diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index 3b021de4..ade2e622 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -103,12 +103,12 @@ public class DynamicTask { public void stop(String key) { if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { - futureMap.get(key).cancel(true); - Runnable runnable = runnableMap.get(key); - if (runnable instanceof ISubscribeTask) { - ISubscribeTask subscribeTask = (ISubscribeTask) runnable; - subscribeTask.stop(); - } +// Runnable runnable = runnableMap.get(key); +// if (runnable instanceof ISubscribeTask) { +// ISubscribeTask subscribeTask = (ISubscribeTask) runnable; +// subscribeTask.stop(); +// } + futureMap.get(key).cancel(false); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/HandlerCatchData.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/HandlerCatchData.java new file mode 100644 index 00000000..97da8630 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/HandlerCatchData.java @@ -0,0 +1,44 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import org.dom4j.Element; + +import javax.sip.RequestEvent; + +/** + * @author lin + */ +public class HandlerCatchData { + private RequestEvent evt; + private Device device; + private Element rootElement; + + public HandlerCatchData(RequestEvent evt, Device device, Element rootElement) { + this.evt = evt; + this.device = device; + this.rootElement = rootElement; + } + + public RequestEvent getEvt() { + return evt; + } + + public void setEvt(RequestEvent evt) { + this.evt = evt; + } + + public Device getDevice() { + return device; + } + + public void setDevice(Device device) { + this.device = device; + } + + public Element getRootElement() { + return rootElement; + } + + public void setRootElement(Element rootElement) { + this.rootElement = rootElement; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index f191c005..4a900c16 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.bean; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -38,7 +39,6 @@ public class SubscribeHolder { catalogMap.put(platformId, subscribeInfo); // 添加订阅到期 String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; - dynamicTask.stop(taskOverdueKey); // 添加任务处理订阅过期 dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()), subscribeInfo.getExpires() * 1000); @@ -49,10 +49,17 @@ public class SubscribeHolder { } public void removeCatalogSubscribe(String platformId) { + catalogMap.remove(platformId); String taskOverdueKey = taskOverduePrefix + "catalog_" + platformId; + Runnable runnable = dynamicTask.get(taskOverdueKey); + if (runnable instanceof ISubscribeTask) { + ISubscribeTask subscribeTask = (ISubscribeTask) runnable; + subscribeTask.stop(); + } // 添加任务处理订阅过期 dynamicTask.stop(taskOverdueKey); + } public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) { @@ -63,7 +70,6 @@ public class SubscribeHolder { storager, platformId, subscribeInfo.getSn(), key, this, dynamicTask), subscribeInfo.getGpsInterval() * 1000); String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; - dynamicTask.stop(taskOverdueKey); // 添加任务处理订阅过期 dynamicTask.startDelay(taskOverdueKey, () -> { removeMobilePositionSubscribe(subscribeInfo.getId()); @@ -81,6 +87,11 @@ public class SubscribeHolder { // 结束任务处理GPS定时推送 dynamicTask.stop(key); String taskOverdueKey = taskOverduePrefix + "MobilePosition_" + platformId; + Runnable runnable = dynamicTask.get(taskOverdueKey); + if (runnable instanceof ISubscribeTask) { + ISubscribeTask subscribeTask = (ISubscribeTask) runnable; + subscribeTask.stop(); + } // 添加任务处理订阅过期 dynamicTask.stop(taskOverdueKey); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 7e5ecb49..e38733d5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -66,7 +66,7 @@ public class CatalogEventLister implements ApplicationListener { subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId()); if (subscribe == null) { - logger.info("发送订阅消息时发现订阅信息已经不存在"); + logger.info("发送订阅消息时发现订阅信息已经不存在: {}", event.getPlatformId()); return; } }else { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index a06a73d0..a2fab813 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -150,30 +150,24 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { public void processTimeout(TimeoutEvent timeoutEvent) { logger.info("[消息发送超时]"); ClientTransaction clientTransaction = timeoutEvent.getClientTransaction(); - eventPublisher.requestTimeOut(timeoutEvent); + if (clientTransaction != null) { + logger.info("[发送错误订阅] clientTransaction != null"); Request request = clientTransaction.getRequest(); if (request != null) { + logger.info("[发送错误订阅] request != null"); CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); if (callIdHeader != null) { + logger.info("[发送错误订阅]"); SipSubscribe.Event subscribe = sipSubscribe.getErrorSubscribe(callIdHeader.getCallId()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(timeoutEvent); subscribe.response(eventResult); + sipSubscribe.removeOkSubscribe(callIdHeader.getCallId()); sipSubscribe.removeErrorSubscribe(callIdHeader.getCallId()); } } } - -// Timeout timeout = timeoutEvent.getTimeout(); -// ServerTransaction serverTransaction = timeoutEvent.getServerTransaction(); -// if (serverTransaction != null) { -// Request request = serverTransaction.getRequest(); -// URI requestURI = request.getRequestURI(); -// Header header = request.getHeader(FromHeader.NAME); -// } -// if(timeoutProcessor != null) { -// timeoutProcessor.process(timeoutEvent); -// } + eventPublisher.requestTimeOut(timeoutEvent); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 9786ea47..2bb0307c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1487,7 +1487,6 @@ public class SIPCommander implements ISIPCommander { Request request; if (dialog != null) { - logger.info("发送移动位置订阅消息时 dialog的状态为: {}", dialog.getState()); request = dialog.createRequest(Request.SUBSCRIBE); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); request.setContent(subscribePostitionXml.toString(), contentTypeHeader); @@ -1583,12 +1582,12 @@ public class SIPCommander implements ISIPCommander { Request request; if (dialog != null) { - logger.info("发送目录订阅消息时 dialog的状态为: {}", dialog.getState()); request = dialog.createRequest(Request.SUBSCRIBE); + ExpiresHeader expiresHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForCatalog()); + request.setExpires(expiresHeader); + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); request.setContent(cmdXml.toString(), contentTypeHeader); - ExpiresHeader expireHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForMobilePosition()); - request.addHeader(expireHeader); }else { String tm = Long.toString(System.currentTimeMillis()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 40b9a9ae..38a72465 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; @@ -26,6 +25,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -36,6 +37,7 @@ import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.text.ParseException; import java.util.Iterator; +import java.util.concurrent.ConcurrentLinkedQueue; /** * SIP命令类型: NOTIFY请求 @@ -64,11 +66,19 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Autowired private EventPublisher publisher; - private String method = "NOTIFY"; + private final String method = "NOTIFY"; @Autowired private SIPProcessorObserver sipProcessorObserver; + private boolean taskQueueHandlerRun = false; + + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -78,23 +88,40 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Override public void process(RequestEvent evt) { try { - Element rootElement = getRootElement(evt); - String cmd = XmlUtil.getText(rootElement, "CmdType"); - if (CmdType.CATALOG.equals(cmd)) { - logger.info("接收到Catalog通知"); - processNotifyCatalogList(evt); - } else if (CmdType.ALARM.equals(cmd)) { - logger.info("接收到Alarm通知"); - processNotifyAlarm(evt); - } else if (CmdType.MOBILE_POSITION.equals(cmd)) { - logger.info("接收到MobilePosition通知"); - processNotifyMobilePosition(evt); - } else { - logger.info("接收到消息:" + cmd); - responseAck(evt, Response.OK); + taskQueue.offer(new HandlerCatchData(evt, null, null)); + responseAck(evt, Response.OK); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(()-> { + while (!taskQueue.isEmpty()) { + try { + HandlerCatchData take = taskQueue.poll(); + Element rootElement = getRootElement(take.getEvt()); + String cmd = XmlUtil.getText(rootElement, "CmdType"); + + if (CmdType.CATALOG.equals(cmd)) { + logger.info("接收到Catalog通知"); + processNotifyCatalogList(take.getEvt()); + } else if (CmdType.ALARM.equals(cmd)) { + logger.info("接收到Alarm通知"); + processNotifyAlarm(take.getEvt()); + } else if (CmdType.MOBILE_POSITION.equals(cmd)) { + logger.info("接收到MobilePosition通知"); + processNotifyMobilePosition(take.getEvt()); + } else { + logger.info("接收到消息:" + cmd); + } + } catch (DocumentException e) { + throw new RuntimeException(e); + } + } + taskQueueHandlerRun = false; + }); } - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + + + } catch (SipException | InvalidArgumentException | ParseException e) { e.printStackTrace(); } } @@ -167,8 +194,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements jsonObject.put("direction", mobilePosition.getDirection()); jsonObject.put("speed", mobilePosition.getSpeed()); redisCatchStorage.sendMobilePositionMsg(jsonObject); - responseAck(evt, Response.OK); - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + } catch (DocumentException e) { e.printStackTrace(); } } @@ -189,7 +215,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { - responseAck(evt, Response.NOT_FOUND, "device is not found"); + logger.warn("[ NotifyAlarm ] 未找到设备:{}", deviceId); return; } rootElement = getRootElement(evt, device.getCharset()); @@ -199,7 +225,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod")); String alarmTime = XmlUtil.getText(rootElement, "AlarmTime"); if (alarmTime == null) { - responseAck(evt, Response.BAD_REQUEST, "AlarmTime cannot be null"); + logger.warn("[ NotifyAlarm ] AlarmTime cannot be null"); return; } deviceAlarm.setAlarmTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(alarmTime)); @@ -219,7 +245,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements deviceAlarm.setLatitude(0.00); } logger.info("[收到Notify-Alarm]:{}/{}", device.getDeviceId(), deviceAlarm.getChannelId()); - if (deviceAlarm.getAlarmMethod().equals("4")) { + if ("4".equals(deviceAlarm.getAlarmMethod())) { MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setDeviceId(deviceAlarm.getDeviceId()); mobilePosition.setTime(deviceAlarm.getAlarmTime()); @@ -240,11 +266,10 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements // TODO: 需要实现存储报警信息、报警分类 // 回复200 OK - responseAck(evt, Response.OK); if (redisCatchStorage.deviceIsOnline(deviceId)) { publisher.deviceAlarmEventPublish(deviceAlarm); } - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + } catch (DocumentException e) { e.printStackTrace(); } } @@ -280,64 +305,60 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements continue; } Element eventElement = itemDevice.element("Event"); + String event; + if (eventElement == null) { + logger.warn("[收到 目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" )); + event = CatalogEvent.ADD; + }else { + event = eventElement.getText().toUpperCase(); + } DeviceChannel channel = XmlUtil.channelContentHander(itemDevice); channel.setDeviceId(device.getDeviceId()); logger.info("[收到 目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId()); - switch (eventElement.getText().toUpperCase()) { + switch (event) { case CatalogEvent.ON: // 上线 logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOnline(deviceId, channel.getChannelId()); - // 回复200 OK - responseAck(evt, Response.OK); break; case CatalogEvent.OFF : // 离线 logger.info("收到来自设备【{}】的通道【{}】离线通知", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); - // 回复200 OK - responseAck(evt, Response.OK); break; case CatalogEvent.VLOST: // 视频丢失 logger.info("收到来自设备【{}】的通道【{}】视频丢失通知", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); - // 回复200 OK - responseAck(evt, Response.OK); break; case CatalogEvent.DEFECT: // 故障 - // 回复200 OK - responseAck(evt, Response.OK); break; case CatalogEvent.ADD: // 增加 logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channel.getChannelId()); storager.updateChannel(deviceId, channel); - responseAck(evt, Response.OK); break; case CatalogEvent.DEL: // 删除 logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channel.getChannelId()); storager.delChannel(deviceId, channel.getChannelId()); - responseAck(evt, Response.OK); break; case CatalogEvent.UPDATE: // 更新 logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channel.getChannelId()); storager.updateChannel(deviceId, channel); - responseAck(evt, Response.OK); break; default: - responseAck(evt, Response.BAD_REQUEST, "event not found"); + logger.warn("[ NotifyCatalog ] event not found : {}", event ); } // 转发变化信息 - eventPublisher.catalogEventPublish(null, channel, eventElement.getText().toUpperCase()); + eventPublisher.catalogEventPublish(null, channel, event); } } - } catch (DocumentException | SipException | InvalidArgumentException | ParseException e) { + } catch (DocumentException e) { e.printStackTrace(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 891b21da..0fe317ae 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -20,6 +20,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -31,6 +33,7 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; @Component public class CatalogResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { @@ -38,9 +41,13 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp private Logger logger = LoggerFactory.getLogger(CatalogResponseMessageHandler.class); private final String cmdType = "Catalog"; + private boolean taskQueueHandlerRun = false; + @Autowired private ResponseMessageHandler responseMessageHandler; + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + @Autowired private IVideoManagerStorage storager; @@ -63,6 +70,10 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @Autowired private IRedisCatchStorage redisCatchStorage; + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -70,68 +81,88 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @Override public void handForDevice(RequestEvent evt, Device device, Element element) { - String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + device.getDeviceId(); - Element rootElement = null; + taskQueue.offer(new HandlerCatchData(evt, device, element)); + // 回复200 OK try { - rootElement = getRootElement(evt, device.getCharset()); - Element deviceListElement = rootElement.element("DeviceList"); - Element sumNumElement = rootElement.element("SumNum"); - Element snElement = rootElement.element("SN"); - if (snElement == null || sumNumElement == null || deviceListElement == null) { - responseAck(evt, Response.BAD_REQUEST, "xml error"); - return; - } - int sumNum = Integer.parseInt(sumNumElement.getText()); - - if (sumNum == 0) { - // 数据已经完整接收 - storager.cleanChannelsForDevice(device.getDeviceId()); - catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); - }else { - Iterator deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - List channelList = new ArrayList<>(); - // 遍历DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; + responseAck(evt, Response.OK); + } catch (SipException e) { + throw new RuntimeException(e); + } catch (InvalidArgumentException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(()-> { + while (!taskQueue.isEmpty()) { + HandlerCatchData take = taskQueue.poll(); + String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + take.getDevice().getDeviceId(); + Element rootElement = null; + try { + rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); + Element deviceListElement = rootElement.element("DeviceList"); + Element sumNumElement = rootElement.element("SumNum"); + Element snElement = rootElement.element("SN"); + if (snElement == null || sumNumElement == null || deviceListElement == null) { + responseAck(take.getEvt(), Response.BAD_REQUEST, "xml error"); + return; } - //by brewswang -// if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {//如果包含位置信息,就更新一下位置 -// processNotifyMobilePosition(evt, itemDevice); -// } - DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice); - deviceChannel.setDeviceId(device.getDeviceId()); + int sumNum = Integer.parseInt(sumNumElement.getText()); - channelList.add(deviceChannel); - } - int sn = Integer.parseInt(snElement.getText()); - catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList); - logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(device.getDeviceId()) == null ? 0 :catalogDataCatch.get(device.getDeviceId()).size(), sumNum); - if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) { - // 数据已经完整接收 - boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId())); - if (!resetChannelsResult) { - String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条"; - catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg); + if (sumNum == 0) { + // 数据已经完整接收 + storager.cleanChannelsForDevice(take.getDevice().getDeviceId()); + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); }else { - catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null); + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + List channelList = new ArrayList<>(); + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element channelDeviceElement = itemDevice.element("DeviceID"); + if (channelDeviceElement == null) { + continue; + } + //by brewswang + // if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {//如果包含位置信息,就更新一下位置 + // processNotifyMobilePosition(evt, itemDevice); + // } + DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice); + deviceChannel.setDeviceId(take.getDevice().getDeviceId()); + + channelList.add(deviceChannel); + } + int sn = Integer.parseInt(snElement.getText()); + catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList); + logger.info("收到来自设备【{}】的通道: {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum); + if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) { + // 数据已经完整接收 + boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId())); + if (!resetChannelsResult) { + String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(take.getDevice().getDeviceId()).size() + "条"; + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), errorMsg); + }else { + catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null); + } + } + } + } + } catch (DocumentException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } catch (SipException e) { + e.printStackTrace(); } } - // 回复200 OK - responseAck(evt, Response.OK); - } - } catch (DocumentException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } catch (SipException e) { - e.printStackTrace(); + taskQueueHandlerRun = false; + }); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 1f701715..87adc3ef 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -1,9 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; -import com.genersoft.iot.vmp.gb28181.bean.RecordItem; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.session.RecordDataCatch; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; @@ -19,6 +16,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -28,6 +27,9 @@ import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; import java.util.*; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @@ -42,6 +44,9 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent private final String cmdType = "RecordInfo"; private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; + private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + + private boolean taskQueueHandlerRun = false; @Autowired private ResponseMessageHandler responseMessageHandler; @@ -51,11 +56,13 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent @Autowired private DeferredResultHolder deferredResultHolder; - - @Autowired private EventPublisher eventPublisher; + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + @Override public void afterPropertiesSet() throws Exception { responseMessageHandler.addHandler(cmdType, this); @@ -67,75 +74,89 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent // 回复200 OK try { responseAck(evt, Response.OK); + taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); + if (!taskQueueHandlerRun) { + taskQueueHandlerRun = true; + taskExecutor.execute(()->{ + try { + while (!taskQueue.isEmpty()) { + HandlerCatchData take = taskQueue.poll(); + Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset()); + String sn = getText(rootElementForCharset, "SN"); + String channelId = getText(rootElementForCharset, "DeviceID"); + RecordInfo recordInfo = new RecordInfo(); + recordInfo.setChannelId(channelId); + recordInfo.setDeviceId(take.getDevice().getDeviceId()); + recordInfo.setSn(sn); + recordInfo.setName(getText(rootElementForCharset, "Name")); + String sumNumStr = getText(rootElementForCharset, "SumNum"); + int sumNum = 0; + if (!StringUtils.isEmpty(sumNumStr)) { + sumNum = Integer.parseInt(sumNumStr); + } + recordInfo.setSumNum(sumNum); + Element recordListElement = rootElementForCharset.element("RecordList"); + if (recordListElement == null || sumNum == 0) { + logger.info("无录像数据"); + eventPublisher.recordEndEventPush(recordInfo); + recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>()); + releaseRequest(take.getDevice().getDeviceId(), sn); + } else { + Iterator recordListIterator = recordListElement.elementIterator(); + if (recordListIterator != null) { + List recordList = new ArrayList<>(); + // 遍历DeviceList + while (recordListIterator.hasNext()) { + Element itemRecord = recordListIterator.next(); + Element recordElement = itemRecord.element("DeviceID"); + if (recordElement == null) { + logger.info("记录为空,下一个..."); + continue; + } + RecordItem record = new RecordItem(); + record.setDeviceId(getText(itemRecord, "DeviceID")); + record.setName(getText(itemRecord, "Name")); + record.setFilePath(getText(itemRecord, "FilePath")); + record.setFileSize(getText(itemRecord, "FileSize")); + record.setAddress(getText(itemRecord, "Address")); - rootElement = getRootElement(evt, device.getCharset()); - String sn = getText(rootElement, "SN"); - RecordInfo recordInfo = new RecordInfo(); - recordInfo.setDeviceId(device.getDeviceId()); - recordInfo.setSn(sn); - recordInfo.setName(getText(rootElement, "Name")); - String sumNumStr = getText(rootElement, "SumNum"); - int sumNum = 0; - if (!StringUtils.isEmpty(sumNumStr)) { - sumNum = Integer.parseInt(sumNumStr); - } - recordInfo.setSumNum(sumNum); - Element recordListElement = rootElement.element("RecordList"); - if (recordListElement == null || sumNum == 0) { - logger.info("无录像数据"); - eventPublisher.recordEndEventPush(recordInfo); - recordDataCatch.put(device.getDeviceId(), sn, sumNum, new ArrayList<>()); - releaseRequest(device.getDeviceId(), sn); - } else { - Iterator recordListIterator = recordListElement.elementIterator(); - if (recordListIterator != null) { - List recordList = new ArrayList<>(); - // 遍历DeviceList - while (recordListIterator.hasNext()) { - Element itemRecord = recordListIterator.next(); - Element recordElement = itemRecord.element("DeviceID"); - if (recordElement == null) { - logger.info("记录为空,下一个..."); - continue; + String startTimeStr = getText(itemRecord, "StartTime"); + record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); + + String endTimeStr = getText(itemRecord, "EndTime"); + record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); + + record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 + : Integer.parseInt(getText(itemRecord, "Secrecy"))); + record.setType(getText(itemRecord, "Type")); + record.setRecorderId(getText(itemRecord, "RecorderID")); + recordList.add(record); + } + recordInfo.setRecordList(recordList); + // 发送消息,如果是上级查询此录像,则会通过这里通知给上级 + eventPublisher.recordEndEventPush(recordInfo); + int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList); + logger.info("[国标录像], {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum); + } + + if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){ + releaseRequest(take.getDevice().getDeviceId(), sn); + } + } } - RecordItem record = new RecordItem(); - record.setDeviceId(getText(itemRecord, "DeviceID")); - record.setName(getText(itemRecord, "Name")); - record.setFilePath(getText(itemRecord, "FilePath")); - record.setFileSize(getText(itemRecord, "FileSize")); - record.setAddress(getText(itemRecord, "Address")); - - String startTimeStr = getText(itemRecord, "StartTime"); - record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr)); - - String endTimeStr = getText(itemRecord, "EndTime"); - record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr)); - - record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 - : Integer.parseInt(getText(itemRecord, "Secrecy"))); - record.setType(getText(itemRecord, "Type")); - record.setRecorderId(getText(itemRecord, "RecorderID")); - recordList.add(record); + taskQueueHandlerRun = false; + }catch (DocumentException e) { + throw new RuntimeException(e); } - recordInfo.setRecordList(recordList); - // 发送消息,如果是上级查询此录像,则会通过这里通知给上级 - eventPublisher.recordEndEventPush(recordInfo); - int count = recordDataCatch.put(device.getDeviceId(), sn, sumNum, recordList); - logger.info("[国标录像], {}->{}: {}/{}", device.getDeviceId(), sn, count, sumNum); - } - - if (recordDataCatch.isComplete(device.getDeviceId(), sn)){ - releaseRequest(device.getDeviceId(), sn); - } + }); } + } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); - } catch (DocumentException e) { - e.printStackTrace(); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 77c7c617..3359b00f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.service.IDeviceService; @@ -95,7 +96,6 @@ public class DeviceServiceImpl implements IDeviceService { } // 刷新过期任务 String registerExpireTaskKey = registerExpireTaskKeyPrefix + device.getDeviceId(); - dynamicTask.stop(registerExpireTaskKey); dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId()), device.getExpires() * 1000); } @@ -144,8 +144,16 @@ public class DeviceServiceImpl implements IDeviceService { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } - logger.info("移除目录订阅: {}", device.getDeviceId()); - dynamicTask.stop(device.getDeviceId() + "catalog"); + logger.info("[移除目录订阅]: {}", device.getDeviceId()); + String taskKey = device.getDeviceId() + "catalog"; + if (device.getOnline() == 1) { + Runnable runnable = dynamicTask.get(taskKey); + if (runnable instanceof ISubscribeTask) { + ISubscribeTask subscribeTask = (ISubscribeTask) runnable; + subscribeTask.stop(); + } + } + dynamicTask.stop(taskKey); return true; } @@ -169,8 +177,16 @@ public class DeviceServiceImpl implements IDeviceService { if (device == null || device.getSubscribeCycleForCatalog() < 0) { return false; } - logger.info("移除移动位置订阅: {}", device.getDeviceId()); - dynamicTask.stop(device.getDeviceId() + "mobile_position"); + logger.info("[移除移动位置订阅]: {}", device.getDeviceId()); + String taskKey = device.getDeviceId() + "mobile_position"; + if (device.getOnline() == 1) { + Runnable runnable = dynamicTask.get(taskKey); + if (runnable instanceof ISubscribeTask) { + ISubscribeTask subscribeTask = (ISubscribeTask) runnable; + subscribeTask.stop(); + } + } + dynamicTask.stop(taskKey); return true; } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index cd576669..d313c6b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -206,6 +206,11 @@ public class DeviceQuery { Set allKeys = dynamicTask.getAllKeys(); for (String key : allKeys) { if (key.startsWith(deviceId)) { + Runnable runnable = dynamicTask.get(key); + if (runnable instanceof ISubscribeTask) { + ISubscribeTask subscribeTask = (ISubscribeTask) runnable; + subscribeTask.stop(); + } dynamicTask.stop(key); } }