设备历史录像查询(调整/部分重构)

This commit is contained in:
shikong 2023-10-03 17:25:05 +08:00
parent 60ea2fc372
commit 4a69cbd8ac
6 changed files with 71 additions and 62 deletions

View File

@ -1,8 +1,8 @@
package cn.skcks.docking.gb28181.api.record; package cn.skcks.docking.gb28181.api.gb28181.record;
import cn.skcks.docking.gb28181.annotation.web.JsonMapping; import cn.skcks.docking.gb28181.annotation.web.JsonMapping;
import cn.skcks.docking.gb28181.annotation.web.methods.GetJson; import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.api.record.dto.GetInfoDTO; import cn.skcks.docking.gb28181.api.gb28181.record.dto.GetRecordInfoDTO;
import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.config.SwaggerConfig; import cn.skcks.docking.gb28181.config.SwaggerConfig;
import cn.skcks.docking.gb28181.service.record.RecordService; import cn.skcks.docking.gb28181.service.record.RecordService;
@ -20,7 +20,7 @@ import java.util.List;
@Tag(name="历史录像") @Tag(name="历史录像")
@RestController @RestController
@JsonMapping("/api/device/record") @JsonMapping("/api/gb28181/record")
@RequiredArgsConstructor @RequiredArgsConstructor
public class RecordController { public class RecordController {
private final RecordService recordService; private final RecordService recordService;
@ -30,8 +30,8 @@ public class RecordController {
return SwaggerConfig.api("Record", "/api/device/record"); return SwaggerConfig.api("Record", "/api/device/record");
} }
@GetJson("/getInfoList") @GetJson("/list")
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> getInfo(@ParameterObject @Validated GetInfoDTO dto){ public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> getInfo(@ParameterObject @Validated GetRecordInfoDTO dto){
return recordService.requestRecordInfo(dto.getDeviceId(), dto.getTimeout(), dto.getDate()); return recordService.requestRecordInfo(dto.getGbDeviceId(), dto.getGbDeviceChannelId(), dto.getTimeout(), dto.getDate());
} }
} }

View File

@ -1,4 +1,4 @@
package cn.skcks.docking.gb28181.api.record.dto; package cn.skcks.docking.gb28181.api.gb28181.record.dto;
import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DatePattern;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
@ -11,10 +11,14 @@ import java.util.Date;
@Schema(title = "查询历史录像") @Schema(title = "查询历史录像")
@Data @Data
public class GetInfoDTO { public class GetRecordInfoDTO {
@NotBlank @NotBlank
@Schema(description = "设备id", example = "44050100001180000001") @Schema(description = "设备id", example = "44050100002000000006")
private String deviceId; private String gbDeviceId;
@NotBlank
@Schema(description = "通道id", example = "34020000001310000001")
private String gbDeviceChannelId;
@Min(30) @Min(30)
@Schema(description = "超时时间(秒)", example = "30") @Schema(description = "超时时间(秒)", example = "30")

View File

@ -6,7 +6,6 @@ import cn.skcks.docking.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
@ -15,6 +14,7 @@ import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.sip.manscdp.MessageDTO; import cn.skcks.docking.gb28181.sip.manscdp.MessageDTO;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO; import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils; import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
@ -25,8 +25,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.util.EventObject; import java.util.EventObject;
import java.util.Optional; import java.util.Optional;
@ -77,8 +75,8 @@ public class MessageRequestProcessor implements MessageProcessor {
response = ok; response = ok;
RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET); RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET);
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn()); String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn());
Optional.ofNullable(subscribe.getRecordInfoSubscribe().getPublisher(key)) Optional.ofNullable(subscribe.getSipRequestSubscribe().getPublisher(key))
.ifPresentOrElse(publisher -> publisher.submit(dto), .ifPresentOrElse(publisher -> publisher.submit(request),
() -> log.warn("对应订阅 {} 已结束, 异常数据 => {}", key, dto)); () -> log.warn("对应订阅 {} 已结束, 异常数据 => {}", key, dto));
}else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){ }else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){
CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(content, CatalogResponseDTO.class); CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(content, CatalogResponseDTO.class);

View File

@ -64,7 +64,7 @@ public class CatalogService {
.deviceId(gbDeviceId) .deviceId(gbDeviceId)
.sn(sn) .sn(sn)
.build(); .build();
Request request = requestBuilder.createMessageRequest(callId, cSeq, MANSCDPUtils.toByteXml(catalogQueryDTO)); Request request = requestBuilder.createMessageRequest(callId, cSeq, MANSCDPUtils.toByteXml(catalogQueryDTO, device.getCharset()));
String key = GenericSubscribe.Helper.getKey(CmdType.CATALOG, gbDeviceId, sn); String key = GenericSubscribe.Helper.getKey(CmdType.CATALOG, gbDeviceId, sn);
subscribe.getSipRequestSubscribe().addPublisher(key, 60, TimeUnit.SECONDS); subscribe.getSipRequestSubscribe().addPublisher(key, 60, TimeUnit.SECONDS);
subscribe.getSipRequestSubscribe().addSubscribe(key, new Flow.Subscriber<>() { subscribe.getSipRequestSubscribe().addSubscribe(key, new Flow.Subscriber<>() {

View File

@ -3,21 +3,26 @@ package cn.skcks.docking.gb28181.service.record;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.json.ResponseStatus; import cn.skcks.docking.gb28181.common.json.ResponseStatus;
import cn.skcks.docking.gb28181.common.xml.XmlUtils; import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.query.dto.RecordInfoRequestDTO;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder; import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.service.SipService; import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDeviceChannel;
import cn.skcks.docking.gb28181.service.device.DeviceChannelService;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.service.record.convertor.RecordConvertor; import cn.skcks.docking.gb28181.service.record.convertor.RecordConvertor;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO; import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.request.RecordInfoRequestDTO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.sip.method.message.request.MessageRequestBuilder;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -25,11 +30,8 @@ import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.SipProvider; import javax.sip.SipProvider;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import java.util.ArrayList; import java.util.*;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -38,11 +40,11 @@ import java.util.stream.Collectors;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
public class RecordService { public class RecordService {
private final SipConfig sipConfig;
private final DockingDeviceService deviceService; private final DockingDeviceService deviceService;
private final DeviceChannelService deviceChannelService;
private final SipService sipService; private final SipService sipService;
private final SipMessageSender sender;
private final SipSubscribe subscribe; private final SipSubscribe subscribe;
/** /**
* *
* @param deviceId 设备id * @param deviceId 设备id
@ -50,7 +52,7 @@ public class RecordService {
* @param date 查询日期 * @param date 查询日期
*/ */
@SneakyThrows @SneakyThrows
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestRecordInfo(String deviceId, long timeout, Date date) { public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestRecordInfo(String deviceId, String channelId, long timeout, Date date) {
log.info("查询 设备 => {} {} 的历史媒体记录, 超时时间 {} 秒", deviceId, DateUtil.formatDate(date), timeout); log.info("查询 设备 => {} {} 的历史媒体记录, 超时时间 {} 秒", deviceId, DateUtil.formatDate(date), timeout);
DeferredResult<JsonResponse<List<RecordInfoItemVO>>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(timeout)); DeferredResult<JsonResponse<List<RecordInfoItemVO>>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(timeout));
@ -60,33 +62,41 @@ public class RecordService {
result.setResult(JsonResponse.error(null, "未找到设备")); result.setResult(JsonResponse.error(null, "未找到设备"));
return result; return result;
} }
Optional<DockingDeviceChannel> deviceChannel = deviceChannelService.getDeviceChannel(deviceId, channelId);
if(deviceChannel.isEmpty()){
log.info("未能找到 设备编码为 => {}, 通道 => {} 的信息", deviceId, channelId);
result.setResult(JsonResponse.error(null, "未找到通道信息"));
return result;
}
String transport = device.getTransport(); String transport = device.getTransport();
String senderIp = device.getLocalIp(); String localIp = device.getLocalIp();
SipProvider provider = sipService.getProvider(transport, senderIp); SipProvider provider = sipService.getProvider(transport, localIp);
CallIdHeader callId = provider.getNewCallId(); String callId = provider.getNewCallId().getCallId();
String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000); String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000);
MessageRequestBuilder requestBuilder = MessageRequestBuilder.builder()
.targetIp(device.getIp())
.targetPort(device.getPort())
.targetId(deviceId)
.localId(sipConfig.getId())
.localIp(localIp)
.localPort(sipConfig.getPort())
.transport(transport)
.build();
RecordInfoRequestDTO dto = RecordInfoRequestDTO.builder() RecordInfoRequestDTO dto = RecordInfoRequestDTO.builder()
.deviceId(deviceId) .deviceId(channelId)
.startTime(DateUtil.beginOfDay(date)) .startTime(DateUtil.beginOfDay(date))
.endTime(DateUtil.endOfDay(date)) .endTime(DateUtil.endOfDay(date))
.sn(sn) .sn(sn)
.build(); .build();
Request request = SipRequestBuilder.createMessageRequest(device, Request request = requestBuilder.createMessageRequest(callId,SipRequestBuilder.getCSeq(), MANSCDPUtils.toByteXml(dto, device.getCharset()));
XmlUtils.toXml(dto), String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, channelId, sn);
SipUtil.generateViaTag(), subscribe.getSipRequestSubscribe().addPublisher(key);
SipUtil.generateFromTag(), Flow.Subscriber<SIPRequest> subscriber = new Flow.Subscriber<>() {
null, final List<RecordInfoItemDTO> list = new ArrayList<>();
callId);
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, deviceId, sn);
subscribe.getRecordInfoSubscribe().addPublisher(key);
List<RecordInfoItemDTO> list = new ArrayList<>();
AtomicLong atomicSum = new AtomicLong(0); AtomicLong atomicSum = new AtomicLong(0);
AtomicLong atomicNum = new AtomicLong(0); AtomicLong atomicNum = new AtomicLong(0);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<RecordInfoResponseDTO> subscriber = new Flow.Subscriber<>() {
Flow.Subscription subscription; Flow.Subscription subscription;
@Override @Override
@ -97,10 +107,11 @@ public class RecordService {
} }
@Override @Override
public void onNext(RecordInfoResponseDTO item) { public void onNext(SIPRequest item) {
atomicSum.set(item.getSumNum()); RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class);
atomicNum.getAndAdd(item.getRecordList().size()); atomicSum.set(data.getSumNum());
list.addAll(item.getRecordList()); atomicNum.getAndAdd(data.getRecordList().getNum());
list.addAll(data.getRecordList().getRecordList());
long num = atomicNum.get(); long num = atomicNum.get();
long sum = atomicSum.get(); long sum = atomicSum.get();
if(num > sum){ if(num > sum){
@ -113,10 +124,7 @@ public class RecordService {
// 针对某些不按规范的设备 // 针对某些不按规范的设备
// 如果已获取数量 >= 约定的总数 // 如果已获取数量 >= 约定的总数
// 就执行定时任务, 500ms 内未收到新的数据视为已结束 // 就执行定时任务, 500ms 内未收到新的数据视为已结束
if(schedule[0] != null){ subscribe.getSipRequestSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS);
schedule[0].cancel(true);
}
schedule[0] = scheduledExecutorService.schedule(this::onComplete, 500, TimeUnit.MILLISECONDS);
} }
subscription.request(1); subscription.request(1);
} }
@ -128,21 +136,20 @@ public class RecordService {
@Override @Override
public void onComplete() { public void onComplete() {
schedule[0].cancel(true);
subscribe.getRecordInfoSubscribe().delPublisher(key);
result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list)))); result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list))));
log.debug("订阅结束 => {}", key); log.debug("订阅结束 => {}", key);
subscribe.getRecordInfoSubscribe().delPublisher(key);
} }
}; };
subscribe.getRecordInfoSubscribe().addSubscribe(key, subscriber); subscribe.getSipRequestSubscribe().addSubscribe(key, subscriber);
sender.send(senderIp, request);
result.onTimeout(() -> { result.onTimeout(() -> {
result.setResult(JsonResponse.build(ResponseStatus.PARTIAL_CONTENT, result.setResult(JsonResponse.build(ResponseStatus.PARTIAL_CONTENT,
RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list)), RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(Collections.emptyList())),
"查询超时, 结果可能不完整")); "查询超时, 结果可能不完整"));
subscribe.getRecordInfoSubscribe().delPublisher(key); subscribe.getRecordInfoSubscribe().delPublisher(key);
}); });
provider.sendRequest(request);
return result; return result;
} }

View File

@ -1,7 +1,7 @@
package cn.skcks.docking.gb28181.service.record.convertor; package cn.skcks.docking.gb28181.service.record.convertor;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO; import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoItemDTO;
import org.mapstruct.Mapper; import org.mapstruct.Mapper;
import org.mapstruct.factory.Mappers; import org.mapstruct.factory.Mappers;