From 7667b527cfff67634088400219098d9e233b1082 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 29 Nov 2022 11:42:09 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=98=9F=E5=88=97=E5=A4=84?= =?UTF-8?q?=E7=90=86=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/gb28181/SipLayer.java | 2 - .../DigestServerAuthenticationHelper.java | 72 +----- .../cmd/impl/SIPCommanderFroPlatform.java | 11 +- .../request/impl/NotifyRequestProcessor.java | 9 +- .../notify/cmd/AlarmNotifyMessageHandler.java | 209 +++++++++--------- .../MobilePositionNotifyMessageHandler.java | 32 +-- .../cmd/CatalogResponseMessageHandler.java | 125 +++++------ .../cmd/RecordInfoResponseMessageHandler.java | 17 +- .../redisMsg/RedisAlarmMsgListener.java | 99 ++++----- .../redisMsg/RedisGbPlayMsgListener.java | 172 +++++++------- .../service/redisMsg/RedisGpsMsgListener.java | 17 +- .../RedisPushStreamResponseListener.java | 30 +-- .../RedisPushStreamStatusListMsgListener.java | 83 +++---- .../RedisPushStreamStatusMsgListener.java | 51 ++--- .../redisMsg/RedisStreamMsgListener.java | 74 +++---- 15 files changed, 458 insertions(+), 545 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index 13fa01d3..24b88e55 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -29,8 +29,6 @@ public class SipLayer implements CommandLineRunner { @Autowired private ISIPProcessorObserver sipProcessorObserver; - - private final Map tcpSipProviderMap = new ConcurrentHashMap<>(); private final Map udpSipProviderMap = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java index 7c319efc..fdb05e56 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java @@ -25,10 +25,9 @@ */ package com.genersoft.iot.vmp.gb28181.auth; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.time.Instant; -import java.util.Random; +import gov.nist.core.InternalErrorHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.sip.address.URI; import javax.sip.header.AuthorizationHeader; @@ -36,10 +35,10 @@ import javax.sip.header.HeaderFactory; import javax.sip.header.WWWAuthenticateHeader; import javax.sip.message.Request; import javax.sip.message.Response; - -import gov.nist.core.InternalErrorHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.Instant; +import java.util.Random; /** * Implements the HTTP digest authentication method server side functionality. @@ -201,12 +200,13 @@ public class DigestServerAuthenticationHelper { // String ncStr = new DecimalFormat("00000000").format(Integer.parseInt(nc + "", 16)); String A1 = username + ":" + realm + ":" + pass; + String A2 = request.getMethod().toUpperCase() + ":" + uri.toString(); + byte mdbytes[] = messageDigest.digest(A1.getBytes()); String HA1 = toHexString(mdbytes); logger.debug("A1: " + A1); logger.debug("A2: " + A2); - mdbytes = messageDigest.digest(A2.getBytes()); String HA2 = toHexString(mdbytes); logger.debug("HA1: " + HA1); @@ -238,58 +238,4 @@ public class DigestServerAuthenticationHelper { } -// public static void main(String[] args) throws NoSuchAlgorithmException { -// String realm = "3402000000"; -// String username = "44010000001180008012"; - - -// String nonce = "07cab60999fbf643264ace27d3b7de8b"; -// String uri = "sip:34020000002000000001@3402000000"; -// // qop 保护质量 包含auth(默认的)和auth-int(增加了报文完整性检测)两种策略 -// String qop = "auth"; - -// // 客户端随机数,这是一个不透明的字符串值,由客户端提供,并且客户端和服务器都会使用,以避免用明文文本。 -// // 这使得双方都可以查验对方的身份,并对消息的完整性提供一些保护 -// //String cNonce = authHeader.getCNonce(); - -// // nonce计数器,是一个16进制的数值,表示同一nonce下客户端发送出请求的数量 -// int nc = 1; -// String ncStr = new DecimalFormat("00000000").format(nc); -// // String ncStr = new DecimalFormat("00000000").format(Integer.parseInt(nc + "", 16)); -// MessageDigest messageDigest = MessageDigest.getInstance(DEFAULT_ALGORITHM); -// String A1 = username + ":" + realm + ":" + "12345678"; -// String A2 = "REGISTER" + ":" + uri; -// byte mdbytes[] = messageDigest.digest(A1.getBytes()); -// String HA1 = toHexString(mdbytes); -// System.out.println("A1: " + A1); -// System.out.println("A2: " + A2); - -// mdbytes = messageDigest.digest(A2.getBytes()); -// String HA2 = toHexString(mdbytes); -// System.out.println("HA1: " + HA1); -// System.out.println("HA2: " + HA2); -// String cnonce = "0a4f113b"; -// System.out.println("nonce: " + nonce); -// System.out.println("nc: " + ncStr); -// System.out.println("cnonce: " + cnonce); -// System.out.println("qop: " + qop); -// String KD = HA1 + ":" + nonce; - -// if (qop != null && qop.equals("auth") ) { -// if (nc != -1) { -// KD += ":" + ncStr; -// } -// if (cnonce != null) { -// KD += ":" + cnonce; -// } -// KD += ":" + qop; -// } -// KD += ":" + HA2; -// System.out.println("KD: " + KD); -// mdbytes = messageDigest.digest(KD.getBytes()); -// String mdString = toHexString(mdbytes); -// System.out.println("mdString: " + mdString); -// String response = "4f0507d4b87cdecff04bdaf4c96348f0"; -// System.out.println("response: " + response); -// } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 0f234f54..a641d9be 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.alibaba.fastjson2.JSON; -import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; @@ -9,13 +8,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.SIPSender; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; -import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; +import com.genersoft.iot.vmp.utils.DateUtil; import gov.nist.javax.sip.message.MessageFactoryImpl; import gov.nist.javax.sip.message.SIPRequest; import org.slf4j.Logger; @@ -26,8 +25,10 @@ import org.springframework.lang.Nullable; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import javax.sip.*; -import javax.sip.header.*; +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import javax.sip.header.CallIdHeader; +import javax.sip.header.WWWAuthenticateHeader; import javax.sip.message.Request; import java.text.ParseException; import java.util.ArrayList; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 4df7d631..db922f98 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -11,7 +11,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; @@ -31,7 +30,6 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; @@ -77,8 +75,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IDeviceChannelService deviceChannelService; - private boolean taskQueueHandlerRun = false; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -98,9 +94,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements }catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } + boolean runed = !taskQueue.isEmpty(); taskQueue.offer(new HandlerCatchData(evt, null, null)); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + if (!runed) { taskExecutor.execute(()-> { while (!taskQueue.isEmpty()) { try { @@ -128,7 +124,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements logger.error("处理NOTIFY消息时错误", e); } } - taskQueueHandlerRun = false; }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index bb149786..09a5ffc2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.service.IDeviceAlarmService; @@ -27,17 +26,15 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; - import java.text.ParseException; import java.util.concurrent.ConcurrentLinkedQueue; -import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.*; +import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; /** * 报警事件的处理,参考:9.4 @@ -72,8 +69,6 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme @Autowired private IDeviceChannelService deviceChannelService; - private boolean taskQueueHandlerRun = false; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -89,128 +84,128 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { logger.info("[收到报警通知]设备:{}", device.getDeviceId()); - + boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + // 回复200 OK + try { + responseAck((SIPRequest) evt.getRequest(), Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 报警通知回复: {}", e.getMessage()); + } + if (isEmpty) { taskExecutor.execute(() -> { logger.info("[处理报警通知]待处理数量:{}", taskQueue.size() ); while (!taskQueue.isEmpty()) { - SipMsgInfo sipMsgInfo = taskQueue.poll(); - // 回复200 OK try { - responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.OK); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[处理报警通知], 回复200OK失败", e); - } + SipMsgInfo sipMsgInfo = taskQueue.poll(); - Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID"); - String channelId = deviceIdElement.getText().toString(); + Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID"); + String channelId = deviceIdElement.getText().toString(); - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setCreateTime(DateUtil.getNow()); - deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); - deviceAlarm.setChannelId(channelId); - deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority")); - deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod")); - String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime"); - if (alarmTime == null) { - continue; - } - deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); - String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription"); - if (alarmDescription == null) { - deviceAlarm.setAlarmDescription(""); - } else { - deviceAlarm.setAlarmDescription(alarmDescription); - } - String longitude = getText(sipMsgInfo.getRootElement(), "Longitude"); - if (longitude != null && NumericUtil.isDouble(longitude)) { - deviceAlarm.setLongitude(Double.parseDouble(longitude)); - } else { - deviceAlarm.setLongitude(0.00); - } - String latitude = getText(sipMsgInfo.getRootElement(), "Latitude"); - if (latitude != null && NumericUtil.isDouble(latitude)) { - deviceAlarm.setLatitude(Double.parseDouble(latitude)); - } else { - deviceAlarm.setLatitude(0.00); - } + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setCreateTime(DateUtil.getNow()); + deviceAlarm.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); + deviceAlarm.setChannelId(channelId); + deviceAlarm.setAlarmPriority(getText(sipMsgInfo.getRootElement(), "AlarmPriority")); + deviceAlarm.setAlarmMethod(getText(sipMsgInfo.getRootElement(), "AlarmMethod")); + String alarmTime = XmlUtil.getText(sipMsgInfo.getRootElement(), "AlarmTime"); + if (alarmTime == null) { + continue; + } + deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); + String alarmDescription = getText(sipMsgInfo.getRootElement(), "AlarmDescription"); + if (alarmDescription == null) { + deviceAlarm.setAlarmDescription(""); + } else { + deviceAlarm.setAlarmDescription(alarmDescription); + } + String longitude = getText(sipMsgInfo.getRootElement(), "Longitude"); + if (longitude != null && NumericUtil.isDouble(longitude)) { + deviceAlarm.setLongitude(Double.parseDouble(longitude)); + } else { + deviceAlarm.setLongitude(0.00); + } + String latitude = getText(sipMsgInfo.getRootElement(), "Latitude"); + if (latitude != null && NumericUtil.isDouble(latitude)) { + deviceAlarm.setLatitude(Double.parseDouble(latitude)); + } else { + deviceAlarm.setLatitude(0.00); + } - if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) { - if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); - mobilePosition.setTime(deviceAlarm.getAlarmTime()); - mobilePosition.setLongitude(deviceAlarm.getLongitude()); - mobilePosition.setLatitude(deviceAlarm.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); + if (!ObjectUtils.isEmpty(deviceAlarm.getAlarmMethod())) { + if ( deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.GPS.getVal() + "")) { + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); + mobilePosition.setTime(deviceAlarm.getAlarmTime()); + mobilePosition.setLongitude(deviceAlarm.getLongitude()); + mobilePosition.setLatitude(deviceAlarm.getLatitude()); + mobilePosition.setReportSource("GPS Alarm"); - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); - deviceChannel.setChannelId(channelId); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); + // 更新device channel 的经纬度 + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); + deviceChannel.setChannelId(channelId); + deviceChannel.setLongitude(mobilePosition.getLongitude()); + deviceChannel.setLatitude(mobilePosition.getLatitude()); + deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice()); + deviceChannel = deviceChannelService.updateGps(deviceChannel, sipMsgInfo.getDevice()); - mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); - mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); - mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); - mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); + mobilePosition.setLongitudeWgs84(deviceChannel.getLongitudeWgs84()); + mobilePosition.setLatitudeWgs84(deviceChannel.getLatitudeWgs84()); + mobilePosition.setLongitudeGcj02(deviceChannel.getLongitudeGcj02()); + mobilePosition.setLatitudeGcj02(deviceChannel.getLatitudeGcj02()); - if (userSetting.getSavePositionHistory()) { - storager.insertMobilePosition(mobilePosition); + if (userSetting.getSavePositionHistory()) { + storager.insertMobilePosition(mobilePosition); + } + storager.updateChannelPosition(deviceChannel); + + // 发送redis消息。 通知位置信息的变化 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("time", mobilePosition.getTime()); + jsonObject.put("serial", deviceChannel.getDeviceId()); + jsonObject.put("code", deviceChannel.getChannelId()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + redisCatchStorage.sendMobilePositionMsg(jsonObject); } - storager.updateChannelPosition(deviceChannel); - - // 发送redis消息。 通知位置信息的变化 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("time", mobilePosition.getTime()); - jsonObject.put("serial", deviceChannel.getDeviceId()); - jsonObject.put("code", deviceChannel.getChannelId()); - jsonObject.put("longitude", mobilePosition.getLongitude()); - jsonObject.put("latitude", mobilePosition.getLatitude()); - jsonObject.put("altitude", mobilePosition.getAltitude()); - jsonObject.put("direction", mobilePosition.getDirection()); - jsonObject.put("speed", mobilePosition.getSpeed()); - redisCatchStorage.sendMobilePositionMsg(jsonObject); } - } - if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) { - if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) { - deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType")); + if (!ObjectUtils.isEmpty(deviceAlarm.getDeviceId())) { + if (deviceAlarm.getAlarmMethod().contains(DeviceAlarmMethod.Video.getVal() + "")) { + deviceAlarm.setAlarmType(getText(sipMsgInfo.getRootElement().element("Info"), "AlarmType")); + } + } + logger.info("[收到报警通知]内容:{}", JSON.toJSONString(deviceAlarm)); + if ("7".equals(deviceAlarm.getAlarmMethod()) ) { + // 发送给平台的报警信息。 发送redis通知 + AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage(); + alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod())); + alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription()); + alarmChannelMessage.setGbId(channelId); + redisCatchStorage.sendAlarmMsg(alarmChannelMessage); + continue; } - } - logger.info("[收到报警通知]内容:{}", JSON.toJSONString(deviceAlarm)); - if ("7".equals(deviceAlarm.getAlarmMethod()) ) { - // 发送给平台的报警信息。 发送redis通知 - AlarmChannelMessage alarmChannelMessage = new AlarmChannelMessage(); - alarmChannelMessage.setAlarmSn(Integer.parseInt(deviceAlarm.getAlarmMethod())); - alarmChannelMessage.setAlarmDescription(deviceAlarm.getAlarmDescription()); - alarmChannelMessage.setGbId(channelId); - redisCatchStorage.sendAlarmMsg(alarmChannelMessage); - continue; - } - logger.debug("存储报警信息、报警分类"); - // 存储报警信息、报警分类 - if (sipConfig.isAlarm()) { - deviceAlarmService.add(deviceAlarm); - } + logger.debug("存储报警信息、报警分类"); + // 存储报警信息、报警分类 + if (sipConfig.isAlarm()) { + deviceAlarmService.add(deviceAlarm); + } - if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) { - publisher.deviceAlarmEventPublish(deviceAlarm); + if (redisCatchStorage.deviceIsOnline(sipMsgInfo.getDevice().getDeviceId())) { + publisher.deviceAlarmEventPublish(deviceAlarm); + } + }catch (Exception e) { + logger.warn("[收到报警通知] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest()); } } - taskQueueHandlerRun = false; }); } - - } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index 7a0ea1c2..40d1dcc9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -22,7 +22,6 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; @@ -57,8 +56,6 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen @Autowired private IDeviceChannelService deviceChannelService; - private boolean taskQueueHandlerRun = false; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -73,21 +70,22 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { + boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + // 回复200 OK + try { + responseAck((SIPRequest) evt.getRequest(), Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage()); + } + if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { SipMsgInfo sipMsgInfo = taskQueue.poll(); try { Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset()); if (rootElementAfterCharset == null) { - try { - logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest()); - responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.BAD_REQUEST); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 移动设备位置数据通知 内容为空: {}", e.getMessage()); - } + logger.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId()); continue; } MobilePosition mobilePosition = new MobilePosition(); @@ -137,12 +135,6 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen storager.insertMobilePosition(mobilePosition); } storager.updateChannelPosition(deviceChannel); - //回复 200 OK - try { - responseAck((SIPRequest) sipMsgInfo.getEvt().getRequest(), Response.OK); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 移动设备位置数据回复200: {}", e.getMessage()); - } // 发送redis消息。 通知位置信息的变化 JSONObject jsonObject = new JSONObject(); @@ -158,14 +150,12 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen } catch (DocumentException e) { e.printStackTrace(); + } catch (Exception e) { + logger.warn("[移动位置通知] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest()); } - } - taskQueueHandlerRun = false; }); } - - } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 52e0b7f1..761481be 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -1,18 +1,11 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; -import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.CatalogDataCatch; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; -import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; @@ -24,7 +17,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; -import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; @@ -45,12 +37,10 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class); private final String cmdType = "Catalog"; - private boolean taskQueueHandlerRun = false; - @Autowired private ResponseMessageHandler responseMessageHandler; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Autowired private IVideoManagerStorage storager; @@ -69,6 +59,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @Override public void handForDevice(RequestEvent evt, Device device, Element element) { + boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(new HandlerCatchData(evt, device, element)); // 回复200 OK try { @@ -76,67 +67,71 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } catch (SipException | InvalidArgumentException | ParseException e) { logger.error("[命令发送失败] 目录查询回复: {}", e.getMessage()); } - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + // 如果不为空则说明已经开启消息处理 + if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { - HandlerCatchData take = taskQueue.poll(); - Element rootElement = null; + // 全局异常捕获,保证下一条可以得到处理 try { - rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); - } catch (DocumentException e) { - logger.error("[xml解析] 失败: ", e); - continue; - } - if (rootElement == null) { - logger.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest()); - continue; - } - Element deviceListElement = rootElement.element("DeviceList"); - Element sumNumElement = rootElement.element("SumNum"); - Element snElement = rootElement.element("SN"); - int sumNum = Integer.parseInt(sumNumElement.getText()); - - if (sumNum == 0) { - logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); - // 数据已经完整接收 - storager.cleanChannelsForDevice(take.getDevice().getDeviceId()); - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); - } else { - Iterator deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - List channelList = new ArrayList<>(); - // 遍历DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; - } - DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice, device, null); - deviceChannel.setDeviceId(take.getDevice().getDeviceId()); - - channelList.add(deviceChannel); - } - int sn = Integer.parseInt(snElement.getText()); - catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); - logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); - if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { - // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, - // 目前支持设备通道上线通知时和设备上线时向上级通知 - boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); - if (!resetChannelsResult) { - String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条"; - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); - } else { - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); - } - } + HandlerCatchData take = taskQueue.poll(); + Element rootElement = null; + try { + rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); + } catch (DocumentException e) { + logger.error("[xml解析] 失败: ", e); + continue; } + if (rootElement == null) { + logger.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest()); + continue; + } + Element deviceListElement = rootElement.element("DeviceList"); + Element sumNumElement = rootElement.element("SumNum"); + Element snElement = rootElement.element("SN"); + int sumNum = Integer.parseInt(sumNumElement.getText()); + if (sumNum == 0) { + logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); + // 数据已经完整接收 + storager.cleanChannelsForDevice(take.getDevice().getDeviceId()); + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); + } else { + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + List channelList = new ArrayList<>(); + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element channelDeviceElement = itemDevice.element("DeviceID"); + if (channelDeviceElement == null) { + continue; + } + DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice, device, null); + deviceChannel.setDeviceId(take.getDevice().getDeviceId()); + + channelList.add(deviceChannel); + } + int sn = Integer.parseInt(snElement.getText()); + catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); + logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 : catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); + if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { + // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, + // 目前支持设备通道上线通知时和设备上线时向上级通知 + boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); + if (!resetChannelsResult) { + String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条"; + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); + } else { + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); + } + } + } + + } + }catch (Exception e) { + logger.warn("[收到通道] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest()); } } - taskQueueHandlerRun = false; }); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 286dd565..11d239ef 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.utils.DateUtil; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import gov.nist.javax.sip.message.SIPRequest; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -21,17 +20,17 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; -import java.util.*; -import java.util.concurrent.BlockingQueue; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -46,7 +45,6 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - private boolean taskQueueHandlerRun = false; @Autowired private ResponseMessageHandler responseMessageHandler; @@ -70,6 +68,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { + boolean isEmpty = taskQueue.isEmpty(); try { // 回复200 OK responseAck((SIPRequest) evt.getRequest(), Response.OK); @@ -77,8 +76,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage()); } taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + if (isEmpty) { taskExecutor.execute(()->{ while (!taskQueue.isEmpty()) { try { @@ -151,9 +149,10 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent } } catch (DocumentException e) { logger.error("xml解析异常: ", e); + } catch (Exception e) { + logger.warn("[国标录像] 发现未处理的异常, {}\r\n{}",e.getMessage(), evt.getRequest()); } } - taskQueueHandlerRun = false; }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java index d68591c4..9bb3bbd2 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisAlarmMsgListener.java @@ -38,8 +38,6 @@ public class RedisAlarmMsgListener implements MessageListener { @Autowired private IVideoManagerStorage storage; - private boolean taskQueueHandlerRun = false; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -49,69 +47,68 @@ public class RedisAlarmMsgListener implements MessageListener { @Override public void onMessage(@NotNull Message message, byte[] bytes) { logger.info("收到来自REDIS的ALARM通知: {}", new String(message.getBody())); - + boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + if (isEmpty) { logger.info("[线程池信息]活动线程数:{}, 最大线程数: {}", taskExecutor.getActiveCount(), taskExecutor.getMaxPoolSize()); taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); + try { + AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class); + if (alarmChannelMessage == null) { + logger.warn("[REDIS的ALARM通知]消息解析失败"); + continue; + } + String gbId = alarmChannelMessage.getGbId(); - AlarmChannelMessage alarmChannelMessage = JSON.parseObject(msg.getBody(), AlarmChannelMessage.class); - if (alarmChannelMessage == null) { - logger.warn("[REDIS的ALARM通知]消息解析失败"); - continue; - } - String gbId = alarmChannelMessage.getGbId(); + DeviceAlarm deviceAlarm = new DeviceAlarm(); + deviceAlarm.setCreateTime(DateUtil.getNow()); + deviceAlarm.setChannelId(gbId); + deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription()); + deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn()); + deviceAlarm.setAlarmPriority("1"); + deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601()); + deviceAlarm.setAlarmType("1"); + deviceAlarm.setLongitude(0); + deviceAlarm.setLatitude(0); - DeviceAlarm deviceAlarm = new DeviceAlarm(); - deviceAlarm.setCreateTime(DateUtil.getNow()); - deviceAlarm.setChannelId(gbId); - deviceAlarm.setAlarmDescription(alarmChannelMessage.getAlarmDescription()); - deviceAlarm.setAlarmMethod("" + alarmChannelMessage.getAlarmSn()); - deviceAlarm.setAlarmPriority("1"); - deviceAlarm.setAlarmTime(DateUtil.getNowForISO8601()); - deviceAlarm.setAlarmType("1"); - deviceAlarm.setLongitude(0); - deviceAlarm.setLatitude(0); - - if (ObjectUtils.isEmpty(gbId)) { - // 发送给所有的上级 - List parentPlatforms = storage.queryEnableParentPlatformList(true); - if (parentPlatforms.size() > 0) { - for (ParentPlatform parentPlatform : parentPlatforms) { - try { - commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); - } catch (SipException | InvalidArgumentException | ParseException e) { - logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage()); + if (ObjectUtils.isEmpty(gbId)) { + // 发送给所有的上级 + List parentPlatforms = storage.queryEnableParentPlatformList(true); + if (parentPlatforms.size() > 0) { + for (ParentPlatform parentPlatform : parentPlatforms) { + try { + commanderForPlatform.sendAlarmMessage(parentPlatform, deviceAlarm); + } catch (SipException | InvalidArgumentException | ParseException e) { + logger.error("[命令发送失败] 国标级联 发送报警: {}", e.getMessage()); + } } } - } - }else { - Device device = storage.queryVideoDevice(gbId); - ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId); - if (device != null && platform == null) { - try { - commander.sendAlarmMessage(device, deviceAlarm); - } catch (InvalidArgumentException | SipException | ParseException e) { - logger.error("[命令发送失败] 发送报警: {}", e.getMessage()); - } - }else if (device == null && platform != null){ - try { - commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); - } catch (InvalidArgumentException | SipException | ParseException e) { - logger.error("[命令发送失败] 发送报警: {}", e.getMessage()); - } }else { - logger.warn("无法确定" + gbId + "是平台还是设备"); + Device device = storage.queryVideoDevice(gbId); + ParentPlatform platform = storage.queryParentPlatByServerGBId(gbId); + if (device != null && platform == null) { + try { + commander.sendAlarmMessage(device, deviceAlarm); + } catch (InvalidArgumentException | SipException | ParseException e) { + logger.error("[命令发送失败] 发送报警: {}", e.getMessage()); + } + }else if (device == null && platform != null){ + try { + commanderForPlatform.sendAlarmMessage(platform, deviceAlarm); + } catch (InvalidArgumentException | SipException | ParseException e) { + logger.error("[命令发送失败] 发送报警: {}", e.getMessage()); + } + }else { + logger.warn("无法确定" + gbId + "是平台还是设备"); + } } + }catch (Exception e) { + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); } } - taskQueueHandlerRun = false; }); } - - } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index 1330262c..42cbec5c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -88,8 +88,6 @@ public class RedisGbPlayMsgListener implements MessageListener { @Autowired private ZlmHttpHookSubscribe subscribe; - private boolean taskQueueHandlerRun = false; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -111,107 +109,103 @@ public class RedisGbPlayMsgListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { - + boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); - JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); - WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); - if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { - continue; - } - if (WvpRedisMsg.isRequest(wvpRedisMsg)) { - logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); - - switch (wvpRedisMsg.getCmd()){ - case WvpRedisMsgCmd.GET_SEND_ITEM: - RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); - requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: - RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; - requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); - break; - default: - break; + try { + JSONObject msgJSON = JSON.parseObject(msg.getBody(), JSONObject.class); + WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); + if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { + continue; } + if (WvpRedisMsg.isRequest(wvpRedisMsg)) { + logger.info("[收到REDIS通知] 请求: {}", new String(msg.getBody())); - }else { - logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); - switch (wvpRedisMsg.getCmd()){ - case WvpRedisMsgCmd.GET_SEND_ITEM: + switch (wvpRedisMsg.getCmd()){ + case WvpRedisMsgCmd.GET_SEND_ITEM: + RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); + requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: + RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; + requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + default: + break; + } - WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); + }else { + logger.info("[收到REDIS通知] 回复: {}", new String(msg.getBody())); + switch (wvpRedisMsg.getCmd()){ + case WvpRedisMsgCmd.GET_SEND_ITEM: - String key = wvpRedisMsg.getSerial(); - switch (content.getCode()) { - case 0: - ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); - PlayMsgCallback playMsgCallback = callbacks.get(key); - if (playMsgCallback != null) { - callbacksForError.remove(key); - try { - playMsgCallback.handler(responseSendItemMsg); - } catch (ParseException e) { - logger.error("[REDIS消息处理异常] ", e); + WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); + + String key = wvpRedisMsg.getSerial(); + switch (content.getCode()) { + case 0: + ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); + PlayMsgCallback playMsgCallback = callbacks.get(key); + if (playMsgCallback != null) { + callbacksForError.remove(key); + try { + playMsgCallback.handler(responseSendItemMsg); + } catch (ParseException e) { + logger.error("[REDIS消息处理异常] ", e); + } } - } - break; - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: - case ERROR_CODE_OFFLINE: - case ERROR_CODE_TIMEOUT: - PlayMsgErrorCallback errorCallback = callbacksForError.get(key); - if (errorCallback != null) { - callbacks.remove(key); - errorCallback.handler(content); - } - break; - default: - break; - } - break; - case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: - WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); - String serial = wvpRedisMsg.getSerial(); - switch (wvpResult.getCode()) { - case 0: - JSONObject jsonObject = (JSONObject)wvpResult.getData(); - PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); - if (playMsgCallback != null) { - callbacksForError.remove(serial); - playMsgCallback.handler(jsonObject); - } - break; - case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: - case ERROR_CODE_OFFLINE: - case ERROR_CODE_TIMEOUT: - PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); - if (errorCallback != null) { - callbacks.remove(serial); - errorCallback.handler(wvpResult); - } - break; - default: - break; - } - break; - default: - break; + break; + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: + case ERROR_CODE_OFFLINE: + case ERROR_CODE_TIMEOUT: + PlayMsgErrorCallback errorCallback = callbacksForError.get(key); + if (errorCallback != null) { + callbacks.remove(key); + errorCallback.handler(content); + } + break; + default: + break; + } + break; + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: + WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); + String serial = wvpRedisMsg.getSerial(); + switch (wvpResult.getCode()) { + case 0: + JSONObject jsonObject = (JSONObject)wvpResult.getData(); + PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); + if (playMsgCallback != null) { + callbacksForError.remove(serial); + playMsgCallback.handler(jsonObject); + } + break; + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: + case ERROR_CODE_OFFLINE: + case ERROR_CODE_TIMEOUT: + PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); + if (errorCallback != null) { + callbacks.remove(serial); + errorCallback.handler(wvpResult); + } + break; + default: + break; + } + break; + default: + break; + } } + }catch (Exception e) { + logger.warn("[RedisGbPlayMsg] 发现未处理的异常, {}",e.getMessage()); } } - taskQueueHandlerRun = false; }); } - - - - - - } /** diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java index c43a9375..0c99707e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java @@ -27,8 +27,6 @@ public class RedisGpsMsgListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class); - private boolean taskQueueHandlerRun = false; - @Autowired private IRedisCatchStorage redisCatchStorage; @@ -44,17 +42,20 @@ public class RedisGpsMsgListener implements MessageListener { @Override public void onMessage(@NotNull Message message, byte[] bytes) { + boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); - GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class); - // 只是放入redis缓存起来 - redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); + try { + GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class); + // 只是放入redis缓存起来 + redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); + }catch (Exception e) { + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); + } } - taskQueueHandlerRun = false; }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java index 05d662d7..33eae1eb 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,8 +25,6 @@ public class RedisPushStreamResponseListener implements MessageListener { private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); - private boolean taskQueueHandlerRun = false; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -43,24 +40,27 @@ public class RedisPushStreamResponseListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { - logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); + logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); + boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); - MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); - if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ - logger.info("[REDIS消息-请求推流结果]:参数不全"); - continue; - } - // 查看正在等待的invite消息 - if (responseEvents.get(response.getApp() + response.getStream()) != null) { - responseEvents.get(response.getApp() + response.getStream()).run(response); + try { + MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class); + if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ + logger.info("[REDIS消息-请求推流结果]:参数不全"); + continue; + } + // 查看正在等待的invite消息 + if (responseEvents.get(response.getApp() + response.getStream()) != null) { + responseEvents.get(response.getApp() + response.getStream()).run(response); + } + }catch (Exception e) { + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); } } - taskQueueHandlerRun = false; }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java index 15e37ecf..d8ed1a01 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusListMsgListener.java @@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,7 +17,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.annotation.Resource; -import java.util.*; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; /** @@ -38,7 +38,6 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { @Resource private IGbStreamService gbStreamService; - private boolean taskQueueHandlerRun = false; private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @@ -49,54 +48,56 @@ public class RedisPushStreamStatusListMsgListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { logger.info("[REDIS消息-推流设备列表更新]: {}", new String(message.getBody())); - + boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); - List streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); - //查询全部的app+stream 用于判断是添加还是修改 - List allAppAndStream = streamPushService.getAllAppAndStream(); + try { + List streamPushItems = JSON.parseArray(new String(msg.getBody()), StreamPushItem.class); + //查询全部的app+stream 用于判断是添加还是修改 + List allAppAndStream = streamPushService.getAllAppAndStream(); - /** - * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 - */ - List streamPushItemForSave = new ArrayList<>(); - List streamPushItemForUpdate = new ArrayList<>(); - for (StreamPushItem streamPushItem : streamPushItems) { - String app = streamPushItem.getApp(); - String stream = streamPushItem.getStream(); - boolean contains = allAppAndStream.contains(app + stream); - //不存在就添加 - if (!contains) { - streamPushItem.setStreamType("push"); - streamPushItem.setCreateTime(DateUtil.getNow()); - streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); - streamPushItem.setOriginType(2); - streamPushItem.setOriginTypeStr("rtsp_push"); - streamPushItem.setTotalReaderCount("0"); - streamPushItemForSave.add(streamPushItem); - } else { - //存在就只修改 name和gbId - streamPushItemForUpdate.add(streamPushItem); + /** + * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表 + */ + List streamPushItemForSave = new ArrayList<>(); + List streamPushItemForUpdate = new ArrayList<>(); + for (StreamPushItem streamPushItem : streamPushItems) { + String app = streamPushItem.getApp(); + String stream = streamPushItem.getStream(); + boolean contains = allAppAndStream.contains(app + stream); + //不存在就添加 + if (!contains) { + streamPushItem.setStreamType("push"); + streamPushItem.setCreateTime(DateUtil.getNow()); + streamPushItem.setMediaServerId(mediaServerService.getDefaultMediaServer().getId()); + streamPushItem.setOriginType(2); + streamPushItem.setOriginTypeStr("rtsp_push"); + streamPushItem.setTotalReaderCount("0"); + streamPushItemForSave.add(streamPushItem); + } else { + //存在就只修改 name和gbId + streamPushItemForUpdate.add(streamPushItem); + } } - } - if (streamPushItemForSave.size() > 0) { + if (streamPushItemForSave.size() > 0) { - logger.info("添加{}条",streamPushItemForSave.size()); - logger.info(JSONObject.toJSONString(streamPushItemForSave)); - streamPushService.batchAdd(streamPushItemForSave); + logger.info("添加{}条",streamPushItemForSave.size()); + logger.info(JSONObject.toJSONString(streamPushItemForSave)); + streamPushService.batchAdd(streamPushItemForSave); - } - if(streamPushItemForUpdate.size()>0){ - logger.info("修改{}条",streamPushItemForUpdate.size()); - logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); - gbStreamService.updateGbIdOrName(streamPushItemForUpdate); + } + if(streamPushItemForUpdate.size()>0){ + logger.info("修改{}条",streamPushItemForUpdate.size()); + logger.info(JSONObject.toJSONString(streamPushItemForUpdate)); + gbStreamService.updateGbIdOrName(streamPushItemForUpdate); + } + }catch (Exception e) { + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); } } - taskQueueHandlerRun = false; }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java index 4fafa1c3..96ff8e83 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamStatusMsgListener.java @@ -29,8 +29,6 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class); - private boolean taskQueueHandlerRun = false; - @Autowired private IRedisCatchStorage redisCatchStorage; @@ -50,37 +48,40 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic @Override public void onMessage(Message message, byte[] bytes) { + boolean isEmpty = taskQueue.isEmpty(); logger.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody())); taskQueue.offer(message); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); - PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); - if (statusChangeFromPushStream == null) { - logger.warn("[REDIS消息]推流设备状态变化消息解析失败"); - continue; - } - // 取消定时任务 - dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); - if (statusChangeFromPushStream.isSetAllOffline()) { - // 所有设备离线 - streamPushService.allStreamOffline(); - } - if (statusChangeFromPushStream.getOfflineStreams() != null - && statusChangeFromPushStream.getOfflineStreams().size() > 0) { - // 更新部分设备离线 - streamPushService.offline(statusChangeFromPushStream.getOfflineStreams()); - } - if (statusChangeFromPushStream.getOnlineStreams() != null && - statusChangeFromPushStream.getOnlineStreams().size() > 0) { - // 更新部分设备上线 - streamPushService.online(statusChangeFromPushStream.getOnlineStreams()); + try { + PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class); + if (statusChangeFromPushStream == null) { + logger.warn("[REDIS消息]推流设备状态变化消息解析失败"); + continue; + } + // 取消定时任务 + dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED); + if (statusChangeFromPushStream.isSetAllOffline()) { + // 所有设备离线 + streamPushService.allStreamOffline(); + } + if (statusChangeFromPushStream.getOfflineStreams() != null + && statusChangeFromPushStream.getOfflineStreams().size() > 0) { + // 更新部分设备离线 + streamPushService.offline(statusChangeFromPushStream.getOfflineStreams()); + } + if (statusChangeFromPushStream.getOnlineStreams() != null && + statusChangeFromPushStream.getOnlineStreams().size() > 0) { + // 更新部分设备上线 + streamPushService.online(statusChangeFromPushStream.getOnlineStreams()); + } + }catch (Exception e) { + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); } } - taskQueueHandlerRun = false; }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java index 3e73fc05..1cdc527a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java @@ -33,8 +33,6 @@ public class RedisStreamMsgListener implements MessageListener { @Autowired private ZLMMediaListManager zlmMediaListManager; - private boolean taskQueueHandlerRun = false; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @@ -43,48 +41,50 @@ public class RedisStreamMsgListener implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { - + boolean isEmpty = taskQueue.isEmpty(); taskQueue.offer(message); - if (!taskQueueHandlerRun) { - taskQueueHandlerRun = true; + if (isEmpty) { taskExecutor.execute(() -> { while (!taskQueue.isEmpty()) { Message msg = taskQueue.poll(); - JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); - if (steamMsgJson == null) { - logger.warn("[收到redis 流变化]消息解析失败"); - continue; - } - String serverId = steamMsgJson.getString("serverId"); + try { + JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class); + if (steamMsgJson == null) { + logger.warn("[收到redis 流变化]消息解析失败"); + continue; + } + String serverId = steamMsgJson.getString("serverId"); - if (userSetting.getServerId().equals(serverId)) { - // 自己发送的消息忽略即可 - continue; - } - logger.info("[收到redis 流变化]: {}", new String(message.getBody())); - String app = steamMsgJson.getString("app"); - String stream = steamMsgJson.getString("stream"); - boolean register = steamMsgJson.getBoolean("register"); - String mediaServerId = steamMsgJson.getString("mediaServerId"); - OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); - onStreamChangedHookParam.setSeverId(serverId); - onStreamChangedHookParam.setApp(app); - onStreamChangedHookParam.setStream(stream); - onStreamChangedHookParam.setRegist(register); - onStreamChangedHookParam.setMediaServerId(mediaServerId); - onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); - onStreamChangedHookParam.setAliveSecond(0L); - onStreamChangedHookParam.setTotalReaderCount("0"); - onStreamChangedHookParam.setOriginType(0); - onStreamChangedHookParam.setOriginTypeStr("0"); - onStreamChangedHookParam.setOriginTypeStr("unknown"); - if (register) { - zlmMediaListManager.addPush(onStreamChangedHookParam); - }else { - zlmMediaListManager.removeMedia(app, stream); + if (userSetting.getServerId().equals(serverId)) { + // 自己发送的消息忽略即可 + continue; + } + logger.info("[收到redis 流变化]: {}", new String(message.getBody())); + String app = steamMsgJson.getString("app"); + String stream = steamMsgJson.getString("stream"); + boolean register = steamMsgJson.getBoolean("register"); + String mediaServerId = steamMsgJson.getString("mediaServerId"); + OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam(); + onStreamChangedHookParam.setSeverId(serverId); + onStreamChangedHookParam.setApp(app); + onStreamChangedHookParam.setStream(stream); + onStreamChangedHookParam.setRegist(register); + onStreamChangedHookParam.setMediaServerId(mediaServerId); + onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000); + onStreamChangedHookParam.setAliveSecond(0L); + onStreamChangedHookParam.setTotalReaderCount("0"); + onStreamChangedHookParam.setOriginType(0); + onStreamChangedHookParam.setOriginTypeStr("0"); + onStreamChangedHookParam.setOriginTypeStr("unknown"); + if (register) { + zlmMediaListManager.addPush(onStreamChangedHookParam); + }else { + zlmMediaListManager.removeMedia(app, stream); + } + }catch (Exception e) { + logger.warn("[REDIS的ALARM通知] 发现未处理的异常, {}",e.getMessage()); } } - taskQueueHandlerRun = false; }); } }