优化启动后清理过期信息的逻辑

This commit is contained in:
648540858 2021-12-10 11:08:04 +08:00
parent 41d3a7c240
commit ff66c356ca
5 changed files with 56 additions and 47 deletions

View File

@ -30,7 +30,7 @@ public class ZLMKeepliveTimeoutListener extends KeyExpirationEventMessageListene
public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
// 配置springboot默认Config为空即不让应用去修改redis的默认配置因为Redis服务出于安全会禁用CONFIG命令给远程用户使用
setKeyspaceNotificationsConfigParameter("");
// setKeyspaceNotificationsConfigParameter("");
}
/**

View File

@ -31,10 +31,8 @@ public class ZLMOfflineEventListener implements ApplicationListener<ZLMOfflineEv
@Override
public void onApplicationEvent(ZLMOfflineEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("ZLM离线事件触发ID" + event.getMediaServerId());
}
logger.info("ZLM离线事件触发ID" + event.getMediaServerId());
// 处理ZLM离线
mediaServerService.zlmServerOffline(event.getMediaServerId());
streamProxyService.zlmServerOffline(event.getMediaServerId());

View File

@ -37,10 +37,8 @@ public class ZLMOnlineEventListener implements ApplicationListener<ZLMOnlineEven
@Override
public void onApplicationEvent(ZLMOnlineEvent event) {
if (logger.isDebugEnabled()) {
logger.debug("ZLM上线事件触发ID" + event.getMediaServerId());
}
logger.info("ZLM上线事件触发ID" + event.getMediaServerId());
streamPushService.zlmServerOnline(event.getMediaServerId());
streamProxyService.zlmServerOnline(event.getMediaServerId());

View File

@ -565,7 +565,6 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
redisUtil.zRemove(VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetup.getServerId(), id);
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetup.getServerId() + "_" + id;
redisUtil.del(key);
mediaServerMapper.delOne(id);
}
@Override

View File

@ -154,47 +154,61 @@ public class StreamPushServiceImpl implements IStreamPushService {
if (mediaServerItem == null) {
return;
}
// 数据库记录
List<StreamPushItem> pushList = getPushList(mediaServerId);
Map<String, StreamPushItem> pushItemMap = new HashMap<>();
// redis记录
List<StreamInfo> streamInfoPushList = redisCatchStorage.getStreams(mediaServerId, "PUSH");
Map<String, StreamInfo> streamInfoPushItemMap = new HashMap<>();
if (pushList.size() > 0) {
Map<String, StreamPushItem> pushItemMap = new HashMap<>();
for (StreamPushItem streamPushItem : pushList) {
pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
}
zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
if (mediaList == null) return;
String dataStr = mediaList.getString("data");
Integer code = mediaList.getInteger("code");
List<StreamPushItem> streamPushItems = null;
if (code == 0 ) {
if (dataStr != null) {
streamPushItems = handleJSON(dataStr, mediaServerItem);
}
}
if (streamPushItems != null) {
for (StreamPushItem streamPushItem : streamPushItems) {
pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
}
}
Collection<StreamPushItem> offlinePushItems = pushItemMap.values();
if (offlinePushItems.size() > 0) {
String type = "PUSH";
streamPushMapper.delAll(new ArrayList<>(offlinePushItems));
for (StreamPushItem offlinePushItem : offlinePushItems) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetup.getServerId());
jsonObject.put("app", offlinePushItem.getApp());
jsonObject.put("stream", offlinePushItem.getStream());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlinePushItem.getApp(), offlinePushItem.getStream());
}
}
}));
}
if (streamInfoPushList.size() > 0) {
for (StreamInfo streamInfo : streamInfoPushList) {
streamInfoPushItemMap.put(streamInfo.getApp() + streamInfo.getStreamId(), streamInfo);
}
}
zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
if (mediaList == null) return;
String dataStr = mediaList.getString("data");
Integer code = mediaList.getInteger("code");
List<StreamPushItem> streamPushItems = null;
if (code == 0 ) {
if (dataStr != null) {
streamPushItems = handleJSON(dataStr, mediaServerItem);
}
}
if (streamPushItems != null) {
for (StreamPushItem streamPushItem : streamPushItems) {
pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
}
}
Collection<StreamPushItem> offlinePushItems = pushItemMap.values();
if (offlinePushItems.size() > 0) {
String type = "PUSH";
streamPushMapper.delAll(new ArrayList<>(offlinePushItems));
}
Collection<StreamInfo> offlineStreamInfoItems = streamInfoPushItemMap.values();
if (offlineStreamInfoItems.size() > 0) {
String type = "PUSH";
for (StreamInfo offlineStreamInfoItem : offlineStreamInfoItems) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetup.getServerId());
jsonObject.put("app", offlineStreamInfoItem.getApp());
jsonObject.put("stream", offlineStreamInfoItem.getStreamId());
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineStreamInfoItem.getApp(), offlineStreamInfoItem.getStreamId());
}
}
}));
}
@Override
@ -211,6 +225,8 @@ public class StreamPushServiceImpl implements IStreamPushService {
List<StreamInfo> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
if (streamInfoList.size() > 0) {
for (StreamInfo streamInfo : streamInfoList) {
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
JSONObject jsonObject = new JSONObject();
jsonObject.put("serverId", userSetup.getServerId());
jsonObject.put("app", streamInfo.getApp());
@ -218,8 +234,6 @@ public class StreamPushServiceImpl implements IStreamPushService {
jsonObject.put("register", false);
jsonObject.put("mediaServerId", mediaServerId);
redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
// 移除redis内流的信息
redisCatchStorage.removeStream(mediaServerId, type, streamInfo.getApp(), streamInfo.getStreamId());
}
}
}