From e5620f237b084177c45612c057d48873d111d115 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Sun, 8 Oct 2023 09:34:42 +0800 Subject: [PATCH] =?UTF-8?q?ffmpeg=20=E6=8E=A8=E6=B5=81=E7=BB=93=E6=9D=9F?= =?UTF-8?q?=E5=90=8E=20=E7=AD=89=E5=BE=8530=E7=A7=92=20=E8=8B=A530?= =?UTF-8?q?=E7=A7=92=E5=86=85=20zlm=E6=8E=A8=E6=B5=81=E5=AE=8C=E6=AF=95=20?= =?UTF-8?q?=E5=B0=B1=E5=8F=91=E9=80=81bye=20=E8=8B=A5=E6=9C=AA=E8=83=BD?= =?UTF-8?q?=E6=94=B6=E5=88=B0zlm=E6=8E=A8=E6=B5=81=E5=AE=8C=E6=AF=95?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6(=E8=B6=85=E6=97=B6)=20=E5=86=8D=E4=B8=BB?= =?UTF-8?q?=E5=8A=A8=E5=8F=91=E9=80=81bye?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/mocking/api/zlm/ZlmHookApi.java | 2 +- .../service/device/DeviceProxyService.java | 77 ++++++++++++------- .../zlm/hook/ZlmStreamChangeHookService.java | 37 +++++---- 3 files changed, 72 insertions(+), 44 deletions(-) diff --git a/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java b/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java index e2b5ef9..93a3121 100644 --- a/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java +++ b/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java @@ -21,6 +21,6 @@ public class ZlmHookApi { @PostJson("/on_stream_changed") public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){ - zlmStreamChangeHookService.processEvent(dto.getStream(), dto.getRegist()); + zlmStreamChangeHookService.processEvent(dto.getStream(),dto.getStream(), dto.getRegist()); } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index d3c819d..44658b4 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -92,7 +92,7 @@ public class DeviceProxyService { GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); - zlmStreamChangeHookService.handlerMap.put(callId,()->{ + zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ StartSendRtp startSendRtp = new StartSendRtp(); startSendRtp.setApp("live"); startSendRtp.setStream(callId); @@ -104,6 +104,9 @@ public class DeviceProxyService { StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); log.info("startSendRtpResp {}",startSendRtpResp); }); + zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{ + sendBye(request,device,key); + }); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); @@ -129,7 +132,7 @@ public class DeviceProxyService { GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); - zlmStreamChangeHookService.handlerMap.put(callId,()->{ + zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ StartSendRtp startSendRtp = new StartSendRtp(); startSendRtp.setApp("live"); startSendRtp.setStream(callId); @@ -141,6 +144,9 @@ public class DeviceProxyService { StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); log.info("startSendRtpResp {}",startSendRtpResp); }); + zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{ + sendBye(request,device,key); + }); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); @@ -216,7 +222,7 @@ public class DeviceProxyService { } @RequiredArgsConstructor - public static class FfmpegExecuteResultHandler implements ExecuteResultHandler { + public class FfmpegExecuteResultHandler implements ExecuteResultHandler { private final static long SLEEP_TIME_MS = 50; @Setter(AccessLevel.PRIVATE) private boolean hasResult = false; @@ -224,39 +230,23 @@ public class DeviceProxyService { private final SIPRequest request; private final MockingDevice device; private final String key; - private final SipSender sender; + @SneakyThrows private void mediaStatus(){ int num = taskNum.decrementAndGet(); log.info("当前任务数 {}", num); + // 等待zlm推流结束, 如果 ffmpeg 结束 30秒 未能推流完成就主动结束 + Thread.sleep(30 * 1000); CallIdHeader requestCallId = request.getCallId(); String callId = requestCallId.getCallId(); callbackTask.remove(callId); - log.info("{} 推流结束, 发送媒体通知", key); - MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder() - .sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000))) - .deviceId(device.getGbChannelId()) - .build(); - - String tag = request.getFromHeader().getTag(); - sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device, - ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId))); - - String ip = request.getLocalAddress().getHostAddress(); - SipURI targetUri = (SipURI) request.getFromHeader().getAddress().getURI(); - String targetId = targetUri.getUser(); - String targetIp = request.getRemoteAddress().getHostAddress(); - int targetPort = request.getTopmostViaHeader().getPort(); - String transport = request.getTopmostViaHeader().getTransport(); - long seqNumber = request.getCSeq().getSeqNumber() + 1; - SipProvider provider = sender.getProvider(transport, ip); - CallIdHeader newCallId = request.getCallId(); - Request byeRequest = SipRequestBuilder.createByeRequest(targetIp, targetPort, seqNumber, targetId, SipUtil.generateFromTag(), null, newCallId.getCallId()); - try{ - provider.sendRequest(byeRequest); - }catch (Exception e){ - log.error("bye 请求发送失败 {}",e.getMessage()); + Optional optionalZlmStreamChangeHookHandler = + Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId)); + // 如果取消注册已完成就直接结束, 否则发送 bye请求 结束 + if(optionalZlmStreamChangeHookHandler.isEmpty()){ + return; } + sendBye(request,device,key); } public boolean hasResult() { @@ -284,7 +274,7 @@ public class DeviceProxyService { } public FfmpegExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){ - return new FfmpegExecuteResultHandler(request,device,key,sender); + return new FfmpegExecuteResultHandler(request,device,key); } /** @@ -295,4 +285,33 @@ public class DeviceProxyService { callbackTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess()); downloadTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess()); } + + private void sendBye(SIPRequest request, MockingDevice device, String key){ + CallIdHeader requestCallId = request.getCallId(); + log.info("{} 推流结束, 发送媒体通知", key); + MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder() + .sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000))) + .deviceId(device.getGbChannelId()) + .build(); + + String tag = request.getFromHeader().getTag(); + sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device, + ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId))); + + String ip = request.getLocalAddress().getHostAddress(); + SipURI targetUri = (SipURI) request.getFromHeader().getAddress().getURI(); + String targetId = targetUri.getUser(); + String targetIp = request.getRemoteAddress().getHostAddress(); + int targetPort = request.getTopmostViaHeader().getPort(); + String transport = request.getTopmostViaHeader().getTransport(); + long seqNumber = request.getCSeq().getSeqNumber() + 1; + SipProvider provider = sender.getProvider(transport, ip); + CallIdHeader newCallId = request.getCallId(); + Request byeRequest = SipRequestBuilder.createByeRequest(targetIp, targetPort, seqNumber, targetId, SipUtil.generateFromTag(), null, newCallId.getCallId()); + try{ + provider.sendRequest(byeRequest); + }catch (Exception e){ + log.error("bye 请求发送失败 {}",e.getMessage()); + } + } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java index 4cd176d..1d5cc58 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java @@ -20,21 +20,30 @@ public class ZlmStreamChangeHookService { void handler(); } - public ConcurrentMap handlerMap = new ConcurrentHashMap<>(); + public ConcurrentMap registHandler = new ConcurrentHashMap<>(); + public ConcurrentMap unregistHandler = new ConcurrentHashMap<>(); - public void processEvent(String streamId, Boolean regist){ - log.debug("stream {}, regist {}", streamId, regist); - if(!regist){ - return; + public void processEvent(String stream,String streamId, Boolean regist){ + log.debug("stream {}, streamId {}, regist {}", stream,streamId, regist); + + if(regist){ + Optional.ofNullable(registHandler.remove(streamId)).ifPresent((handler)->{ + try { + Thread.sleep(zlmHookConfig.getDelay().toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + handler.handler(); + }); + } else { + Optional.ofNullable(unregistHandler.remove(streamId)).ifPresent((handler)->{ + try { + Thread.sleep(zlmHookConfig.getDelay().toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + handler.handler(); + }); } - - Optional.ofNullable(handlerMap.remove(streamId)).ifPresent((handler)->{ - try { - Thread.sleep(zlmHookConfig.getDelay().toMillis()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - handler.handler(); - }); } }