diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 51c2ab10..a06c6d08 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -106,6 +106,11 @@ public class VideoManagerConstants { */ public static final String VM_MSG_STREAM_PUSH_RESPONSE = "VM_MSG_STREAM_PUSH_RESPONSE"; + /** + * redis 通知平台关闭推流 + */ + public static final String VM_MSG_STREAM_PUSH_CLOSE = "VM_MSG_STREAM_PUSH_CLOSE"; + /** * redis 消息请求所有的在线通道 */ diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java index 9f484266..7e1cc1d6 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java @@ -43,6 +43,9 @@ public class RedisMsgListenConfig { @Autowired private RedisPushStreamResponseListener redisPushStreamResponseListener; + @Autowired + private RedisCloseStreamMsgListener redisCloseStreamMsgListener; + /** * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 @@ -63,6 +66,7 @@ public class RedisMsgListenConfig { container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); + container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE)); return container; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 0a03c660..dcaab9e3 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -183,6 +183,7 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public boolean stop(String app, String streamId) { + logger.info("[推流 ] 停止流: {}/{}", app, streamId); StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); if (streamPushItem != null) { gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java new file mode 100644 index 00000000..d0104756 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisCloseStreamMsgListener.java @@ -0,0 +1,59 @@ +package com.genersoft.iot.vmp.service.redisMsg; + +import com.alibaba.fastjson2.JSON; +import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.service.IStreamPushService; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * 接收来自redis的关闭流更新通知 + * @author lin + */ +@Component +public class RedisCloseStreamMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisCloseStreamMsgListener.class); + + + @Autowired + private IStreamPushService pushService; + + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Override + public void onMessage(@NotNull Message message, byte[] bytes) { + boolean isEmpty = taskQueue.isEmpty(); + taskQueue.offer(message); + if (isEmpty) { + taskExecutor.execute(() -> { + while (!taskQueue.isEmpty()) { + Message msg = taskQueue.poll(); + try { + JSONObject jsonObject = JSON.parseObject(msg.getBody()); + String app = jsonObject.getString("app"); + String stream = jsonObject.getString("stream"); + pushService.stop(app, stream); + + }catch (Exception e) { + logger.warn("[REDIS的关闭推流通知] 发现未处理的异常, \r\n{}", JSON.toJSONString(message)); + logger.error("[REDIS的关闭推流通知] 异常内容: ", e); + } + } + }); + } + } +}