From e8f5bfd1b0d567ae4bdb2b00974f98a2ddaa2971 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Sun, 7 Jan 2024 16:12:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E7=90=86=20=E4=B8=8E=20=E6=B8=85?= =?UTF-8?q?=E7=90=86=E9=83=A8=E5=88=86=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/catalog/CatalogController.java | 9 +-- .../sip/message/subscribe/SipSubscribe.java | 2 +- .../service/catalog/CatalogService.java | 51 +----------- .../service/catalog/CatalogSubscriber.java | 76 +++++++++++++++++ .../gb28181/service/record/RecordService.java | 52 +----------- .../service/record/RecordSubscriber.java | 81 +++++++++++++++++++ .../gb28181/utils/FutureDeferredResult.java | 52 ++++++++++++ 7 files changed, 216 insertions(+), 107 deletions(-) create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogSubscriber.java create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordSubscriber.java create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/utils/FutureDeferredResult.java diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/catalog/CatalogController.java b/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/catalog/CatalogController.java index 31a257f..aeae2a7 100644 --- a/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/catalog/CatalogController.java +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/catalog/CatalogController.java @@ -5,6 +5,7 @@ import cn.skcks.docking.gb28181.annotation.web.methods.GetJson; import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.service.catalog.CatalogService; import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogItemDTO; +import cn.skcks.docking.gb28181.utils.FutureDeferredResult; import io.swagger.v3.oas.annotations.tags.Tag; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; @@ -23,12 +24,8 @@ public class CatalogController { @SneakyThrows @GetJson - public DeferredResult>> catalog(String gbDeviceId){ - DeferredResult>> result = new DeferredResult<>(); + public DeferredResult>> catalog(String gbDeviceId){ CompletableFuture> catalog = catalogService.catalog(gbDeviceId); - catalog.whenComplete((data,throwable)->{ - result.setResult(JsonResponse.success(data)); - }); - return result; + return FutureDeferredResult.toDeferredResultWithJson(catalog); } } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java index 21f90e6..cad6a73 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java @@ -23,7 +23,7 @@ import java.util.concurrent.ScheduledExecutorService; public class SipSubscribe { @Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME) private final Executor executor; - private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); private GenericTimeoutSubscribe sipResponseSubscribe; private GenericTimeoutSubscribe sipRequestSubscribe; diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java index c818a5d..ab803b5 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java @@ -67,56 +67,7 @@ public class CatalogService { Request request = requestBuilder.createMessageRequest(callId, cSeq, MANSCDPUtils.toByteXml(catalogQueryDTO, device.getCharset())); String key = GenericSubscribe.Helper.getKey(CmdType.CATALOG, gbDeviceId, sn); subscribe.getSipRequestSubscribe().addPublisher(key, 60, TimeUnit.SECONDS); - subscribe.getSipRequestSubscribe().addSubscribe(key, new Flow.Subscriber<>() { - private Flow.Subscription subscription; - private final AtomicLong num = new AtomicLong(0); - private long sumNum = 0; - - private final List data = new ArrayList<>(); - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - subscription.request(1); - } - - @Override - public void onNext(SIPRequest item) { - CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(item.getRawContent(), CatalogResponseDTO.class); - sumNum = Math.max(sumNum,catalogResponseDTO.getSumNum()); - long curNum = num.addAndGet(catalogResponseDTO.getDeviceList().getNum()); - log.debug("当前获取数量: {}/{}", curNum, sumNum); - data.addAll(catalogResponseDTO.getDeviceList().getDeviceList()); - if(curNum >= sumNum){ - log.info("获取完成 {}", key); - subscribe.getSipRequestSubscribe().complete(key); - } else { - subscription.request(1); - } - } - - @Override - public void onError(Throwable throwable) { - throwable.printStackTrace(); - onComplete(); - } - - @Override - public void onComplete() { - log.info("{} 返回结果 {}", key, result.complete(data)); - - data.stream().map(item->{ - DockingDeviceChannel model = new DockingDeviceChannel(); - model.setGbDeviceId(device.getDeviceId()); - model.setGbDeviceChannelId(item.getDeviceId()); - model.setName(item.getName()); - model.setAddress(item.getAddress()); - return model; - }).forEach(deviceChannelService::add); - - subscribe.getSipRequestSubscribe().delPublisher(key); - } - }); + subscribe.getSipRequestSubscribe().addSubscribe(key, new CatalogSubscriber(subscribe, key, result, device.getDeviceId(), deviceChannelService::add)); provider.sendRequest(request); return result; } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogSubscriber.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogSubscriber.java new file mode 100644 index 0000000..db77293 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogSubscriber.java @@ -0,0 +1,76 @@ +package cn.skcks.docking.gb28181.service.catalog; + +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; +import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDeviceChannel; +import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogItemDTO; +import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO; +import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils; +import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; + +@Slf4j +@RequiredArgsConstructor +public class CatalogSubscriber implements Flow.Subscriber{ + private final SipSubscribe subscribe; + private final String key; + private final CompletableFuture> result; + private final String deviceId; + private final Consumer addDeviceChannelFunc; + + private Flow.Subscription subscription; + private final AtomicLong num = new AtomicLong(0); + private long sumNum = 0; + + private final List data = new ArrayList<>(); + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(SIPRequest item) { + CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(item.getRawContent(), CatalogResponseDTO.class); + sumNum = Math.max(sumNum,catalogResponseDTO.getSumNum()); + long curNum = num.addAndGet(catalogResponseDTO.getDeviceList().getNum()); + log.debug("当前获取数量: {}/{}", curNum, sumNum); + data.addAll(catalogResponseDTO.getDeviceList().getDeviceList()); + if(curNum >= sumNum){ + log.info("获取完成 {}", key); + subscribe.getSipRequestSubscribe().complete(key); + } else { + subscription.request(1); + } + } + + @Override + public void onError(Throwable throwable) { + throwable.printStackTrace(); + onComplete(); + } + + @Override + public void onComplete() { + log.info("{} 返回结果 {}", key, result.complete(data)); + + data.stream().map(item->{ + DockingDeviceChannel model = new DockingDeviceChannel(); + model.setGbDeviceId(deviceId); + model.setGbDeviceChannelId(item.getDeviceId()); + model.setName(item.getName()); + model.setAddress(item.getAddress()); + return model; + }).forEach(addDeviceChannelFunc); + + subscribe.getSipRequestSubscribe().delPublisher(key); + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java index e46f9c6..8befae2 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java @@ -93,58 +93,10 @@ public class RecordService { Request request = requestBuilder.createMessageRequest(callId,SipRequestBuilder.getCSeq(), MANSCDPUtils.toByteXml(dto, device.getCharset())); String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, channelId, sn); subscribe.getSipRequestSubscribe().addPublisher(key); - Flow.Subscriber subscriber = new Flow.Subscriber<>() { - final List list = new ArrayList<>(); - final AtomicLong atomicSum = new AtomicLong(0); - final AtomicLong atomicNum = new AtomicLong(0); - Flow.Subscription subscription; - - @Override - public void onSubscribe(Flow.Subscription subscription) { - this.subscription = subscription; - log.debug("建立订阅 => {}", key); - subscription.request(1); - } - - @Override - public void onNext(SIPRequest item) { - RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class); - atomicSum.set(Math.max(data.getSumNum(), atomicNum.get())); - atomicNum.addAndGet(data.getRecordList().getNum()); - list.addAll(data.getRecordList().getRecordList()); - long num = atomicNum.get(); - long sum = atomicSum.get(); - if(num > sum){ - log.warn("检测到 设备 => {}, 未按规范实现, 订阅 => {}, 期望总数为 => {}, 已接收数量 => {}", deviceId, key, atomicSum.get(), atomicNum.get()); - } else { - log.info("获取订阅 => {}, {}/{}", key, atomicNum.get(), atomicSum.get()); - } - - if (num >= sum) { - // 针对某些不按规范的设备 - // 如果已获取数量 >= 约定的总数 - // 就执行定时任务, 若 500ms 内未收到新的数据视为已结束 - subscribe.getSipRequestSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS); - } - subscription.request(1); - } - - @Override - public void onError(Throwable throwable) { - - } - - @Override - public void onComplete() { - result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list)))); - log.debug("订阅结束 => {}", key); - subscribe.getSipRequestSubscribe().delPublisher(key); - } - }; - subscribe.getSipRequestSubscribe().addSubscribe(key, subscriber); + subscribe.getSipRequestSubscribe().addSubscribe(key, new RecordSubscriber(subscribe, key, result, deviceId)); result.onTimeout(() -> { result.setResult(JsonResponse.build(ResponseStatus.PARTIAL_CONTENT, - RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(Collections.emptyList())), + RecordConvertor.INSTANCE.dto2Vo(Collections.emptyList()), "查询超时, 结果可能不完整")); subscribe.getSipRequestSubscribe().delPublisher(key); }); diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordSubscriber.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordSubscriber.java new file mode 100644 index 0000000..7c82d5b --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordSubscriber.java @@ -0,0 +1,81 @@ +package cn.skcks.docking.gb28181.service.record; + +import cn.hutool.core.date.DateUtil; +import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; +import cn.skcks.docking.gb28181.service.record.convertor.RecordConvertor; +import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO; +import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoItemDTO; +import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO; +import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils; +import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Flow; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +@Slf4j +@RequiredArgsConstructor +public class RecordSubscriber implements Flow.Subscriber{ + private final SipSubscribe subscribe; + private final String key; + private final DeferredResult>> result; + private final String deviceId; + + private final List list = new ArrayList<>(); + private final AtomicLong atomicSum = new AtomicLong(0); + private final AtomicLong atomicNum = new AtomicLong(0); + private Flow.Subscription subscription; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + log.debug("建立订阅 => {}", key); + subscription.request(1); + } + + @Override + public void onNext(SIPRequest item) { + RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class); + atomicSum.set(Math.max(data.getSumNum(), atomicNum.get())); + atomicNum.addAndGet(data.getRecordList().getNum()); + list.addAll(data.getRecordList().getRecordList()); + long num = atomicNum.get(); + long sum = atomicSum.get(); + if(num > sum){ + log.warn("检测到 设备 => {}, 未按规范实现, 订阅 => {}, 期望总数为 => {}, 已接收数量 => {}", deviceId, key, atomicSum.get(), atomicNum.get()); + } else { + log.info("获取订阅 => {}, {}/{}", key, atomicNum.get(), atomicSum.get()); + } + + if (num >= sum) { + // 针对某些不按规范的设备 + // 如果已获取数量 >= 约定的总数 + // 就执行定时任务, 若 500ms 内未收到新的数据视为已结束 + subscribe.getSipRequestSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS); + } + subscription.request(1); + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list)))); + log.debug("订阅结束 => {}", key); + subscribe.getSipRequestSubscribe().delPublisher(key); + } + + private List sortedRecordList(List list){ + return list.stream().sorted((a,b)-> DateUtil.compare(a.getStartTime(),b.getStartTime())).collect(Collectors.toList()); + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/utils/FutureDeferredResult.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/utils/FutureDeferredResult.java new file mode 100644 index 0000000..050fe87 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/utils/FutureDeferredResult.java @@ -0,0 +1,52 @@ +package cn.skcks.docking.gb28181.utils; + +import cn.skcks.docking.gb28181.common.json.JsonResponse; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class FutureDeferredResult { + public static DeferredResult> toDeferredResultWithJson(CompletableFuture future){ + DeferredResult> result = new DeferredResult<>(); + future.whenComplete((data,throwable)->{ + result.setResult(JsonResponse.success(data)); + }); + future.exceptionally(e -> { + result.setResult(JsonResponse.error(e.getMessage())); + return null; + }); + return result; + } + + public static DeferredResult> toDeferredResultWithJsonAndTimeout(CompletableFuture future, long time, TimeUnit timeUnit){ + DeferredResult> result = new DeferredResult<>(timeUnit.toMillis(time)); + result.onTimeout(()-> result.setResult(JsonResponse.error("请求超时"))); + + future.whenComplete((data,throwable)->{ + result.setResult(JsonResponse.success(data)); + }); + future.exceptionally(e -> { + result.setResult(JsonResponse.error(e.getMessage())); + return null; + }); + return result; + } + + public static DeferredResult toDeferredResult(CompletableFuture future){ + DeferredResult result = new DeferredResult<>(); + future.whenComplete((data,throwable)->{ + result.setResult(data); + }); + return result; + } + + public static DeferredResult toDeferredResultWithTimeout(CompletableFuture future, T timeoutResult,long time, TimeUnit timeUnit){ + DeferredResult result = new DeferredResult<>(timeUnit.toMillis(time), timeoutResult); + future.completeOnTimeout(timeoutResult,time,timeUnit); + future.whenComplete((data, throwable) -> { + result.setResult(data); + }); + return result; + } +}