From fd79d37fe6469637f7992426893a0375833ab6f1 Mon Sep 17 00:00:00 2001 From: jiang <893224616@qq.com> Date: Thu, 18 Aug 2022 16:17:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=B9=E6=8D=AEredis=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E6=8E=A8=E6=B5=81=E5=88=97=E8=A1=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 4 + .../genersoft/iot/vmp/conf/RedisConfig.java | 4 + .../iot/vmp/service/IGbStreamService.java | 8 ++ .../iot/vmp/service/IStreamPushService.java | 6 ++ .../vmp/service/impl/GbStreamServiceImpl.java | 8 +- .../impl/RedisPushStreamListMsgListener.java | 83 +++++++++++++++++++ .../service/impl/StreamPushServiceImpl.java | 6 ++ .../iot/vmp/storager/dao/GbStreamMapper.java | 10 +++ .../vmp/storager/dao/StreamPushMapper.java | 3 + 9 files changed, 130 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 510b5b22..bbbfce97 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -91,6 +91,10 @@ public class VideoManagerConstants { * 接收推流设备的GPS变化通知 */ public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE"; + /** + * 接收推流设备列表更新变化通知 + */ + public static final String VM_MSG_PUSH_STREAM_LIST_CHANGE = "VM_MSG_PUSH_STREAM_LIST_CHANGE"; /** * redis 消息通知设备推流到平台 diff --git a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java index d2e1347e..449a0181 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java @@ -43,6 +43,9 @@ public class RedisConfig extends CachingConfigurerSupport { @Autowired private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; + @Autowired + private RedisPushStreamListMsgListener redisPushStreamListMsgListener; + @Bean public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate redisTemplate = new RedisTemplate<>(); @@ -80,6 +83,7 @@ public class RedisConfig extends CachingConfigurerSupport { container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); + container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); return container; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java index 0a392060..61f94c25 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.github.pagehelper.PageInfo; import java.util.List; @@ -45,4 +46,11 @@ public interface IGbStreamService { void sendCatalogMsg(GbStream gbStream, String type); void sendCatalogMsgs(List gbStreams, String type); + + /** + * 修改gbId或name + * @param streamPushItemForUpdate + * @return + */ + int updateGbIdOrName(List streamPushItemForUpdate); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java index b95ec486..5dbba926 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java @@ -100,4 +100,10 @@ public interface IStreamPushService { * 增加推流 */ boolean add(StreamPushItem stream); + + /** + * 获取全部的app+Streanm 用于判断推流列表是新增还是修改 + * @return + */ + List getAllAppAndStream(); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java index 8734882f..0ce898e8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java @@ -1,10 +1,9 @@ package com.genersoft.iot.vmp.service.impl; -import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper; @@ -183,4 +182,9 @@ public class GbStreamServiceImpl implements IGbStreamService { } } } + + @Override + public int updateGbIdOrName(List streamPushItemForUpdate) { + return gbStreamMapper.updateGbIdOrName(streamPushItemForUpdate); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java new file mode 100644 index 00000000..d70ddf13 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java @@ -0,0 +1,83 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.IGbStreamService; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.*; + +/** + * @Auther: JiangFeng + * @Date: 2022/8/16 11:32 + * @Description: 接收redis发送的推流设备列表更新通知 + */ +@Component +public class RedisPushStreamListMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class); + @Resource + private IMediaServerService mediaServerService; + + @Resource + private IStreamPushService streamPushService; + @Resource + private IGbStreamService gbStreamService; + + @Override + public void onMessage(Message message, byte[] bytes) { + // + logger.warn("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody())); + List streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class); + //查询全部的app+stream 用于判断是添加还是修改 + List allAppAndStream = streamPushService.getAllAppAndStream(); + + /** + * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 + */ + List streamPushItemForSave = new ArrayList<>(); + List streamPushItemForUpdate = new ArrayList<>(); + for (StreamPushItem streamPushItem : streamPushItems) { + String app = streamPushItem.getApp(); + String stream = streamPushItem.getStream(); + boolean contains = allAppAndStream.contains(app + stream); + //不存在就添加 + if (!contains) { + streamPushItem.setStatus(false); + streamPushItem.setStreamType("push"); + streamPushItem.setCreateTime(DateUtil.getNow()); + streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); + streamPushItem.setOriginType(2); + streamPushItem.setOriginTypeStr("rtsp_push"); + streamPushItem.setTotalReaderCount("0"); + streamPushItemForSave.add(streamPushItem); + } else { + //存在就只修改 name和gbId + streamPushItemForUpdate.add(streamPushItem); + } + } + if (streamPushItemForSave.size() > 0) { + + logger.info("添加{}条",streamPushItemForSave.size()); + logger.info(JSONObject.toJSONString(streamPushItemForSave)); + streamPushService.batchAdd(streamPushItemForSave); + + } + if(streamPushItemForUpdate.size()>0){ + logger.info("修改{}条",streamPushItemForUpdate.size()); + logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); + gbStreamService.updateGbIdOrName(streamPushItemForUpdate); + } + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 6c6c04b1..ed592307 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -340,6 +340,7 @@ public class StreamPushServiceImpl implements IStreamPushService { gbStreamMapper.batchAdd(streamPushItems); } + @Override public void batchAddForUpload(List streamPushItems, Map> streamPushItemsForAll ) { // 存储数据到stream_push表 @@ -503,4 +504,9 @@ public class StreamPushServiceImpl implements IStreamPushService { } return result; } + + @Override + public List getAllAppAndStream() { + return streamPushMapper.getAllAppAndStream(); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index 7ed6b5ab..df9143da 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -148,4 +148,14 @@ public interface GbStreamMapper { "SET mediaServerId=#{mediaServerId}" + "WHERE app=#{app} AND stream=#{stream}") void updateMediaServer(String app, String stream, String mediaServerId); + + @Update("") + int updateGbIdOrName(List streamPushItemForUpdate); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index b4ee81ee..706de93e 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -168,4 +168,7 @@ public interface StreamPushMapper { @Update("UPDATE stream_push SET status=0") void setAllStreamOffline(); + + @Select("SELECT CONCAT(app,stream) FROM gb_stream") + List getAllAppAndStream(); }