目录查询

This commit is contained in:
shikong 2023-09-20 10:24:10 +08:00
parent 65b492f8e7
commit f2835262fd
4 changed files with 69 additions and 17 deletions

View File

@ -5,11 +5,18 @@ import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig; import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig;
import cn.skcks.docking.gb28181.wvp.service.catalog.CatalogService; import cn.skcks.docking.gb28181.wvp.service.catalog.CatalogService;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogItemDTO;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.springdoc.core.models.GroupedOpenApi; import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@RequiredArgsConstructor @RequiredArgsConstructor
@RestController @RestController
@ -21,9 +28,16 @@ public class Gb28181Controller {
return SwaggerConfig.api("Gb28181Api", "/gb28181"); return SwaggerConfig.api("Gb28181Api", "/gb28181");
} }
@SneakyThrows
@GetJson("/catalog") @GetJson("/catalog")
public JsonResponse<Void> catalog(@RequestParam("gbDeviceId") String id){ public DeferredResult<JsonResponse<List<CatalogItemDTO>>> catalog(@RequestParam("gbDeviceId") String id){
catalogService.getCatalog(id); DeferredResult<JsonResponse<List<CatalogItemDTO>>> result = new DeferredResult<>();
return JsonResponse.success(null); CompletableFuture<CatalogResponseDTO> catalog = catalogService.getCatalog(id);
catalog.thenApplyAsync((dto)->{
List<CatalogItemDTO> deviceList = dto.getDeviceList().getDeviceList();
result.setResult(JsonResponse.success(deviceList));
return null;
});
return result;
} }
} }

View File

@ -2,11 +2,13 @@ package cn.skcks.docking.gb28181.wvp.service.catalog;
import cn.skcks.docking.gb28181.common.json.JsonException; import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.common.xml.XmlUtils; import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogItemDTO;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogRequestDTO; import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogRequestDTO;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO; import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
@ -18,7 +20,8 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.sip.message.Request; import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -32,7 +35,8 @@ public class CatalogService {
private final DockingService dockingService; private final DockingService dockingService;
@SneakyThrows @SneakyThrows
public void getCatalog(String deviceId){ public CompletableFuture<CatalogResponseDTO> getCatalog(String deviceId){
CompletableFuture<CatalogResponseDTO> result = new CompletableFuture<>();
WvpProxyDocking device = dockingService.getDeviceByDeviceCode(deviceId).orElse(null); WvpProxyDocking device = dockingService.getDeviceByDeviceCode(deviceId).orElse(null);
if (device == null){ if (device == null){
throw new JsonException("设备不存在"); throw new JsonException("设备不存在");
@ -41,25 +45,30 @@ public class CatalogService {
catalogRequestDTO.setDeviceId(deviceId); catalogRequestDTO.setDeviceId(deviceId);
String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000); String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000);
catalogRequestDTO.setSn(sn); catalogRequestDTO.setSn(sn);
String key = GenericSubscribe.Helper.getKey(Request.MESSAGE, deviceId); String key = GenericSubscribe.Helper.getKey(CmdType.CATALOG, deviceId);
sipSubscribe.getCatalogSubscribe().addPublisher(key); sipSubscribe.getCatalogSubscribe().addPublisher(key);
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1]; final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPRequest> subscriber = catalog(key, device, schedule); Flow.Subscriber<SIPRequest> subscriber = catalog(key, device, schedule, result);
// 60秒超时计时器 // 60秒超时计时器
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS); schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS);
sipSender.sendRequest((provider, ip, port)-> SipRequestBuilder.createMessageRequest(device,ip,port,1L, XmlUtils.toXml(catalogRequestDTO), SipUtil.generateViaTag(), // 添加订阅
sipSubscribe.getCatalogSubscribe().addSubscribe(key, subscriber);
sipSender.sendRequest((provider, ip, port)-> SipRequestBuilder.createMessageRequest(device,ip,port,SipRequestBuilder.getCSeq(), XmlUtils.toXml(catalogRequestDTO), SipUtil.generateViaTag(),
SipUtil.generateFromTag(), provider.getNewCallId())); SipUtil.generateFromTag(), provider.getNewCallId()));
return result;
} }
private Flow.Subscriber<SIPRequest> catalog(String key, WvpProxyDocking device, ScheduledFuture<?>[] schedule){ private Flow.Subscriber<SIPRequest> catalog(String key, WvpProxyDocking device, ScheduledFuture<?>[] schedule,CompletableFuture<CatalogResponseDTO> result){
List<CatalogItemDTO> deviceList = new ArrayList<>();
return new Flow.Subscriber<>() { return new Flow.Subscriber<>() {
Flow.Subscription subscription; Flow.Subscription subscription;
CatalogResponseDTO dto;
final AtomicLong getNum = new AtomicLong(0); final AtomicLong getNum = new AtomicLong(0);
@Override @Override
public void onSubscribe(Flow.Subscription subscription) { public void onSubscribe(Flow.Subscription subscription) {
log.info("创建 订阅 {}", key); log.info("开始订阅 {}", key);
this.subscription = subscription; this.subscription = subscription;
subscription.request(1); subscription.request(1);
} }
@ -67,9 +76,11 @@ public class CatalogService {
@Override @Override
public void onNext(SIPRequest item) { public void onNext(SIPRequest item) {
CatalogResponseDTO responseDTO = XmlUtils.parse(item.getRawContent(), CatalogResponseDTO.class, GB28181Constant.CHARSET); CatalogResponseDTO responseDTO = XmlUtils.parse(item.getRawContent(), CatalogResponseDTO.class, GB28181Constant.CHARSET);
dto = responseDTO;
Long sumNum = responseDTO.getSumNum(); Long sumNum = responseDTO.getSumNum();
log.info("{}",responseDTO); log.info("{}",responseDTO);
getNum.getAndAdd(responseDTO.getDeviceList().getDeviceList().size()); getNum.getAndAdd(responseDTO.getDeviceList().getDeviceList().size());
deviceList.addAll(responseDTO.getDeviceList().getDeviceList());
if(getNum.get() < sumNum){ if(getNum.get() < sumNum){
subscription.request(1); subscription.request(1);
} else{ } else{
@ -86,6 +97,11 @@ public class CatalogService {
public void onComplete() { public void onComplete() {
sipSubscribe.getCatalogSubscribe().delPublisher(key); sipSubscribe.getCatalogSubscribe().delPublisher(key);
schedule[0].cancel(true); schedule[0].cancel(true);
log.info("订阅结束 {}", key);
if(dto != null){
dto.getDeviceList().setDeviceList(deviceList);
}
result.complete(dto);
} }
}; };
} }

View File

@ -7,16 +7,15 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO; import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
import cn.skcks.docking.gb28181.service.docking.device.DockingDeviceService;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -37,9 +36,10 @@ public class MessageRequestProcessor implements MessageProcessor {
private final SipSubscribe subscribe; private final SipSubscribe subscribe;
private final DockingService dockingService; private final DockingService dockingService;
@PostConstruct
@Override @Override
public void init() { public void init() {
sipListener.addRequestProcessor(Method.MESSAGE,this);
} }
@Override @Override
@ -55,7 +55,7 @@ public class MessageRequestProcessor implements MessageProcessor {
String senderIp = request.getLocalAddress().getHostAddress(); String senderIp = request.getLocalAddress().getHostAddress();
if(dockingService.hasDeviceByDeviceCode(deviceId)){ if(!dockingService.hasDeviceByDeviceCode(deviceId)){
log.info("未找到相关设备信息 => {}", deviceId); log.info("未找到相关设备信息 => {}", deviceId);
Response response = response(request,Response.NOT_FOUND,"设备未注册"); Response response = response(request,Response.NOT_FOUND,"设备未注册");
sender.send(senderIp,response); sender.send(senderIp,response);
@ -69,8 +69,8 @@ public class MessageRequestProcessor implements MessageProcessor {
// 更新设备在线状态 // 更新设备在线状态
} else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){ } else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){
response = ok; response = ok;
RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET); CatalogResponseDTO dto = XmlUtils.parse(content, CatalogResponseDTO.class, GB28181Constant.CHARSET);
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn()); String key = GenericSubscribe.Helper.getKey(CmdType.CATALOG, dto.getDeviceId());
Optional.ofNullable(subscribe.getCatalogSubscribe().getPublisher(key)) Optional.ofNullable(subscribe.getCatalogSubscribe().getPublisher(key))
.ifPresentOrElse(publisher-> publisher.submit(request), .ifPresentOrElse(publisher-> publisher.submit(request),
()-> log.warn("对应订阅 {} 已结束, 异常数据 => {}",key, dto)); ()-> log.warn("对应订阅 {} 已结束, 异常数据 => {}",key, dto));

View File

@ -1,5 +1,7 @@
package cn.skcks.docking.gb28181.wvp.sip.request; package cn.skcks.docking.gb28181.wvp.sip.request;
import cn.skcks.docking.gb28181.common.redis.RedisUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant;
import cn.skcks.docking.gb28181.core.sip.message.MessageHelper; import cn.skcks.docking.gb28181.core.sip.message.MessageHelper;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
@ -123,4 +125,24 @@ public class SipRequestBuilder implements ApplicationContextAware {
request.setContent(content, contentTypeHeader); request.setContent(content, contentTypeHeader);
return request; return request;
} }
public static long getCSeq() {
String key = CacheUtil.getKey(CacheUtil.SIP_C_SEQ_PREFIX,sipConfig.getId());
long result = 1L;
if(RedisUtil.KeyOps.hasKey(key)){
try {
result = RedisUtil.StringOps.incrBy(key,1L);
} finally {
if (result > Integer.MAX_VALUE) {
RedisUtil.StringOps.set(key, String.valueOf(1L));
result = 1L;
}
}
} else {
RedisUtil.StringOps.set(key, String.valueOf(result));
}
return result;
}
} }