From bfa056d2ca6b5f6d63702bc96c896216aa8930af Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Wed, 23 Aug 2023 01:51:04 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=BE=E5=A4=87=20=E5=AA=92=E4=BD=93?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/api/record/RecordController.java | 10 +-- gb28181-service/pom.xml | 4 ++ .../request/MessageRequestProcessor.java | 19 ++++- .../message/subscribe/GenericSubscribe.java | 6 ++ .../subscribe/RecordInfoSubscribe.java | 5 ++ .../gb28181/service/record/RecordService.java | 71 ++++++++++++++++++- 6 files changed, 104 insertions(+), 11 deletions(-) diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/record/RecordController.java b/api/src/main/java/cn/skcks/docking/gb28181/api/record/RecordController.java index b9cbe28..e1d438c 100644 --- a/api/src/main/java/cn/skcks/docking/gb28181/api/record/RecordController.java +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/record/RecordController.java @@ -5,6 +5,7 @@ import cn.skcks.docking.gb28181.annotation.web.methods.GetJson; import cn.skcks.docking.gb28181.api.record.dto.GetInfoDTO; import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.config.SwaggerConfig; +import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoItemDTO; import cn.skcks.docking.gb28181.service.record.RecordService; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.RequiredArgsConstructor; @@ -15,6 +16,8 @@ import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; +import java.util.List; + @Tag(name="历史录像") @RestController @JsonMapping("/record") @@ -28,10 +31,7 @@ public class RecordController { } @GetJson("/getInfo") - public DeferredResult> getInfo(@ParameterObject @Validated GetInfoDTO dto){ - recordService.requestRecordInfo(dto.getDeviceId()); - DeferredResult> result = new DeferredResult<>(); - result.setResult(JsonResponse.success(null)); - return result; + public DeferredResult>> getInfo(@ParameterObject @Validated GetInfoDTO dto){ + return recordService.requestRecordInfo(dto.getDeviceId()); } } diff --git a/gb28181-service/pom.xml b/gb28181-service/pom.xml index 74bbda0..252a47c 100644 --- a/gb28181-service/pom.xml +++ b/gb28181-service/pom.xml @@ -78,6 +78,10 @@ junit-jupiter test + + org.springframework + spring-web + diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java index b751330..bab6238 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java @@ -2,12 +2,15 @@ package cn.skcks.docking.gb28181.core.sip.message.processor.message.request; import cn.skcks.docking.gb28181.common.json.ResponseStatus; import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; @@ -22,6 +25,7 @@ import org.springframework.stereotype.Component; import javax.sip.RequestEvent; import javax.sip.header.CallIdHeader; import javax.sip.message.Response; +import java.util.concurrent.SubmissionPublisher; @Slf4j @RequiredArgsConstructor @@ -30,6 +34,7 @@ public class MessageRequestProcessor implements MessageProcessor { private final SipListener sipListener; private final DockingDeviceService deviceService; private final SipMessageSender sender; + private final SipSubscribe subscribe; @PostConstruct @Override @@ -43,7 +48,8 @@ public class MessageRequestProcessor implements MessageProcessor { String deviceId = SipUtil.getUserIdFromFromHeader(request); CallIdHeader callIdHeader = request.getCallIdHeader(); - MessageDTO messageDto = XmlUtils.parse(request.getRawContent(), MessageDTO.class, GB28181Constant.CHARSET); + byte[] content = request.getRawContent(); + MessageDTO messageDto = XmlUtils.parse(content, MessageDTO.class, GB28181Constant.CHARSET); log.debug("接收到的消息 => {}", messageDto); DockingDevice device = deviceService.getDevice(deviceId); @@ -56,15 +62,22 @@ public class MessageRequestProcessor implements MessageProcessor { return; } + Response ok = response(request, Response.OK, "OK"); Response response; if(messageDto.getCmdType().equalsIgnoreCase(CmdType.KEEPALIVE)){ - response = response(request, Response.OK, "OK"); + response = ok; // 更新设备在线状态 deviceService.online(device, response); + } else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.RECORD_INFO)){ + response = ok; + RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET); + String key = CacheUtil.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn()); + SubmissionPublisher publisher = subscribe.getRecordInfoSubscribe().getPublisher(key); + publisher.submit(dto); } else { response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage()); } - sender.send(senderIp,response); + sender.send(senderIp, response); } @SneakyThrows diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/GenericSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/GenericSubscribe.java index 8263a7f..cf064a6 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/GenericSubscribe.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/GenericSubscribe.java @@ -15,6 +15,7 @@ public interface GenericSubscribe { SubmissionPublisher getPublisher(String key); void addSubscribe(String key,Flow.Subscriber subscribe); + void delPublisher(String key); class Helper { public static void close(Map> publishers){ @@ -22,6 +23,11 @@ public interface GenericSubscribe { publishers.clear(); } + public static void delPublisher(Map> publishers, String key){ + SubmissionPublisher publisher = publishers.remove(key); + publisher.close(); + } + public static void addPublisher(Executor executor, Map> publishers, String key){ SubmissionPublisher publisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); publishers.put(key, publisher); diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/RecordInfoSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/RecordInfoSubscribe.java index bf1a051..8206d01 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/RecordInfoSubscribe.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/RecordInfoSubscribe.java @@ -29,4 +29,9 @@ public class RecordInfoSubscribe implements GenericSubscribe subscribe) { Helper.addSubscribe(publishers, key, subscribe); } + + @Override + public void delPublisher(String key) { + Helper.delPublisher(publishers, key); + } } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java index 75a5ec7..2a9fd44 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java @@ -1,10 +1,16 @@ package cn.skcks.docking.gb28181.service.record; import cn.hutool.core.date.DateUtil; +import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.query.dto.RecordInfoRequestDTO; +import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoItemDTO; +import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder; import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.service.SipService; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; @@ -13,10 +19,15 @@ import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.springframework.web.context.request.async.DeferredResult; import javax.sip.SipProvider; import javax.sip.header.CallIdHeader; import javax.sip.message.Request; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; @Slf4j @Service @@ -25,24 +36,29 @@ public class RecordService { private final DockingDeviceService deviceService; private final SipService sipService; private final SipMessageSender sender; + private final SipSubscribe subscribe; @SneakyThrows - public void requestRecordInfo(String deviceId){ + public DeferredResult>> requestRecordInfo(String deviceId) { + DeferredResult>> result = new DeferredResult<>(30 * 1000L); + DockingDevice device = deviceService.getDevice(deviceId); if (device == null) { log.info("未能找到 编码为 => {} 的设备", deviceId); - return; + result.setResult(JsonResponse.error(null, "未找到设备")); + return result; } String transport = device.getTransport(); String senderIp = device.getLocalIp(); SipProvider provider = sipService.getProvider(transport, senderIp); CallIdHeader callId = provider.getNewCallId(); + String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000); RecordInfoRequestDTO dto = RecordInfoRequestDTO.builder() .deviceId(deviceId) .startTime(DateUtil.beginOfDay(DateUtil.date())) .endTime(DateUtil.endOfDay(DateUtil.date())) - .sn(String.valueOf((int)(Math.random() * 9 + 1) * 100000)) + .sn(sn) .build(); Request request = SipRequestBuilder.createMessageRequest(device, XmlUtils.toXml(dto), @@ -50,6 +66,55 @@ public class RecordService { SipUtil.generateFromTag(), null, callId); + + String key = CacheUtil.getKey(CmdType.RECORD_INFO, deviceId, sn); + subscribe.getRecordInfoSubscribe().addPublisher(key); sender.send(senderIp, request); + List list = new ArrayList<>(); + AtomicLong sum = new AtomicLong(0); + AtomicLong getNum = new AtomicLong(0); + subscribe.getRecordInfoSubscribe().addSubscribe(key, new Flow.Subscriber<>() { + Flow.Subscription subscription; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + log.debug("建立订阅 => {}", key); + subscription.request(1); + } + + @Override + public void onNext(RecordInfoResponseDTO item) { + sum.set(item.getSumNum()); + getNum.getAndAdd(item.getRecordList().size()); + log.info("{}", item); + list.addAll(item.getRecordList()); + log.info("{}/{}", getNum.get(), sum.get()); + if (getNum.get() >= sum.get()) { + onComplete(); + } else { + subscription.request(1); + } + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + subscribe.getRecordInfoSubscribe().delPublisher(key); + result.setResult(JsonResponse.success(list)); + log.debug("订阅结束 => {}", key); + } + }); + + result.onTimeout(()->{ + result.setResult(JsonResponse.success(list,"查询超时, 结果可能不完整")); + subscribe.getRecordInfoSubscribe().delPublisher(key); + }); + + return result; } }