From e9820fdbcd5a0398b06ec0b5789f04b58ceb60ad Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Tue, 16 Jan 2024 17:28:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E6=AD=A5=20=E6=B7=BB=E5=8A=A0=20zlm?= =?UTF-8?q?=20ffmpegSource=20=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mocking/config/sip/FfmpegConfig.java | 2 + .../request/InviteRequestProcessor.java | 8 +- .../service/device/DeviceProxyService.java | 164 +++++++++++++++--- .../zlm/hook/ZlmStreamChangeHookService.java | 20 ++- .../src/main/resources/application.yml | 3 + 5 files changed, 169 insertions(+), 28 deletions(-) diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java index fe29592..5593f64 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/FfmpegConfig.java @@ -11,6 +11,8 @@ public class FfmpegConfig { private String ffmpeg; private String ffprobe; + private Boolean useZlmFfmpeg = false; + private Rtp rtp; @Data diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java index d8cb179..f06a7a4 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/invite/request/InviteRequestProcessor.java @@ -324,8 +324,12 @@ public class InviteRequestProcessor implements MessageProcessor, SmartLifecycle @Override public void onNext(SIPRequest item) { log.info("收到 ack 确认请求: {} 开始推流",key); - // RTP 推流 - deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port,ssrc, deviceProxyService.playbackTask()); + if(ffmpegConfig.getUseZlmFfmpeg()){ + deviceProxyService.pullStreamByZlmFfmpegSource(request,callId,device, start, stop, address, port,ssrc); + } else { + // RTP 推流 + deviceProxyService.proxyVideo2Rtp(request, callId, device, start, stop, address, port,ssrc, deviceProxyService.playbackTask()); + } onComplete(); } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java index ef4efd4..de90327 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/device/DeviceProxyService.java @@ -12,6 +12,8 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; +import cn.skcks.docking.gb28181.media.dto.proxy.AddFFmpegSource; +import cn.skcks.docking.gb28181.media.dto.proxy.AddFFmpegSourceResp; import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxy; import cn.skcks.docking.gb28181.media.dto.proxy.AddStreamProxyResp; import cn.skcks.docking.gb28181.media.dto.response.ZlmResponse; @@ -54,6 +56,7 @@ import javax.sip.header.CallIdHeader; import javax.sip.message.Request; import javax.sip.message.Response; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.Date; @@ -84,6 +87,10 @@ public class DeviceProxyService { private final ZlmMediaConfig zlmMediaConfig; private final ZlmStreamChangeHookService zlmStreamChangeHookService; private final ZlmRtmpConfig zlmRtmpConfig; + + private final String DEFAULT_ZLM_APP = "live"; + private final String ZLM_FFMPEG_PROXY_APP = "ffmpeg_proxy"; + ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); public interface TaskProcessor { @@ -94,7 +101,7 @@ public class DeviceProxyService { GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); - zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ + zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{ schedule.cancel(false); Retryer retryer = RetryerBuilder.newBuilder() .retryIfResult(resp -> resp.getLocalPort() == null || resp.getLocalPort() <= 0) @@ -108,7 +115,7 @@ public class DeviceProxyService { try { retryer.call(()->{ StartSendRtp startSendRtp = new StartSendRtp(); - startSendRtp.setApp("live"); + startSendRtp.setApp(DEFAULT_ZLM_APP); startSendRtp.setStream(callId); startSendRtp.setSsrc(ssrc); startSendRtp.setDstUrl(toAddr); @@ -121,15 +128,15 @@ public class DeviceProxyService { }); } catch (Exception e) { schedule.cancel(true); - Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId)) + Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId)) .ifPresent(ZlmStreamChangeHookService.ZlmStreamChangeHookHandler::handler); throw new RuntimeException(e); } }); - zlmStreamChangeHookService.getUnregistHandler().put(callId,()->{ + zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).put(callId,()->{ sendBye(request,device,key); }); - return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/live/" + callId; + return "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + DEFAULT_ZLM_APP +"/" + callId; } private Flow.Subscriber ffmpegTask(SIPRequest request,ConcurrentHashMap tasks, String callId, String key, MockingDevice device){ @@ -232,6 +239,45 @@ public class DeviceProxyService { }; } + public Flow.Subscriber zlmFfmpegByeSubscriber(String key, SIPRequest inviteRequest,MockingDevice device){ + return new Flow.Subscriber<>() { + private SIPRequest request; + @Override + public void onSubscribe(Flow.Subscription subscription) { + log.info("订阅 bye {}", key); + subscription.request(1); + } + + @Override + public void onNext(SIPRequest item) { + request = item; + subscribe.getByeSubscribe().delPublisher(key); + } + + @Override + public void onError(Throwable throwable) {} + + @Override + public void onComplete() { + log.info("bye 订阅结束 {}", key); + if(request == null){ + sendBye(inviteRequest,device,""); + } else { + String ip = request.getLocalAddress().getHostAddress(); + String transPort = request.getTopmostViaHeader().getTransport(); + sender.sendResponse(ip, transPort, ((provider, ip1, port) -> + SipResponseBuilder.response(request, Response.OK, "OK"))); + } + + String cacheKey = CacheUtil.getKey("ZLM","FFMPEG", "PROXY", key); + String proxyKey = RedisUtil.StringOps.get(cacheKey); + log.info("关闭拉流代理 {}", zlmMediaService.delFfmpegSource(proxyKey)); + RedisUtil.KeyOps.delete(cacheKey); + } + }; + } + + public Flow.Subscriber zlmByeSubscriber(String key, SIPRequest inviteRequest,MockingDevice device){ return new Flow.Subscriber<>() { private SIPRequest request; @@ -270,6 +316,69 @@ public class DeviceProxyService { }; } + public void pullStreamByZlmFfmpegSource(SIPRequest request,String callId, MockingDevice device, Date start, Date stop,String rtpAddr, int rtpPort, String ssrc){ + Retryer> retryer = RetryerBuilder.>newBuilder() + .retryIfResult(resp -> { + log.info("resp {}", resp); + return !resp.getCode().equals(ResponseStatus.Success); + }) + .retryIfException() + .retryIfRuntimeException() + // 重试间隔 + .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS)) + // 重试次数 + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .build(); + + String toUrl = "rtmp://" + zlmMediaConfig.getIp() + ":" + zlmRtmpConfig.getPort() + "/" + ZLM_FFMPEG_PROXY_APP +"/" + callId; + String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); + try { + ZlmResponse sourceResp = retryer.call(() -> zlmMediaService.addFfmpegSource(AddFFmpegSource.builder() + .srcUrl(getProxyUrl(device,start,stop)) + .dstUrl(toUrl) + .enableHls(false) + .enableMp4(false) + .timeoutMs(Duration.ofSeconds(30).toMillis()) + .build())); + String proxyKey = sourceResp.getData().getKey(); + String cacheKey = CacheUtil.getKey("ZLM","FFMPEG", "PROXY", key); + RedisUtil.StringOps.set(cacheKey, proxyKey); + + GB28181Description gb28181Description = new GB28181DescriptionParser(new String(request.getRawContent())).parse(); + MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); + boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); + + Retryer rtpRetryer = rtpRetryer(); + zlmStreamChangeHookService.getRegistHandler(ZLM_FFMPEG_PROXY_APP).put(callId,()->{ + try { + rtpRetryer.call(()->{ + StartSendRtp startSendRtp = new StartSendRtp(); + startSendRtp.setApp(DEFAULT_ZLM_APP); + startSendRtp.setStream(callId); + startSendRtp.setSsrc(ssrc); + startSendRtp.setDstUrl(rtpAddr); + startSendRtp.setDstPort(rtpPort); + startSendRtp.setUdp(!tcp); + log.info("startSendRtp {}",startSendRtp); + StartSendRtpResp startSendRtpResp = zlmMediaService.startSendRtp(startSendRtp); + log.info("startSendRtpResp {}",startSendRtpResp); + return startSendRtpResp; + }); + } catch (Exception e){ + log.error("zlm rtp 推流失败",e); + sendBye(request, device, ""); + } + }); + + Flow.Subscriber subscriber = zlmFfmpegByeSubscriber(key,request,device); + subscribe.getByeSubscribe().addPublisher(key); + subscribe.getByeSubscribe().addSubscribe(key, subscriber); + }catch (Exception e){ + log.error("zlm ffmpeg 拉/推流失败",e); + sendBye(request, device, ""); + } + } + @SneakyThrows public void pullLiveStream2Rtp(SIPRequest request,String callId, MockingDevice device, String rtpAddr, int rtpPort, String ssrc){ Retryer> retryer = RetryerBuilder.>newBuilder() @@ -290,7 +399,7 @@ public class DeviceProxyService { try { ZlmResponse proxy = retryer.call(() -> zlmMediaService.addStreamProxy(AddStreamProxy.builder() .url(liveUrl) - .app("live") + .app(DEFAULT_ZLM_APP) .stream(callId) .build())); @@ -304,24 +413,12 @@ public class DeviceProxyService { MediaDescription mediaDescription = (MediaDescription)gb28181Description.getMediaDescriptions(true).get(0); boolean tcp = StringUtils.containsIgnoreCase(mediaDescription.getMedia().getProtocol(), "TCP"); - Retryer rtpRetryer = RetryerBuilder.newBuilder() - .retryIfResult(resp -> { - log.info("resp {}", resp); - return resp.getLocalPort() == null || resp.getLocalPort() <= 0; - }) - .retryIfException() - .retryIfRuntimeException() - // 重试间隔 - .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS)) - // 重试次数 - .withStopStrategy(StopStrategies.stopAfterAttempt(3)) - .build(); - - zlmStreamChangeHookService.getRegistHandler().put(callId,()->{ + Retryer rtpRetryer = rtpRetryer(); + zlmStreamChangeHookService.getRegistHandler(DEFAULT_ZLM_APP).put(callId,()->{ try { rtpRetryer.call(()->{ StartSendRtp startSendRtp = new StartSendRtp(); - startSendRtp.setApp("live"); + startSendRtp.setApp(DEFAULT_ZLM_APP); startSendRtp.setStream(callId); startSendRtp.setSsrc(ssrc); startSendRtp.setDstUrl(rtpAddr); @@ -348,7 +445,7 @@ public class DeviceProxyService { } } - public void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) { + private String getProxyUrl(MockingDevice device, Date startTime, Date endTime){ String fromUrl = URLUtil.completeUrl(proxyConfig.getUrl(), "/video"); HashMap map = new HashMap<>(3); String deviceCode = device.getDeviceCode(); @@ -360,10 +457,14 @@ public class DeviceProxyService { String query = URLUtil.buildQuery(map, StandardCharsets.UTF_8); fromUrl = StringUtils.joinWith("?", fromUrl, query); log.info("设备: {} 视频 url: {}", deviceCode, fromUrl); - long time = DateUtil.between(startTime, endTime, DateUnit.SECOND); + return fromUrl; + } + public void proxyVideo2Rtp(SIPRequest request,String callId, MockingDevice device, Date startTime, Date endTime, String rtpAddr, int rtpPort, String ssrc, TaskProcessor taskProcessor) { + String fromUrl = getProxyUrl(device, startTime, endTime); String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); subscribe.getByeSubscribe().addPublisher(key); + long time = DateUtil.between(startTime, endTime, DateUnit.SECOND); taskProcessor.process(request, callId,fromUrl,rtpAddr, rtpPort,device,key,time, ssrc); } @@ -397,7 +498,7 @@ public class DeviceProxyService { String callId = requestCallId.getCallId(); callbackTask.remove(callId); Optional optionalZlmStreamChangeHookHandler = - Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler().remove(callId)); + Optional.ofNullable(zlmStreamChangeHookService.getUnregistHandler(DEFAULT_ZLM_APP).remove(callId)); // 如果取消注册已完成就直接结束, 否则发送 bye请求 结束 if(optionalZlmStreamChangeHookHandler.isEmpty()){ return; @@ -471,4 +572,19 @@ public class DeviceProxyService { log.error("bye 请求发送失败 {}",e.getMessage()); } } + + private Retryer rtpRetryer(){ + return RetryerBuilder.newBuilder() + .retryIfResult(resp -> { + log.info("resp {}", resp); + return resp.getLocalPort() == null || resp.getLocalPort() <= 0; + }) + .retryIfException() + .retryIfRuntimeException() + // 重试间隔 + .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS)) + // 重试次数 + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .build(); + } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java index 56ea172..52ffbd2 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/service/zlm/hook/ZlmStreamChangeHookService.java @@ -1,7 +1,9 @@ package cn.skcks.docking.gb28181.mocking.service.zlm.hook; import cn.skcks.docking.gb28181.mocking.config.sip.ZlmHookConfig; +import lombok.AccessLevel; import lombok.Data; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -20,13 +22,26 @@ public class ZlmStreamChangeHookService { void handler(); } - public ConcurrentMap registHandler = new ConcurrentHashMap<>(); - public ConcurrentMap unregistHandler = new ConcurrentHashMap<>(); + @Getter(AccessLevel.PRIVATE) + private ConcurrentMap> registHandler = new ConcurrentHashMap<>(); + @Getter(AccessLevel.PRIVATE) + private ConcurrentMap> unregistHandler = new ConcurrentHashMap<>(); + + public ConcurrentMap getRegistHandler(String app){ + this.registHandler.putIfAbsent(app,new ConcurrentHashMap<>()); + return this.registHandler.get(app); + } + + public ConcurrentMap getUnregistHandler(String app){ + this.unregistHandler.putIfAbsent(app,new ConcurrentHashMap<>()); + return this.unregistHandler.get(app); + } public void processEvent(String app,String streamId, Boolean regist){ log.debug("app {}, streamId {}, regist {}", app,streamId, regist); if(regist){ + ConcurrentMap registHandler = getRegistHandler(app); Optional.ofNullable(registHandler.remove(streamId)).ifPresent((handler)->{ try { Thread.sleep(zlmHookConfig.getDelay().toMillis()); @@ -36,6 +51,7 @@ public class ZlmStreamChangeHookService { handler.handler(); }); } else { + ConcurrentMap unregistHandler = getUnregistHandler(app); Optional.ofNullable(unregistHandler.remove(streamId)).ifPresent((handler)->{ try { Thread.sleep(zlmHookConfig.getDelay().toMillis()); diff --git a/gb28181-mocking-starter/src/main/resources/application.yml b/gb28181-mocking-starter/src/main/resources/application.yml index ca35361..9190a50 100644 --- a/gb28181-mocking-starter/src/main/resources/application.yml +++ b/gb28181-mocking-starter/src/main/resources/application.yml @@ -78,6 +78,7 @@ proxy: # 参数 device_id, begin_time, end_time #url: http://192.168.2.3:18183 url: http://10.10.10.20:18183 + #url: http://127.0.0.1:18183 ffmpeg-support: task: # 最大同时推流任务数, <= 0 时不做限制 @@ -112,6 +113,8 @@ ffmpeg-support: download: false input: false output: false + # 是否通过 zlm 调用 ffmpeg + use-zlm-ffmpeg: false # [可选] 日志配置, 一般不需要改 logging: