From 723ad67df988c0e1cc8ec7d3eb889c66606627ad Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Thu, 14 Mar 2024 11:18:23 +0800 Subject: [PATCH] =?UTF-8?q?DeviceProxyService=20=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/device/DeviceProxyService.java | 47 +++++++++++-------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index e1c6c52..bac8c49 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -250,9 +250,15 @@ public class DeviceProxyService { public TaskProcessor downloadTask(){ return (Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{ + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + log.error("{}", e.getMessage()); + } String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId); subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES); subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() { + private SIPRequest ackRequest; @Override public void onSubscribe(Flow.Subscription subscription) { subscription.request(1); @@ -260,6 +266,7 @@ public class DeviceProxyService { @Override public void onNext(SIPRequest item) { + ackRequest = item; subscribe.getAckSubscribe().delPublisher(ackKey); } @@ -271,27 +278,29 @@ public class DeviceProxyService { public void onComplete() { Flow.Subscriber task = ffmpegTask(request, downloadTask, callId, key, device); try { - FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); - if (!ffmpegConfig.getRtp().getUseRtpToDownload()) { - String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); - executor.execute(()->{ - try { - requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + if(ackRequest != null){ + FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); + if (!ffmpegConfig.getRtp().getUseRtpToDownload()) { + String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); + executor.execute(()->{ + try { + requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); - Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); - scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); - downloadTask.put(device.getDeviceCode(), executor); - } else { - String rtpUrl = getRtpUrl(request); - Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time + 60, executeResultHandler); - scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); - downloadTask.put(device.getDeviceCode(), executor); + Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); + scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); + downloadTask.put(device.getDeviceCode(), executor); + } else { + String rtpUrl = getRtpUrl(request); + Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time + 60, executeResultHandler); + scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); + downloadTask.put(device.getDeviceCode(), executor); + } + executeResultHandler.waitFor(); } - executeResultHandler.waitFor(); } catch (Exception e) { sendBye(request, device, ""); log.error("{}", e.getMessage());