From 2da052331062010a0e01fbf0addb4a29e7823752 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Mon, 15 Jan 2024 11:43:07 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A8=8B=E5=BA=8F=E9=80=80=E5=87=BA=E6=97=B6?= =?UTF-8?q?=20=E5=9C=A85=E7=A7=92=E5=86=85=20=E5=85=B3=E9=97=AD=E6=89=80?= =?UTF-8?q?=E6=9C=89=20ack=20=E5=92=8C=20bye=20=E8=AE=A2=E9=98=85=20?= =?UTF-8?q?=E5=B9=B6=20=E5=85=B3=E9=97=AD=E6=89=80=E6=9C=89=E6=8E=A8?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/InviteRequestProcessor.java | 33 +++++++++++-- .../service/device/DeviceProxyService.java | 46 +++++++++++++------ .../gb28181/mocking/starter/SipStarter.java | 5 ++ 3 files changed, 66 insertions(+), 18 deletions(-) diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java index b0ddca4..d8cb179 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java @@ -25,12 +25,14 @@ import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.springframework.context.SmartLifecycle; import org.springframework.stereotype.Component; import javax.sdp.*; import javax.sip.RequestEvent; import javax.sip.message.Request; import javax.sip.message.Response; +import java.time.Duration; import java.util.Date; import java.util.EventObject; import java.util.Vector; @@ -40,7 +42,7 @@ import java.util.concurrent.*; @RequiredArgsConstructor @Component @SuppressWarnings("Duplicates") -public class InviteRequestProcessor implements MessageProcessor { +public class InviteRequestProcessor implements MessageProcessor, SmartLifecycle { private final SipListener sipListener; private final SipSender sender; @@ -55,6 +57,8 @@ public class InviteRequestProcessor implements MessageProcessor { private final DeviceProxyConfig deviceProxyConfig; + private boolean running; + @PostConstruct @Override public void init() { @@ -99,8 +103,6 @@ public class InviteRequestProcessor implements MessageProcessor { if (StringUtils.equalsAnyIgnoreCase(type, "Play", "PlayBack")) { log.info("点播/回放请求"); if (StringUtils.equalsIgnoreCase(type, "Play")) { - // 暂不支持实时 -// sender.sendResponse(senderIp, transport, unsupported(request)); play(request, device, gb28181Description, (MediaDescription) item); } else { playback(request, device, gb28181Description, (MediaDescription) item); @@ -368,4 +370,29 @@ public class InviteRequestProcessor implements MessageProcessor { } }; } + + @Override + public void start() { + running = true; + } + + @SneakyThrows + @Override + public void stop() { + subscribe.getAckSubscribe().close(); + subscribe.getByeSubscribe().close(); + log.info("关闭所有 ack 和 bye 订阅 并 关闭所有推流"); + Thread.sleep(Duration.ofSeconds(5).toMillis()); + running = false; + } + + @Override + public boolean isRunning() { + return running; + } + + @Override + public int getPhase() { + return Integer.MAX_VALUE; + } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index ad52678..148969f 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -132,11 +132,11 @@ public class DeviceProxyService { return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; } - private Flow.Subscriber ffmpegTask(ConcurrentHashMap tasks, String callId, String key, MockingDevice device){ + private Flow.Subscriber ffmpegTask(SIPRequest request,ConcurrentHashMap tasks, String callId, String key, MockingDevice device){ Optional.ofNullable(tasks.get(callId)).ifPresent(task->{ task.getWatchdog().destroyProcess(); }); - Flow.Subscriber subscriber = ffmpegByeSubscriber(key, device, tasks); + Flow.Subscriber subscriber = ffmpegByeSubscriber(request, key, device, tasks); subscribe.getByeSubscribe().addSubscribe(key, subscriber); int num = taskNum.incrementAndGet(); log.info("当前任务数 {}", num); @@ -145,7 +145,7 @@ public class DeviceProxyService { public TaskProcessor playbackTask(){ return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> { - Flow.Subscriber task = ffmpegTask(callbackTask, callId, key, device); + Flow.Subscriber task = ffmpegTask(request, callbackTask, callId, key, device); ScheduledFuture schedule = trying(request); try { String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); @@ -156,6 +156,7 @@ public class DeviceProxyService { executeResultHandler.waitFor(); } catch (Exception e) { schedule.cancel(true); + sendBye(request,device,""); throw new RuntimeException(e); } }; @@ -163,7 +164,7 @@ public class DeviceProxyService { public TaskProcessor downloadTask(){ return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{ - Flow.Subscriber task = ffmpegTask(downloadTask, callId, key, device); + Flow.Subscriber task = ffmpegTask(request, downloadTask, callId, key, device); ScheduledFuture schedule = trying(request); try { String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); @@ -174,6 +175,7 @@ public class DeviceProxyService { executeResultHandler.waitFor(); } catch (Exception e) { schedule.cancel(true); + sendBye(request,device,""); throw new RuntimeException(e); } }; @@ -189,8 +191,9 @@ public class DeviceProxyService { }, 200, TimeUnit.MILLISECONDS); } - public Flow.Subscriber ffmpegByeSubscriber(String key, MockingDevice device, ConcurrentHashMap task){ + public Flow.Subscriber ffmpegByeSubscriber(SIPRequest inviteRequest,String key, MockingDevice device, ConcurrentHashMap task){ return new Flow.Subscriber<>() { + SIPRequest request; @Override public void onSubscribe(Flow.Subscription subscription) { log.info("订阅 bye {}", key); @@ -199,10 +202,7 @@ public class DeviceProxyService { @Override public void onNext(SIPRequest item) { - String ip = item.getLocalAddress().getHostAddress(); - String transPort = item.getTopmostViaHeader().getTransport(); - sender.sendResponse(ip, transPort, ((provider, ip1, port) -> - SipResponseBuilder.response(item, Response.OK, "OK"))); + request = item; onComplete(); } @@ -214,6 +214,15 @@ public class DeviceProxyService { @Override public void onComplete() { log.info("bye 订阅结束 {}", key); + if(request == null){ + sendBye(inviteRequest,device,""); + } else { + String ip = request.getLocalAddress().getHostAddress(); + String transPort = request.getTopmostViaHeader().getTransport(); + sender.sendResponse(ip, transPort, ((provider, ip1, port) -> + SipResponseBuilder.response(request, Response.OK, "OK"))); + } + subscribe.getByeSubscribe().delPublisher(key); Optional.ofNullable(task.get(device.getDeviceCode())).ifPresent(task -> { task.getWatchdog().destroyProcess(); @@ -223,8 +232,9 @@ public class DeviceProxyService { }; } - public Flow.Subscriber zlmByeSubscriber(String key, MockingDevice device){ + public Flow.Subscriber zlmByeSubscriber(String key, SIPRequest inviteRequest,MockingDevice device){ return new Flow.Subscriber<>() { + private SIPRequest request; @Override public void onSubscribe(Flow.Subscription subscription) { log.info("订阅 bye {}", key); @@ -233,10 +243,7 @@ public class DeviceProxyService { @Override public void onNext(SIPRequest item) { - String ip = item.getLocalAddress().getHostAddress(); - String transPort = item.getTopmostViaHeader().getTransport(); - sender.sendResponse(ip, transPort, ((provider, ip1, port) -> - SipResponseBuilder.response(item, Response.OK, "OK"))); + request = item; subscribe.getByeSubscribe().delPublisher(key); } @@ -246,6 +253,15 @@ public class DeviceProxyService { @Override public void onComplete() { log.info("bye 订阅结束 {}", key); + if(request == null){ + sendBye(inviteRequest,device,""); + } else { + String ip = request.getLocalAddress().getHostAddress(); + String transPort = request.getTopmostViaHeader().getTransport(); + sender.sendResponse(ip, transPort, ((provider, ip1, port) -> + SipResponseBuilder.response(request, Response.OK, "OK"))); + } + String cacheKey = CacheUtil.getKey("INVITE", "PROXY", key); String proxyKey = RedisUtil.StringOps.get(cacheKey); log.info("关闭拉流代理 {}", zlmMediaService.delStreamProxy(proxyKey)); @@ -323,7 +339,7 @@ public class DeviceProxyService { }); // zlmStreamChangeHookService.getUnregistHandler().put(callId,()-> sendBye(request,device,key)); - Flow.Subscriber subscriber = zlmByeSubscriber(key,device); + Flow.Subscriber subscriber = zlmByeSubscriber(key,request,device); subscribe.getByeSubscribe().addPublisher(key); subscribe.getByeSubscribe().addSubscribe(key, subscriber); } catch (Exception e) { diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/starter/SipStarter.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/starter/SipStarter.java index 3f27cf5..d062222 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/starter/SipStarter.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/starter/SipStarter.java @@ -38,6 +38,11 @@ public class SipStarter implements SmartLifecycle { isRunning = false; } + @Override + public int getPhase() { + return 0; + } + @Override public boolean isRunning() { return isRunning;