From b20e48c911ecac644ee42ffe1dc6185ff4b429b7 Mon Sep 17 00:00:00 2001 From: zxb <919411476@qq.com> Date: Thu, 14 Mar 2024 03:13:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20=E6=8E=A8=E6=B5=81?= =?UTF-8?q?=E9=89=B4=E6=9D=83=E4=BA=8B=E4=BB=B6=E8=AE=A2=E9=98=85=20+=20ac?= =?UTF-8?q?k=20=E4=BA=8B=E4=BB=B6=E5=85=B3=E8=81=94=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/AckRequestProcessor.java | 53 ++++++++++++------- .../vmp/media/zlm/ZLMHttpHookListener.java | 10 +++- .../vmp/service/ZlmPublishHookService.java | 43 +++++++++++++++ 3 files changed, 86 insertions(+), 20 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/service/ZlmPublishHookService.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index 393b170d..0b947967 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.ZlmPublishHookService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener; @@ -43,6 +44,8 @@ import java.text.ParseException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** @@ -99,6 +102,11 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private SipSubscribe sipSubscribe; + @Autowired + private ZlmPublishHookService zlmPublishHookService; + + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + /** * 处理 ACK请求 * @@ -207,24 +215,33 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); }); }else { - logger.debug("sendRtpItem {}", JSONObject.toJSONString(sendRtpItem)); - JSONObject startSendRtpStreamResult; - Retryer retryer = RetryerBuilder.newBuilder() - .retryIfResult(resp -> resp == null || resp.getInteger("code") != 0) - .retryIfException() - .retryIfRuntimeException() - // 重试间隔 - .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS)) - // 重试次数 - .withStopStrategy(StopStrategies.stopAfterAttempt(5 * 1000)) - .build(); - try { - startSendRtpStreamResult = retryer.call(() -> zlmServerFactory.startSendRtpStream(mediaInfo, param)); - } catch (ExecutionException | RetryException e) { - logger.error(e.getMessage()); - startSendRtpStreamResult = null; - } - startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); + logger.debug("sendRtpItem {} {}", JSONObject.toJSONString(sendRtpItem), JSONObject.toJSONString(param)); + + zlmPublishHookService.getHandler(sendRtpItem.getApp()).put(sendRtpItem.getStreamId(),()->{ + scheduledExecutorService.submit(()->{ + JSONObject startSendRtpStreamResult; + Retryer retryer = RetryerBuilder.newBuilder() + .retryIfResult(resp -> resp == null || resp.getInteger("code") != 0) + .retryIfException() + .retryIfRuntimeException() + // 重试间隔 + .withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.MILLISECONDS)) + // 重试次数 + .withStopStrategy(StopStrategies.stopAfterAttempt(5 * 1000)) + .build(); + try { + startSendRtpStreamResult = retryer.call(() -> zlmServerFactory.startSendRtpStream(mediaInfo, param)); + } catch (ExecutionException | RetryException e) { + logger.error(e.getMessage()); + startSendRtpStreamResult = null; + } + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, startSendRtpStreamResult, param, callIdHeader); + }); + }); + + scheduledExecutorService.schedule(()->{ + zlmPublishHookService.getHandler(sendRtpItem.getApp()).remove(sendRtpItem.getStreamId()); + }, 1, TimeUnit.MINUTES); } } private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 114a81b2..4fcfedc8 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -129,11 +129,14 @@ public class ZLMHttpHookListener { @Autowired private RedisTemplate redisTemplate; + @Autowired + private ZlmPublishHookService zlmPublishHookService; + /** * 服务器定时上报时间,上报间隔可配置,默认10s上报一次 */ @ResponseBody - + @PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8") public HookResult onServerKeepalive(@RequestBody OnServerKeepaliveHookParam param) { @@ -155,7 +158,7 @@ public class ZLMHttpHookListener { * 播放器鉴权事件,rtsp/rtmp/http-flv/ws-flv/hls的播放都将触发此鉴权事件。 */ @ResponseBody - + @PostMapping(value = "/on_play", produces = "application/json;charset=UTF-8") public HookResult onPlay(@RequestBody OnPlayHookParam param) { if (logger.isDebugEnabled()) { @@ -253,6 +256,9 @@ public class ZLMHttpHookListener { } }); + + zlmPublishHookService.processEvent(param); + // 是否录像 if ("rtp".equals(param.getApp())) { result.setEnable_mp4(userSetting.getRecordSip()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/ZlmPublishHookService.java b/src/main/java/com/genersoft/iot/vmp/service/ZlmPublishHookService.java new file mode 100644 index 00000000..542a2e99 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/ZlmPublishHookService.java @@ -0,0 +1,43 @@ +package com.genersoft.iot.vmp.service; + +import com.genersoft.iot.vmp.media.zlm.dto.hook.OnPublishHookParam; +import lombok.AccessLevel; +import lombok.Data; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@Slf4j +@Data +@Service +@RequiredArgsConstructor +public class ZlmPublishHookService { + + public interface ZlmPublishHookHandler { + void handler(); + } + + @Getter(AccessLevel.PRIVATE) + private ConcurrentMap> handler = new ConcurrentHashMap<>(); + + public ConcurrentMap getHandler(String app) { + this.handler.putIfAbsent(app, new ConcurrentHashMap<>()); + return this.handler.get(app); + } + + + public void processEvent(OnPublishHookParam dto) { + String app = dto.getApp(); + String streamId = dto.getStream(); + String ip = dto.getIp(); + log.debug("推流鉴权: app {}, streamId {}, ip {}", app, streamId, ip); + + ConcurrentMap handlers = getHandler(app); + Optional.ofNullable(handlers.remove(streamId)).ifPresent(ZlmPublishHookHandler::handler); + } +}