支持 Download 请求 (即 Download 使用最大速率推流)

This commit is contained in:
shikong 2023-09-16 21:09:58 +08:00
parent f835242b52
commit 9cad000b41
5 changed files with 140 additions and 38 deletions

View File

@ -15,8 +15,9 @@ public class FfmpegConfig {
@Data
public static class Rtp {
private String input;
private String output;
private String download = "-i";
private String input = "-re -i";
private String output = "-vcodec h264 -acodec aac -f rtp_mpegts";
private String logLevel = "fatal";
}
}

View File

@ -101,6 +101,7 @@ public class InviteRequestProcessor implements MessageProcessor {
}
} else if (StringUtils.equalsIgnoreCase(type, "Download")) {
log.info("下载请求");
download(request, device, gb28181Description, (MediaDescription) item);
} else {
log.error("未知请求类型: {}", type);
sender.sendResponse(senderIp, transport, unsupported(request));
@ -153,6 +154,18 @@ public class InviteRequestProcessor implements MessageProcessor {
@SneakyThrows
private void playback(SIPRequest request, MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription, TimeField time) {
playback(request, device, gb28181Description, mediaDescription, time, false);
}
@SneakyThrows
private void download(SIPRequest request, MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription) {
TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) gb28181Description.getTimeDescriptions(true).get(0);
TimeField time = (TimeField) timeDescription.getTime();
playback(request, device, gb28181Description, mediaDescription, time, true);
}
@SneakyThrows
private void playback(SIPRequest request, MockingDevice device, GB28181Description gb28181Description, MediaDescription mediaDescription, TimeField time, boolean isDownload) {
Date start = new Date(time.getStartTime() * 1000);
Date stop = new Date(time.getStopTime() * 1000);
log.info("{} ~ {}", start, stop);
@ -199,7 +212,25 @@ public class InviteRequestProcessor implements MessageProcessor {
subscribe.getAckSubscribe().addPublisher(key);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
final ScheduledFuture<?>[] schedule = new ScheduledFuture<?>[1];
Flow.Subscriber<SIPRequest> subscriber = new Flow.Subscriber<>() {
Flow.Subscriber<SIPRequest> subscriber;
if(!isDownload){
subscriber = placbackSubscriber(callId,device,start,stop,address,port,key,schedule);
} else {
subscriber = downloadSubscriber(callId,device,start,stop,address,port,key,schedule);
}
// 60秒超时计时器
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS);
// 推流 ack 事件订阅
subscribe.getAckSubscribe().addSubscribe(key, subscriber);
scheduledExecutorService.schedule(()->{
// 发送 sdp 响应
sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.responseSdp(request, description));
}, 1,TimeUnit.SECONDS);
}
public Flow.Subscriber<SIPRequest> placbackSubscriber(String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture<?>[] scheduledFuture){
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("创建 ack 订阅 {}", key);
@ -210,7 +241,7 @@ public class InviteRequestProcessor implements MessageProcessor {
public void onNext(SIPRequest item) {
log.info("收到 ack 确认请求: {} 开始推流",key);
// RTP 推流
deviceProxyService.proxyVideo2Rtp(callId, device, start, stop, address, port);
deviceProxyService.proxyVideo2Rtp(callId, device, start, stop, address, port, deviceProxyService.playbackTask());
onComplete();
}
@ -222,17 +253,37 @@ public class InviteRequestProcessor implements MessageProcessor {
@Override
public void onComplete() {
subscribe.getAckSubscribe().delPublisher(key);
schedule[0].cancel(true);
scheduledFuture[0].cancel(true);
}
};
// 60秒超时计时器
schedule[0] = scheduledExecutorService.schedule(subscriber::onComplete, 60 , TimeUnit.SECONDS);
// 推流 ack 事件订阅
subscribe.getAckSubscribe().addSubscribe(key, subscriber);
}
scheduledExecutorService.schedule(()->{
// 发送 sdp 响应
sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.responseSdp(request, description));
}, 1,TimeUnit.SECONDS);
public Flow.Subscriber<SIPRequest> downloadSubscriber(String callId,MockingDevice device,Date start,Date stop,String address,int port,String key,ScheduledFuture<?>[] scheduledFuture){
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("创建 ack 订阅 {}", key);
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
log.info("收到 ack 确认请求: {} 开始推流",key);
// RTP 推流
deviceProxyService.proxyVideo2Rtp(callId, device, start, stop, address, port, deviceProxyService.downloadTask());
onComplete();
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
subscribe.getAckSubscribe().delPublisher(key);
scheduledFuture[0].cancel(true);
}
};
}
}

View File

@ -35,32 +35,45 @@ public class DeviceProxyService {
private final SipSubscribe subscribe;
private final ConcurrentHashMap<String, Executor> task = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Executor> callbackTask = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Executor> downloadTask = new ConcurrentHashMap<>();
private final SipSender sender;
private final FfmpegSupportService ffmpegSupportService;
public synchronized void proxyVideo2Rtp(String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort) {
Optional.ofNullable(task.get(device.getDeviceCode())).ifPresent(task->{
task.getWatchdog().destroyProcess();
});
String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video");
HashMap<String, String> map = new HashMap<>(3);
String deviceCode = device.getDeviceCode();
map.put("device_id", deviceCode);
map.put("begin_time", DateUtil.format(startTime, DatePattern.PURE_DATETIME_FORMAT));
map.put("end_time", DateUtil.format(endTime, DatePattern.PURE_DATETIME_FORMAT));
String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8);
fromUrl = StringUtils.joinWith("?", fromUrl, query);
log.info("设备: {} 视频 url: {}", deviceCode, fromUrl);
String toUrl = StringUtils.joinWith("", "rtp://", rtpAddr, ":", rtpPort);
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
subscribe.getByeSubscribe().addPublisher(key);
Flow.Subscriber<SIPRequest> subscriber = new Flow.Subscriber<>() {
public interface TaskProcessor {
void process(String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time);
}
public TaskProcessor playbackTask(){
return (String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time) -> {
Optional.ofNullable(callbackTask.get(device.getDeviceCode())).ifPresent(task->{
task.getWatchdog().destroyProcess();
});
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, callbackTask);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
callbackTask.put(device.getDeviceCode(), pushRtpTask( fromUrl, toUrl, time + 60));
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
};
}
public TaskProcessor downloadTask(){
return (String callId,String fromUrl, String toUrl, MockingDevice device, String key, long time)->{
Optional.ofNullable(downloadTask.get(device.getDeviceCode())).ifPresent(task->{
task.getWatchdog().destroyProcess();
});
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, downloadTask);
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
downloadTask.put(device.getDeviceCode(), pushDownload2RtpTask( fromUrl, toUrl, time + 60));
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
};
}
public Flow.Subscriber<SIPRequest> byeSubscriber(String key, MockingDevice device, ConcurrentHashMap<String, Executor> task){
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("订阅 bye {}", key);
@ -88,17 +101,36 @@ public class DeviceProxyService {
Optional.ofNullable(task.get(device.getDeviceCode())).ifPresent(task -> {
task.getWatchdog().destroyProcess();
});
task.remove(device.getDeviceCode());
downloadTask.remove(device.getDeviceCode());
}
};
subscribe.getByeSubscribe().addSubscribe(key, subscriber);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
task.put(device.getDeviceCode(), pushRtpTask( fromUrl, toUrl, time + 60));
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
}
public synchronized void proxyVideo2Rtp(String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, TaskProcessor taskProcessor) {
String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video");
HashMap<String, String> map = new HashMap<>(3);
String deviceCode = device.getDeviceCode();
map.put("device_id", deviceCode);
map.put("begin_time", DateUtil.format(startTime, DatePattern.PURE_DATETIME_FORMAT));
map.put("end_time", DateUtil.format(endTime, DatePattern.PURE_DATETIME_FORMAT));
String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8);
fromUrl = StringUtils.joinWith("?", fromUrl, query);
log.info("设备: {} 视频 url: {}", deviceCode, fromUrl);
String toUrl = StringUtils.joinWith("", "rtp://", rtpAddr, ":", rtpPort);
long time = DateUtil.between(startTime, endTime, DateUnit.SECOND);
String key = GenericSubscribe.Helper.getKey(Request.BYE, callId);
subscribe.getByeSubscribe().addPublisher(key);
taskProcessor.process(callId,fromUrl,toUrl,device,key,time);
}
@SneakyThrows
public Executor pushRtpTask(String fromUrl, String toUrl, long time){
return ffmpegSupportService.pushToRtp(fromUrl, toUrl, time, TimeUnit.SECONDS);
}
@SneakyThrows
public Executor pushDownload2RtpTask(String fromUrl, String toUrl, long time){
return ffmpegSupportService.pushDownload2Rtp(fromUrl, toUrl, time, TimeUnit.SECONDS);
}
}

View File

@ -25,6 +25,24 @@ public class FfmpegSupportService {
String outputParam = StringUtils.joinWith(" ", rtp.getOutput(), output);
log.info("视频输出参数 {}", outputParam);
return ffmpegExecutor(inputParam, outputParam, time, unit);
}
@SneakyThrows
public Executor pushDownload2Rtp(String input, String output, long time, TimeUnit unit){
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
String inputParam = StringUtils.joinWith(" ", rtp.getDownload(), input);
log.info("视频下载参数 {}", inputParam);
String outputParam = StringUtils.joinWith(" ", rtp.getOutput(), output);
log.info("视频输出参数 {}", outputParam);
return ffmpegExecutor(inputParam, outputParam, time, unit);
}
@SneakyThrows
public Executor ffmpegExecutor(String inputParam,String outputParam, long time, TimeUnit unit){
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
String logLevelParam = StringUtils.joinWith(" ","-loglevel", rtp.getLogLevel());
String command = StringUtils.joinWith(" ", ffmpegConfig.getFfmpeg(), inputParam, outputParam, logLevelParam);
CommandLine commandLine = CommandLine.parse(command);

View File

@ -78,4 +78,4 @@ ffmpeg-support:
ffprobe: D:\Soft\Captura\ffmpeg\ffprobe.exe
rtp:
input: -re -i
output: -vcodec copy -acodec aac -f rtp_mpegts
output: -vcodec h264 -acodec aac -f rtp_mpegts