整合冗余代码

This commit is contained in:
shikong 2024-01-12 09:46:30 +08:00
parent 9b7f05ca34
commit 588d321fbe

View File

@ -126,22 +126,26 @@ public class DeviceProxyService {
return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId;
} }
public TaskProcessor playbackTask(){ private Flow.Subscriber<SIPRequest> task(ConcurrentHashMap<String, Executor> tasks, String callId, String key, MockingDevice device){
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> { Optional.ofNullable(tasks.get(callId)).ifPresent(task->{
Optional.ofNullable(callbackTask.get(callId)).ifPresent(task->{
task.getWatchdog().destroyProcess(); task.getWatchdog().destroyProcess();
}); });
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, callbackTask); Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, tasks);
subscribe.getByeSubscribe().addSubscribe(key, subscriber); subscribe.getByeSubscribe().addSubscribe(key, subscriber);
int num = taskNum.incrementAndGet(); int num = taskNum.incrementAndGet();
log.info("当前任务数 {}", num); log.info("当前任务数 {}", num);
return subscriber;
}
public TaskProcessor playbackTask(){
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
Flow.Subscriber<SIPRequest> task = task(callbackTask, callId, key, device);
ScheduledFuture<?> schedule = trying(request); ScheduledFuture<?> schedule = trying(request);
try { try {
String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
callbackTask.put(device.getDeviceCode(), executor); callbackTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor(); executeResultHandler.waitFor();
} catch (Exception e) { } catch (Exception e) {
@ -153,19 +157,13 @@ public class DeviceProxyService {
public TaskProcessor downloadTask(){ public TaskProcessor downloadTask(){
return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{ return (SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
Optional.ofNullable(downloadTask.get(callId)).ifPresent(task->{ Flow.Subscriber<SIPRequest> task = task(downloadTask, callId, key, device);
task.getWatchdog().destroyProcess();
});
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, downloadTask);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
int num = taskNum.incrementAndGet();
log.info("当前任务数 {}", num);
ScheduledFuture<?> schedule = trying(request); ScheduledFuture<?> schedule = trying(request);
try { try {
String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); String zlmRtpUrl = requestZlmPushStream(schedule, request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor); downloadTask.put(device.getDeviceCode(), executor);
executeResultHandler.waitFor(); executeResultHandler.waitFor();
} catch (Exception e) { } catch (Exception e) {