From f2835262fd9c5cff8b05c14f5b66bdc4f0d0200d Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Wed, 20 Sep 2023 10:24:10 +0800 Subject: [PATCH] =?UTF-8?q?=E7=9B=AE=E5=BD=95=E6=9F=A5=E8=AF=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wvp/api/gb28181/Gb28181Controller.java | 20 +++++++++++-- .../wvp/service/catalog/CatalogService.java | 30 ++++++++++++++----- .../dto/request/MessageRequestProcessor.java | 14 ++++----- .../wvp/sip/request/SipRequestBuilder.java | 22 ++++++++++++++ 4 files changed, 69 insertions(+), 17 deletions(-) diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java index 1b7fcc6..11a6657 100644 --- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/gb28181/Gb28181Controller.java @@ -5,11 +5,18 @@ import cn.skcks.docking.gb28181.annotation.web.methods.GetJson; import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig; import cn.skcks.docking.gb28181.wvp.service.catalog.CatalogService; +import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogItemDTO; +import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import org.springdoc.core.models.GroupedOpenApi; import org.springframework.context.annotation.Bean; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.List; +import java.util.concurrent.CompletableFuture; @RequiredArgsConstructor @RestController @@ -21,9 +28,16 @@ public class Gb28181Controller { return SwaggerConfig.api("Gb28181Api", "/gb28181"); } + @SneakyThrows @GetJson("/catalog") - public JsonResponse catalog(@RequestParam("gbDeviceId") String id){ - catalogService.getCatalog(id); - return JsonResponse.success(null); + public DeferredResult>> catalog(@RequestParam("gbDeviceId") String id){ + DeferredResult>> result = new DeferredResult<>(); + CompletableFuture catalog = catalogService.getCatalog(id); + catalog.thenApplyAsync((dto)->{ + List deviceList = dto.getDeviceList().getDeviceList(); + result.setResult(JsonResponse.success(deviceList)); + return null; + }); + return result; } } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/catalog/CatalogService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/catalog/CatalogService.java index 4bdc355..2bec621 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/catalog/CatalogService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/catalog/CatalogService.java @@ -2,11 +2,13 @@ package cn.skcks.docking.gb28181.wvp.service.catalog; import cn.skcks.docking.gb28181.common.json.JsonException; import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; +import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogItemDTO; import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogRequestDTO; import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO; import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; @@ -18,7 +20,8 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; -import javax.sip.message.Request; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @@ -32,7 +35,8 @@ public class CatalogService { private final DockingService dockingService; @SneakyThrows - public void getCatalog(String deviceId){ + public CompletableFuture getCatalog(String deviceId){ + CompletableFuture result = new CompletableFuture<>(); WvpProxyDocking device = dockingService.getDeviceByDeviceCode(deviceId).orElse(null); if (device == null){ throw new JsonException("设备不存在"); @@ -41,25 +45,30 @@ public class CatalogService { catalogRequestDTO.setDeviceId(deviceId); String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000); catalogRequestDTO.setSn(sn); - String key = GenericSubscribe.Helper.getKey(Request.MESSAGE, deviceId); + String key = GenericSubscribe.Helper.getKey(CmdType.CATALOG, deviceId); sipSubscribe.getCatalogSubscribe().addPublisher(key); final ScheduledFuture[] schedule = new ScheduledFuture[1]; - Flow.Subscriber subscriber = catalog(key, device, schedule); + Flow.Subscriber subscriber = catalog(key, device, schedule, result); // 60秒超时计时器 schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS); - sipSender.sendRequest((provider, ip, port)-> SipRequestBuilder.createMessageRequest(device,ip,port,1L, XmlUtils.toXml(catalogRequestDTO), SipUtil.generateViaTag(), + // 添加订阅 + sipSubscribe.getCatalogSubscribe().addSubscribe(key, subscriber); + sipSender.sendRequest((provider, ip, port)-> SipRequestBuilder.createMessageRequest(device,ip,port,SipRequestBuilder.getCSeq(), XmlUtils.toXml(catalogRequestDTO), SipUtil.generateViaTag(), SipUtil.generateFromTag(), provider.getNewCallId())); + return result; } - private Flow.Subscriber catalog(String key, WvpProxyDocking device, ScheduledFuture[] schedule){ + private Flow.Subscriber catalog(String key, WvpProxyDocking device, ScheduledFuture[] schedule,CompletableFuture result){ + List deviceList = new ArrayList<>(); return new Flow.Subscriber<>() { Flow.Subscription subscription; + CatalogResponseDTO dto; final AtomicLong getNum = new AtomicLong(0); @Override public void onSubscribe(Flow.Subscription subscription) { - log.info("创建 订阅 {}", key); + log.info("开始订阅 {}", key); this.subscription = subscription; subscription.request(1); } @@ -67,9 +76,11 @@ public class CatalogService { @Override public void onNext(SIPRequest item) { CatalogResponseDTO responseDTO = XmlUtils.parse(item.getRawContent(), CatalogResponseDTO.class, GB28181Constant.CHARSET); + dto = responseDTO; Long sumNum = responseDTO.getSumNum(); log.info("{}",responseDTO); getNum.getAndAdd(responseDTO.getDeviceList().getDeviceList().size()); + deviceList.addAll(responseDTO.getDeviceList().getDeviceList()); if(getNum.get() < sumNum){ subscription.request(1); } else{ @@ -86,6 +97,11 @@ public class CatalogService { public void onComplete() { sipSubscribe.getCatalogSubscribe().delPublisher(key); schedule[0].cancel(true); + log.info("订阅结束 {}", key); + if(dto != null){ + dto.getDeviceList().setDeviceList(deviceList); + } + result.complete(dto); } }; } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/request/MessageRequestProcessor.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/request/MessageRequestProcessor.java index afa9893..5e66f6c 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/request/MessageRequestProcessor.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/message/catalog/dto/request/MessageRequestProcessor.java @@ -7,16 +7,15 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO; -import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; -import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice; -import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService; import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; +import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO; import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; +import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -37,9 +36,10 @@ public class MessageRequestProcessor implements MessageProcessor { private final SipSubscribe subscribe; private final DockingService dockingService; + @PostConstruct @Override public void init() { - + sipListener.addRequestProcessor(Method.MESSAGE,this); } @Override @@ -55,7 +55,7 @@ public class MessageRequestProcessor implements MessageProcessor { String senderIp = request.getLocalAddress().getHostAddress(); - if(dockingService.hasDeviceByDeviceCode(deviceId)){ + if(!dockingService.hasDeviceByDeviceCode(deviceId)){ log.info("未找到相关设备信息 => {}", deviceId); Response response = response(request,Response.NOT_FOUND,"设备未注册"); sender.send(senderIp,response); @@ -69,8 +69,8 @@ public class MessageRequestProcessor implements MessageProcessor { // 更新设备在线状态 } else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){ response = ok; - RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET); - String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn()); + CatalogResponseDTO dto = XmlUtils.parse(content, CatalogResponseDTO.class, GB28181Constant.CHARSET); + String key = GenericSubscribe.Helper.getKey(CmdType.CATALOG, dto.getDeviceId()); Optional.ofNullable(subscribe.getCatalogSubscribe().getPublisher(key)) .ifPresentOrElse(publisher-> publisher.submit(request), ()-> log.warn("对应订阅 {} 已结束, 异常数据 => {}",key, dto)); diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java index 190230a..f20a0c4 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/request/SipRequestBuilder.java @@ -1,5 +1,7 @@ package cn.skcks.docking.gb28181.wvp.sip.request; +import cn.skcks.docking.gb28181.common.redis.RedisUtil; +import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.message.MessageHelper; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; @@ -123,4 +125,24 @@ public class SipRequestBuilder implements ApplicationContextAware { request.setContent(content, contentTypeHeader); return request; } + + public static long getCSeq() { + String key = CacheUtil.getKey(CacheUtil.SIP_C_SEQ_PREFIX,sipConfig.getId()); + + long result = 1L; + if(RedisUtil.KeyOps.hasKey(key)){ + try { + result = RedisUtil.StringOps.incrBy(key,1L); + } finally { + if (result > Integer.MAX_VALUE) { + RedisUtil.StringOps.set(key, String.valueOf(1L)); + result = 1L; + } + } + } else { + RedisUtil.StringOps.set(key, String.valueOf(result)); + } + + return result; + } }