diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index a671410..c72d4c3 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -81,6 +81,7 @@ public class Gb28181DownloadService { private final WvpProxyConfig wvpProxyConfig; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentMap>> requestMap = new ConcurrentHashMap<>(); + private final ConcurrentMap realtimeVideo = new ConcurrentHashMap<>(); private final ConcurrentMap realtimeVideoInfoMap = new ConcurrentHashMap<>(); @Getter @@ -224,7 +225,7 @@ public class Gb28181DownloadService { } @SneakyThrows - public DeferredResult> realtimeVideoUrl(String deviceCode){ + public synchronized DeferredResult> realtimeVideoUrl(String deviceCode){ DeferredResult> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(60)); result.onTimeout(()->{ result.setResult(JsonResponse.error("请求超时")); @@ -239,9 +240,11 @@ public class Gb28181DownloadService { result.setResult(JsonResponse.error("设备(通道)不存在")); return result; } - closeExistRequest(deviceCode, device, docking); - requestMap.put(deviceCode, result); - + String existUrl = realtimeVideo.get(deviceCode); + if(Optional.ofNullable(existUrl).isPresent()){ + result.setResult(JsonResponse.success(existUrl)); + return result; + } // 间隔一定时间(200ms) 给设备足够的时间结束前次请求 scheduledExecutorService.schedule(()->{ realtime.realtime(deviceCode).whenComplete((videoInfo, e)->{ @@ -416,7 +419,7 @@ public class Gb28181DownloadService { CallIdHeader callId = provider.getNewCallId(); String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); subscribe.getInviteSubscribe().addPublisher(subscribeKey); - Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 60, TimeUnit.SECONDS); + Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 0, TimeUnit.SECONDS); subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); RedisUtil.StringOps.set(cacheKey, callId.getCallId()); return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); @@ -575,7 +578,12 @@ public class Gb28181DownloadService { schedule[0].cancel(true); } }; - schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, time, unit); + + if(time == 0){ + schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60, unit); + } else { + schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, time, unit); + } return subscriber; } @@ -608,11 +616,16 @@ public class Gb28181DownloadService { @Override public void onComplete() { subscribe.getByeSubscribe().delPublisher(key); - schedule[0].cancel(true); + if(time > 0){ + schedule[0].cancel(true); + } zlmMediaService.closeRtpServer(new CloseRtpServer(streamId)); } }; - schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, time, unit); + + if(time > 0){ + schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, time, unit); + } return subscriber; } }