整理 与 清理部分代码

This commit is contained in:
shikong 2024-01-07 16:12:28 +08:00
parent dfba129768
commit e8f5bfd1b0
7 changed files with 216 additions and 107 deletions

View File

@ -5,6 +5,7 @@ import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.service.catalog.CatalogService;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogItemDTO;
import cn.skcks.docking.gb28181.utils.FutureDeferredResult;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
@ -23,12 +24,8 @@ public class CatalogController {
@SneakyThrows
@GetJson
public DeferredResult<JsonResponse<List<?>>> catalog(String gbDeviceId){
DeferredResult<JsonResponse<List<?>>> result = new DeferredResult<>();
public DeferredResult<JsonResponse<List<CatalogItemDTO>>> catalog(String gbDeviceId){
CompletableFuture<List<CatalogItemDTO>> catalog = catalogService.catalog(gbDeviceId);
catalog.whenComplete((data,throwable)->{
result.setResult(JsonResponse.success(data));
});
return result;
return FutureDeferredResult.toDeferredResultWithJson(catalog);
}
}

View File

@ -23,7 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
private GenericTimeoutSubscribe<SIPResponse> sipResponseSubscribe;
private GenericTimeoutSubscribe<SIPRequest> sipRequestSubscribe;

View File

@ -67,56 +67,7 @@ public class CatalogService {
Request request = requestBuilder.createMessageRequest(callId, cSeq, MANSCDPUtils.toByteXml(catalogQueryDTO, device.getCharset()));
String key = GenericSubscribe.Helper.getKey(CmdType.CATALOG, gbDeviceId, sn);
subscribe.getSipRequestSubscribe().addPublisher(key, 60, TimeUnit.SECONDS);
subscribe.getSipRequestSubscribe().addSubscribe(key, new Flow.Subscriber<>() {
private Flow.Subscription subscription;
private final AtomicLong num = new AtomicLong(0);
private long sumNum = 0;
private final List<CatalogItemDTO> data = new ArrayList<>();
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(item.getRawContent(), CatalogResponseDTO.class);
sumNum = Math.max(sumNum,catalogResponseDTO.getSumNum());
long curNum = num.addAndGet(catalogResponseDTO.getDeviceList().getNum());
log.debug("当前获取数量: {}/{}", curNum, sumNum);
data.addAll(catalogResponseDTO.getDeviceList().getDeviceList());
if(curNum >= sumNum){
log.info("获取完成 {}", key);
subscribe.getSipRequestSubscribe().complete(key);
} else {
subscription.request(1);
}
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
onComplete();
}
@Override
public void onComplete() {
log.info("{} 返回结果 {}", key, result.complete(data));
data.stream().map(item->{
DockingDeviceChannel model = new DockingDeviceChannel();
model.setGbDeviceId(device.getDeviceId());
model.setGbDeviceChannelId(item.getDeviceId());
model.setName(item.getName());
model.setAddress(item.getAddress());
return model;
}).forEach(deviceChannelService::add);
subscribe.getSipRequestSubscribe().delPublisher(key);
}
});
subscribe.getSipRequestSubscribe().addSubscribe(key, new CatalogSubscriber(subscribe, key, result, device.getDeviceId(), deviceChannelService::add));
provider.sendRequest(request);
return result;
}

View File

@ -0,0 +1,76 @@
package cn.skcks.docking.gb28181.service.catalog;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.orm.mybatis.dynamic.model.DockingDeviceChannel;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogItemDTO;
import cn.skcks.docking.gb28181.sip.manscdp.catalog.response.CatalogResponseDTO;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
@Slf4j
@RequiredArgsConstructor
public class CatalogSubscriber implements Flow.Subscriber<SIPRequest>{
private final SipSubscribe subscribe;
private final String key;
private final CompletableFuture<List<CatalogItemDTO>> result;
private final String deviceId;
private final Consumer<? super DockingDeviceChannel> addDeviceChannelFunc;
private Flow.Subscription subscription;
private final AtomicLong num = new AtomicLong(0);
private long sumNum = 0;
private final List<CatalogItemDTO> data = new ArrayList<>();
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
CatalogResponseDTO catalogResponseDTO = MANSCDPUtils.parse(item.getRawContent(), CatalogResponseDTO.class);
sumNum = Math.max(sumNum,catalogResponseDTO.getSumNum());
long curNum = num.addAndGet(catalogResponseDTO.getDeviceList().getNum());
log.debug("当前获取数量: {}/{}", curNum, sumNum);
data.addAll(catalogResponseDTO.getDeviceList().getDeviceList());
if(curNum >= sumNum){
log.info("获取完成 {}", key);
subscribe.getSipRequestSubscribe().complete(key);
} else {
subscription.request(1);
}
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
onComplete();
}
@Override
public void onComplete() {
log.info("{} 返回结果 {}", key, result.complete(data));
data.stream().map(item->{
DockingDeviceChannel model = new DockingDeviceChannel();
model.setGbDeviceId(deviceId);
model.setGbDeviceChannelId(item.getDeviceId());
model.setName(item.getName());
model.setAddress(item.getAddress());
return model;
}).forEach(addDeviceChannelFunc);
subscribe.getSipRequestSubscribe().delPublisher(key);
}
}

View File

@ -93,58 +93,10 @@ public class RecordService {
Request request = requestBuilder.createMessageRequest(callId,SipRequestBuilder.getCSeq(), MANSCDPUtils.toByteXml(dto, device.getCharset()));
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, channelId, sn);
subscribe.getSipRequestSubscribe().addPublisher(key);
Flow.Subscriber<SIPRequest> subscriber = new Flow.Subscriber<>() {
final List<RecordInfoItemDTO> list = new ArrayList<>();
final AtomicLong atomicSum = new AtomicLong(0);
final AtomicLong atomicNum = new AtomicLong(0);
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.debug("建立订阅 => {}", key);
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class);
atomicSum.set(Math.max(data.getSumNum(), atomicNum.get()));
atomicNum.addAndGet(data.getRecordList().getNum());
list.addAll(data.getRecordList().getRecordList());
long num = atomicNum.get();
long sum = atomicSum.get();
if(num > sum){
log.warn("检测到 设备 => {}, 未按规范实现, 订阅 => {}, 期望总数为 => {}, 已接收数量 => {}", deviceId, key, atomicSum.get(), atomicNum.get());
} else {
log.info("获取订阅 => {}, {}/{}", key, atomicNum.get(), atomicSum.get());
}
if (num >= sum) {
// 针对某些不按规范的设备
// 如果已获取数量 >= 约定的总数
// 就执行定时任务, 500ms 内未收到新的数据视为已结束
subscribe.getSipRequestSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list))));
log.debug("订阅结束 => {}", key);
subscribe.getSipRequestSubscribe().delPublisher(key);
}
};
subscribe.getSipRequestSubscribe().addSubscribe(key, subscriber);
subscribe.getSipRequestSubscribe().addSubscribe(key, new RecordSubscriber(subscribe, key, result, deviceId));
result.onTimeout(() -> {
result.setResult(JsonResponse.build(ResponseStatus.PARTIAL_CONTENT,
RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(Collections.emptyList())),
RecordConvertor.INSTANCE.dto2Vo(Collections.emptyList()),
"查询超时, 结果可能不完整"));
subscribe.getSipRequestSubscribe().delPublisher(key);
});

View File

@ -0,0 +1,81 @@
package cn.skcks.docking.gb28181.service.record;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.service.record.convertor.RecordConvertor;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class RecordSubscriber implements Flow.Subscriber<SIPRequest>{
private final SipSubscribe subscribe;
private final String key;
private final DeferredResult<JsonResponse<List<RecordInfoItemVO>>> result;
private final String deviceId;
private final List<RecordInfoItemDTO> list = new ArrayList<>();
private final AtomicLong atomicSum = new AtomicLong(0);
private final AtomicLong atomicNum = new AtomicLong(0);
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.debug("建立订阅 => {}", key);
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class);
atomicSum.set(Math.max(data.getSumNum(), atomicNum.get()));
atomicNum.addAndGet(data.getRecordList().getNum());
list.addAll(data.getRecordList().getRecordList());
long num = atomicNum.get();
long sum = atomicSum.get();
if(num > sum){
log.warn("检测到 设备 => {}, 未按规范实现, 订阅 => {}, 期望总数为 => {}, 已接收数量 => {}", deviceId, key, atomicSum.get(), atomicNum.get());
} else {
log.info("获取订阅 => {}, {}/{}", key, atomicNum.get(), atomicSum.get());
}
if (num >= sum) {
// 针对某些不按规范的设备
// 如果已获取数量 >= 约定的总数
// 就执行定时任务, 500ms 内未收到新的数据视为已结束
subscribe.getSipRequestSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list))));
log.debug("订阅结束 => {}", key);
subscribe.getSipRequestSubscribe().delPublisher(key);
}
private List<RecordInfoItemDTO> sortedRecordList(List<RecordInfoItemDTO> list){
return list.stream().sorted((a,b)-> DateUtil.compare(a.getStartTime(),b.getStartTime())).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,52 @@
package cn.skcks.docking.gb28181.utils;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class FutureDeferredResult {
public static <T> DeferredResult<JsonResponse<T>> toDeferredResultWithJson(CompletableFuture<T> future){
DeferredResult<JsonResponse<T>> result = new DeferredResult<>();
future.whenComplete((data,throwable)->{
result.setResult(JsonResponse.success(data));
});
future.exceptionally(e -> {
result.setResult(JsonResponse.error(e.getMessage()));
return null;
});
return result;
}
public static <T> DeferredResult<JsonResponse<T>> toDeferredResultWithJsonAndTimeout(CompletableFuture<T> future, long time, TimeUnit timeUnit){
DeferredResult<JsonResponse<T>> result = new DeferredResult<>(timeUnit.toMillis(time));
result.onTimeout(()-> result.setResult(JsonResponse.error("请求超时")));
future.whenComplete((data,throwable)->{
result.setResult(JsonResponse.success(data));
});
future.exceptionally(e -> {
result.setResult(JsonResponse.error(e.getMessage()));
return null;
});
return result;
}
public static <T> DeferredResult<T> toDeferredResult(CompletableFuture<T> future){
DeferredResult<T> result = new DeferredResult<>();
future.whenComplete((data,throwable)->{
result.setResult(data);
});
return result;
}
public static <T> DeferredResult<T> toDeferredResultWithTimeout(CompletableFuture<T> future, T timeoutResult,long time, TimeUnit timeUnit){
DeferredResult<T> result = new DeferredResult<>(timeUnit.toMillis(time), timeoutResult);
future.completeOnTimeout(timeoutResult,time,timeUnit);
future.whenComplete((data, throwable) -> {
result.setResult(data);
});
return result;
}
}