From 41f13af88d55d47956aa9b4b37be7a2d840faaf4 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Wed, 7 Feb 2024 09:02:31 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=86=E9=A2=91=E6=8E=A8=E6=B5=81=E9=A1=BA?= =?UTF-8?q?=E5=BA=8F=E8=B0=83=E6=95=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../invite/request/InviteRequestProcessor.java | 4 ---- .../service/device/DeviceProxyService.java | 17 +++++++++++------ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java index c173f7d..a53aa21 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java @@ -36,7 +36,6 @@ import java.time.Duration; import java.util.Date; import java.util.EventObject; import java.util.Vector; -import java.util.concurrent.*; @Slf4j @RequiredArgsConstructor @@ -257,9 +256,6 @@ public class InviteRequestProcessor implements MessageProcessor, SmartLifecycle String callId = request.getCallId().getCallId(); String key = GenericSubscribe.Helper.getKey(Request.ACK, callId); subscribe.getAckSubscribe().addPublisher(key); - ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - final ScheduledFuture[] schedule = new ScheduledFuture[1]; - Flow.Subscriber subscriber; // 发送 sdp 响应 Runnable sendOkResponse = () -> { diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index cff3854..689e276 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -107,11 +107,11 @@ public class DeviceProxyService { void process(SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc); } - private String requestZlmPushStream(ScheduledFuture schedule, Runnable sendOkResponse, SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{ + private void requestZlmPushStream(ScheduledFuture schedule, Runnable sendOkResponse, SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{ GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); - zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{ +// zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{ Retryer retryer = RetryerBuilder.newBuilder() .retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0) .retryIfException() @@ -146,7 +146,7 @@ public class DeviceProxyService { schedule.cancel(false); // 响应 sdp ok sendOkResponse.run(); - }); +// }); zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{ StopSendRtp stopSendRtp = new StopSendRtp(); stopSendRtp.setApp(DEFAULT_ZLM_APP); @@ -156,7 +156,6 @@ public class DeviceProxyService { zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).put(callId,()->{ sendBye(request,device,key); }); - return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + DEFAULT_ZLM_APP +"/" + callId; } private Flow.Subscriber ffmpegTask(SIPRequest request,ConcurrentHashMap tasks, String callId, String key, MockingDevice device){ @@ -175,9 +174,10 @@ public class DeviceProxyService { ScheduledFuture schedule = trying(request); Flow.Subscriber task = ffmpegTask(request, callbackTask, callId, key, device); try { - String zlmRtpUrl = requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); + String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(schedule,request, device, key); Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); + requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); callbackTask.put(device.getDeviceCode(), executor); executeResultHandler.waitFor(); @@ -194,9 +194,10 @@ public class DeviceProxyService { ScheduledFuture schedule = trying(request); Flow.Subscriber task = ffmpegTask(request, downloadTask, callId, key, device); try { - String zlmRtpUrl = requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); + String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(schedule, request, device, key); Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); + requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); downloadTask.put(device.getDeviceCode(), executor); executeResultHandler.waitFor(); @@ -208,6 +209,10 @@ public class DeviceProxyService { }; } + private String getZlmRtmpUrl(String app, String streamId){ + return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + app +"/" + streamId; + } + private ScheduledFuture trying(SIPRequest request){ return scheduledExecutorService.scheduleAtFixedRate(() -> { InviteResponseBuilder inviteRequestBuilder = InviteResponseBuilder.builder().build();