设备目录查询

This commit is contained in:
shikong 2023-10-02 23:53:10 +08:00
parent e1b8db65a0
commit 1c280ef64f
2 changed files with 9 additions and 6 deletions

View File

@ -82,7 +82,8 @@ public class MessageRequestProcessor implements MessageProcessor {
() -> log.warn("对应订阅 {} 已结束, 异常数据 => {}", key, dto));
}else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){
CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(content, CatalogResponseDTO.class);
Optional.ofNullable(subscribe.getSipRequestSubscribe().getPublisher(catalogResponseDTO.getSn())).ifPresent(publisher->{
String key = GenericSubscribe.Helper.getKey(catalogResponseDTO.getDeviceId(), catalogResponseDTO.getSn());
Optional.ofNullable(subscribe.getSipRequestSubscribe().getPublisher(key)).ifPresent(publisher->{
publisher.submit(request);
});
response = ok;

View File

@ -2,6 +2,7 @@ package cn.skcks.docking.gb28181.service.catalog;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.core.sip.service.SipService;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDevice;
@ -60,8 +61,9 @@ public class CatalogService {
.sn(sn)
.build();
Request request = requestBuilder.createMessageRequest(callId, cSeq, MANSCDPUtils.toByteXml(catalogQueryDTO));
subscribe.getSipRequestSubscribe().addPublisher(sn, 60, TimeUnit.SECONDS);
subscribe.getSipRequestSubscribe().addSubscribe(sn, new Flow.Subscriber<>() {
String key = GenericSubscribe.Helper.getKey(gbDeviceId,sn);
subscribe.getSipRequestSubscribe().addPublisher(key, 60, TimeUnit.SECONDS);
subscribe.getSipRequestSubscribe().addSubscribe(key, new Flow.Subscriber<>() {
private Flow.Subscription subscription;
private final AtomicLong num = new AtomicLong(0);
private long sumNum = 0;
@ -82,8 +84,8 @@ public class CatalogService {
log.debug("当前获取数量: {}/{}", curNum, sumNum);
data.addAll(catalogResponseDTO.getDeviceList().getDeviceList());
if(curNum >= sumNum){
log.info("获取完成");
subscribe.getSipRequestSubscribe().delPublisher(sn);
log.info("获取完成 {}", key);
subscribe.getSipRequestSubscribe().delPublisher(key);
} else {
subscription.request(1);
}
@ -97,7 +99,7 @@ public class CatalogService {
@Override
public void onComplete() {
log.info("返回结果 {} {}", result.complete(data),data);
log.info("{} 返回结果 {}", key, result.complete(data));
}
});
provider.sendRequest(request);