From 071e2a249194315a0ab7e0417fca2cc39acaebe5 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Mon, 5 Feb 2024 16:43:43 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=8F=AF=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=20=E8=8E=B7=E5=8F=96=E5=8E=86=E5=8F=B2=E8=A7=86=E9=A2=91?= =?UTF-8?q?=E4=B9=8B=E5=89=8D=20=E6=98=AF=E5=90=A6=20=E5=85=88=E5=8F=91?= =?UTF-8?q?=E8=B5=B7=20RecordInfo=20=E8=AF=B7=E6=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wvp/api/gb28181/Gb28181Controller.java | 7 +- .../gb28181/wvp/config/ProxySipConfig.java | 10 ++- .../gb28181/Gb28181DownloadService.java | 62 ++++++++++++-- .../wvp/service/record/RecordInfoService.java | 28 +++++-- .../wvp/service/record/dto/RecordInfoDTO.java | 4 + .../request/MessageRequestProcessor.java | 8 ++ .../wvp/sip/subscribe/RecordSubscribe.java | 81 +++++++++++++++++++ .../wvp/sip/subscribe/SipSubscribe.java | 8 ++ .../src/main/resources/application-local.yml | 15 +++- 9 files changed, 199 insertions(+), 24 deletions(-) create mode 100644 gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/RecordSubscribe.java diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java index aad15fc..51314b2 100644 --- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java @@ -4,7 +4,7 @@ import cn.skcks.docking.gb28181.annotation.web.JsonMapping; import cn.skcks.docking.gb28181.annotation.web.methods.GetJson; import cn.skcks.docking.gb28181.annotation.web.methods.PostJson; import cn.skcks.docking.gb28181.common.json.JsonResponse; -import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.request.RecordInfoRequestDTO; +import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO; import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig; import cn.skcks.docking.gb28181.wvp.service.catalog.CatalogService; import cn.skcks.docking.gb28181.wvp.service.device.control.DeviceControlService; @@ -57,8 +57,7 @@ public class Gb28181Controller { } @PostJson("/recordInfo") - public JsonResponse recordInfo(RecordInfoDTO dto){ - recordInfoService.requestRecordInfo(dto); - return JsonResponse.success(null); + public DeferredResult>> recordInfo(RecordInfoDTO dto){ + return recordInfoService.requestRecordInfo(dto); } } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ProxySipConfig.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ProxySipConfig.java index 99e2bb8..afa2fc2 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ProxySipConfig.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/ProxySipConfig.java @@ -2,7 +2,6 @@ package cn.skcks.docking.gb28181.wvp.config; import cn.skcks.docking.gb28181.config.sip.SipConfig; import cn.skcks.docking.gb28181.sdp.media.MediaStreamMode; - import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -11,6 +10,7 @@ import org.springframework.stereotype.Component; import javax.sip.ListeningPoint; import java.util.List; +import java.util.concurrent.TimeUnit; @Component @ConfigurationProperties(prefix = "proxy.gb28181.sip", ignoreInvalidFields = true) @@ -44,6 +44,14 @@ public class ProxySipConfig { */ private String proxyMediaUrl = ""; + /** + * 调用 视频下载之前 是否使用 recordInfo 查询 + */ + private boolean useRecordInfoQueryBeforeDownload = true; + private int retryRecordInfoQueryBeforeDownloadTimes = 20; + private long retryRecordInfoQueryBeforeDownloadInterval = 3; + private TimeUnit retryRecordInfoQueryBeforeDownloadIntervalUnit = TimeUnit.SECONDS; + @Bean public SipConfig sipConfig(){ SipConfig sipConfig = new SipConfig(); diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index 70a582b..6b1817b 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -25,6 +25,7 @@ import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; import cn.skcks.docking.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.sdp.GB28181SDPBuilder; import cn.skcks.docking.gb28181.sdp.media.MediaStreamMode; +import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO; import cn.skcks.docking.gb28181.service.ssrc.SsrcService; import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig; import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig; @@ -32,11 +33,18 @@ import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; +import cn.skcks.docking.gb28181.wvp.service.record.RecordInfoService; +import cn.skcks.docking.gb28181.wvp.service.record.dto.RecordInfoDTO; import cn.skcks.docking.gb28181.wvp.service.video.VideoService; import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.wvp.sip.response.SipResponseBuilder; import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; +import cn.skcks.docking.gb28181.wvp.utils.RetryUtil; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; import gov.nist.javax.sdp.MediaDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sdp.fields.URIField; @@ -62,6 +70,7 @@ import java.nio.charset.StandardCharsets; import java.text.MessageFormat; import java.time.ZoneId; import java.util.Date; +import java.util.List; import java.util.Optional; import java.util.Vector; import java.util.concurrent.*; @@ -80,6 +89,8 @@ public class Gb28181DownloadService { private final SipSubscribe subscribe; private final VideoService videoService; private final WvpProxyConfig wvpProxyConfig; + private final RecordInfoService recordInfoService; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentMap>> requestMap = new ConcurrentHashMap<>(); @@ -150,22 +161,59 @@ public class Gb28181DownloadService { } @SneakyThrows + @SuppressWarnings({"UnstableApiUsage", "unchecked"}) public void video(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime, Boolean fileHeader, Boolean useDownload) { AsyncContext asyncContext = request.startAsync(); asyncContext.setTimeout(0); asyncContext.start(()->{ HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse(); try{ - download(deviceCode, startTime,endTime, useDownload).whenComplete((videoInfo, e)->{ - writeFileHeader(response,deviceCode,startTime,endTime,fileHeader); - if(e != null){ + if(proxySipConfig.isUseRecordInfoQueryBeforeDownload()){ + String name = MessageFormat.format("{0} {1}-{2}", deviceCode, startTime, endTime); + Retryer>> retryer = RetryerBuilder.>>newBuilder() + // 异常就重试 + .retryIfException() + .retryIfRuntimeException() + // 重试间隔 + .withWaitStrategy(WaitStrategies.fixedWait(proxySipConfig.getRetryRecordInfoQueryBeforeDownloadInterval(), proxySipConfig.getRetryRecordInfoQueryBeforeDownloadIntervalUnit())) + // 重试次数 + .withStopStrategy(StopStrategies.stopAfterAttempt(proxySipConfig.getRetryRecordInfoQueryBeforeDownloadTimes())) + .retryIfResult((result) -> { + log.info("{}", result); + return result == null || + result.getCode() != Response.OK || + result.getData() == null || + result.getData().isEmpty(); + }) + .withRetryListener(RetryUtil.defaultRetryListener(name)).build(); + + retryer.call(()->{ + CompletableFuture>> future = new CompletableFuture<>(); + // 发起设备录像查询 + DeferredResult>> requestedRecordInfo = + recordInfoService.requestRecordInfo(new RecordInfoDTO(deviceCode, startTime, endTime, "", 0, "all")); + + requestedRecordInfo.setResultHandler(result -> { + future.complete((JsonResponse>) result); + }); + requestedRecordInfo.onError((throwable)->{ + future.complete(JsonResponse.error(throwable.getMessage())); + }); + + return future.get(); + }); + } + + download(deviceCode, startTime, endTime, useDownload).whenComplete((videoInfo, e) -> { + writeFileHeader(response, deviceCode, startTime, endTime, fileHeader); + if (e != null) { writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage())); - } else if(videoInfo == null){ + } else if (videoInfo == null) { writeErrorToResponse(asyncResponse, JsonResponse.error("下载失败")); - } else if(wvpProxyConfig.getUseFfmpeg()){ - videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60,videoInfo.getDevice(),videoInfo.getCallId()); + } else if (wvpProxyConfig.getUseFfmpeg()) { + videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime, endTime, DateUnit.SECOND) + 60, videoInfo.getDevice(), videoInfo.getCallId()); } else { - videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60); + videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime, endTime, DateUnit.SECOND) + 60); } asyncContext.complete(); }); diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/record/RecordInfoService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/record/RecordInfoService.java index 74177b4..24010a7 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/record/RecordInfoService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/record/RecordInfoService.java @@ -1,8 +1,12 @@ package cn.skcks.docking.gb28181.wvp.service.record; import cn.skcks.docking.gb28181.common.json.JsonException; +import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO; import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.request.RecordInfoRequestDTO; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; @@ -11,16 +15,17 @@ import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; import cn.skcks.docking.gb28181.wvp.service.record.dto.RecordInfoDTO; import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender; +import cn.skcks.docking.gb28181.wvp.sip.subscribe.RecordSubscribe; import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.springframework.web.context.request.async.DeferredResult; import java.text.MessageFormat; +import java.util.List; import java.util.Optional; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; @Slf4j @Service @@ -28,12 +33,11 @@ import java.util.concurrent.ScheduledExecutorService; public class RecordInfoService { private final SipSender sipSender; private final SipSubscribe sipSubscribe; - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final DockingService dockingService; private final DeviceService deviceService; @SneakyThrows - public void requestRecordInfo(RecordInfoDTO dto){ + public DeferredResult>> requestRecordInfo(RecordInfoDTO dto){ String deviceCode = dto.getDeviceCode(); Optional deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode); if (deviceByDeviceCode.isEmpty()) { @@ -42,20 +46,23 @@ public class RecordInfoService { throw new JsonException(reason); } else { WvpProxyDevice device = deviceByDeviceCode.get(); - requestRecordInfo(device.getGbDeviceId(), device.getGbDeviceChannelId(), dto); + return requestRecordInfo(device.getGbDeviceId(), device.getGbDeviceChannelId(), dto); } } - public void requestRecordInfo(String gbDeviceId, String channel, RecordInfoDTO dto){ + public DeferredResult>> requestRecordInfo(String gbDeviceId, String channel, RecordInfoDTO dto){ + DeferredResult>> result = new DeferredResult<>(); Optional deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId); if(deviceByGbDeviceId.isEmpty()){ log.info("未能找到 国标编码 {} 的注册信息", gbDeviceId); - return; + result.setResult(JsonResponse.error(MessageFormat.format("未能找到 设备编码 为 {0} 的设备", gbDeviceId))); + return result; } Optional deviceByGbDeviceIdAndChannel = deviceService.getDeviceByGbDeviceIdAndChannel(gbDeviceId, channel); if (deviceByGbDeviceIdAndChannel.isEmpty()) { log.info("未能找到 编码 {}, 通道 {} 的设备", gbDeviceId, channel); - return; + result.setResult(JsonResponse.error(MessageFormat.format("未能找到 编码 {0}, 通道 {1} 的设备", gbDeviceId, channel))); + return result; } WvpProxyDocking device = deviceByGbDeviceId.get(); String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000); @@ -69,7 +76,12 @@ public class RecordInfoService { .filePath(dto.getFilePath()) .indistinctQuery(0) .build(); + + String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, channel, sn); + sipSubscribe.getMessageSubscribe().addPublisher(key); + sipSubscribe.getMessageSubscribe().addSubscribe(key, new RecordSubscribe(sipSubscribe, key, result, gbDeviceId)); sipSender.sendRequest((provider, ip, port)-> SipRequestBuilder.createMessageRequest(device,ip,port,SipRequestBuilder.getCSeq(), XmlUtils.toXml(recordInfoRequestDTO), SipUtil.generateViaTag(), SipUtil.generateFromTag(), provider.getNewCallId())); + return result; } } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/record/dto/RecordInfoDTO.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/record/dto/RecordInfoDTO.java index b026970..dcd1a5a 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/record/dto/RecordInfoDTO.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/record/dto/RecordInfoDTO.java @@ -3,11 +3,15 @@ package cn.skcks.docking.gb28181.wvp.service.record.dto; import cn.hutool.core.date.DatePattern; import cn.skcks.docking.gb28181.constant.GB28181Constant; import com.fasterxml.jackson.annotation.JsonFormat; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import org.springframework.format.annotation.DateTimeFormat; import java.util.Date; +@AllArgsConstructor +@NoArgsConstructor @Data public class RecordInfoDTO { /** diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java index 2e14d56..61ceeb9 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/processor/request/MessageRequestProcessor.java @@ -10,6 +10,7 @@ import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.M import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; +import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO; import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO; import cn.skcks.docking.gb28181.wvp.sip.message.message.notify.MediaStatusRequestDTO; @@ -81,6 +82,13 @@ public class MessageRequestProcessor implements MessageProcessor { if(StringUtils.equalsAnyIgnoreCase(messageDto.getCmdType(), CmdType.KEEPALIVE)){ response = ok; // 更新设备在线状态 + } else if(messageDto.getCmdType().equalsIgnoreCase(cn.skcks.docking.gb28181.constant.CmdType.RECORD_INFO)) { + response = ok; + RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET); + String key = GenericSubscribe.Helper.getKey(cn.skcks.docking.gb28181.constant.CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn()); + Optional.ofNullable(subscribe.getMessageSubscribe().getPublisher(key)) + .ifPresentOrElse(publisher -> publisher.submit(request), + () -> log.warn("对应订阅 {} 已结束, 异常数据 => {}", key, dto)); } else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){ response = ok; CatalogResponseDTO dto = XmlUtils.parse(content, CatalogResponseDTO.class, GB28181Constant.CHARSET); diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/RecordSubscribe.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/RecordSubscribe.java new file mode 100644 index 0000000..69daa4b --- /dev/null +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/RecordSubscribe.java @@ -0,0 +1,81 @@ +package cn.skcks.docking.gb28181.wvp.sip.subscribe; + +import cn.hutool.core.date.DateUtil; +import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.service.record.convertor.RecordConvertor; +import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO; +import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoItemDTO; +import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO; +import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils; +import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Flow; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +@Slf4j +@RequiredArgsConstructor +public class RecordSubscribe implements Flow.Subscriber{ + private final SipSubscribe subscribe; + private final String key; + private final DeferredResult>> result; + private final String deviceId; + + private final List list = new ArrayList<>(); + private final AtomicLong atomicSum = new AtomicLong(0); + private final AtomicLong atomicNum = new AtomicLong(0); + private Flow.Subscription subscription; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + log.debug("建立订阅 => {}", key); + subscription.request(1); + } + + @Override + public void onNext(SIPRequest item) { + RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class); + atomicSum.set(Math.max(data.getSumNum(), atomicNum.get())); + atomicNum.addAndGet(data.getRecordList().getNum()); + list.addAll(data.getRecordList().getRecordList()); + long num = atomicNum.get(); + long sum = atomicSum.get(); + if(num > sum){ + log.warn("检测到 设备 => {}, 未按规范实现, 订阅 => {}, 期望总数为 => {}, 已接收数量 => {}", deviceId, key, atomicSum.get(), atomicNum.get()); + } else { + log.info("获取订阅 => {}, {}/{}", key, atomicNum.get(), atomicSum.get()); + } + + if (num >= sum) { + // 针对某些不按规范的设备 + // 如果已获取数量 >= 约定的总数 + // 就执行定时任务, 若 500ms 内未收到新的数据视为已结束 + subscribe.getMessageSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS); + } + subscription.request(1); + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list)))); + log.debug("订阅结束 => {}", key); + subscribe.getMessageSubscribe().delPublisher(key); + } + + private List sortedRecordList(List list){ + return list.stream().sorted((a,b)-> DateUtil.compare(a.getStartTime(),b.getStartTime())).collect(Collectors.toList()); + } +} + diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java index 439c3dd..9ae70bf 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/subscribe/SipSubscribe.java @@ -2,7 +2,9 @@ package cn.skcks.docking.gb28181.wvp.sip.subscribe; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericTimeoutSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.InviteSubscribe; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipRequestSubscribe; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import jakarta.annotation.PostConstruct; @@ -14,6 +16,8 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; @Slf4j @Data @@ -22,15 +26,18 @@ import java.util.concurrent.Executor; public class SipSubscribe { @Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME) private final Executor executor; + private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); private GenericSubscribe catalogSubscribe; private GenericSubscribe inviteSubscribe; private GenericSubscribe byeSubscribe; + private GenericTimeoutSubscribe messageSubscribe; @PostConstruct private void init() { catalogSubscribe = new CatalogSubscribe(executor); inviteSubscribe = new InviteSubscribe(executor); byeSubscribe = new ByeSubscribe(executor); + messageSubscribe = new SipRequestSubscribe(executor, scheduledExecutorService); } @PreDestroy @@ -38,5 +45,6 @@ public class SipSubscribe { catalogSubscribe.close(); inviteSubscribe.close(); byeSubscribe.close(); + messageSubscribe.close(); } } diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml index 26b9a14..a2774b1 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application-local.yml @@ -43,7 +43,8 @@ media: proxy: wvp: - url: http://127.0.0.1:18978 + #url: http://127.0.0.1:18978 + url: http://192.168.3.12:18978 user: admin passwd: admin use-ffmpeg: true @@ -55,6 +56,7 @@ proxy: - 44050100002000000003 - 44050100001180000001 - 44050100001320000001 + - 44050100001110000010 # 用于生成 代理 wvp 的 视频流 ws-flv 地址 #proxy-media-url: 'wss://192.168.1.241:9022/mf-config/media' proxy-media-url: 'ws://10.10.10.200:5080' @@ -62,16 +64,21 @@ proxy: realtime-video-duration: 15m gb28181: sip: - id: 44050100002000000003 + id: 44050100002000000005 domain: 4405010000 password: 123456 port: 5063 ip: - - 10.10.10.20 +# - 10.10.10.20 + - 192.168.0.195 stream-mode: udp use-playback-to-download: false proxy-media-url: 'https://10.10.10.200:18181/media' -# - 192.168.1.241 + use-record-info-query-before-download: true + retry-record-info-query-before-download-interval: 3 + retry-record-info-query-before-download-times: 20 + retry-record-info-query-before-download-interval-unit: seconds + # - 192.168.1.241 device-api: offset: forward: 0s