同时推流任务数限制

input/output debug选项
This commit is contained in:
shikong 2023-09-19 08:50:10 +08:00
parent dca238eb4e
commit 8ad7c5a15f
7 changed files with 79 additions and 9 deletions

View File

@ -20,4 +20,21 @@ public class FfmpegConfig {
private String output = "-vcodec h264 -acodec aac -f rtp_mpegts"; private String output = "-vcodec h264 -acodec aac -f rtp_mpegts";
private String logLevel = "fatal"; private String logLevel = "fatal";
} }
private Debug debug;
@Data
public static class Debug {
private Boolean download = false;
private Boolean input = false;
private Boolean output = false;
}
private Task task;
@Data
public static class Task {
private Integer max = 4;
}
} }

View File

@ -6,6 +6,7 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper;
import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.listener.SipListener;
import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParser; import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParser;
import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder;
@ -51,6 +52,8 @@ public class InviteRequestProcessor implements MessageProcessor {
private final SipSubscribe subscribe; private final SipSubscribe subscribe;
private final FfmpegConfig ffmpegConfig;
@PostConstruct @PostConstruct
@Override @Override
public void init() { public void init() {
@ -178,6 +181,16 @@ public class InviteRequestProcessor implements MessageProcessor {
log.info("目标端口号: {}", port); log.info("目标端口号: {}", port);
String senderIp = request.getLocalAddress().getHostAddress(); String senderIp = request.getLocalAddress().getHostAddress();
String transport = request.getTopmostViaHeader().getTransport();
int taskNum = deviceProxyService.getTaskNum().get();
log.info("当前任务数 {}", taskNum);
if(ffmpegConfig.getTask().getMax() > 0 && taskNum >= ffmpegConfig.getTask().getMax()){
log.warn("任务数过多 性能受限, 返回 486");
// 发送 sdp 响应
sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.response(request, Response.BUSY_HERE, "BUSY_HERE"));
return;
}
SdpFactory sdpFactory = SdpFactory.getInstance(); SdpFactory sdpFactory = SdpFactory.getInstance();
SessionDescriptionImpl sessionDescription = new SessionDescriptionImpl(); SessionDescriptionImpl sessionDescription = new SessionDescriptionImpl();
GB28181Description description = GB28181Description.Convertor.convert(sessionDescription); GB28181Description description = GB28181Description.Convertor.convert(sessionDescription);
@ -205,8 +218,6 @@ public class InviteRequestProcessor implements MessageProcessor {
}}); }});
description.setSsrcField(gb28181Description.getSsrcField()); description.setSsrcField(gb28181Description.getSsrcField());
String transport = request.getTopmostViaHeader().getTransport();
String callId = request.getCallId().getCallId(); String callId = request.getCallId().getCallId();
String key = GenericSubscribe.Helper.getKey(Request.ACK, callId); String key = GenericSubscribe.Helper.getKey(Request.ACK, callId);
subscribe.getAckSubscribe().addPublisher(key); subscribe.getAckSubscribe().addPublisher(key);

View File

@ -18,6 +18,8 @@ import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService; import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -36,6 +38,7 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j @Slf4j
@Service @Service
@ -48,6 +51,9 @@ public class DeviceProxyService {
private final ConcurrentHashMap<String, Executor> callbackTask = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Executor> callbackTask = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Executor> downloadTask = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, Executor> downloadTask = new ConcurrentHashMap<>();
@Getter
private final AtomicInteger taskNum = new AtomicInteger(0);
private final SipSender sender; private final SipSender sender;
private final FfmpegSupportService ffmpegSupportService; private final FfmpegSupportService ffmpegSupportService;
@ -65,6 +71,7 @@ public class DeviceProxyService {
}); });
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, callbackTask); Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, callbackTask);
subscribe.getByeSubscribe().addSubscribe(key, subscriber); subscribe.getByeSubscribe().addSubscribe(key, subscriber);
taskNum.getAndIncrement();
callbackTask.put(device.getDeviceCode(), pushRtpTask(fromUrl, toUrl, time + 60, mediaStatus(request, device, key))); callbackTask.put(device.getDeviceCode(), pushRtpTask(fromUrl, toUrl, time + 60, mediaStatus(request, device, key)));
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
}; };
@ -77,6 +84,7 @@ public class DeviceProxyService {
}); });
Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, downloadTask); Flow.Subscriber<SIPRequest> subscriber = byeSubscriber(key, device, downloadTask);
subscribe.getByeSubscribe().addSubscribe(key, subscriber); subscribe.getByeSubscribe().addSubscribe(key, subscriber);
taskNum.getAndIncrement();
downloadTask.put(device.getDeviceCode(), pushDownload2RtpTask( fromUrl, toUrl, time + 60, mediaStatus(request,device,key))); downloadTask.put(device.getDeviceCode(), pushDownload2RtpTask( fromUrl, toUrl, time + 60, mediaStatus(request,device,key)));
scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS);
}; };
@ -116,7 +124,7 @@ public class DeviceProxyService {
}; };
} }
public synchronized void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, TaskProcessor taskProcessor) { public void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, TaskProcessor taskProcessor) {
String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video"); String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video");
HashMap<String, String> map = new HashMap<>(3); HashMap<String, String> map = new HashMap<>(3);
String deviceCode = device.getDeviceCode(); String deviceCode = device.getDeviceCode();
@ -147,6 +155,7 @@ public class DeviceProxyService {
public ExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){ public ExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){
return new ExecuteResultHandler() { return new ExecuteResultHandler() {
private void mediaStatus(){ private void mediaStatus(){
taskNum.getAndDecrement();
CallIdHeader requestCallId = request.getCallId(); CallIdHeader requestCallId = request.getCallId();
String callId = requestCallId.getCallId(); String callId = requestCallId.getCallId();
callbackTask.remove(callId); callbackTask.remove(callId);
@ -172,4 +181,13 @@ public class DeviceProxyService {
} }
}; };
} }
/**
* 程序退出时全部销毁
*/
@PreDestroy
private void destroy(){
callbackTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess());
downloadTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess());
}
} }

View File

@ -19,10 +19,11 @@ public class FfmpegSupportService {
@SneakyThrows @SneakyThrows
public Executor pushToRtp(String input, String output, long time, TimeUnit unit,ExecuteResultHandler resultHandler){ public Executor pushToRtp(String input, String output, long time, TimeUnit unit,ExecuteResultHandler resultHandler){
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp(); FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
String inputParam = StringUtils.joinWith(" ", rtp.getInput(), input); FfmpegConfig.Debug debug = ffmpegConfig.getDebug();
String inputParam = debug.getInput() ? rtp.getInput() : StringUtils.joinWith(" ", rtp.getInput(), input);
log.info("视频输入参数 {}", inputParam); log.info("视频输入参数 {}", inputParam);
String outputParam = StringUtils.joinWith(" ", rtp.getOutput(), output); String outputParam = debug.getOutput()? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), output);
log.info("视频输出参数 {}", outputParam); log.info("视频输出参数 {}", outputParam);
return ffmpegExecutor(inputParam, outputParam, time, unit, resultHandler); return ffmpegExecutor(inputParam, outputParam, time, unit, resultHandler);
@ -31,10 +32,11 @@ public class FfmpegSupportService {
@SneakyThrows @SneakyThrows
public Executor pushDownload2Rtp(String input, String output, long time, TimeUnit unit, ExecuteResultHandler resultHandler){ public Executor pushDownload2Rtp(String input, String output, long time, TimeUnit unit, ExecuteResultHandler resultHandler){
FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp(); FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp();
String inputParam = StringUtils.joinWith(" ", rtp.getDownload(), input); FfmpegConfig.Debug debug = ffmpegConfig.getDebug();
String inputParam = debug.getDownload()? rtp.getDownload() : StringUtils.joinWith(" ", rtp.getDownload(), input);
log.info("视频下载参数 {}", inputParam); log.info("视频下载参数 {}", inputParam);
String outputParam = StringUtils.joinWith(" ", rtp.getOutput(), output); String outputParam = debug.getOutput()? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), output);
log.info("视频输出参数 {}", outputParam); log.info("视频输出参数 {}", outputParam);
return ffmpegExecutor(inputParam, outputParam, time, unit, resultHandler); return ffmpegExecutor(inputParam, outputParam, time, unit, resultHandler);

View File

@ -68,8 +68,15 @@ proxy:
device: device:
url: http://10.10.10.20:18186 url: http://10.10.10.20:18186
ffmpeg-support: ffmpeg-support:
task:
# 最大同时推流任务数, <= 0 时不做限制
max: 6
ffmpeg: /usr/bin/ffmpeg/ffmpeg ffmpeg: /usr/bin/ffmpeg/ffmpeg
ffprobe: /usr/bin/ffmpeg/ffprobe ffprobe: /usr/bin/ffmpeg/ffprobe
rtp: rtp:
input: -re -i input: -re -i
output: -vcodec h264 -acodec aac -f rtp_mpegts output: -vcodec h264 -acodec aac -f rtp_mpegts
debug:
download: false
input: false
output: false

View File

@ -74,8 +74,16 @@ proxy:
device: device:
url: http://10.10.10.20:18186 url: http://10.10.10.20:18186
ffmpeg-support: ffmpeg-support:
task:
# 最大同时推流任务数, <= 0 时不做限制
max: 4
ffmpeg: D:\Soft\Captura\ffmpeg\ffmpeg.exe ffmpeg: D:\Soft\Captura\ffmpeg\ffmpeg.exe
ffprobe: D:\Soft\Captura\ffmpeg\ffprobe.exe ffprobe: D:\Soft\Captura\ffmpeg\ffprobe.exe
rtp: rtp:
input: -re -i input: -re -i http://10.10.10.200:5080/live/test.live.flv
output: -vcodec h264 -acodec aac -f rtp_mpegts # input: -re -i
output: -vcodec h264 -acodec aac -f rtp_mpegts # -rtsp_transport tcp
debug:
download: false
input: true
output: false

View File

@ -71,9 +71,16 @@ proxy:
device: device:
url: http://192.168.2.3:18183 url: http://192.168.2.3:18183
ffmpeg-support: ffmpeg-support:
task:
# 最大同时推流任务数, <= 0 时不做限制
max: 4
ffmpeg: /usr/bin/ffmpeg/ffmpeg ffmpeg: /usr/bin/ffmpeg/ffmpeg
ffprobe: /usr/bin/ffmpeg/ffprobe ffprobe: /usr/bin/ffmpeg/ffprobe
rtp: rtp:
download: -i download: -i
input: -re -i input: -re -i
output: -vcodec h264 -acodec aac -f rtp_mpegts output: -vcodec h264 -acodec aac -f rtp_mpegts
debug:
download: false
input: false
output: false