From e3bbb4a066b3a0c0f898dfa068d50d7a76df9d8c Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 29 Aug 2022 11:50:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E9=99=A4zlm=E4=BD=BF=E7=94=A8redis?= =?UTF-8?q?=E8=BF=87=E6=9C=9F=E4=BD=9C=E4=B8=BA=E5=BF=83=E8=B7=B3=E8=B6=85?= =?UTF-8?q?=E6=97=B6=E7=9A=84=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 2 - ...edisKeyExpirationEventMessageListener.java | 3 +- .../KeepaliveTimeoutListenerForPlatform.java | 81 +++++++++++++++++++ .../zlm/event/ZLMKeepliveTimeoutListener.java | 72 ----------------- .../service/impl/MediaServerServiceImpl.java | 40 +++++++-- 5 files changed, 118 insertions(+), 80 deletions(-) rename src/main/java/com/genersoft/iot/vmp/conf/{ => redis}/RedisKeyExpirationEventMessageListener.java (94%) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index bbbfce97..7a122c77 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -14,8 +14,6 @@ public class VideoManagerConstants { public static final String MEDIA_SERVER_PREFIX = "VMP_MEDIA_SERVER_"; - public static final String MEDIA_SERVER_KEEPALIVE_PREFIX = "VMP_MEDIA_SERVER_KEEPALIVE_"; - public static final String MEDIA_SERVERS_ONLINE_PREFIX = "VMP_MEDIA_ONLINE_SERVERS_"; public static final String MEDIA_STREAM_PREFIX = "VMP_MEDIA_STREAM"; diff --git a/src/main/java/com/genersoft/iot/vmp/conf/RedisKeyExpirationEventMessageListener.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisKeyExpirationEventMessageListener.java similarity index 94% rename from src/main/java/com/genersoft/iot/vmp/conf/RedisKeyExpirationEventMessageListener.java rename to src/main/java/com/genersoft/iot/vmp/conf/redis/RedisKeyExpirationEventMessageListener.java index ef4a6172..b3adab52 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/RedisKeyExpirationEventMessageListener.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisKeyExpirationEventMessageListener.java @@ -1,5 +1,6 @@ -package com.genersoft.iot.vmp.conf; +package com.genersoft.iot.vmp.conf.redis; +import com.genersoft.iot.vmp.conf.UserSetting; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java new file mode 100644 index 00000000..ead82464 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java @@ -0,0 +1,81 @@ +package com.genersoft.iot.vmp.gb28181.event.offline; + +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.conf.redis.RedisKeyExpirationEventMessageListener; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; + +/** + * 设备心跳超时监听,借助redis过期特性,进行监听,监听到说明设备心跳超时,发送离线事件 + * @author swwheihei + */ +@Component +public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEventMessageListener { + + private Logger logger = LoggerFactory.getLogger(KeepaliveTimeoutListenerForPlatform.class); + + @Autowired + private EventPublisher publisher; + + @Autowired + private UserSetting userSetting; + + @Autowired + private SipSubscribe sipSubscribe; + + @Autowired + private IVideoManagerStorage storager; + + public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) { + super(listenerContainer, userSetting); + } + + + /** + * 监听失效的key + * @param message + * @param pattern + */ + @Override + public void onMessage(Message message, byte[] pattern) { + // 获取失效的key + String expiredKey = message.toString(); + // 平台心跳到期,需要重发, 判断是否已经多次未收到心跳回复, 多次未收到,则重新发起注册, 注册尝试多次未得到回复,则认为平台离线 + String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.PLATFORM_KEEPALIVE_PREFIX + userSetting.getServerId() + "_"; + String PLATFORM_REGISTER_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_PREFIX + userSetting.getServerId() + "_"; + String REGISTER_INFO_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_"; + if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { + String platformGbId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length()); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGbId); + if (platform != null) { + publisher.platformKeepaliveExpireEventPublish(platformGbId); + } + }else if (expiredKey.startsWith(PLATFORM_REGISTER_PREFIX)) { + String platformGbId = expiredKey.substring(PLATFORM_REGISTER_PREFIX.length(),expiredKey.length()); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGbId); + if (platform != null) { + publisher.platformRegisterCycleEventPublish(platformGbId); + } + }else if (expiredKey.startsWith(REGISTER_INFO_PREFIX)) { + String callId = expiredKey.substring(REGISTER_INFO_PREFIX.length()); + if (sipSubscribe.getErrorSubscribe(callId) != null) { + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); + eventResult.callId = callId; + eventResult.msg = "注册超时"; + eventResult.type = "register timeout"; + sipSubscribe.getErrorSubscribe(callId).response(eventResult); + } + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java deleted file mode 100644 index d3af23c0..00000000 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.genersoft.iot.vmp.media.zlm.event; - -import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IMediaServerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.stereotype.Component; - -/** - * @description:设备心跳超时监听,借助redis过期特性,进行监听,监听到说明设备心跳超时,发送离线事件 - * @author: swwheihei - * @date: 2020年5月6日 上午11:35:46 - */ -@Component -public class ZLMKeepliveTimeoutListener extends RedisKeyExpirationEventMessageListener { - - private Logger logger = LoggerFactory.getLogger(ZLMKeepliveTimeoutListener.class); - - @Autowired - private EventPublisher publisher; - - @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; - - @Autowired - private UserSetting userSetting; - - @Autowired - private IMediaServerService mediaServerService; - - public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) { - super(listenerContainer, userSetting); - } - - - /** - * 监听失效的key,key格式为keeplive_deviceId - * @param message - * @param pattern - */ - @Override - public void onMessage(Message message, byte[] pattern) { - // 获取失效的key - String expiredKey = message.toString(); - String KEEPLIVEKEY_PREFIX = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_"; - if(!expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){ - return; - } - - String mediaServerId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); - logger.info("[zlm心跳到期]:" + mediaServerId); - // 发起http请求验证zlm是否确实无法连接,如果确实无法连接则发送离线事件,否则不作处理 - MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); - JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(mediaServerItem); - if (mediaServerConfig != null && mediaServerConfig.getInteger("code") == 0) { - logger.info("[zlm心跳到期]:{}验证后zlm仍在线,恢复心跳信息", mediaServerId); - // 添加zlm信息 - mediaServerService.updateMediaServerKeepalive(mediaServerId, mediaServerConfig); - }else { - publisher.zlmOfflineEventPublish(mediaServerId); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 55b32d90..385dd5e4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import org.slf4j.Logger; @@ -53,6 +54,8 @@ public class MediaServerServiceImpl implements IMediaServerService { private final static Logger logger = LoggerFactory.getLogger(MediaServerServiceImpl.class); + private final String zlmKeepaliveKeyPrefix = "zlm-keepalive_"; + @Autowired private SipConfig sipConfig; @@ -83,10 +86,12 @@ public class MediaServerServiceImpl implements IMediaServerService { @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; - @Autowired private EventPublisher publisher; + @Autowired + private DynamicTask dynamicTask; + /** * 初始化 */ @@ -398,11 +403,37 @@ public class MediaServerServiceImpl implements IMediaServerService { if (serverItem.isAutoConfig()) { setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable())); } + final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId(); + dynamicTask.stop(zlmKeepaliveKey); + dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), serverItem.getHookAliveInterval() * 1000); publisher.zlmOnlineEventPublish(serverItem.getId()); logger.info("[ZLM] 连接成功 {} - {}:{} ", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); } + class KeepAliveTimeoutRunnable implements Runnable{ + + private MediaServerItem serverItem; + + public KeepAliveTimeoutRunnable(MediaServerItem serverItem) { + this.serverItem = serverItem; + } + + @Override + public void run() { + logger.info("[zlm心跳到期]:" + serverItem.getId()); + // 发起http请求验证zlm是否确实无法连接,如果确实无法连接则发送离线事件,否则不作处理 + JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(serverItem); + if (mediaServerConfig != null && mediaServerConfig.getInteger("code") == 0) { + logger.info("[zlm心跳到期]:{}验证后zlm仍在线,恢复心跳信息,请检查zlm是否可以正常向wvp发送心跳", serverItem.getId()); + // 添加zlm信息 + updateMediaServerKeepalive(serverItem.getId(), mediaServerConfig); + }else { + publisher.zlmOfflineEventPublish(serverItem.getId()); + } + } + } + @Override public void zlmServerOffline(String mediaServerId) { @@ -429,7 +460,6 @@ public class MediaServerServiceImpl implements IMediaServerService { }else { clearRTPServer(serverItem); } - } @@ -625,9 +655,9 @@ public class MediaServerServiceImpl implements IMediaServerService { return; } } - String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId; - int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2; - RedisUtil.set(key, data, hookAliveInterval); + final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + mediaServerItem.getId(); + dynamicTask.stop(zlmKeepaliveKey); + dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(mediaServerItem), mediaServerItem.getHookAliveInterval() * 1000); } private MediaServerItem getOneFromDatabase(String mediaServerId) {