diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index 4f54944..0b4fec1 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -198,44 +198,48 @@ public class DeviceProxyService { public TaskProcessor playbackTask(){ return (Runnable sendOkResponse, SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> { - scheduledExecutorService.schedule(() -> { - trying(request); - sendOkResponse.run(); - String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId); - subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES); - subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() { - @Override - public void onSubscribe(Flow.Subscription subscription) { - subscription.request(1); - } + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException e) { + log.error("{}", e.getMessage()); + } - @Override - public void onNext(SIPRequest item) { - subscribe.getAckSubscribe().delPublisher(ackKey); - } + String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId); + subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES); + subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscription.request(1); + } - @Override - public void onError(Throwable throwable) { - } + @Override + public void onNext(SIPRequest item) { + subscribe.getAckSubscribe().delPublisher(ackKey); + } - @Override - public void onComplete() { - Flow.Subscriber task = ffmpegTask(request, callbackTask, callId, key, device); - try { - String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); - requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); - FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); - Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); - scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); - callbackTask.put(device.getDeviceCode(), executor); - executeResultHandler.waitFor(); - } catch (Exception e) { - sendBye(request,device,""); - log.error("{}", e.getMessage()); - } + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onComplete() { + Flow.Subscriber task = ffmpegTask(request, callbackTask, callId, key, device); + try { + String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); + requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); + FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); + Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); + scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); + callbackTask.put(device.getDeviceCode(), executor); + executeResultHandler.waitFor(); + } catch (Exception e) { + sendBye(request, device, ""); + log.error("{}", e.getMessage()); } - }); - }, 1, TimeUnit.SECONDS); + } + }); + trying(request); + sendOkResponse.run(); }; } @@ -247,57 +251,56 @@ public class DeviceProxyService { log.error("{}", e.getMessage()); } - trying(request); - sendOkResponse.run(); - String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId); - subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES); - subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() { - @Override - public void onSubscribe(Flow.Subscription subscription) { - subscription.request(1); - } + String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId); + subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES); + subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() { + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscription.request(1); + } - @Override - public void onNext(SIPRequest item) { - subscribe.getAckSubscribe().delPublisher(ackKey); - } + @Override + public void onNext(SIPRequest item) { + subscribe.getAckSubscribe().delPublisher(ackKey); + } - @Override - public void onError(Throwable throwable) { - } + @Override + public void onError(Throwable throwable) { + } - @Override - public void onComplete() { - Flow.Subscriber task = ffmpegTask(request, downloadTask, callId, key, device); - try { - FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); - if(!ffmpegConfig.getRtp().getUseRtpToDownload()){ - String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); - scheduledExecutorService.submit(()->{ - try { - requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + @Override + public void onComplete() { + Flow.Subscriber task = ffmpegTask(request, downloadTask, callId, key, device); + try { + FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); + if (!ffmpegConfig.getRtp().getUseRtpToDownload()) { + String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); + scheduledExecutorService.submit(() -> { + try { + requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); - Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); - scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); - downloadTask.put(device.getDeviceCode(), executor); - } else { - String rtpUrl = getRtpUrl(request); - Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time + 60, executeResultHandler); - scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); - downloadTask.put(device.getDeviceCode(), executor); - } - executeResultHandler.waitFor(); - } catch (Exception e) { - sendBye(request, device, ""); - log.error("{}", e.getMessage()); + Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); + scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); + downloadTask.put(device.getDeviceCode(), executor); + } else { + String rtpUrl = getRtpUrl(request); + Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time + 60, executeResultHandler); + scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); + downloadTask.put(device.getDeviceCode(), executor); } + executeResultHandler.waitFor(); + } catch (Exception e) { + sendBye(request, device, ""); + log.error("{}", e.getMessage()); } - }); - + } + }); + trying(request); + sendOkResponse.run(); }; }