From 3db949ed269e71b5c1160d575f7e3e0223bb7b6e Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Mon, 2 Oct 2023 23:29:09 +0800 Subject: [PATCH] =?UTF-8?q?=E8=AE=BE=E5=A4=87=E7=9B=AE=E5=BD=95=E6=9F=A5?= =?UTF-8?q?=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/gb28181/GB28181Controller.java | 16 +++ .../gb28181/catalog/CatalogController.java | 29 +++++ .../request/MessageRequestProcessor.java | 23 ++-- .../request/RegisterRequestProcessor.java | 4 +- .../subscribe/SipRequestSubscribe.java | 65 ++++++++++ .../subscribe/SipResponseSubscribe.java | 65 ++++++++++ .../sip/message/subscribe/SipSubscribe.java | 13 ++ .../core/sip/service/SipServiceImpl.java | 58 +++++++++ .../service/catalog/CatalogService.java | 112 ++++++++++++++++++ .../docking/gb28181/sip/utils/SipUtil.java | 4 + 10 files changed, 379 insertions(+), 10 deletions(-) create mode 100644 api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/GB28181Controller.java create mode 100644 api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/catalog/CatalogController.java create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipRequestSubscribe.java create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipResponseSubscribe.java create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/GB28181Controller.java b/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/GB28181Controller.java new file mode 100644 index 0000000..c25cbb4 --- /dev/null +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/GB28181Controller.java @@ -0,0 +1,16 @@ +package cn.skcks.docking.gb28181.api.gb28181; + +import cn.skcks.docking.gb28181.annotation.web.JsonMapping; +import cn.skcks.docking.gb28181.config.SwaggerConfig; +import org.springdoc.core.models.GroupedOpenApi; +import org.springframework.context.annotation.Bean; +import org.springframework.web.bind.annotation.RestController; + +@JsonMapping("/api/gb28181") +@RestController +public class GB28181Controller { + @Bean + public GroupedOpenApi gb28181Api() { + return SwaggerConfig.api("GB28181", "/api/gb28181"); + } +} diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/catalog/CatalogController.java b/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/catalog/CatalogController.java new file mode 100644 index 0000000..ee5264d --- /dev/null +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/gb28181/catalog/CatalogController.java @@ -0,0 +1,29 @@ +package cn.skcks.docking.gb28181.api.gb28181.catalog; + +import cn.skcks.docking.gb28181.annotation.web.JsonMapping; +import cn.skcks.docking.gb28181.annotation.web.methods.GetJson; +import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.service.catalog.CatalogService; +import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogItemDTO; +import io.swagger.v3.oas.annotations.tags.Tag; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +@Tag(name="获取设备目录信息") +@RestController +@JsonMapping("/api/gb28181/catalog") +@RequiredArgsConstructor +public class CatalogController { + private final CatalogService catalogService; + + @SneakyThrows + @GetJson + public JsonResponse> catalog(String gbDeviceId){ + CompletableFuture> catalog = catalogService.catalog(gbDeviceId); + return JsonResponse.success(catalog.get()); + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java index ee3c5bd..2bc347b 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java @@ -2,11 +2,10 @@ package cn.skcks.docking.gb28181.core.sip.message.processor.message.request; import cn.skcks.docking.gb28181.common.json.ResponseStatus; import cn.skcks.docking.gb28181.common.xml.XmlUtils; -import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; +import cn.skcks.docking.gb28181.constant.CmdType; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; -import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO; import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; @@ -14,6 +13,9 @@ import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; +import cn.skcks.docking.gb28181.sip.manscdp.MessageDTO; +import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO; +import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import jakarta.annotation.PostConstruct; @@ -24,6 +26,7 @@ import org.springframework.stereotype.Component; import javax.sip.RequestEvent; import javax.sip.header.CallIdHeader; +import javax.sip.message.Request; import javax.sip.message.Response; import java.util.EventObject; import java.util.Optional; @@ -48,10 +51,10 @@ public class MessageRequestProcessor implements MessageProcessor { RequestEvent requestEvent = (RequestEvent) eventObject; SIPRequest request = (SIPRequest)requestEvent.getRequest(); String deviceId = SipUtil.getUserIdFromFromHeader(request); - CallIdHeader callIdHeader = request.getCallIdHeader(); + String callId = request.getCallIdHeader().getCallId(); byte[] content = request.getRawContent(); - MessageDTO messageDto = XmlUtils.parse(content, MessageDTO.class, GB28181Constant.CHARSET); + MessageDTO messageDto = MANSCDPUtils.parse(content, MessageDTO.class); log.debug("接收到的消息 => {}", messageDto); DockingDevice device = deviceService.getDevice(deviceId); @@ -70,13 +73,19 @@ public class MessageRequestProcessor implements MessageProcessor { response = ok; // 更新设备在线状态 deviceService.online(device, response); - } else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.RECORD_INFO)){ + } else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.RECORD_INFO)) { response = ok; RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET); String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn()); Optional.ofNullable(subscribe.getRecordInfoSubscribe().getPublisher(key)) - .ifPresentOrElse(publisher-> publisher.submit(dto), - ()-> log.warn("对应订阅 {} 已结束, 异常数据 => {}",key, dto)); + .ifPresentOrElse(publisher -> publisher.submit(dto), + () -> log.warn("对应订阅 {} 已结束, 异常数据 => {}", key, dto)); + }else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){ + CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(content, CatalogResponseDTO.class); + Optional.ofNullable(subscribe.getSipRequestSubscribe().getPublisher(catalogResponseDTO.getSn())).ifPresent(publisher->{ + publisher.submit(request); + }); + response = ok; } else { response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage()); } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/register/request/RegisterRequestProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/register/request/RegisterRequestProcessor.java index fd89d12..a1bab8c 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/register/request/RegisterRequestProcessor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/register/request/RegisterRequestProcessor.java @@ -50,7 +50,7 @@ public class RegisterRequestProcessor implements MessageProcessor { @PostConstruct @Override public void init(){ - sipListener.addRequestProcessor(Method.REGISTER,this); + sipListener.addRequestProcessor(Request.REGISTER,this); } @SneakyThrows @@ -92,8 +92,6 @@ public class RegisterRequestProcessor implements MessageProcessor { return; } - - log.debug("认证信息 => {}", authorization); boolean authPass = StringUtils.isBlank(password) || DigestAuthenticationHelper.doAuthenticatePlainTextPassword(request,password); diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipRequestSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipRequestSubscribe.java new file mode 100644 index 0000000..77c1d68 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipRequestSubscribe.java @@ -0,0 +1,65 @@ +package cn.skcks.docking.gb28181.core.sip.message.subscribe; + +import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.io.Closeable; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.*; + +@Slf4j +@RequiredArgsConstructor +public class SipRequestSubscribe implements GenericTimeoutSubscribe, Closeable { + private final Executor executor; + private final ScheduledExecutorService scheduledExecutorService; + private final ConcurrentMap> scheduledFutureManager = new ConcurrentHashMap<>(0); + private static final Map> publishers = new ConcurrentHashMap<>(); + + public void close() { + Helper.close(publishers); + } + + public void addPublisher(String key) { + Helper.addPublisher(executor, publishers, key); + } + + public SubmissionPublisher getPublisher(String key) { + return Helper.getPublisher(publishers, key); + } + + public void addSubscribe(String key, Flow.Subscriber subscribe) { + Helper.addSubscribe(publishers, key, subscribe); + } + + @Override + public void delPublisher(String key) { + ScheduledFuture schedule = scheduledFutureManager.remove(key); + Optional.ofNullable(schedule).ifPresent(scheduledFuture->scheduledFuture.cancel(true)); + Helper.delPublisher(publishers, key); + } + + @Override + public void addPublisher(String key, long time, TimeUnit timeUnit) { + addPublisher(key); + ScheduledFuture schedule = scheduledExecutorService.schedule(() -> { + scheduledFutureManager.remove(key); + delPublisher(key); + log.debug("清理超时 请求 订阅器 {}", key); + }, time, timeUnit); + scheduledFutureManager.put(key,schedule); + } + + @Override + public void refreshPublisher(String key, long time, TimeUnit timeUnit) { + ScheduledFuture schedule = scheduledFutureManager.remove(key); + Optional.ofNullable(schedule).ifPresent(scheduledFuture->scheduledFuture.cancel(true)); + schedule = scheduledExecutorService.schedule(() -> { + scheduledFutureManager.remove(key); + delPublisher(key); + log.debug("清理超时 请求 订阅器 {}", key); + }, time, timeUnit); + scheduledFutureManager.put(key,schedule); + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipResponseSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipResponseSubscribe.java new file mode 100644 index 0000000..22d19e2 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipResponseSubscribe.java @@ -0,0 +1,65 @@ +package cn.skcks.docking.gb28181.core.sip.message.subscribe; + +import gov.nist.javax.sip.message.SIPResponse; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.io.Closeable; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.*; + +@Slf4j +@RequiredArgsConstructor +public class SipResponseSubscribe implements GenericTimeoutSubscribe, Closeable { + private final Executor executor; + private final ScheduledExecutorService scheduledExecutorService; + private final ConcurrentMap> scheduledFutureManager = new ConcurrentHashMap<>(0); + private static final Map> publishers = new ConcurrentHashMap<>(); + + public void close() { + Helper.close(publishers); + } + + public void addPublisher(String key) { + Helper.addPublisher(executor, publishers, key); + } + + public SubmissionPublisher getPublisher(String key) { + return Helper.getPublisher(publishers, key); + } + + public void addSubscribe(String key, Flow.Subscriber subscribe) { + Helper.addSubscribe(publishers, key, subscribe); + } + + @Override + public void delPublisher(String key) { + ScheduledFuture schedule = scheduledFutureManager.remove(key); + Optional.ofNullable(schedule).ifPresent(scheduledFuture->scheduledFuture.cancel(true)); + Helper.delPublisher(publishers, key); + } + + @Override + public void addPublisher(String key, long time, TimeUnit timeUnit) { + addPublisher(key); + ScheduledFuture schedule = scheduledExecutorService.schedule(() -> { + scheduledFutureManager.remove(key); + delPublisher(key); + log.debug("清理超时 响应 订阅器 {}", key); + }, time, timeUnit); + scheduledFutureManager.put(key,schedule); + } + + @Override + public void refreshPublisher(String key, long time, TimeUnit timeUnit) { + ScheduledFuture schedule = scheduledFutureManager.remove(key); + Optional.ofNullable(schedule).ifPresent(scheduledFuture->scheduledFuture.cancel(true)); + schedule = scheduledExecutorService.schedule(() -> { + scheduledFutureManager.remove(key); + delPublisher(key); + log.debug("清理超时 响应 订阅器 {}", key); + }, time, timeUnit); + scheduledFutureManager.put(key,schedule); + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java index 9bc0e10..6c1559e 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java @@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.core.sip.message.subscribe; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; +import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; @@ -12,6 +13,8 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; @Slf4j @Data @@ -20,18 +23,28 @@ import java.util.concurrent.Executor; public class SipSubscribe { @Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME) private final Executor executor; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private GenericSubscribe recordInfoSubscribe; private GenericSubscribe inviteSubscribe; + private GenericTimeoutSubscribe sipResponseSubscribe; + private GenericTimeoutSubscribe sipRequestSubscribe; @PostConstruct private void init() { + // TODO 准备废弃 recordInfoSubscribe = new RecordInfoSubscribe(executor); inviteSubscribe = new InviteSubscribe(executor); + // 通用订阅器 + sipResponseSubscribe = new SipResponseSubscribe(executor, scheduledExecutorService); + sipRequestSubscribe = new SipRequestSubscribe(executor, scheduledExecutorService); } @PreDestroy private void destroy() { inviteSubscribe.close(); recordInfoSubscribe.close(); + + sipResponseSubscribe.close(); + sipRequestSubscribe.close(); } } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java index e1236c0..8cc29ea 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/service/SipServiceImpl.java @@ -13,8 +13,11 @@ import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; import javax.sip.*; +import javax.sip.message.Request; +import javax.sip.message.Response; import java.util.ArrayList; import java.util.List; +import java.util.Objects; @Slf4j @RequiredArgsConstructor @@ -67,6 +70,61 @@ public class SipServiceImpl implements SipService { }).findFirst().orElse(null); } + public List getProviders(String transport) { + return sipConfig.getIp().stream().map(item -> getProvider(transport, item)) + .filter(Objects::nonNull) + .toList(); + } + + public void sendResponse(SipProvider sipProvider, SendResponse response) { + log.info("{}", sipProvider); + ListeningPoint[] listeningPoints = sipProvider.getListeningPoints(); + if (listeningPoints == null || listeningPoints.length == 0) { + log.error("发送响应失败, 未找到有效的监听地址"); + return; + } + ListeningPoint listeningPoint = listeningPoints[0]; + String ip = listeningPoint.getIPAddress(); + int port = listeningPoint.getPort(); + try { + sipProvider.sendResponse(response.build(sipProvider, ip, port)); + } catch (SipException e) { + log.error("向{} {}:{} 发送响应失败, 异常: {}", ip, listeningPoint.getPort(), listeningPoint.getTransport(), e.getMessage()); + } + } + + public void sendResponse(String senderIp,String transport, SendResponse response) { + SipProvider sipProvider = getProvider(transport, senderIp); + sendResponse(sipProvider, response); + } + + public void sendRequest(String transport, SendRequest request) { + getProviders(transport).parallelStream().forEach(sipProvider -> { + log.info("{}", sipProvider); + ListeningPoint[] listeningPoints = sipProvider.getListeningPoints(); + if (listeningPoints == null || listeningPoints.length == 0) { + log.error("发送请求失败, 未找到有效的监听地址"); + return; + } + ListeningPoint listeningPoint = listeningPoints[0]; + String ip = listeningPoint.getIPAddress(); + int port = listeningPoint.getPort(); + try { + sipProvider.sendRequest(request.build(sipProvider, ip, port)); + } catch (SipException e) { + log.error("向{} {}:{} 发送请求失败, 异常: {}", ip, listeningPoint.getPort(), listeningPoint.getTransport(), e.getMessage()); + } + }); + } + + public interface SendRequest { + Request build(SipProvider provider, String ip, int port); + } + + public interface SendResponse { + Response build(SipProvider provider, String ip, int port); + } + public void listen(String ip, int port){ try{ sipStack = (SipStackImpl)sipFactory.createSipStack(DefaultProperties.getProperties("GB28181_SIP")); diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java new file mode 100644 index 0000000..9875a7b --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/catalog/CatalogService.java @@ -0,0 +1,112 @@ +package cn.skcks.docking.gb28181.service.catalog; + +import cn.skcks.docking.gb28181.config.sip.SipConfig; +import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; +import cn.skcks.docking.gb28181.core.sip.service.SipService; +import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; +import cn.skcks.docking.gb28181.service.docking.device.cache.DockingDeviceCacheService; +import cn.skcks.docking.gb28181.sip.manscdp.catalog.query.CatalogQueryDTO; +import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogItemDTO; +import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO; +import cn.skcks.docking.gb28181.sip.method.message.request.MessageRequestBuilder; +import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils; +import cn.skcks.docking.gb28181.sip.utils.SipUtil; +import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import javax.sip.SipProvider; +import javax.sip.message.Request; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +@Slf4j +@Service +@RequiredArgsConstructor +public class CatalogService { + private final SipService sipService; + private final DockingDeviceCacheService deviceCacheService; + private final SipConfig sipConfig; + private final SipSubscribe subscribe; + + @SneakyThrows + public CompletableFuture> catalog(String gbDeviceId){ + CompletableFuture> result = new CompletableFuture<>(); + result.completeOnTimeout(Collections.emptyList(), 60, TimeUnit.SECONDS); + DockingDevice device = deviceCacheService.getDevice(gbDeviceId); + SipProvider provider = sipService.getProvider(device.getTransport(), device.getLocalIp()); + MessageRequestBuilder requestBuilder = MessageRequestBuilder.builder() + .localIp(device.getLocalIp()) + .localId(sipConfig.getId()) + .localPort(sipConfig.getPort()) + .transport(device.getTransport()) + .targetId(device.getDeviceId()) + .targetIp(device.getIp()) + .targetPort(device.getPort()) + .build(); + String callId = provider.getNewCallId().getCallId(); + long cSeq = SipRequestBuilder.getCSeq(); + String sn = SipUtil.generateSn(); + CatalogQueryDTO catalogQueryDTO = CatalogQueryDTO.builder() + .deviceId(gbDeviceId) + .sn(sn) + .build(); + Request request = requestBuilder.createMessageRequest(callId, cSeq, MANSCDPUtils.toByteXml(catalogQueryDTO)); + subscribe.getSipRequestSubscribe().addPublisher(sn, 60, TimeUnit.SECONDS); + subscribe.getSipRequestSubscribe().addSubscribe(sn, new Flow.Subscriber<>() { + private Flow.Subscription subscription; + private final AtomicLong num = new AtomicLong(0); + private long sumNum = 0; + + private final List data = new ArrayList<>(); + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(SIPRequest item) { + CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(item.getRawContent(), CatalogResponseDTO.class); + sumNum = Math.max(sumNum,catalogResponseDTO.getSumNum()); + long curNum = num.addAndGet(catalogResponseDTO.getDeviceList().getNum()); + log.debug("当前获取数量: {}/{}", curNum, sumNum); + data.addAll(catalogResponseDTO.getDeviceList().getDeviceList()); + if(curNum >= sumNum){ + log.info("获取完成"); + onComplete(); + } else { + subscription.request(1); + } + } + + @Override + public void onError(Throwable throwable) { + if(throwable == null){ + return; + } + throwable.printStackTrace(); + onComplete(); + } + + @Override + public void onComplete() { + log.info("返回结果 {}",result.complete(data)); + subscribe.getSipRequestSubscribe().delPublisher(callId); + } + }); + provider.sendRequest(request); + return result; + } +} + + diff --git a/gb28181-sip/src/main/java/cn/skcks/docking/gb28181/sip/utils/SipUtil.java b/gb28181-sip/src/main/java/cn/skcks/docking/gb28181/sip/utils/SipUtil.java index 469c8a8..857126a 100644 --- a/gb28181-sip/src/main/java/cn/skcks/docking/gb28181/sip/utils/SipUtil.java +++ b/gb28181-sip/src/main/java/cn/skcks/docking/gb28181/sip/utils/SipUtil.java @@ -47,6 +47,10 @@ public class SipUtil { return getIdFromFromHeader(fromHeader); } + public static String generateSn(){ + return String.valueOf((int) (Math.random() * 9 + 1) * 100000); + } + /** * 从subject读取channelId * */