diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java index 2bc347b..e4590f0 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java @@ -82,7 +82,8 @@ public class MessageRequestProcessor implements MessageProcessor { () -> log.warn("对应订阅 {} 已结束, 异常数据 => {}", key, dto)); }else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){ CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(content, CatalogResponseDTO.class); - Optional.ofNullable(subscribe.getSipRequestSubscribe().getPublisher(catalogResponseDTO.getSn())).ifPresent(publisher->{ + String key = GenericSubscribe.Helper.getKey(catalogResponseDTO.getDeviceId(), catalogResponseDTO.getSn()); + Optional.ofNullable(subscribe.getSipRequestSubscribe().getPublisher(key)).ifPresent(publisher->{ publisher.submit(request); }); response = ok; diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java index 62f387a..8269a0a 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java @@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.service.catalog; import cn.skcks.docking.gb28181.config.sip.SipConfig; import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.service.SipService; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; @@ -60,8 +61,9 @@ public class CatalogService { .sn(sn) .build(); Request request = requestBuilder.createMessageRequest(callId, cSeq, MANSCDPUtils.toByteXml(catalogQueryDTO)); - subscribe.getSipRequestSubscribe().addPublisher(sn, 60, TimeUnit.SECONDS); - subscribe.getSipRequestSubscribe().addSubscribe(sn, new Flow.Subscriber<>() { + String key = GenericSubscribe.Helper.getKey(gbDeviceId,sn); + subscribe.getSipRequestSubscribe().addPublisher(key, 60, TimeUnit.SECONDS); + subscribe.getSipRequestSubscribe().addSubscribe(key, new Flow.Subscriber<>() { private Flow.Subscription subscription; private final AtomicLong num = new AtomicLong(0); private long sumNum = 0; @@ -82,8 +84,8 @@ public class CatalogService { log.debug("当前获取数量: {}/{}", curNum, sumNum); data.addAll(catalogResponseDTO.getDeviceList().getDeviceList()); if(curNum >= sumNum){ - log.info("获取完成"); - subscribe.getSipRequestSubscribe().delPublisher(sn); + log.info("获取完成 {}", key); + subscribe.getSipRequestSubscribe().delPublisher(key); } else { subscription.request(1); } @@ -97,7 +99,7 @@ public class CatalogService { @Override public void onComplete() { - log.info("返回结果 {} {}", result.complete(data),data); + log.info("{} 返回结果 {}", key, result.complete(data)); } }); provider.sendRequest(request);