ffmpeg 推流结束后 等待30秒

若30秒内 zlm推流完毕 就发送bye
若未能收到zlm推流完毕事件(超时) 再主动发送bye
This commit is contained in:
shikong 2023-10-08 09:34:42 +08:00
parent fbd51a2b91
commit e5620f237b
3 changed files with 72 additions and 44 deletions

View File

@ -21,6 +21,6 @@ public class ZlmHookApi {
@PostJson("/on_stream_changed") @PostJson("/on_stream_changed")
public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){ public void onStreamChanged(@RequestBody ZlmStreamChangeDTO dto){
zlmStreamChangeHookService.processEvent(dto.getStream(), dto.getRegist()); zlmStreamChangeHookService.processEvent(dto.getStream(),dto.getStream(), dto.getRegist());
} }
} }

View File

@ -92,7 +92,7 @@ public class DeviceProxyService {
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
zlmStreamChangeHookService.handlerMap.put(callId,()->{ zlmStreamChangeHookService.getRegistHandler().put(callId,()->{
StartSendRtp startSendRtp = new StartSendRtp(); StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("live"); startSendRtp.setApp("live");
startSendRtp.setStream(callId); startSendRtp.setStream(callId);
@ -104,6 +104,9 @@ public class DeviceProxyService {
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp); log.info("startSendRtpResp {}",startSendRtpResp);
}); });
zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{
sendBye(request,device,key);
});
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId;
Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushRtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
@ -129,7 +132,7 @@ public class DeviceProxyService {
GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse();
MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0);
boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP");
zlmStreamChangeHookService.handlerMap.put(callId,()->{ zlmStreamChangeHookService.getRegistHandler().put(callId,()->{
StartSendRtp startSendRtp = new StartSendRtp(); StartSendRtp startSendRtp = new StartSendRtp();
startSendRtp.setApp("live"); startSendRtp.setApp("live");
startSendRtp.setStream(callId); startSendRtp.setStream(callId);
@ -141,6 +144,9 @@ public class DeviceProxyService {
StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp);
log.info("startSendRtpResp {}",startSendRtpResp); log.info("startSendRtpResp {}",startSendRtpResp);
}); });
zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{
sendBye(request,device,key);
});
FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key); FfmpegExecuteResultHandler executeResultHandler = mediaStatus(request, device, key);
String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; String zlmRtpUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId;
Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler); Executor executor = pushDownload2RtpTask(fromUrl, zlmRtpUrl, time + 60, executeResultHandler);
@ -216,7 +222,7 @@ public class DeviceProxyService {
} }
@RequiredArgsConstructor @RequiredArgsConstructor
public static class FfmpegExecuteResultHandler implements ExecuteResultHandler { public class FfmpegExecuteResultHandler implements ExecuteResultHandler {
private final static long SLEEP_TIME_MS = 50; private final static long SLEEP_TIME_MS = 50;
@Setter(AccessLevel.PRIVATE) @Setter(AccessLevel.PRIVATE)
private boolean hasResult = false; private boolean hasResult = false;
@ -224,39 +230,23 @@ public class DeviceProxyService {
private final SIPRequest request; private final SIPRequest request;
private final MockingDevice device; private final MockingDevice device;
private final String key; private final String key;
private final SipSender sender;
@SneakyThrows
private void mediaStatus(){ private void mediaStatus(){
int num = taskNum.decrementAndGet(); int num = taskNum.decrementAndGet();
log.info("当前任务数 {}", num); log.info("当前任务数 {}", num);
// 等待zlm推流结束, 如果 ffmpeg 结束 30秒 未能推流完成就主动结束
Thread.sleep(30 * 1000);
CallIdHeader requestCallId = request.getCallId(); CallIdHeader requestCallId = request.getCallId();
String callId = requestCallId.getCallId(); String callId = requestCallId.getCallId();
callbackTask.remove(callId); callbackTask.remove(callId);
log.info("{} 推流结束, 发送媒体通知", key); Optional<ZlmStreamChangeHookService.ZlmStreamChangeHookHandler> optionalZlmStreamChangeHookHandler =
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder() Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId));
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000))) // 如果取消注册已完成就直接结束, 否则发送 bye请求 结束
.deviceId(device.getGbChannelId()) if(optionalZlmStreamChangeHookHandler.isEmpty()){
.build(); return;
String tag = request.getFromHeader().getTag();
sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
String ip = request.getLocalAddress().getHostAddress();
SipURI targetUri = (SipURI) request.getFromHeader().getAddress().getURI();
String targetId = targetUri.getUser();
String targetIp = request.getRemoteAddress().getHostAddress();
int targetPort = request.getTopmostViaHeader().getPort();
String transport = request.getTopmostViaHeader().getTransport();
long seqNumber = request.getCSeq().getSeqNumber() + 1;
SipProvider provider = sender.getProvider(transport, ip);
CallIdHeader newCallId = request.getCallId();
Request byeRequest = SipRequestBuilder.createByeRequest(targetIp, targetPort, seqNumber, targetId, SipUtil.generateFromTag(), null, newCallId.getCallId());
try{
provider.sendRequest(byeRequest);
}catch (Exception e){
log.error("bye 请求发送失败 {}",e.getMessage());
} }
sendBye(request,device,key);
} }
public boolean hasResult() { public boolean hasResult() {
@ -284,7 +274,7 @@ public class DeviceProxyService {
} }
public FfmpegExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){ public FfmpegExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){
return new FfmpegExecuteResultHandler(request,device,key,sender); return new FfmpegExecuteResultHandler(request,device,key);
} }
/** /**
@ -295,4 +285,33 @@ public class DeviceProxyService {
callbackTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess()); callbackTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess());
downloadTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess()); downloadTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess());
} }
private void sendBye(SIPRequest request, MockingDevice device, String key){
CallIdHeader requestCallId = request.getCallId();
log.info("{} 推流结束, 发送媒体通知", key);
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder()
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.deviceId(device.getGbChannelId())
.build();
String tag = request.getFromHeader().getTag();
sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
String ip = request.getLocalAddress().getHostAddress();
SipURI targetUri = (SipURI) request.getFromHeader().getAddress().getURI();
String targetId = targetUri.getUser();
String targetIp = request.getRemoteAddress().getHostAddress();
int targetPort = request.getTopmostViaHeader().getPort();
String transport = request.getTopmostViaHeader().getTransport();
long seqNumber = request.getCSeq().getSeqNumber() + 1;
SipProvider provider = sender.getProvider(transport, ip);
CallIdHeader newCallId = request.getCallId();
Request byeRequest = SipRequestBuilder.createByeRequest(targetIp, targetPort, seqNumber, targetId, SipUtil.generateFromTag(), null, newCallId.getCallId());
try{
provider.sendRequest(byeRequest);
}catch (Exception e){
log.error("bye 请求发送失败 {}",e.getMessage());
}
}
} }

View File

@ -20,15 +20,23 @@ public class ZlmStreamChangeHookService {
void handler(); void handler();
} }
public ConcurrentMap<String, ZlmStreamChangeHookHandler> handlerMap = new ConcurrentHashMap<>(); public ConcurrentMap<String, ZlmStreamChangeHookHandler> registHandler = new ConcurrentHashMap<>();
public ConcurrentMap<String, ZlmStreamChangeHookHandler> unregistHandler = new ConcurrentHashMap<>();
public void processEvent(String streamId, Boolean regist){ public void processEvent(String stream,String streamId, Boolean regist){
log.debug("stream {}, regist {}", streamId, regist); log.debug("stream {}, streamId {}, regist {}", stream,streamId, regist);
if(!regist){
return; if(regist){
Optional.ofNullable(registHandler.remove(streamId)).ifPresent((handler)->{
try {
Thread.sleep(zlmHookConfig.getDelay().toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
handler.handler();
Optional.ofNullable(handlerMap.remove(streamId)).ifPresent((handler)->{ });
} else {
Optional.ofNullable(unregistHandler.remove(streamId)).ifPresent((handler)->{
try { try {
Thread.sleep(zlmHookConfig.getDelay().toMillis()); Thread.sleep(zlmHookConfig.getDelay().toMillis());
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -38,3 +46,4 @@ public class ZlmStreamChangeHookService {
}); });
} }
} }
}