diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java index e482809..5981141 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java @@ -22,6 +22,7 @@ public class FfmpegConfig { private String input = "-re -i"; private String output = "-vcodec h264 -acodec aac -f rtp_mpegts"; private String logLevel = "error"; + private Boolean useRtpToDownload = false; } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index e3e9772..32e22ec 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -23,7 +23,10 @@ import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtpResp; import cn.skcks.docking.gb28181.media.dto.rtp.StopSendRtp; import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus; import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; -import cn.skcks.docking.gb28181.mocking.config.sip.*; +import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig; +import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig; +import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig; +import cn.skcks.docking.gb28181.mocking.config.sip.ZlmRtmpConfig; import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder; @@ -41,6 +44,7 @@ import com.github.rholder.retry.Retryer; import com.github.rholder.retry.RetryerBuilder; import com.github.rholder.retry.StopStrategies; import com.github.rholder.retry.WaitStrategies; +import gov.nist.javax.sdp.MediaDescriptionImpl; import gov.nist.javax.sip.message.SIPRequest; import jakarta.annotation.PreDestroy; import lombok.*; @@ -52,12 +56,14 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import javax.sdp.MediaDescription; +import javax.sdp.SdpException; import javax.sip.SipProvider; import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.nio.charset.StandardCharsets; +import java.text.ParseException; import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; @@ -233,19 +239,26 @@ public class DeviceProxyService { public void onComplete() { Flow.Subscriber task = ffmpegTask(request, downloadTask, callId, key, device); try { - String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); - scheduledExecutorService.submit(()->{ - try { - requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); - Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); + if(!ffmpegConfig.getRtp().getUseRtpToDownload()){ + String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); + scheduledExecutorService.submit(()->{ + try { + requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); - scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); - downloadTask.put(device.getDeviceCode(), executor); + Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); + scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); + downloadTask.put(device.getDeviceCode(), executor); + } else { + String rtpUrl = getRtpUrl(request); + Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time + 60, executeResultHandler); + scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); + downloadTask.put(device.getDeviceCode(), executor); + } executeResultHandler.waitFor(); } catch (Exception e) { sendBye(request, device, ""); @@ -257,6 +270,15 @@ public class DeviceProxyService { }; } + private static String getRtpUrl(SIPRequest request) throws ParseException, SdpException { + String contentString = new String(request.getRawContent()); + GB28181DescriptionParser gb28181DescriptionParser = new GB28181DescriptionParser(contentString); + GB28181Description sdp = gb28181DescriptionParser.parse(); + String rtpIp = sdp.getConnection().getAddress(); + MediaDescriptionImpl media = (MediaDescriptionImpl) sdp.getMediaDescriptions(true).get(0); + return "rtp://" + rtpIp + ":" + media.getMedia().getMediaPort(); + } + private String getZlmRtmpUrl(String app, String streamId){ return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + app +"/" + streamId; }