From 16ea4dc8905f4b5e68a7bbe78e9c235dc3c69dfc Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Tue, 23 Jan 2024 08:55:00 +0800 Subject: [PATCH] =?UTF-8?q?ffmpeg=20=E6=89=A7=E8=A1=8C=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E6=97=B6=20=E5=8F=91=E9=80=81=20bye=20=E7=BB=93=E6=9D=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/device/DeviceProxyService.java | 76 ++++++++++++------- 1 file changed, 50 insertions(+), 26 deletions(-) diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index 9cb2f16..6379064 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -19,6 +19,7 @@ import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxyResp; import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse; import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtp; import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtpResp; +import cn.skcks.docking.gb28181.media.dto.rtp.StopSendRtp; import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus; import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig; @@ -136,9 +137,12 @@ public class DeviceProxyService { throw new RuntimeException(e); } }); -// zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{ -// sendBye(request,device,key); -// }); + zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{ + StopSendRtp stopSendRtp = new StopSendRtp(); + stopSendRtp.setApp(DEFAULT_ZLM_APP); + stopSendRtp.setStream(callId); + stopSendRtp.setSsrc(ssrc); + }); zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).put(callId,()->{ sendBye(request,device,key); }); @@ -376,6 +380,15 @@ public class DeviceProxyService { } }); + zlmStreamChangeHookService.getUnregistHandler(ZLM_FFMPEG_PROXY_APP).put(callId, ()->{ + StopSendRtp stopSendRtp = new StopSendRtp(); + stopSendRtp.setApp(DEFAULT_ZLM_APP); + stopSendRtp.setStream(callId); + stopSendRtp.setSsrc(ssrc); + + zlmMediaService.stopSendRtp(stopSendRtp); + }); + Flow.Subscriber subscriber = zlmFfmpegByeSubscriber(key,request,device); subscribe.getByeSubscribe().addPublisher(key); subscribe.getByeSubscribe().addSubscribe(key, subscriber); @@ -441,7 +454,15 @@ public class DeviceProxyService { } }); - // zlmStreamChangeHookService.getUnregistHandler().put(callId,()-> sendBye(request,device,key)); + zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()-> { + StopSendRtp stopSendRtp = new StopSendRtp(); + stopSendRtp.setApp(DEFAULT_ZLM_APP); + stopSendRtp.setStream(callId); + stopSendRtp.setSsrc(ssrc); + + zlmMediaService.stopSendRtp(stopSendRtp); + }); + Flow.Subscriber subscriber = zlmByeSubscriber(key,request,device); subscribe.getByeSubscribe().addPublisher(key); subscribe.getByeSubscribe().addSubscribe(key, subscriber); @@ -494,32 +515,34 @@ public class DeviceProxyService { private final MockingDevice device; private final String key; + private void close(){ + CallIdHeader requestCallId = request.getCallId(); + String callId = requestCallId.getCallId(); + callbackTask.remove(callId); + Optional optionalZlmStreamChangeHookHandler = + Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId)); + Optional optionalZlmStreamNoneReaderHandler = + Optional.ofNullable(zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).remove(callId)); + // 如果取消注册已完成就直接结束, 否则发送 bye请求 结束 + if(optionalZlmStreamChangeHookHandler.isEmpty() && optionalZlmStreamNoneReaderHandler.isEmpty()){ + return; + } + + optionalZlmStreamChangeHookHandler.ifPresent(handler -> { + log.warn("流改变事件未结束 ZlmStreamChange {} {}, 强制结束", DEFAULT_ZLM_APP,callId); + }); + optionalZlmStreamNoneReaderHandler.ifPresent(handler -> { + log.warn("流无人观看事件未结束 ZlmStreamNoneReader {} {}, 强制结束", DEFAULT_ZLM_APP, callId); + }); + sendBye(request,device,key); + } + @SneakyThrows private void mediaStatus(){ int num = taskNum.decrementAndGet(); log.info("当前任务数 {}", num); // 等待zlm推流结束, 如果 ffmpeg 结束 3分钟内 未能推流完成就主动结束 - scheduledExecutorService.schedule(()->{ - CallIdHeader requestCallId = request.getCallId(); - String callId = requestCallId.getCallId(); - callbackTask.remove(callId); - Optional optionalZlmStreamChangeHookHandler = - Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId)); - Optional optionalZlmStreamNoneReaderHandler = - Optional.ofNullable(zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).remove(callId)); - // 如果取消注册已完成就直接结束, 否则发送 bye请求 结束 - if(optionalZlmStreamChangeHookHandler.isEmpty() && optionalZlmStreamNoneReaderHandler.isEmpty()){ - return; - } - - optionalZlmStreamChangeHookHandler.ifPresent(handler -> { - log.warn("流改变事件未结束 ZlmStreamChange {} {}, 强制结束", DEFAULT_ZLM_APP,callId); - }); - optionalZlmStreamNoneReaderHandler.ifPresent(handler -> { - log.warn("流无人观看事件未结束 ZlmStreamNoneReader {} {}, 强制结束", DEFAULT_ZLM_APP, callId); - }); - sendBye(request,device,key); - },3,TimeUnit.MINUTES); + scheduledExecutorService.schedule(this::close,5,TimeUnit.MINUTES); } public boolean hasResult() { @@ -542,7 +565,8 @@ public class DeviceProxyService { @Override public void onProcessFailed(ExecuteException e) { hasResult = true; - mediaStatus(); + log.error("ffmpeg 执行失败", e); + close(); } }