diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java index cc9ea69..b8f5435 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java @@ -20,4 +20,21 @@ public class FfmpegConfig { private String output = "-vcodec h264 -acodec aac -f rtp_mpegts"; private String logLevel = "fatal"; } + + + private Debug debug; + + @Data + public static class Debug { + private Boolean download = false; + private Boolean input = false; + private Boolean output = false; + } + + private Task task; + + @Data + public static class Task { + private Integer max = 4; + } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java index f03ded8..f082975 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java @@ -6,6 +6,7 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.mocking.config.sip.FfmpegConfig; import cn.skcks.docking.gb28181.mocking.core.sip.gb28181.sdp.GB28181DescriptionParser; import cn.skcks.docking.gb28181.mocking.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder; @@ -51,6 +52,8 @@ public class InviteRequestProcessor implements MessageProcessor { private final SipSubscribe subscribe; + private final FfmpegConfig ffmpegConfig; + @PostConstruct @Override public void init() { @@ -178,6 +181,16 @@ public class InviteRequestProcessor implements MessageProcessor { log.info("目标端口号: {}", port); String senderIp = request.getLocalAddress().getHostAddress(); + String transport = request.getTopmostViaHeader().getTransport(); + int taskNum = deviceProxyService.getTaskNum().get(); + log.info("当前任务数 {}", taskNum); + if(ffmpegConfig.getTask().getMax() > 0 && taskNum >= ffmpegConfig.getTask().getMax()){ + log.warn("任务数过多 性能受限, 返回 486"); + // 发送 sdp 响应 + sender.sendResponse(senderIp, transport, (ignore, ignore2, ignore3) -> SipResponseBuilder.response(request, Response.BUSY_HERE, "BUSY_HERE")); + return; + } + SdpFactory sdpFactory = SdpFactory.getInstance(); SessionDescriptionImpl sessionDescription = new SessionDescriptionImpl(); GB28181Description description = GB28181Description.Convertor.convert(sessionDescription); @@ -205,8 +218,6 @@ public class InviteRequestProcessor implements MessageProcessor { }}); description.setSsrcField(gb28181Description.getSsrcField()); - String transport = request.getTopmostViaHeader().getTransport(); - String callId = request.getCallId().getCallId(); String key = GenericSubscribe.Helper.getKey(Request.ACK, callId); subscribe.getAckSubscribe().addPublisher(key); diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index afb9ca1..a976954 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -18,6 +18,8 @@ import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender; import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; import cn.skcks.docking.gb28181.mocking.service.ffmpeg.FfmpegSupportService; import gov.nist.javax.sip.message.SIPRequest; +import jakarta.annotation.PreDestroy; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -36,6 +38,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Optional; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; @Slf4j @Service @@ -48,6 +51,9 @@ public class DeviceProxyService { private final ConcurrentHashMap callbackTask = new ConcurrentHashMap<>(); private final ConcurrentHashMap downloadTask = new ConcurrentHashMap<>(); + @Getter + private final AtomicInteger taskNum = new AtomicInteger(0); + private final SipSender sender; private final FfmpegSupportService ffmpegSupportService; @@ -65,6 +71,7 @@ public class DeviceProxyService { }); Flow.Subscriber subscriber = byeSubscriber(key, device, callbackTask); subscribe.getByeSubscribe().addSubscribe(key, subscriber); + taskNum.getAndIncrement(); callbackTask.put(device.getDeviceCode(), pushRtpTask(fromUrl, toUrl, time + 60, mediaStatus(request, device, key))); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); }; @@ -77,6 +84,7 @@ public class DeviceProxyService { }); Flow.Subscriber subscriber = byeSubscriber(key, device, downloadTask); subscribe.getByeSubscribe().addSubscribe(key, subscriber); + taskNum.getAndIncrement(); downloadTask.put(device.getDeviceCode(), pushDownload2RtpTask( fromUrl, toUrl, time + 60, mediaStatus(request,device,key))); scheduledExecutorService.schedule(subscriber::onComplete, time + 60, TimeUnit.SECONDS); }; @@ -116,7 +124,7 @@ public class DeviceProxyService { }; } - public synchronized void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, TaskProcessor taskProcessor) { + public void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, TaskProcessor taskProcessor) { String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video"); HashMap map = new HashMap<>(3); String deviceCode = device.getDeviceCode(); @@ -147,6 +155,7 @@ public class DeviceProxyService { public ExecuteResultHandler mediaStatus(SIPRequest request, MockingDevice device,String key){ return new ExecuteResultHandler() { private void mediaStatus(){ + taskNum.getAndDecrement(); CallIdHeader requestCallId = request.getCallId(); String callId = requestCallId.getCallId(); callbackTask.remove(callId); @@ -172,4 +181,13 @@ public class DeviceProxyService { } }; } + + /** + * 程序退出时全部销毁 + */ + @PreDestroy + private void destroy(){ + callbackTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess()); + downloadTask.values().parallelStream().forEach(executor -> executor.getWatchdog().destroyProcess()); + } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/ffmpeg/FfmpegSupportService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/ffmpeg/FfmpegSupportService.java index 2b02c50..9d53767 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/ffmpeg/FfmpegSupportService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/ffmpeg/FfmpegSupportService.java @@ -19,10 +19,11 @@ public class FfmpegSupportService { @SneakyThrows public Executor pushToRtp(String input, String output, long time, TimeUnit unit,ExecuteResultHandler resultHandler){ FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp(); - String inputParam = StringUtils.joinWith(" ", rtp.getInput(), input); + FfmpegConfig.Debug debug = ffmpegConfig.getDebug(); + String inputParam = debug.getInput() ? rtp.getInput() : StringUtils.joinWith(" ", rtp.getInput(), input); log.info("视频输入参数 {}", inputParam); - String outputParam = StringUtils.joinWith(" ", rtp.getOutput(), output); + String outputParam = debug.getOutput()? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), output); log.info("视频输出参数 {}", outputParam); return ffmpegExecutor(inputParam, outputParam, time, unit, resultHandler); @@ -31,10 +32,11 @@ public class FfmpegSupportService { @SneakyThrows public Executor pushDownload2Rtp(String input, String output, long time, TimeUnit unit, ExecuteResultHandler resultHandler){ FfmpegConfig.Rtp rtp = ffmpegConfig.getRtp(); - String inputParam = StringUtils.joinWith(" ", rtp.getDownload(), input); + FfmpegConfig.Debug debug = ffmpegConfig.getDebug(); + String inputParam = debug.getDownload()? rtp.getDownload() : StringUtils.joinWith(" ", rtp.getDownload(), input); log.info("视频下载参数 {}", inputParam); - String outputParam = StringUtils.joinWith(" ", rtp.getOutput(), output); + String outputParam = debug.getOutput()? rtp.getOutput() : StringUtils.joinWith(" ", rtp.getOutput(), output); log.info("视频输出参数 {}", outputParam); return ffmpegExecutor(inputParam, outputParam, time, unit, resultHandler); diff --git a/gb28181-mocking-starter/src/main/resources/application-local-linux.yml b/gb28181-mocking-starter/src/main/resources/application-local-linux.yml index c50e88d..f671269 100644 --- a/gb28181-mocking-starter/src/main/resources/application-local-linux.yml +++ b/gb28181-mocking-starter/src/main/resources/application-local-linux.yml @@ -68,8 +68,15 @@ proxy: device: url: http://10.10.10.20:18186 ffmpeg-support: + task: + # 最大同时推流任务数, <= 0 时不做限制 + max: 6 ffmpeg: /usr/bin/ffmpeg/ffmpeg ffprobe: /usr/bin/ffmpeg/ffprobe rtp: input: -re -i output: -vcodec h264 -acodec aac -f rtp_mpegts + debug: + download: false + input: false + output: false diff --git a/gb28181-mocking-starter/src/main/resources/application-local.yml b/gb28181-mocking-starter/src/main/resources/application-local.yml index cb23766..9790bfc 100644 --- a/gb28181-mocking-starter/src/main/resources/application-local.yml +++ b/gb28181-mocking-starter/src/main/resources/application-local.yml @@ -74,8 +74,16 @@ proxy: device: url: http://10.10.10.20:18186 ffmpeg-support: + task: + # 最大同时推流任务数, <= 0 时不做限制 + max: 4 ffmpeg: D:\Soft\Captura\ffmpeg\ffmpeg.exe ffprobe: D:\Soft\Captura\ffmpeg\ffprobe.exe rtp: - input: -re -i - output: -vcodec h264 -acodec aac -f rtp_mpegts + input: -re -i http://10.10.10.200:5080/live/test.live.flv +# input: -re -i + output: -vcodec h264 -acodec aac -f rtp_mpegts # -rtsp_transport tcp + debug: + download: false + input: true + output: false diff --git a/gb28181-mocking-starter/src/main/resources/application.yml b/gb28181-mocking-starter/src/main/resources/application.yml index 07ea16a..812e0fd 100644 --- a/gb28181-mocking-starter/src/main/resources/application.yml +++ b/gb28181-mocking-starter/src/main/resources/application.yml @@ -71,9 +71,16 @@ proxy: device: url: http://192.168.2.3:18183 ffmpeg-support: + task: + # 最大同时推流任务数, <= 0 时不做限制 + max: 4 ffmpeg: /usr/bin/ffmpeg/ffmpeg ffprobe: /usr/bin/ffmpeg/ffprobe rtp: download: -i input: -re -i output: -vcodec h264 -acodec aac -f rtp_mpegts + debug: + download: false + input: false + output: false