diff --git a/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java b/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java index e2b5ef9..93a3121 100644 --- a/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java +++ b/gb28181-mocking-api/src/main/java/cn/skcks/docking/gb28181/mocking/api/zlm/ZlmHookApi.java @@ -21,6 +21,6 @@ public class ZlmHookApi { @PostJson("/on_stream_changed") public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){ - zlmStreamChangeHookService.processEvent(dto.getStream(), dto.getRegist()); + zlmStreamChangeHookService.processEvent(dto.getStream(),dto.getStream(), dto.getRegist()); } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index d3c819d..44658b4 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -92,7 +92,7 @@ public class DeviceProxyService { GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); - zlmStreamChangeHookService.handlerMap.put(callId,()->{ + zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ StartSendRtp startSendRtp = new StartSendRtp(); startSendRtp.setApp("live"); startSendRtp.setStream(callId); @@ -104,6 +104,9 @@ public class DeviceProxyService { StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); log.info("startSendRtpResp {}",startSendRtpResp); }); + zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{ + sendBye(request,device,key); + }); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); @@ -129,7 +132,7 @@ public class DeviceProxyService { GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); - zlmStreamChangeHookService.handlerMap.put(callId,()->{ + zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ StartSendRtp startSendRtp = new StartSendRtp(); startSendRtp.setApp("live"); startSendRtp.setStream(callId); @@ -141,6 +144,9 @@ public class DeviceProxyService { StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); log.info("startSendRtpResp {}",startSendRtpResp); }); + zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{ + sendBye(request,device,key); + }); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); @@ -216,7 +222,7 @@ public class DeviceProxyService { } @RequiredArgsConstructor - public static class FfmpegExecuteResultHandler implements ExecuteResultHandler { + public class FfmpegExecuteResultHandler implements ExecuteResultHandler { private final static long SLEEP_TIME_MS = 50; @Setter(AccessLevel.PRIVATE) private boolean hasResult = false; @@ -224,39 +230,23 @@ public class DeviceProxyService { private final SIPRequest request; private final MockingDevice device; private final String key; - private final SipSender sender; + @SneakyThrows private void mediaStatus(){ int num = taskNum.decrementAndGet(); log.info("当前任务数 {}", num); + // 等待zlm推流结束, 如果 ffmpeg 结束 30秒 未能推流完成就主动结束 + Thread.sleep(30 * 1000); CallIdHeader requestCallId = request.getCallId(); String callId = requestCallId.getCallId(); callbackTask.remove(callId); - log.info("{} 推流结束, 发送媒体通知", key); - MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder() - .sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000))) - .deviceId(device.getGbChannelId()) - .build(); - - String tag = request.getFromHeader().getTag(); - sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device, - ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId))); - - String ip = request.getLocalAddress().getHostAddress(); - SipURI targetUri = (SipURI) request.getFromHeader().getAddress().getURI(); - String targetId = targetUri.getUser(); - String targetIp = request.getRemoteAddress().getHostAddress(); - int targetPort = request.getTopmostViaHeader().getPort(); - String transport = request.getTopmostViaHeader().getTransport(); - long seqNumber = request.getCSeq().getSeqNumber() + 1; - SipProvider provider = sender.getProvider(transport, ip); - CallIdHeader newCallId = request.getCallId(); - Request byeRequest = SipRequestBuilder.createByeRequest(targetIp, targetPort, seqNumber, targetId, SipUtil.generateFromTag(), null, newCallId.getCallId()); - try{ - provider.sendRequest(byeRequest); - }catch (Exception e){ - log.error("bye 请求发送失败 {}",e.getMessage()); + Optional optionalZlmStreamChangeHookHandler = + Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId)); + // 如果取消注册已完成就直接结束, 否则发送 bye请求 结束 + if(optionalZlmStreamChangeHookHandler.isEmpty()){ + return; } + sendBye(request,device,key); } public boolean hasResult() { @@ -284,7 +274,7 @@ public class DeviceProxyService { } public FfmpegExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){ - return new FfmpegExecuteResultHandler(request,device,key,sender); + return new FfmpegExecuteResultHandler(request,device,key); } /** @@ -295,4 +285,33 @@ public class DeviceProxyService { callbackTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess()); downloadTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess()); } + + private void sendBye(SIPRequest request, MockingDevice device, String key){ + CallIdHeader requestCallId = request.getCallId(); + log.info("{} 推流结束, 发送媒体通知", key); + MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder() + .sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000))) + .deviceId(device.getGbChannelId()) + .build(); + + String tag = request.getFromHeader().getTag(); + sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device, + ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId))); + + String ip = request.getLocalAddress().getHostAddress(); + SipURI targetUri = (SipURI) request.getFromHeader().getAddress().getURI(); + String targetId = targetUri.getUser(); + String targetIp = request.getRemoteAddress().getHostAddress(); + int targetPort = request.getTopmostViaHeader().getPort(); + String transport = request.getTopmostViaHeader().getTransport(); + long seqNumber = request.getCSeq().getSeqNumber() + 1; + SipProvider provider = sender.getProvider(transport, ip); + CallIdHeader newCallId = request.getCallId(); + Request byeRequest = SipRequestBuilder.createByeRequest(targetIp, targetPort, seqNumber, targetId, SipUtil.generateFromTag(), null, newCallId.getCallId()); + try{ + provider.sendRequest(byeRequest); + }catch (Exception e){ + log.error("bye 请求发送失败 {}",e.getMessage()); + } + } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java index 4cd176d..1d5cc58 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java @@ -20,21 +20,30 @@ public class ZlmStreamChangeHookService { void handler(); } - public ConcurrentMap handlerMap = new ConcurrentHashMap<>(); + public ConcurrentMap registHandler = new ConcurrentHashMap<>(); + public ConcurrentMap unregistHandler = new ConcurrentHashMap<>(); - public void processEvent(String streamId, Boolean regist){ - log.debug("stream {}, regist {}", streamId, regist); - if(!regist){ - return; + public void processEvent(String stream,String streamId, Boolean regist){ + log.debug("stream {}, streamId {}, regist {}", stream,streamId, regist); + + if(regist){ + Optional.ofNullable(registHandler.remove(streamId)).ifPresent((handler)->{ + try { + Thread.sleep(zlmHookConfig.getDelay().toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + handler.handler(); + }); + } else { + Optional.ofNullable(unregistHandler.remove(streamId)).ifPresent((handler)->{ + try { + Thread.sleep(zlmHookConfig.getDelay().toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + handler.handler(); + }); } - - Optional.ofNullable(handlerMap.remove(streamId)).ifPresent((handler)->{ - try { - Thread.sleep(zlmHookConfig.getDelay().toMillis()); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - handler.handler(); - }); } }