diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/record/RecordController.java b/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/record/RecordController.java similarity index 75% rename from api/src/main/java/cn/skcks/docking/gb28181/api/record/RecordController.java rename to api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/record/RecordController.java index 6f0c93e..0f619e2 100644 --- a/api/src/main/java/cn/skcks/docking/gb28181/api/record/RecordController.java +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/record/RecordController.java @@ -1,8 +1,8 @@ -package cn.skcks.docking.gb28181.api.record; +package cn.skcks.docking.gb28181.api.gb28181.record; import cn.skcks.docking.gb28181.annotation.web.JsonMapping; import cn.skcks.docking.gb28181.annotation.web.methods.GetJson; -import cn.skcks.docking.gb28181.api.record.dto.GetInfoDTO; +import cn.skcks.docking.gb28181.api.gb28181.record.dto.GetRecordInfoDTO; import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.config.SwaggerConfig; import cn.skcks.docking.gb28181.service.record.RecordService; @@ -20,7 +20,7 @@ import java.util.List; @Tag(name="历史录像") @RestController -@JsonMapping("/api/device/record") +@JsonMapping("/api/gb28181/record") @RequiredArgsConstructor public class RecordController { private final RecordService recordService; @@ -30,8 +30,8 @@ public class RecordController { return SwaggerConfig.api("Record", "/api/device/record"); } - @GetJson("/getInfoList") - public DeferredResult>> getInfo(@ParameterObject @Validated GetInfoDTO dto){ - return recordService.requestRecordInfo(dto.getDeviceId(), dto.getTimeout(), dto.getDate()); + @GetJson("/list") + public DeferredResult>> getInfo(@ParameterObject @Validated GetRecordInfoDTO dto){ + return recordService.requestRecordInfo(dto.getGbDeviceId(), dto.getGbDeviceChannelId(), dto.getTimeout(), dto.getDate()); } } diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/record/dto/GetInfoDTO.java b/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/record/dto/GetRecordInfoDTO.java similarity index 66% rename from api/src/main/java/cn/skcks/docking/gb28181/api/record/dto/GetInfoDTO.java rename to api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/record/dto/GetRecordInfoDTO.java index a02b95e..a62fbd3 100644 --- a/api/src/main/java/cn/skcks/docking/gb28181/api/record/dto/GetInfoDTO.java +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/record/dto/GetRecordInfoDTO.java @@ -1,4 +1,4 @@ -package cn.skcks.docking.gb28181.api.record.dto; +package cn.skcks.docking.gb28181.api.gb28181.record.dto; import cn.hutool.core.date.DatePattern; import io.swagger.v3.oas.annotations.media.Schema; @@ -11,10 +11,14 @@ import java.util.Date; @Schema(title = "查询历史录像") @Data -public class GetInfoDTO { +public class GetRecordInfoDTO { @NotBlank - @Schema(description = "设备id", example = "44050100001180000001") - private String deviceId; + @Schema(description = "设备id", example = "44050100002000000006") + private String gbDeviceId; + + @NotBlank + @Schema(description = "通道id", example = "34020000001310000001") + private String gbDeviceChannelId; @Min(30) @Schema(description = "超时时间(秒)", example = "30") diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java index 80d3f1b..a0b4d47 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java @@ -6,7 +6,6 @@ import cn.skcks.docking.gb28181.constant.CmdType; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; -import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; @@ -15,6 +14,7 @@ import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; import cn.skcks.docking.gb28181.sip.manscdp.MessageDTO; import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO; +import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO; import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; @@ -25,8 +25,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.sip.RequestEvent; -import javax.sip.header.CallIdHeader; -import javax.sip.message.Request; import javax.sip.message.Response; import java.util.EventObject; import java.util.Optional; @@ -77,8 +75,8 @@ public class MessageRequestProcessor implements MessageProcessor { response = ok; RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET); String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn()); - Optional.ofNullable(subscribe.getRecordInfoSubscribe().getPublisher(key)) - .ifPresentOrElse(publisher -> publisher.submit(dto), + Optional.ofNullable(subscribe.getSipRequestSubscribe().getPublisher(key)) + .ifPresentOrElse(publisher -> publisher.submit(request), () -> log.warn("对应订阅 {} 已结束, 异常数据 => {}", key, dto)); }else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){ CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(content, CatalogResponseDTO.class); diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java index c1ae3ef..c818a5d 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java @@ -64,7 +64,7 @@ public class CatalogService { .deviceId(gbDeviceId) .sn(sn) .build(); - Request request = requestBuilder.createMessageRequest(callId, cSeq, MANSCDPUtils.toByteXml(catalogQueryDTO)); + Request request = requestBuilder.createMessageRequest(callId, cSeq, MANSCDPUtils.toByteXml(catalogQueryDTO, device.getCharset())); String key = GenericSubscribe.Helper.getKey(CmdType.CATALOG, gbDeviceId, sn); subscribe.getSipRequestSubscribe().addPublisher(key, 60, TimeUnit.SECONDS); subscribe.getSipRequestSubscribe().addSubscribe(key, new Flow.Subscriber<>() { diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java index d70eb9c..f5c8c4e 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java @@ -3,21 +3,26 @@ package cn.skcks.docking.gb28181.service.record; import cn.hutool.core.date.DateUtil; import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.common.json.ResponseStatus; -import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import cn.skcks.docking.gb28181.config.sip.SipConfig; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; -import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.query.dto.RecordInfoRequestDTO; -import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoItemDTO; -import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; + + import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder; -import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.service.SipService; -import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; +import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDeviceChannel; +import cn.skcks.docking.gb28181.service.device.DeviceChannelService; import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; import cn.skcks.docking.gb28181.service.record.convertor.RecordConvertor; import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO; +import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.request.RecordInfoRequestDTO; +import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoItemDTO; +import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO; +import cn.skcks.docking.gb28181.sip.method.message.request.MessageRequestBuilder; +import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils; +import gov.nist.javax.sip.message.SIPRequest; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -25,11 +30,8 @@ import org.springframework.stereotype.Service; import org.springframework.web.context.request.async.DeferredResult; import javax.sip.SipProvider; -import javax.sip.header.CallIdHeader; import javax.sip.message.Request; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -38,11 +40,11 @@ import java.util.stream.Collectors; @Service @RequiredArgsConstructor public class RecordService { + private final SipConfig sipConfig; private final DockingDeviceService deviceService; + private final DeviceChannelService deviceChannelService; private final SipService sipService; - private final SipMessageSender sender; private final SipSubscribe subscribe; - /** * * @param deviceId 设备id @@ -50,7 +52,7 @@ public class RecordService { * @param date 查询日期 */ @SneakyThrows - public DeferredResult>> requestRecordInfo(String deviceId, long timeout, Date date) { + public DeferredResult>> requestRecordInfo(String deviceId, String channelId, long timeout, Date date) { log.info("查询 设备 => {} {} 的历史媒体记录, 超时时间 {} 秒", deviceId, DateUtil.formatDate(date), timeout); DeferredResult>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(timeout)); @@ -60,33 +62,41 @@ public class RecordService { result.setResult(JsonResponse.error(null, "未找到设备")); return result; } + Optional deviceChannel = deviceChannelService.getDeviceChannel(deviceId, channelId); + if(deviceChannel.isEmpty()){ + log.info("未能找到 设备编码为 => {}, 通道 => {} 的信息", deviceId, channelId); + result.setResult(JsonResponse.error(null, "未找到通道信息")); + return result; + } String transport = device.getTransport(); - String senderIp = device.getLocalIp(); - SipProvider provider = sipService.getProvider(transport, senderIp); - CallIdHeader callId = provider.getNewCallId(); + String localIp = device.getLocalIp(); + SipProvider provider = sipService.getProvider(transport, localIp); + String callId = provider.getNewCallId().getCallId(); String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000); + MessageRequestBuilder requestBuilder = MessageRequestBuilder.builder() + .targetIp(device.getIp()) + .targetPort(device.getPort()) + .targetId(deviceId) + .localId(sipConfig.getId()) + .localIp(localIp) + .localPort(sipConfig.getPort()) + .transport(transport) + .build(); + RecordInfoRequestDTO dto = RecordInfoRequestDTO.builder() - .deviceId(deviceId) + .deviceId(channelId) .startTime(DateUtil.beginOfDay(date)) .endTime(DateUtil.endOfDay(date)) .sn(sn) .build(); - Request request = SipRequestBuilder.createMessageRequest(device, - XmlUtils.toXml(dto), - SipUtil.generateViaTag(), - SipUtil.generateFromTag(), - null, - callId); - - String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, deviceId, sn); - subscribe.getRecordInfoSubscribe().addPublisher(key); - List list = new ArrayList<>(); - AtomicLong atomicSum = new AtomicLong(0); - AtomicLong atomicNum = new AtomicLong(0); - ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - final ScheduledFuture[] schedule = new ScheduledFuture[1]; - Flow.Subscriber subscriber = new Flow.Subscriber<>() { + Request request = requestBuilder.createMessageRequest(callId,SipRequestBuilder.getCSeq(), MANSCDPUtils.toByteXml(dto, device.getCharset())); + String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, channelId, sn); + subscribe.getSipRequestSubscribe().addPublisher(key); + Flow.Subscriber subscriber = new Flow.Subscriber<>() { + final List list = new ArrayList<>(); + AtomicLong atomicSum = new AtomicLong(0); + AtomicLong atomicNum = new AtomicLong(0); Flow.Subscription subscription; @Override @@ -97,10 +107,11 @@ public class RecordService { } @Override - public void onNext(RecordInfoResponseDTO item) { - atomicSum.set(item.getSumNum()); - atomicNum.getAndAdd(item.getRecordList().size()); - list.addAll(item.getRecordList()); + public void onNext(SIPRequest item) { + RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class); + atomicSum.set(data.getSumNum()); + atomicNum.getAndAdd(data.getRecordList().getNum()); + list.addAll(data.getRecordList().getRecordList()); long num = atomicNum.get(); long sum = atomicSum.get(); if(num > sum){ @@ -113,10 +124,7 @@ public class RecordService { // 针对某些不按规范的设备 // 如果已获取数量 >= 约定的总数 // 就执行定时任务, 若 500ms 内未收到新的数据视为已结束 - if(schedule[0] != null){ - schedule[0].cancel(true); - } - schedule[0] = scheduledExecutorService.schedule(this::onComplete, 500, TimeUnit.MILLISECONDS); + subscribe.getSipRequestSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS); } subscription.request(1); } @@ -128,21 +136,20 @@ public class RecordService { @Override public void onComplete() { - schedule[0].cancel(true); - subscribe.getRecordInfoSubscribe().delPublisher(key); result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list)))); log.debug("订阅结束 => {}", key); + subscribe.getRecordInfoSubscribe().delPublisher(key); } }; - subscribe.getRecordInfoSubscribe().addSubscribe(key, subscriber); - sender.send(senderIp, request); + subscribe.getSipRequestSubscribe().addSubscribe(key, subscriber); result.onTimeout(() -> { result.setResult(JsonResponse.build(ResponseStatus.PARTIAL_CONTENT, - RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list)), + RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(Collections.emptyList())), "查询超时, 结果可能不完整")); subscribe.getRecordInfoSubscribe().delPublisher(key); }); + provider.sendRequest(request); return result; } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/convertor/RecordConvertor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/convertor/RecordConvertor.java index f62ac13..5034a16 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/convertor/RecordConvertor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/convertor/RecordConvertor.java @@ -1,7 +1,7 @@ package cn.skcks.docking.gb28181.service.record.convertor; -import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoItemDTO; import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO; +import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoItemDTO; import org.mapstruct.Mapper; import org.mapstruct.factory.Mappers;