From 81f69eb6f47b69cd89da7621889629f4f456dce1 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 12 Jul 2022 17:33:17 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E4=BB=8Eredis=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E6=9B=B4=E6=96=B0=E6=8E=A8=E6=B5=81=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 4 + .../genersoft/iot/vmp/conf/RedisConfig.java | 9 +- .../subscribe/catalog/CatalogEventLister.java | 2 +- .../vmp/media/zlm/ZLMRTPServerFactory.java | 2 +- .../iot/vmp/service/IStreamPushService.java | 35 ++++++-- .../RedisPushStreamStatusMsgListener.java | 89 +++++++++++++++++++ .../service/impl/RedisStreamMsgListener.java | 7 +- .../service/impl/StreamPushServiceImpl.java | 32 ++++++- .../iot/vmp/storager/IRedisCatchStorage.java | 5 ++ .../vmp/storager/dao/StreamPushMapper.java | 42 +++++++++ 10 files changed, 210 insertions(+), 17 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 702e3579..40a73521 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -104,6 +104,10 @@ public class VideoManagerConstants { // 设备状态订阅的通知 public static final String VM_MSG_SUBSCRIBE_DEVICE_STATUS = "device"; + + + + //************************** 第三方 **************************************** public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java index ec1f9bac..6a862ae3 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java @@ -1,10 +1,7 @@ package com.genersoft.iot.vmp.conf; import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.service.impl.RedisAlarmMsgListener; -import com.genersoft.iot.vmp.service.impl.RedisGpsMsgListener; -import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; -import com.genersoft.iot.vmp.service.impl.RedisStreamMsgListener; +import com.genersoft.iot.vmp.service.impl.*; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -60,6 +57,9 @@ public class RedisConfig extends CachingConfigurerSupport { @Autowired private RedisGbPlayMsgListener redisGbPlayMsgListener; + @Autowired + private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; + @Bean public JedisPool jedisPool() { if (StringUtils.isBlank(password)) { @@ -108,6 +108,7 @@ public class RedisConfig extends CachingConfigurerSupport { container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); + container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); return container; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index e38733d5..ac304504 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -58,7 +58,7 @@ public class CatalogEventLister implements ApplicationListener { ParentPlatform parentPlatform = null; Map> parentPlatformMap = new HashMap<>(); - if (event.getPlatformId() != null) { + if (!StringUtils.isEmpty(event.getPlatformId())) { parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId()); if (parentPlatform != null && !parentPlatform.isStatus()) { return; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 366ed220..cbef9ce2 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -292,7 +292,7 @@ public class ZLMRTPServerFactory { logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg")); return -1; } - if ( code == 0 && ! mediaInfo.getBoolean("online")) { + if ( code == 0 && mediaInfo.getBoolean("online") != null && !mediaInfo.getBoolean("online")) { logger.warn("查询流({}/{})是否有其它观看者时得到: {}", app, streamId, mediaInfo.getString("msg")); return -1; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index acf0d274..5dd45ef9 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.github.pagehelper.PageInfo; @@ -44,31 +45,55 @@ public interface IStreamPushService { * 停止一路推流 * @param app 应用名 * @param streamId 流ID - * @return */ boolean stop(String app, String streamId); /** * 新的节点加入 - * @param mediaServerId - * @return */ void zlmServerOnline(String mediaServerId); /** * 节点离线 - * @param mediaServerId - * @return */ void zlmServerOffline(String mediaServerId); + /** + * 清空 + */ void clean(); + boolean saveToRandomGB(); + /** + * 批量添加 + */ void batchAdd(List streamPushExcelDtoList); + /** + * 中止多个推流 + */ boolean batchStop(List streamPushItems); + /** + * 导入时批量增加 + */ void batchAddForUpload(List streamPushItems, Map> streamPushItemsForAll); + + /** + * 全部离线 + */ + void allStreamOffline(); + + /** + * 推流离线 + */ + void offline(List offlineStreams); + + /** + * 推流上线 + */ + void online(List onlineStreams); + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java new file mode 100644 index 00000000..a34315bc --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java @@ -0,0 +1,89 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + +import java.util.List; + + +/** + * 接收redis发送的推流设备上线下线通知 + * @author lin + */ +@Component +public class RedisPushStreamStatusMsgListener implements MessageListener, ApplicationRunner { + + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class); + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IStreamPushService streamPushService; + + @Autowired + private EventPublisher eventPublisher; + + @Autowired + private UserSetting userSetting; + + @Autowired + private DynamicTask dynamicTask; + + @Override + public void onMessage(Message message, byte[] bytes) { + + PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(message.getBody(), PushStreamStatusChangeFromRedisDto.class); + if (statusChangeFromPushStream == null) { + logger.warn("[REDIS 消息]推流设备状态变化消息解析失败"); + return; + } + if (statusChangeFromPushStream.isSetAllOffline()) { + // 所有设备离线 + streamPushService.allStreamOffline(); + } + if (statusChangeFromPushStream.getOfflineStreams().size() > 0) { + // 更新部分设备离线 + streamPushService.offline(statusChangeFromPushStream.getOfflineStreams()); + } + if (statusChangeFromPushStream.getOnlineStreams().size() > 0) { + // 更新部分设备上线 + streamPushService.online(statusChangeFromPushStream.getOnlineStreams()); + } + } + + @Override + public void run(ApplicationArguments args) throws Exception { + // 启动时设置所有推流通道离线,发起查询请求 + redisCatchStorage.sendStreamPushRequestedMsgForStatus(); + dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{ + logger.info("[REDIS 消息]未收到redis回复推流设备状态,执行推流设备离线"); + // 五秒收不到请求就设置通道离线,然后通知上级离线 + streamPushService.allStreamOffline(); + }, 5000); + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java index 07fffdcc..83116f3a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java @@ -3,16 +3,12 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; + import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -22,6 +18,7 @@ import org.springframework.stereotype.Component; /** + * 接收其他wvp发送流变化通知 * @author lin */ @Component diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 837e135c..646d287d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.utils.DateUtil; @@ -181,7 +182,6 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public StreamPushItem getPush(String app, String streamId) { - return streamPushMapper.selectOne(app, streamId); } @@ -481,4 +481,34 @@ public class StreamPushServiceImpl implements IStreamPushService { } return true; } + + @Override + public void allStreamOffline() { + List onlinePushers = streamPushMapper.getOnlinePusherForGb(); + if (onlinePushers.size() == 0) { + return; + } + streamPushMapper.allStreamOffline(); + + // 发送通知 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); + } + + @Override + public void offline(List offlineStreams) { + // 更新部分设备离线 + List onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams); + streamPushMapper.offline(offlineStreams); + // 发送通知 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF); + } + + @Override + public void online(List onlineStreams) { + // 更新部分设备上线streamPushService + List onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams); + streamPushMapper.online(onlineStreams); + // 发送通知 + eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 79e6b26a..b9811da1 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -233,4 +233,9 @@ public interface IRedisCatchStorage { * @return */ StreamAuthorityInfo getStreamAuthorityInfo(String app, String stream); + + /** + * 发送redis消息 查询所有推流设备的状态 + */ + void sendStreamPushRequestedMsgForStatus(); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index 6c1e72d2..bcf57a6f 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager.dao; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis; import org.apache.ibatis.annotations.*; // import org.omg.PortableInterceptor.INACTIVE; import org.springframework.stereotype.Repository; @@ -117,4 +118,45 @@ public interface StreamPushMapper { "SET status=#{status} " + "WHERE mediaServerId=#{mediaServerId}") void updateStatusByMediaServerId(String mediaServerId, boolean status); + + + @Select("") + List getOnlinePusherForGbInList(List offlineStreams); + + @Update("") + void offline(List offlineStreams); + + @Select("") + List getOfflinePusherForGbInList(List onlineStreams); + + @Update("") + void online(List onlineStreams); + + @Select("SELECT gs.* FROM stream_push sp left join gb_stream gs on sp.app = gs.app AND sp.stream = gs.stream") + List getOnlinePusherForGb(); + + @Update("UPDATE stream_push SET status=0") + void allStreamOffline(); }