视频推流顺序调整

This commit is contained in:
shikong 2024-02-07 09:02:31 +08:00
parent dead56fc66
commit 41f13af88d
2 changed files with 11 additions and 10 deletions

View File

@ -36,7 +36,6 @@ import java.time.Duration;
import java.util.Date; import java.util.Date;
import java.util.EventObject; import java.util.EventObject;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.*;
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
@ -257,9 +256,6 @@ public class InviteRequestProcessor implements MessageProcessor, SmartLifecycle
String callId = request.getCallId().getCallId(); String callId = request.getCallId().getCallId();
String key = GenericSubscribe.Helper.getKey(Request.ACK, callId); String key = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(key); subscribe.getAckSubscribe().addPublisher(key);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPRequest> subscriber;
// 发送 sdp 响应 // 发送 sdp 响应
Runnable sendOkResponse = () -> { Runnable sendOkResponse = () -> {

View File

@ -107,11 +107,11 @@ public class DeviceProxyService {
void process(SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc); void process(SIPRequest request,Runnable sendOkResponse,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc);
} }
private String requestZlmPushStream(ScheduledFuture<?> schedule, Runnable sendOkResponse, SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{ private void requestZlmPushStream(ScheduledFuture<?> schedule, Runnable sendOkResponse, SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{ // zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder() Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
.retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0) .retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0)
.retryIfException() .retryIfException()
@ -146,7 +146,7 @@ public class DeviceProxyService {
schedule.cancel(false); schedule.cancel(false);
// 响应 sdp ok // 响应 sdp ok
sendOkResponse.run(); sendOkResponse.run();
}); // });
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{ zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
StopSendRtp stopSendRtp = new StopSendRtp(); StopSendRtp stopSendRtp = new StopSendRtp();
stopSendRtp.setApp(DEFAULT_ZLM_APP); stopSendRtp.setApp(DEFAULT_ZLM_APP);
@ -156,7 +156,6 @@ public class DeviceProxyService {
zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).put(callId,()->{ zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).put(callId,()->{
sendBye(request,device,key); sendBye(request,device,key);
}); });
return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + DEFAULT_ZLM_APP +"/" + callId;
} }
private Flow.Subscriber<SIPRequest> ffmpegTask(SIPRequest request,ConcurrentHashMap<String, Executor> tasks, String callId, String key, MockingDevice device){ private Flow.Subscriber<SIPRequest> ffmpegTask(SIPRequest request,ConcurrentHashMap<String, Executor> tasks, String callId, String key, MockingDevice device){
@ -175,9 +174,10 @@ public class DeviceProxyService {
ScheduledFuture<?> schedule = trying(request); ScheduledFuture<?> schedule = trying(request);
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device); Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
try { try {
String zlmRtpUrl = requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(schedule,request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(schedule,request, device, key);
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
callbackTask.put(device.getDeviceCode(), executor); callbackTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor(); executeResultHandler.waitFor();
@ -194,9 +194,10 @@ public class DeviceProxyService {
ScheduledFuture<?> schedule = trying(request); ScheduledFuture<?> schedule = trying(request);
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device); Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
try { try {
String zlmRtpUrl = requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(schedule, request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(schedule, request, device, key);
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
requestZlmPushStream(schedule, sendOkResponse, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor); downloadTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor(); executeResultHandler.waitFor();
@ -208,6 +209,10 @@ public class DeviceProxyService {
}; };
} }
private String getZlmRtmpUrl(String app, String streamId){
return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + app +"/" + streamId;
}
private ScheduledFuture<?> trying(SIPRequest request){ private ScheduledFuture<?> trying(SIPRequest request){
return scheduledExecutorService.scheduleAtFixedRate(() -> { return scheduledExecutorService.scheduleAtFixedRate(() -> {
InviteResponseBuilder inviteRequestBuilder = InviteResponseBuilder.builder().build(); InviteResponseBuilder inviteRequestBuilder = InviteResponseBuilder.builder().build();