diff --git a/doc/_content/ability/_media/img_16.png b/doc/_content/ability/_media/img_16.png index f7ce9e7a..b09e8cd3 100644 Binary files a/doc/_content/ability/_media/img_16.png and b/doc/_content/ability/_media/img_16.png differ diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index d181a4af..31fe7a4f 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -62,6 +62,8 @@ public class UserSetting { private List allowedOrigins = new ArrayList<>(); + private int maxNotifyCountQueue = 10000; + public Boolean getSavePositionHistory() { return savePositionHistory; } @@ -257,4 +259,12 @@ public class UserSetting { public void setRecordPath(String recordPath) { this.recordPath = recordPath; } + + public int getMaxNotifyCountQueue() { + return maxNotifyCountQueue; + } + + public void setMaxNotifyCountQueue(int maxNotifyCountQueue) { + this.maxNotifyCountQueue = maxNotifyCountQueue; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java new file mode 100644 index 00000000..56fa187e --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForCatalogProcessor.java @@ -0,0 +1,264 @@ +package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; + +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; +import com.genersoft.iot.vmp.service.IDeviceChannelService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import org.dom4j.DocumentException; +import org.dom4j.Element; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.sip.RequestEvent; +import javax.sip.header.FromHeader; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * SIP命令类型: NOTIFY请求中的目录请求处理 + */ +@Component +public class NotifyRequestForCatalogProcessor extends SIPRequestProcessorParent { + + + private final static Logger logger = LoggerFactory.getLogger(NotifyRequestForCatalogProcessor.class); + + private final List updateChannelOnlineList = new CopyOnWriteArrayList<>(); + private final List updateChannelOfflineList = new CopyOnWriteArrayList<>(); + private final Map updateChannelMap = new ConcurrentHashMap<>(); + + private final Map addChannelMap = new ConcurrentHashMap<>(); + private final List deleteChannelList = new CopyOnWriteArrayList<>(); + + + @Autowired + private UserSetting userSetting; + + @Autowired + private EventPublisher eventPublisher; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IDeviceChannelService deviceChannelService; + + @Autowired + private DynamicTask dynamicTask; + + private final static String talkKey = "notify-request-for-catalog-task"; + + public void process(RequestEvent evt) { + try { + long start = System.currentTimeMillis(); + FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); + String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); + + Device device = redisCatchStorage.getDevice(deviceId); + if (device == null || device.getOnline() == 0) { + logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" )); + return; + } + Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + logger.warn("[ 收到目录订阅 ] content cannot be null, {}", evt.getRequest()); + return; + } + Element deviceListElement = rootElement.element("DeviceList"); + if (deviceListElement == null) { + return; + } + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element channelDeviceElement = itemDevice.element("DeviceID"); + if (channelDeviceElement == null) { + continue; + } + Element eventElement = itemDevice.element("Event"); + String event; + if (eventElement == null) { + logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); + event = CatalogEvent.ADD; + }else { + event = eventElement.getText().toUpperCase(); + } + DeviceChannel channel = XmlUtil.channelContentHander(itemDevice, device, event); + + channel.setDeviceId(device.getDeviceId()); + logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); + switch (event) { + case CatalogEvent.ON: + // 上线 + logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + updateChannelOnlineList.add(channel); + if (updateChannelOnlineList.size() > 300) { + executeSaveForOnline(); + } + break; + case CatalogEvent.OFF : + // 离线 + logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + updateChannelOfflineList.add(channel); + if (updateChannelOfflineList.size() > 300) { + executeSaveForOffline(); + } + }else { + logger.info("[收到通道离线通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } + break; + case CatalogEvent.VLOST: + // 视频丢失 + logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + updateChannelOfflineList.add(channel); + if (updateChannelOfflineList.size() > 300) { + executeSaveForOffline(); + } + }else { + logger.info("[收到通道视频丢失通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } + break; + case CatalogEvent.DEFECT: + // 故障 + logger.info("[收到通道视频故障通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + if (userSetting.getRefuseChannelStatusChannelFormNotify()) { + updateChannelOfflineList.add(channel); + if (updateChannelOfflineList.size() > 300) { + executeSaveForOffline(); + } + }else { + logger.info("[收到通道视频故障通知] 但是平台已配置拒绝此消息,来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + } + break; + case CatalogEvent.ADD: + // 增加 + logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + // 判断此通道是否存在 + DeviceChannel deviceChannel = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannel != null) { + channel.setId(deviceChannel.getId()); + updateChannelMap.put(channel.getChannelId(), channel); + if (updateChannelMap.keySet().size() > 300) { + executeSaveForUpdate(); + } + }else { + addChannelMap.put(channel.getChannelId(), channel); + if (addChannelMap.keySet().size() > 300) { + executeSaveForAdd(); + } + } + + break; + case CatalogEvent.DEL: + // 删除 + logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + deleteChannelList.add(channel); + if (deleteChannelList.size() > 300) { + executeSaveForDelete(); + } + break; + case CatalogEvent.UPDATE: + // 更新 + logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId()); + // 判断此通道是否存在 + DeviceChannel deviceChannelForUpdate = deviceChannelService.getOne(deviceId, channel.getChannelId()); + if (deviceChannelForUpdate != null) { + channel.setId(deviceChannelForUpdate.getId()); + updateChannelMap.put(channel.getChannelId(), channel); + if (updateChannelMap.keySet().size() > 300) { + executeSaveForUpdate(); + } + }else { + addChannelMap.put(channel.getChannelId(), channel); + if (addChannelMap.keySet().size() > 300) { + executeSaveForAdd(); + } + } + break; + default: + logger.warn("[ NotifyCatalog ] event not found : {}", event ); + + } + // 转发变化信息 + eventPublisher.catalogEventPublish(null, channel, event); + + if (updateChannelMap.keySet().size() > 0 + || addChannelMap.keySet().size() > 0 + || updateChannelOnlineList.size() > 0 + || updateChannelOfflineList.size() > 0 + || deleteChannelList.size() > 0) { + + if (!dynamicTask.contains(talkKey)) { + dynamicTask.startDelay(talkKey, this::executeSave, 1000); + } + } + } + } + } catch (DocumentException e) { + logger.error("未处理的异常 ", e); + } + } + + private void executeSave(){ + System.out.println("定时存储数据"); + executeSaveForUpdate(); + executeSaveForDelete(); + executeSaveForOnline(); + executeSaveForOffline(); + dynamicTask.stop(talkKey); + } + + private void executeSaveForUpdate(){ + if (updateChannelMap.values().size() > 0) { + ArrayList deviceChannels = new ArrayList<>(updateChannelMap.values()); + updateChannelMap.clear(); + deviceChannelService.batchUpdateChannel(deviceChannels); + } + + } + + private void executeSaveForAdd(){ + if (addChannelMap.values().size() > 0) { + ArrayList deviceChannels = new ArrayList<>(addChannelMap.values()); + addChannelMap.clear(); + deviceChannelService.batchAddChannel(deviceChannels); + } + } + + private void executeSaveForDelete(){ + if (deleteChannelList.size() > 0) { + deviceChannelService.deleteChannels(deleteChannelList); + deleteChannelList.clear(); + } + } + + private void executeSaveForOnline(){ + if (updateChannelOnlineList.size() > 0) { + deviceChannelService.channelsOnline(updateChannelOnlineList); + updateChannelOnlineList.clear(); + } + } + + private void executeSaveForOffline(){ + if (updateChannelOfflineList.size() > 0) { + deviceChannelService.channelsOffline(updateChannelOfflineList); + updateChannelOfflineList.clear(); + } + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 42606411..5dae8261 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -76,12 +76,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IDeviceChannelService deviceChannelService; + @Autowired + private NotifyRequestForCatalogProcessor notifyRequestForCatalogProcessor; + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; + private int maxQueueCount = 30000; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -91,7 +96,15 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Override public void process(RequestEvent evt) { try { - responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + responseAck((SIPRequest) evt.getRequest(), Response.BUSY_HERE, null, null); + logger.error("[notify] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); + return; + }else { + responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + } + }catch (SipException | InvalidArgumentException | ParseException e) { logger.error("未处理的异常 ", e); } @@ -103,6 +116,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements while (!taskQueue.isEmpty()) { try { HandlerCatchData take = taskQueue.poll(); + if (take == null) { + continue; + } Element rootElement = getRootElement(take.getEvt()); if (rootElement == null) { logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest()); @@ -112,7 +128,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements if (CmdType.CATALOG.equals(cmd)) { logger.info("接收到Catalog通知"); - processNotifyCatalogList(take.getEvt()); +// processNotifyCatalogList(take.getEvt()); + notifyRequestForCatalogProcessor.process(take.getEvt()); } else if (CmdType.ALARM.equals(cmd)) { logger.info("接收到Alarm通知"); processNotifyAlarm(take.getEvt()); @@ -132,7 +149,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements /** * 处理MobilePosition移动位置Notify - * + * * @param evt */ private void processNotifyMobilePosition(RequestEvent evt) { @@ -236,7 +253,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements /*** * 处理alarm设备报警Notify - * + * * @param evt */ private void processNotifyAlarm(RequestEvent evt) { @@ -346,7 +363,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements /*** * 处理catalog设备目录列表Notify - * + * * @param evt */ private void processNotifyCatalogList(RequestEvent evt) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java index c192dd5a..66dbe077 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IDeviceChannelService.java @@ -56,4 +56,35 @@ public interface IDeviceChannelService { * 查询通道所属的设备 */ List getDeviceByChannelId(String channelId); + + /** + * 批量删除通道 + * @param deleteChannelList 待删除的通道列表 + */ + int deleteChannels(List deleteChannelList); + + /** + * 批量上线 + */ + int channelsOnline(List channels); + + /** + * 批量下线 + */ + int channelsOffline(List channels); + + /** + * 获取一个通道 + */ + DeviceChannel getOne(String deviceId, String channelId); + + /** + * 直接批量更新通道 + */ + void batchUpdateChannel(List channels); + + /** + * 直接批量添加 + */ + void batchAddChannel(List deviceChannels); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java index 9223ced0..229bc0d2 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceChannelServiceImpl.java @@ -209,6 +209,47 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { @Override public List getDeviceByChannelId(String channelId) { + return channelMapper.getDeviceByChannelId(channelId); } + + @Override + public int deleteChannels(List deleteChannelList) { + return channelMapper.batchDel(deleteChannelList); + } + + @Override + public int channelsOnline(List channels) { + return channelMapper.batchOnline(channels); + } + + @Override + public int channelsOffline(List channels) { + return channelMapper.batchOffline(channels); + } + + @Override + public DeviceChannel getOne(String deviceId, String channelId){ + return channelMapper.queryChannel(deviceId, channelId); + } + + @Override + public void batchUpdateChannel(List channels) { + channelMapper.batchUpdate(channels); + for (DeviceChannel channel : channels) { + if (channel.getParentId() != null) { + channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId()); + } + } + } + + @Override + public void batchAddChannel(List channels) { + channelMapper.batchAdd(channels); + for (DeviceChannel channel : channels) { + if (channel.getParentId() != null) { + channelMapper.updateChannelSubCount(channel.getDeviceId(), channel.getParentId()); + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index a3dd6a7e..48de5d2b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -644,4 +644,6 @@ public class DeviceServiceImpl implements IDeviceService { public List getAll() { return deviceMapper.getAll(); } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 93f2a097..3f4d804c 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -197,6 +197,60 @@ public interface DeviceChannelMapper { @Update(value = {"UPDATE device_channel SET status=0 WHERE deviceId=#{deviceId}"}) void offlineByDeviceId(String deviceId); +// @Insert("") +// int batchAdd(List addChannels); + + @Insert("") int batchAdd(List addChannels); @@ -264,7 +285,7 @@ public interface DeviceChannelMapper { ", owner=#{item.owner}" + ", civilCode=#{item.civilCode}" + ", block=#{item.block}" + - ", block=#{item.subCount}" + + ", subCount=#{item.subCount}" + ", address=#{item.address}" + ", parental=#{item.parental}" + ", parentId=#{item.parentId}" + @@ -289,7 +310,8 @@ public interface DeviceChannelMapper { ", latitudeWgs84=#{item.latitudeWgs84}" + ", businessGroupId=#{item.businessGroupId}" + ", gpsTime=#{item.gpsTime}" + - "WHERE deviceId=#{item.deviceId} AND channelId=#{item.channelId}"+ + "WHERE id=#{item.id}" + + "WHERE deviceId=#{item.deviceId} AND channelId=#{item.channelId}" + "" + ""}) int batchUpdate(List updateChannels); @@ -403,4 +425,26 @@ public interface DeviceChannelMapper { @Select("select de.* from device de left join device_channel dc on de.deviceId = dc.deviceId where dc.channelId=#{channelId}") List getDeviceByChannelId(String channelId); + + + @Delete({""}) + int batchDel(List deleteChannelList); + + @Update({""}) + int batchOnline(List channels); + + @Update({""}) + int batchOffline(List channels); } diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index d0a82895..bae99045 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -178,6 +178,8 @@ user-settings: send-to-platforms-when-id-lost: true # 保持通道状态,不接受notify通道状态变化, 兼容海康平台发送错误消息 refuse-channel-status-channel-form-notify: false + # 设置notify缓存队列最大长度,超过此长度的数据将返回486 BUSY_HERE,消息丢弃, 默认10000 + max-notify-count-queue: 10000 # 跨域配置,配置你访问前端页面的地址即可, 可以配置多个 allowed-origins: - http://localhost:8008