diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index ade7ee8..2bae097 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -84,6 +84,48 @@ public class DeviceProxyService { void process(SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc); } + private String requestZlmPushStream(ScheduledFuture schedule, SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{ + GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); + MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); + boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); + zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ + schedule.cancel(false); + Retryer retryer = RetryerBuilder.newBuilder() + .retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0) + .retryIfException() + .retryIfRuntimeException() + // 重试间隔 + .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS)) + // 重试次数 + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .build(); + try { + retryer.call(()->{ + StartSendRtp startSendRtp = new StartSendRtp(); + startSendRtp.setApp("live"); + startSendRtp.setStream(callId); + startSendRtp.setSsrc(ssrc); + startSendRtp.setDstUrl(toAddr); + startSendRtp.setDstPort(toPort); + startSendRtp.setUdp(!tcp); + log.info("startSendRtp {}",startSendRtp); + StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); + log.info("startSendRtpResp {}",startSendRtpResp); + return startSendRtpResp; + }); + } catch (Exception e) { + schedule.cancel(true); + Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId)) + .ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler); + throw new RuntimeException(e); + } + }); + zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{ + sendBye(request,device,key); + }); + return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; + } + public TaskProcessor playbackTask(){ return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> { Optional.ofNullable(callbackTask.get(callId)).ifPresent(task->{ @@ -96,46 +138,8 @@ public class DeviceProxyService { ScheduledFuture schedule = trying(request); try { - GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); - MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); - boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); - zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ - schedule.cancel(false); - Retryer retryer = RetryerBuilder.newBuilder() - .retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0) - .retryIfException() - .retryIfRuntimeException() - // 重试间隔 - .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS)) - // 重试次数 - .withStopStrategy(StopStrategies.stopAfterAttempt(3)) - .build(); - try { - retryer.call(()->{ - StartSendRtp startSendRtp = new StartSendRtp(); - startSendRtp.setApp("live"); - startSendRtp.setStream(callId); - startSendRtp.setSsrc(ssrc); - startSendRtp.setDstUrl(toAddr); - startSendRtp.setDstPort(toPort); - startSendRtp.setUdp(!tcp); - log.info("startSendRtp {}",startSendRtp); - StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); - log.info("startSendRtpResp {}",startSendRtpResp); - return startSendRtpResp; - }); - } catch (Exception e) { - schedule.cancel(true); - Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId)) - .ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler); - throw new RuntimeException(e); - } - }); - zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{ - sendBye(request,device,key); - }); + String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); - String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); callbackTask.put(device.getDeviceCode(), executor); @@ -158,46 +162,8 @@ public class DeviceProxyService { log.info("当前任务数 {}", num); ScheduledFuture schedule = trying(request); try { - GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); - MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); - boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); - zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ - schedule.cancel(false); - Retryer retryer = RetryerBuilder.newBuilder() - .retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0) - .retryIfException() - .retryIfRuntimeException() - // 重试间隔 - .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS)) - // 重试次数 - .withStopStrategy(StopStrategies.stopAfterAttempt(3)) - .build(); - try { - retryer.call(()->{ - StartSendRtp startSendRtp = new StartSendRtp(); - startSendRtp.setApp("live"); - startSendRtp.setStream(callId); - startSendRtp.setSsrc(ssrc); - startSendRtp.setDstUrl(toAddr); - startSendRtp.setDstPort(toPort); - startSendRtp.setUdp(!tcp); - log.info("startSendRtp {}",startSendRtp); - StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); - log.info("startSendRtpResp {}",startSendRtpResp); - return startSendRtpResp; - }); - } catch (Exception e) { - schedule.cancel(true); - Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId)) - .ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler); - throw new RuntimeException(e); - } - }); - zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{ - sendBye(request,device,key); - }); + String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); - String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); downloadTask.put(device.getDeviceCode(), executor);