From 588d321fbe5b1acb0e8a03dddeb36e3c62af25e2 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Fri, 12 Jan 2024 09:46:30 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E5=90=88=E5=86=97=E4=BD=99=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/device/DeviceProxyService.java | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index 2bae097..7272017 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -126,22 +126,26 @@ public class DeviceProxyService { return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; } + private Flow.Subscriber task(ConcurrentHashMap tasks, String callId, String key, MockingDevice device){ + Optional.ofNullable(tasks.get(callId)).ifPresent(task->{ + task.getWatchdog().destroyProcess(); + }); + Flow.Subscriber subscriber = byeSubscriber(key, device, tasks); + subscribe.getByeSubscribe().addSubscribe(key, subscriber); + int num = taskNum.incrementAndGet(); + log.info("当前任务数 {}", num); + return subscriber; + } + public TaskProcessor playbackTask(){ return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> { - Optional.ofNullable(callbackTask.get(callId)).ifPresent(task->{ - task.getWatchdog().destroyProcess(); - }); - Flow.Subscriber subscriber = byeSubscriber(key, device, callbackTask); - subscribe.getByeSubscribe().addSubscribe(key, subscriber); - int num = taskNum.incrementAndGet(); - log.info("当前任务数 {}", num); - + Flow.Subscriber task = task(callbackTask, callId, key, device); ScheduledFuture schedule = trying(request); try { String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); - scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); + scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); callbackTask.put(device.getDeviceCode(), executor); executeResultHandler.waitFor(); } catch (Exception e) { @@ -153,19 +157,13 @@ public class DeviceProxyService { public TaskProcessor downloadTask(){ return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{ - Optional.ofNullable(downloadTask.get(callId)).ifPresent(task->{ - task.getWatchdog().destroyProcess(); - }); - Flow.Subscriber subscriber = byeSubscriber(key, device, downloadTask); - subscribe.getByeSubscribe().addSubscribe(key, subscriber); - int num = taskNum.incrementAndGet(); - log.info("当前任务数 {}", num); + Flow.Subscriber task = task(downloadTask, callId, key, device); ScheduledFuture schedule = trying(request); try { String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); - scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); + scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); downloadTask.put(device.getDeviceCode(), executor); executeResultHandler.waitFor(); } catch (Exception e) {