DeviceProxyService 调整

This commit is contained in:
shikong 2024-03-14 11:18:23 +08:00
parent d64bba2c9c
commit 723ad67df9

View File

@ -250,9 +250,15 @@ public class DeviceProxyService {
public TaskProcessor downloadTask(){ public TaskProcessor downloadTask(){
return (Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{ return (Runnable sendOkResponse,SIPRequest request,String callId,String fromUrl, String toAddr,int toPort, MockingDevice device, String key, long time,String ssrc)->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
log.error("{}", e.getMessage());
}
String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId); String ackKey = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES); subscribe.getAckSubscribe().addPublisher(ackKey, 1, TimeUnit.MINUTES);
subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() { subscribe.getAckSubscribe().addSubscribe(ackKey, new Flow.Subscriber<>() {
private SIPRequest ackRequest;
@Override @Override
public void onSubscribe(Flow.Subscription subscription) { public void onSubscribe(Flow.Subscription subscription) {
subscription.request(1); subscription.request(1);
@ -260,6 +266,7 @@ public class DeviceProxyService {
@Override @Override
public void onNext(SIPRequest item) { public void onNext(SIPRequest item) {
ackRequest = item;
subscribe.getAckSubscribe().delPublisher(ackKey); subscribe.getAckSubscribe().delPublisher(ackKey);
} }
@ -271,27 +278,29 @@ public class DeviceProxyService {
public void onComplete() { public void onComplete() {
Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device); Flow.Subscriber<SIPRequest> task = ffmpegTask(request, downloadTask, callId, key, device);
try { try {
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); if(ackRequest != null){
if (!ffmpegConfig.getRtp().getUseRtpToDownload()) { FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId); if (!ffmpegConfig.getRtp().getUseRtpToDownload()) {
executor.execute(()->{ String zlmRtpUrl = getZlmRtmpUrl(DEFAULT_ZLM_APP, callId);
try { executor.execute(()->{
requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc); try {
} catch (Exception e) { requestZlmPushStream(request, callId, fromUrl, toAddr, toPort, device, key, time, ssrc);
throw new RuntimeException(e); } catch (Exception e) {
} throw new RuntimeException(e);
}); }
});
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor); downloadTask.put(device.getDeviceCode(), executor);
} else { } else {
String rtpUrl = getRtpUrl(request); String rtpUrl = getRtpUrl(request);
Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time + 60, executeResultHandler); Executor executor = pushDownload2RtpTask(fromUrl, rtpUrl, time + 60, executeResultHandler);
scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(task::onComplete, time + 60, TimeUnit.SECONDS);
downloadTask.put(device.getDeviceCode(), executor); downloadTask.put(device.getDeviceCode(), executor);
}
executeResultHandler.waitFor();
} }
executeResultHandler.waitFor();
} catch (Exception e) { } catch (Exception e) {
sendBye(request, device, ""); sendBye(request, device, "");
log.error("{}", e.getMessage()); log.error("{}", e.getMessage());