diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java index bab6238..d9f087d 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/processor/message/request/MessageRequestProcessor.java @@ -4,10 +4,10 @@ import cn.skcks.docking.gb28181.common.json.ResponseStatus; import cn.skcks.docking.gb28181.common.xml.XmlUtils; import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType; -import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; +import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.MessageDTO; import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; @@ -25,7 +25,7 @@ import org.springframework.stereotype.Component; import javax.sip.RequestEvent; import javax.sip.header.CallIdHeader; import javax.sip.message.Response; -import java.util.concurrent.SubmissionPublisher; +import java.util.Optional; @Slf4j @RequiredArgsConstructor @@ -72,8 +72,9 @@ public class MessageRequestProcessor implements MessageProcessor { response = ok; RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET); String key = CacheUtil.getKey(CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn()); - SubmissionPublisher publisher = subscribe.getRecordInfoSubscribe().getPublisher(key); - publisher.submit(dto); + Optional.ofNullable(subscribe.getRecordInfoSubscribe().getPublisher(key)).ifPresentOrElse(publisher->{ + publisher.submit(dto); + },()-> log.warn("对应订阅 {} 已结束, 异常数据 => {}",key, dto)); } else { response = response(request, Response.NOT_IMPLEMENTED, ResponseStatus.NOT_IMPLEMENTED.getMessage()); } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java index ea61790..bd32e0a 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/record/RecordService.java @@ -26,7 +26,7 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Flow; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @Slf4j @@ -40,7 +40,8 @@ public class RecordService { @SneakyThrows public DeferredResult>> requestRecordInfo(String deviceId) { - DeferredResult>> result = new DeferredResult<>(30 * 1000L); + log.info("TimeUnit.SECONDS.toMillis(30) {}",TimeUnit.SECONDS.toMillis(30)); + DeferredResult>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(30)); DockingDevice device = deviceService.getDevice(deviceId); if (device == null) { @@ -73,7 +74,9 @@ public class RecordService { List list = new ArrayList<>(); AtomicLong sum = new AtomicLong(0); AtomicLong getNum = new AtomicLong(0); - subscribe.getRecordInfoSubscribe().addSubscribe(key, new Flow.Subscriber<>() { + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + final ScheduledFuture[] schedule = new ScheduledFuture[1]; + Flow.Subscriber subscriber = new Flow.Subscriber<>() { Flow.Subscription subscription; @Override @@ -90,10 +93,15 @@ public class RecordService { list.addAll(item.getRecordList()); log.info("获取订阅 => {}, {}/{}", key, getNum.get(), sum.get()); if (getNum.get() >= sum.get()) { - onComplete(); - } else { - subscription.request(1); + // 针对某些不按规范的设备 + // 如果已获取数量 >= 约定的总数 + // 就执行定时任务, 若 500ms 内未收到新的数据视为已结束 + if(schedule[0] != null){ + schedule[0].cancel(true); + } + schedule[0] = scheduledExecutorService.schedule(this::onComplete, 500, TimeUnit.MILLISECONDS); } + subscription.request(1); } @Override @@ -103,11 +111,14 @@ public class RecordService { @Override public void onComplete() { + schedule[0].cancel(true); subscribe.getRecordInfoSubscribe().delPublisher(key); result.setResult(JsonResponse.success(list)); log.debug("订阅结束 => {}", key); } - }); + }; + + subscribe.getRecordInfoSubscribe().addSubscribe(key, subscriber); result.onTimeout(()->{ result.setResult(JsonResponse.success(list,"查询超时, 结果可能不完整"));