推流结束 发送媒体通知

This commit is contained in:
shikong 2023-09-18 15:00:30 +08:00
parent dc80074447
commit 1ddacfeb88
3 changed files with 40 additions and 48 deletions

View File

@ -1,17 +1,13 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.invite.request; package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.invite.request;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParser; import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParser;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.notify.dto.MediaStatusRequestDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender; import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
@ -33,7 +29,6 @@ import org.springframework.stereotype.Component;
import javax.sdp.*; import javax.sdp.*;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.util.Arrays; import java.util.Arrays;
@ -247,17 +242,6 @@ public class InviteRequestProcessor implements MessageProcessor {
log.info("收到 ack 确认请求: {} 开始推流",key); log.info("收到 ack 确认请求: {} 开始推流",key);
// RTP 推流 // RTP 推流
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, deviceProxyService.playbackTask()); deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, deviceProxyService.playbackTask());
log.info("{} 推流结束, 发送媒体通知", key);
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder()
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.deviceId(device.getGbChannelId())
.build();
String tag = request.getFromHeader().getTag();
CallIdHeader requestCallId = request.getCallId();
sender.sendRequest(((provider, ip, port1) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
onComplete(); onComplete();
} }
@ -288,16 +272,6 @@ public class InviteRequestProcessor implements MessageProcessor {
// RTP 推流 // RTP 推流
deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, deviceProxyService.downloadTask()); deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port, deviceProxyService.downloadTask());
log.info("{} 推流结束, 发送媒体通知", key);
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder()
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.deviceId(device.getGbChannelId())
.build();
String tag = request.getFromHeader().getTag();
CallIdHeader requestCallId = request.getCallId();
sender.sendRequest(((provider, ip, port1) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
onComplete(); onComplete();
} }

View File

@ -21,6 +21,8 @@ import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.exec.ExecuteException;
import org.apache.commons.exec.ExecuteResultHandler;
import org.apache.commons.exec.Executor; import org.apache.commons.exec.Executor;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -60,16 +62,6 @@ public class DeviceProxyService {
return (SIPRequest request,String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time) -> { return (SIPRequest request,String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time) -> {
Optional.ofNullable(callbackTask.get(device.getDeviceCode())).ifPresent(task->{ Optional.ofNullable(callbackTask.get(device.getDeviceCode())).ifPresent(task->{
task.getWatchdog().destroyProcess(); task.getWatchdog().destroyProcess();
log.info("{} 推流结束, 发送媒体通知", key);
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder()
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.deviceId(device.getGbChannelId())
.build();
String tag = request.getFromHeader().getTag();
CallIdHeader requestCallId = request.getCallId();
sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
}); });
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, callbackTask); Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, callbackTask);
subscribe.getByeSubscribe().addSubscribe(key, subscriber); subscribe.getByeSubscribe().addSubscribe(key, subscriber);
@ -95,7 +87,7 @@ public class DeviceProxyService {
}); });
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, downloadTask); Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, downloadTask);
subscribe.getByeSubscribe().addSubscribe(key, subscriber); subscribe.getByeSubscribe().addSubscribe(key, subscriber);
downloadTask.put(device.getDeviceCode(), pushDownload2RtpTask( fromUrl, toUrl, time + 60)); downloadTask.put(device.getDeviceCode(), pushDownload2RtpTask( fromUrl, toUrl, time + 60, mediaStatus(request,device,key)));
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
}; };
} }
@ -153,12 +145,39 @@ public class DeviceProxyService {
} }
@SneakyThrows @SneakyThrows
public Executor pushRtpTask(String fromUrl, String toUrl, long time){ public Executor pushRtpTask(String fromUrl, String toUrl, long time, ExecuteResultHandler resultHandler) {
return ffmpegSupportService.pushToRtp(fromUrl, toUrl, time, TimeUnit.SECONDS); return ffmpegSupportService.pushToRtp(fromUrl, toUrl, time, TimeUnit.SECONDS, resultHandler);
} }
@SneakyThrows @SneakyThrows
public Executor pushDownload2RtpTask(String fromUrl, String toUrl, long time){ public Executor pushDownload2RtpTask(String fromUrl, String toUrl, long time, ExecuteResultHandler resultHandler) {
return ffmpegSupportService.pushDownload2Rtp(fromUrl, toUrl, time, TimeUnit.SECONDS); return ffmpegSupportService.pushDownload2Rtp(fromUrl, toUrl, time, TimeUnit.SECONDS, resultHandler);
}
public ExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){
return new ExecuteResultHandler() {
private void mediaStatus(){
log.info("{} 推流结束, 发送媒体通知", key);
MediaStatusRequestDTO mediaStatusRequestDTO = MediaStatusRequestDTO.builder()
.sn(String.valueOf((int) ((Math.random() * 9 + 1) * 100000)))
.deviceId(device.getGbChannelId())
.build();
String tag = request.getFromHeader().getTag();
CallIdHeader requestCallId = request.getCallId();
sender.sendRequest(((provider, ip, port) -> SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(mediaStatusRequestDTO), SipUtil.generateViaTag(), tag, requestCallId)));
}
@Override
public void onProcessComplete(int exitValue) {
mediaStatus();
}
@Override
public void onProcessFailed(ExecuteException e) {
mediaStatus();
}
};
} }
} }

View File

@ -17,7 +17,7 @@ public class FfmpegSupportService {
private final FfmpegConfig ffmpegConfig; private final FfmpegConfig ffmpegConfig;
@SneakyThrows @SneakyThrows
public Executor pushToRtp(String input, String output, long time, TimeUnit unit){ public Executor pushToRtp(String input, String output, long time, TimeUnit unit,ExecuteResultHandler resultHandler){
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp(); FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
String inputParam = StringUtils.joinWith(" ", rtp.getInput(), input); String inputParam = StringUtils.joinWith(" ", rtp.getInput(), input);
log.info("视频输入参数 {}", inputParam); log.info("视频输入参数 {}", inputParam);
@ -25,11 +25,11 @@ public class FfmpegSupportService {
String outputParam = StringUtils.joinWith(" ", rtp.getOutput(), output); String outputParam = StringUtils.joinWith(" ", rtp.getOutput(), output);
log.info("视频输出参数 {}", outputParam); log.info("视频输出参数 {}", outputParam);
return ffmpegExecutor(inputParam, outputParam, time, unit); return ffmpegExecutor(inputParam, outputParam, time, unit, resultHandler);
} }
@SneakyThrows @SneakyThrows
public Executor pushDownload2Rtp(String input, String output, long time, TimeUnit unit){ public Executor pushDownload2Rtp(String input, String output, long time, TimeUnit unit, ExecuteResultHandler resultHandler){
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp(); FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
String inputParam = StringUtils.joinWith(" ", rtp.getDownload(), input); String inputParam = StringUtils.joinWith(" ", rtp.getDownload(), input);
log.info("视频下载参数 {}", inputParam); log.info("视频下载参数 {}", inputParam);
@ -37,16 +37,15 @@ public class FfmpegSupportService {
String outputParam = StringUtils.joinWith(" ", rtp.getOutput(), output); String outputParam = StringUtils.joinWith(" ", rtp.getOutput(), output);
log.info("视频输出参数 {}", outputParam); log.info("视频输出参数 {}", outputParam);
return ffmpegExecutor(inputParam, outputParam, time, unit); return ffmpegExecutor(inputParam, outputParam, time, unit, resultHandler);
} }
@SneakyThrows @SneakyThrows
public Executor ffmpegExecutor(String inputParam,String outputParam, long time, TimeUnit unit){ public Executor ffmpegExecutor(String inputParam,String outputParam, long time, TimeUnit unit,ExecuteResultHandler resultHandler){
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp(); FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
String logLevelParam = StringUtils.joinWith(" ","-loglevel", rtp.getLogLevel()); String logLevelParam = StringUtils.joinWith(" ","-loglevel", rtp.getLogLevel());
String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), inputParam, outputParam, logLevelParam); String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), inputParam, outputParam, logLevelParam);
CommandLine commandLine = CommandLine.parse(command); CommandLine commandLine = CommandLine.parse(command);
DefaultExecuteResultHandler resultHandler = new DefaultExecuteResultHandler();
Executor executor = new DefaultExecutor(); Executor executor = new DefaultExecutor();
ExecuteWatchdog watchdog = new ExecuteWatchdog(unit.toMillis(time)); ExecuteWatchdog watchdog = new ExecuteWatchdog(unit.toMillis(time));
executor.setExitValue(0); executor.setExitValue(0);