ffmpeg 执行失败时 发送 bye 结束

This commit is contained in:
shikong 2024-01-23 08:55:00 +08:00
parent fbe45e25e1
commit 16ea4dc890

View File

@ -19,6 +19,7 @@ import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxyResp;
import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse;
import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtp;
import cn.skcks.docking.gb28181.media.dto.rtp.StartSendRtpResp;
import cn.skcks.docking.gb28181.media.dto.rtp.StopSendRtp;
import cn.skcks.docking.gb28181.media.dto.status.ResponseStatus;
import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
@ -136,9 +137,12 @@ public class DeviceProxyService {
throw new RuntimeException(e);
}
});
// zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
// sendBye(request,device,key);
// });
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{
StopSendRtp stopSendRtp = new StopSendRtp();
stopSendRtp.setApp(DEFAULT_ZLM_APP);
stopSendRtp.setStream(callId);
stopSendRtp.setSsrc(ssrc);
});
zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).put(callId,()->{
sendBye(request,device,key);
});
@ -376,6 +380,15 @@ public class DeviceProxyService {
}
});
zlmStreamChangeHookService.getUnregistHandler(ZLM_FFMPEG_PROXY_APP).put(callId, ()->{
StopSendRtp stopSendRtp = new StopSendRtp();
stopSendRtp.setApp(DEFAULT_ZLM_APP);
stopSendRtp.setStream(callId);
stopSendRtp.setSsrc(ssrc);
zlmMediaService.stopSendRtp(stopSendRtp);
});
Flow.Subscriber<SIPRequest> subscriber = zlmFfmpegByeSubscriber(key,request,device);
subscribe.getByeSubscribe().addPublisher(key);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
@ -441,7 +454,15 @@ public class DeviceProxyService {
}
});
// zlmStreamChangeHookService.getUnregistHandler().put(callId,()-> sendBye(request,device,key));
zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()-> {
StopSendRtp stopSendRtp = new StopSendRtp();
stopSendRtp.setApp(DEFAULT_ZLM_APP);
stopSendRtp.setStream(callId);
stopSendRtp.setSsrc(ssrc);
zlmMediaService.stopSendRtp(stopSendRtp);
});
Flow.Subscriber<SIPRequest> subscriber = zlmByeSubscriber(key,request,device);
subscribe.getByeSubscribe().addPublisher(key);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
@ -494,32 +515,34 @@ public class DeviceProxyService {
private final MockingDevice device;
private final String key;
private void close(){
CallIdHeader requestCallId = request.getCallId();
String callId = requestCallId.getCallId();
callbackTask.remove(callId);
Optional<ZlmStreamChangeHookService.ZlmStreamChangeHookHandler> optionalZlmStreamChangeHookHandler =
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId));
Optional<ZlmStreamNoneReaderHookService.ZlmStreamNoneReaderHookHandler> optionalZlmStreamNoneReaderHandler =
Optional.ofNullable(zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).remove(callId));
// 如果取消注册已完成就直接结束, 否则发送 bye请求 结束
if(optionalZlmStreamChangeHookHandler.isEmpty() && optionalZlmStreamNoneReaderHandler.isEmpty()){
return;
}
optionalZlmStreamChangeHookHandler.ifPresent(handler -> {
log.warn("流改变事件未结束 ZlmStreamChange {} {}, 强制结束", DEFAULT_ZLM_APP,callId);
});
optionalZlmStreamNoneReaderHandler.ifPresent(handler -> {
log.warn("流无人观看事件未结束 ZlmStreamNoneReader {} {}, 强制结束", DEFAULT_ZLM_APP, callId);
});
sendBye(request,device,key);
}
@SneakyThrows
private void mediaStatus(){
int num = taskNum.decrementAndGet();
log.info("当前任务数 {}", num);
// 等待zlm推流结束, 如果 ffmpeg 结束 3分钟内 未能推流完成就主动结束
scheduledExecutorService.schedule(()->{
CallIdHeader requestCallId = request.getCallId();
String callId = requestCallId.getCallId();
callbackTask.remove(callId);
Optional<ZlmStreamChangeHookService.ZlmStreamChangeHookHandler> optionalZlmStreamChangeHookHandler =
Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId));
Optional<ZlmStreamNoneReaderHookService.ZlmStreamNoneReaderHookHandler> optionalZlmStreamNoneReaderHandler =
Optional.ofNullable(zlmStreamNoneReaderHookService.getHandler(DEFAULT_ZLM_APP).remove(callId));
// 如果取消注册已完成就直接结束, 否则发送 bye请求 结束
if(optionalZlmStreamChangeHookHandler.isEmpty() && optionalZlmStreamNoneReaderHandler.isEmpty()){
return;
}
optionalZlmStreamChangeHookHandler.ifPresent(handler -> {
log.warn("流改变事件未结束 ZlmStreamChange {} {}, 强制结束", DEFAULT_ZLM_APP,callId);
});
optionalZlmStreamNoneReaderHandler.ifPresent(handler -> {
log.warn("流无人观看事件未结束 ZlmStreamNoneReader {} {}, 强制结束", DEFAULT_ZLM_APP, callId);
});
sendBye(request,device,key);
},3,TimeUnit.MINUTES);
scheduledExecutorService.schedule(this::close,5,TimeUnit.MINUTES);
}
public boolean hasResult() {
@ -542,7 +565,8 @@ public class DeviceProxyService {
@Override
public void onProcessFailed(ExecuteException e) {
hasResult = true;
mediaStatus();
log.error("ffmpeg 执行失败", e);
close();
}
}