添加 rtp 推流配置

This commit is contained in:
shikong 2024-03-06 13:19:12 +08:00
parent 704bf42073
commit 0fc5de87c8
2 changed files with 35 additions and 12 deletions

View File

@ -22,6 +22,7 @@ public class FfmpegConfig {
private String input = "-re -i"; private String input = "-re -i";
private String output = "-vcodec h264 -acodec aac -f rtp_mpegts"; private String output = "-vcodec h264 -acodec aac -f rtp_mpegts";
private String logLevel = "error"; private String logLevel = "error";
private Boolean useRtpToDownload = false;
} }

View File

@ -23,7 +23,10 @@ import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtpResp;
import cn.skcks.docking.gb28181.media.dto.rtp.StopSendRtp; import cn.skcks.docking.gb28181.media.dto.rtp.StopSendRtp;
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus; import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService; import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.mocking.config.sip.*; import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig;
import cn.skcks.docking.gb28181.mocking.config.sip.ZlmRtmpConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO; import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
@ -41,6 +44,7 @@ import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder; import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies; import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies; import com.github.rholder.retry.WaitStrategies;
import gov.nist.javax.sdp.MediaDescriptionImpl;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import jakarta.annotation.PreDestroy; import jakarta.annotation.PreDestroy;
import lombok.*; import lombok.*;
@ -52,12 +56,14 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.sdp.MediaDescription; import javax.sdp.MediaDescription;
import javax.sdp.SdpException;
import javax.sip.SipProvider; import javax.sip.SipProvider;
import javax.sip.address.SipURI; import javax.sip.address.SipURI;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.time.Duration; import java.time.Duration;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
@ -233,19 +239,26 @@ public class DeviceProxyService {
public void onComplete() { public void onComplete() {
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device); Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
try { try {
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
scheduledExecutorService.submit(()->{
try {
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); if(!ffmpegConfig.getRtp().getUseRtpToDownload()){
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
scheduledExecutorService.submit(()->{
try {
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
downloadTask.put(device.getDeviceCode(), executor); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor);
} else {
String rtpUrl = getRtpUrl(request);
Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor);
}
executeResultHandler.waitFor(); executeResultHandler.waitFor();
} catch (Exception e) { } catch (Exception e) {
sendBye(request, device, ""); sendBye(request, device, "");
@ -257,6 +270,15 @@ public class DeviceProxyService {
}; };
} }
private static String getRtpUrl(SIPRequest request) throws ParseException, SdpException {
String contentString = new String(request.getRawContent());
GB28181DescriptionParser gb28181DescriptionParser = new GB28181DescriptionParser(contentString);
GB28181Description sdp = gb28181DescriptionParser.parse();
String rtpIp = sdp.getConnection().getAddress();
MediaDescriptionImpl media = (MediaDescriptionImpl) sdp.getMediaDescriptions(true).get(0);
return "rtp://" + rtpIp + ":" + media.getMedia().getMediaPort();
}
private String getZlmRtmpUrl(String app, String streamId){ private String getZlmRtmpUrl(String app, String streamId){
return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + app +"/" + streamId; return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + app +"/" + streamId;
} }