整合冗余代码

This commit is contained in:
shikong 2024-01-11 17:13:14 +08:00
parent 2d84b17fe5
commit 9b7f05ca34

View File

@ -84,6 +84,48 @@ public class DeviceProxyService {
void process(SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc); void process(SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc);
} }
private String requestZlmPushStream(ScheduledFuture<?> schedule, SIPRequest request, String callId, String fromUrl, String toAddr, int toPort, MockingDevice device, String key, long time, String ssrc) throws Exception{
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
zlmStreamChangeHookService.getRegistHandler().put(callId,()->{
schedule.cancel(false);
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
.retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0)
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
try {
retryer.call(()->{
StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("live");
startSendRtp.setStream(callId);
startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(toAddr);
startSendRtp.setDstPort(toPort);
startSendRtp.setUdp(!tcp);
log.info("startSendRtp {}",startSendRtp);
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp);
return startSendRtpResp;
});
} catch (Exception e) {
schedule.cancel(true);
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId))
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
throw new RuntimeException(e);
}
});
zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{
sendBye(request,device,key);
});
return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId;
}
public TaskProcessor playbackTask(){ public TaskProcessor playbackTask(){
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> { return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
Optional.ofNullable(callbackTask.get(callId)).ifPresent(task->{ Optional.ofNullable(callbackTask.get(callId)).ifPresent(task->{
@ -96,46 +138,8 @@ public class DeviceProxyService {
ScheduledFuture<?> schedule = trying(request); ScheduledFuture<?> schedule = trying(request);
try { try {
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
zlmStreamChangeHookService.getRegistHandler().put(callId,()->{
schedule.cancel(false);
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
.retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0)
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
try {
retryer.call(()->{
StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("live");
startSendRtp.setStream(callId);
startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(toAddr);
startSendRtp.setDstPort(toPort);
startSendRtp.setUdp(!tcp);
log.info("startSendRtp {}",startSendRtp);
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp);
return startSendRtpResp;
});
} catch (Exception e) {
schedule.cancel(true);
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId))
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
throw new RuntimeException(e);
}
});
zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{
sendBye(request,device,key);
});
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId;
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
callbackTask.put(device.getDeviceCode(), executor); callbackTask.put(device.getDeviceCode(), executor);
@ -158,46 +162,8 @@ public class DeviceProxyService {
log.info("当前任务数 {}", num); log.info("当前任务数 {}", num);
ScheduledFuture<?> schedule = trying(request); ScheduledFuture<?> schedule = trying(request);
try { try {
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
zlmStreamChangeHookService.getRegistHandler().put(callId,()->{
schedule.cancel(false);
Retryer<StartSendRtpResp> retryer = RetryerBuilder.<StartSendRtpResp>newBuilder()
.retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0)
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();
try {
retryer.call(()->{
StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("live");
startSendRtp.setStream(callId);
startSendRtp.setSsrc(ssrc);
startSendRtp.setDstUrl(toAddr);
startSendRtp.setDstPort(toPort);
startSendRtp.setUdp(!tcp);
log.info("startSendRtp {}",startSendRtp);
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp);
return startSendRtpResp;
});
} catch (Exception e) {
schedule.cancel(true);
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId))
.ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler);
throw new RuntimeException(e);
}
});
zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{
sendBye(request,device,key);
});
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId;
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor); downloadTask.put(device.getDeviceCode(), executor);