DeviceProxyService 调整

This commit is contained in:
shikong 2024-03-14 10:53:57 +08:00
parent 28dfe97a5a
commit 39af43f7aa

View File

@ -198,44 +198,48 @@ public class DeviceProxyService {
public TaskProcessor playbackTask(){ public TaskProcessor playbackTask(){
return (Runnable sendOkResponse, SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> { return (Runnable sendOkResponse, SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc) -> {
scheduledExecutorService.schedule(() -> { try {
trying(request); TimeUnit.SECONDS.sleep(1);
sendOkResponse.run(); } catch (InterruptedException e) {
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId); log.error("{}", e.getMessage());
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES); }
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1);
}
@Override String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
public void onNext(SIPRequest item) { subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
subscribe.getAckSubscribe().delPublisher(ackKey); subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
} @Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1);
}
@Override @Override
public void onError(Throwable throwable) { public void onNext(SIPRequest item) {
} subscribe.getAckSubscribe().delPublisher(ackKey);
}
@Override @Override
public void onComplete() { public void onError(Throwable throwable) {
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device); }
try {
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); @Override
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); public void onComplete() {
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); Flow.Subscriber<SIPRequest> task = ffmpegTask(request, callbackTask, callId, key, device);
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); try {
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
callbackTask.put(device.getDeviceCode(), executor); requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
executeResultHandler.waitFor(); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
} catch (Exception e) { Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
sendBye(request,device,""); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
log.error("{}", e.getMessage()); callbackTask.put(device.getDeviceCode(), executor);
} executeResultHandler.waitFor();
} catch (Exception e) {
sendBye(request, device, "");
log.error("{}", e.getMessage());
} }
}); }
}, 1, TimeUnit.SECONDS); });
trying(request);
sendOkResponse.run();
}; };
} }
@ -247,57 +251,56 @@ public class DeviceProxyService {
log.error("{}", e.getMessage()); log.error("{}", e.getMessage());
} }
trying(request); String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
sendOkResponse.run(); subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId); subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES); @Override
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() { public void onSubscribe(Flow.Subscription subscription) {
@Override subscription.request(1);
public void onSubscribe(Flow.Subscription subscription) { }
subscription.request(1);
}
@Override @Override
public void onNext(SIPRequest item) { public void onNext(SIPRequest item) {
subscribe.getAckSubscribe().delPublisher(ackKey); subscribe.getAckSubscribe().delPublisher(ackKey);
} }
@Override @Override
public void onError(Throwable throwable) { public void onError(Throwable throwable) {
} }
@Override @Override
public void onComplete() { public void onComplete() {
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device); Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
try { try {
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
if(!ffmpegConfig.getRtp().getUseRtpToDownload()){ if (!ffmpegConfig.getRtp().getUseRtpToDownload()) {
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
scheduledExecutorService.submit(()->{ scheduledExecutorService.submit(() -> {
try { try {
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor); downloadTask.put(device.getDeviceCode(), executor);
} else { } else {
String rtpUrl = getRtpUrl(request); String rtpUrl = getRtpUrl(request);
Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time + 60, executeResultHandler); Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor); downloadTask.put(device.getDeviceCode(), executor);
}
executeResultHandler.waitFor();
} catch (Exception e) {
sendBye(request, device, "");
log.error("{}", e.getMessage());
} }
executeResultHandler.waitFor();
} catch (Exception e) {
sendBye(request, device, "");
log.error("{}", e.getMessage());
} }
}); }
});
trying(request);
sendOkResponse.run();
}; };
} }