根据redis消息更新推流列表

This commit is contained in:
jiang 2022-08-18 16:17:23 +08:00
parent 36a803f816
commit fd79d37fe6
9 changed files with 130 additions and 2 deletions

View File

@ -91,6 +91,10 @@ public class VideoManagerConstants {
* 接收推流设备的GPS变化通知
*/
public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE";
/**
* 接收推流设备列表更新变化通知
*/
public static final String VM_MSG_PUSH_STREAM_LIST_CHANGE = "VM_MSG_PUSH_STREAM_LIST_CHANGE";
/**
* redis 消息通知设备推流到平台

View File

@ -43,6 +43,9 @@ public class RedisConfig extends CachingConfigurerSupport {
@Autowired
private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
@Autowired
private RedisPushStreamListMsgListener redisPushStreamListMsgListener;
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
@ -80,6 +83,7 @@ public class RedisConfig extends CachingConfigurerSupport {
container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY));
container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
return container;
}

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.GbStream;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.github.pagehelper.PageInfo;
import java.util.List;
@ -45,4 +46,11 @@ public interface IGbStreamService {
void sendCatalogMsg(GbStream gbStream, String type);
void sendCatalogMsgs(List<GbStream> gbStreams, String type);
/**
* 修改gbId或name
* @param streamPushItemForUpdate
* @return
*/
int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate);
}

View File

@ -100,4 +100,10 @@ public interface IStreamPushService {
* 增加推流
*/
boolean add(StreamPushItem stream);
/**
* 获取全部的app+Streanm 用于判断推流列表是新增还是修改
* @return
*/
List<String> getAllAppAndStream();
}

View File

@ -1,10 +1,9 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
import com.genersoft.iot.vmp.storager.dao.PlatformCatalogMapper;
@ -183,4 +182,9 @@ public class GbStreamServiceImpl implements IGbStreamService {
}
}
}
@Override
public int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate) {
return gbStreamMapper.updateGbIdOrName(streamPushItemForUpdate);
}
}

View File

@ -0,0 +1,83 @@
package com.genersoft.iot.vmp.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IGbStreamService;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
/**
* @Auther: JiangFeng
* @Date: 2022/8/16 11:32
* @Description: 接收redis发送的推流设备列表更新通知
*/
@Component
public class RedisPushStreamListMsgListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class);
@Resource
private IMediaServerService mediaServerService;
@Resource
private IStreamPushService streamPushService;
@Resource
private IGbStreamService gbStreamService;
@Override
public void onMessage(Message message, byte[] bytes) {
//
logger.warn("[REDIS消息-推流设备列表更新] {}", new String(message.getBody()));
List<StreamPushItem> streamPushItems = JSON.parseArray(new String(message.getBody()), StreamPushItem.class);
//查询全部的app+stream 用于判断是添加还是修改
List<String> allAppAndStream = streamPushService.getAllAppAndStream();
/**
* 用于存储更具APP+Stream过滤后的数据可以直接存入stream_push表与gb_stream表
*/
List<StreamPushItem> streamPushItemForSave = new ArrayList<>();
List<StreamPushItem> streamPushItemForUpdate = new ArrayList<>();
for (StreamPushItem streamPushItem : streamPushItems) {
String app = streamPushItem.getApp();
String stream = streamPushItem.getStream();
boolean contains = allAppAndStream.contains(app + stream);
//不存在就添加
if (!contains) {
streamPushItem.setStatus(false);
streamPushItem.setStreamType("push");
streamPushItem.setCreateTime(DateUtil.getNow());
streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId());
streamPushItem.setOriginType(2);
streamPushItem.setOriginTypeStr("rtsp_push");
streamPushItem.setTotalReaderCount("0");
streamPushItemForSave.add(streamPushItem);
} else {
//存在就只修改 name和gbId
streamPushItemForUpdate.add(streamPushItem);
}
}
if (streamPushItemForSave.size() > 0) {
logger.info("添加{}条",streamPushItemForSave.size());
logger.info(JSONObject.toJSONString(streamPushItemForSave));
streamPushService.batchAdd(streamPushItemForSave);
}
if(streamPushItemForUpdate.size()>0){
logger.info("修改{}条",streamPushItemForUpdate.size());
logger.info(JSONObject.toJSONString(streamPushItemForUpdate));
gbStreamService.updateGbIdOrName(streamPushItemForUpdate);
}
}
}

View File

@ -340,6 +340,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
gbStreamMapper.batchAdd(streamPushItems);
}
@Override
public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
// 存储数据到stream_push表
@ -503,4 +504,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
}
return result;
}
@Override
public List<String> getAllAppAndStream() {
return streamPushMapper.getAllAppAndStream();
}
}

View File

@ -148,4 +148,14 @@ public interface GbStreamMapper {
"SET mediaServerId=#{mediaServerId}" +
"WHERE app=#{app} AND stream=#{stream}")
void updateMediaServer(String app, String stream, String mediaServerId);
@Update("<script> "+
" <foreach collection='list' item='item' index='index' separator=';'>"+
"UPDATE gb_stream " +
" SET name=#{item.name},"+
" gbId=#{item.gbId}"+
" WHERE app=#{item.app} and stream=#{item.stream}"+
"</foreach>"+
"</script>")
int updateGbIdOrName(List<StreamPushItem> streamPushItemForUpdate);
}

View File

@ -168,4 +168,7 @@ public interface StreamPushMapper {
@Update("UPDATE stream_push SET status=0")
void setAllStreamOffline();
@Select("SELECT CONCAT(app,stream) FROM gb_stream")
List<String> getAllAppAndStream();
}