From 41616f726dafafe7c015bf4f3e02a7aa9488a3a2 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 7 Mar 2022 10:47:06 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=AF=BC=E5=85=A5?= =?UTF-8?q?=E9=80=9A=E9=81=93=E6=97=B6=E6=97=A0=E5=B9=B3=E5=8F=B0=E5=85=B3?= =?UTF-8?q?=E8=81=94=E6=97=B6=E7=9A=84bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transmit/cmd/impl/SIPCommanderFroPlatform.java | 3 +++ .../iot/vmp/service/impl/StreamPushServiceImpl.java | 13 ++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index b55b0448..67cb734f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -563,6 +563,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Override public void streamByeCmd(ParentPlatform platform, String callId) { + if (platform == null) { + return; + } SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId); if (sendRtpItem != null) { String mediaServerId = sendRtpItem.getMediaServerId(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index a13dc29d..39d37c5d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -413,12 +413,15 @@ public class StreamPushServiceImpl implements IStreamPushService { } } - platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); - // 发送通知 - for (String platformId : platformForEvent.keySet()) { - eventPublisher.catalogEventPublishForStream( - platformId, platformForEvent.get(platformId), CatalogEvent.ADD); + if (streamPushItemListFroPlatform.size() > 0) { + platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform); + // 发送通知 + for (String platformId : platformForEvent.keySet()) { + eventPublisher.catalogEventPublishForStream( + platformId, platformForEvent.get(platformId), CatalogEvent.ADD); + } } + } } From ec97c6a7d0ee1b16d08aa5ea85b34776d86af84e Mon Sep 17 00:00:00 2001 From: dengming Date: Thu, 10 Mar 2022 11:33:51 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BA=91=E7=AB=AF?= =?UTF-8?q?=E5=BD=95=E5=83=8F=E6=92=AD=E6=94=BE=E5=92=8C=E4=B8=8B=E8=BD=BD?= =?UTF-8?q?=E8=B7=AF=E5=BE=84=E9=94=99=E8=AF=AF=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- web_src/src/components/CloudRecordDetail.vue | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/web_src/src/components/CloudRecordDetail.vue b/web_src/src/components/CloudRecordDetail.vue index a8d8224a..bee3253b 100644 --- a/web_src/src/components/CloudRecordDetail.vue +++ b/web_src/src/components/CloudRecordDetail.vue @@ -16,7 +16,7 @@ {{ item.substring(0,17)}} - + @@ -265,7 +265,7 @@ this.videoUrl = ""; }else { // TODO 控制列表滚动条 - this.videoUrl = `${this.basePath}/${this.recordFile.app}/${this.recordFile.stream}/${this.chooseDate}/${this.choosedFile}` + this.videoUrl = `${this.basePath}/record/${this.recordFile.app}/${this.recordFile.stream}/${this.chooseDate}/${this.choosedFile}` console.log(this.videoUrl) } From c1d7f867c2ffcb1364334a5e013eb8f208819ef5 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 10 Mar 2022 20:53:56 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=9B=AE=E5=BD=95?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E4=BB=A5=E5=8F=8A=E5=9B=BD=E6=A0=87=E7=BA=A7?= =?UTF-8?q?=E8=81=94=E7=9B=AE=E5=BD=95=E8=AE=A2=E9=98=85=E5=9B=9E=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/MediaConfig.java | 4 +- .../genersoft/iot/vmp/gb28181/SipLayer.java | 5 +- .../gb28181/auth/RegisterLogicHandler.java | 26 ++-- .../iot/vmp/gb28181/bean/SendRtpItem.java | 2 + .../iot/vmp/gb28181/bean/SubscribeHolder.java | 37 +++++ .../iot/vmp/gb28181/bean/SubscribeInfo.java | 26 +++- .../iot/vmp/gb28181/event/EventPublisher.java | 18 ++- .../event/offline/OfflineEventListener.java | 2 +- .../vmp/gb28181/event/online/OnlineEvent.java | 11 +- .../event/online/OnlineEventListener.java | 26 ++-- .../subscribe/catalog/CatalogEventLister.java | 14 +- .../vmp/gb28181/task/GPSSubscribeTask.java | 8 +- .../transmit/SIPProcessorObserver.java | 1 + .../cmd/SIPRequestHeaderPlarformProvider.java | 106 ++++++------- .../cmd/SIPRequestHeaderProvider.java | 3 + .../transmit/cmd/impl/SIPCommander.java | 2 +- .../cmd/impl/SIPCommanderFroPlatform.java | 143 ++++++++++++------ .../request/impl/NotifyRequestProcessor.java | 7 +- .../impl/RegisterRequestProcessor.java | 8 +- .../impl/SubscribeRequestProcessor.java | 55 +++++-- .../impl/message/MessageRequestProcessor.java | 6 + .../cmd/CatalogResponseMessageHandler.java | 75 +++++---- .../vmp/service/impl/DeviceServiceImpl.java | 4 +- .../service/impl/MediaServerServiceImpl.java | 10 ++ .../service/impl/StreamPushServiceImpl.java | 60 ++++++-- 25 files changed, 451 insertions(+), 208 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java index 3f549e5e..bdd15034 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/MediaConfig.java @@ -60,8 +60,8 @@ public class MediaConfig{ @Value("${media.secret}") private String secret; - @Value("${media.stream-none-reader-delay-ms:18000}") - private int streamNoneReaderDelayMS = 18000; + @Value("${media.stream-none-reader-delay-ms:10000}") + private int streamNoneReaderDelayMS = 10000; @Value("${media.rtp.enable}") private boolean rtpEnable; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index 2f62287e..1a5cce57 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -47,7 +47,7 @@ public class SipLayer{ Properties properties = new Properties(); properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP"); properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getMonitorIp()); - properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "false"); + properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true"); /** * sip_server_log.log 和 sip_debug_log.log public static final int TRACE_NONE = * 0; public static final int TRACE_MESSAGES = 16; public static final int @@ -57,6 +57,7 @@ public class SipLayer{ properties.setProperty("gov.nist.javax.sip.SERVER_LOG", "sip_server_log"); properties.setProperty("gov.nist.javax.sip.DEBUG_LOG", "sip_debug_log"); sipStack = (SipStackImpl) sipFactory.createSipStack(properties); + return sipStack; } @@ -70,6 +71,7 @@ public class SipLayer{ tcpSipProvider = (SipProviderImpl)sipStack.createSipProvider(tcpListeningPoint); tcpSipProvider.setDialogErrorsAutomaticallyHandled(); tcpSipProvider.addSipListener(sipProcessorObserver); +// tcpSipProvider.setAutomaticDialogSupportEnabled(false); logger.info("Sip Server TCP 启动成功 port {" + sipConfig.getMonitorIp() + ":" + sipConfig.getPort() + "}"); } catch (TransportNotSupportedException e) { e.printStackTrace(); @@ -93,6 +95,7 @@ public class SipLayer{ udpListeningPoint = sipStack.createListeningPoint(sipConfig.getMonitorIp(), sipConfig.getPort(), "UDP"); udpSipProvider = (SipProviderImpl)sipStack.createSipProvider(udpListeningPoint); udpSipProvider.addSipListener(sipProcessorObserver); +// udpSipProvider.setAutomaticDialogSupportEnabled(false); } catch (TransportNotSupportedException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java index c6fba3db..82390709 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java @@ -27,18 +27,18 @@ public class RegisterLogicHandler { public void onRegister(Device device) { // 只有第一次注册时调用查询设备信息,如需更新调用更新API接口 - // TODO 此处错误无法获取到通道 - Device device1 = storager.queryVideoDevice(device.getDeviceId()); - if (device.isFirsRegister()) { - logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId()); - try { - Thread.sleep(100); - cmder.deviceInfoQuery(device); - Thread.sleep(100); - cmder.catalogQuery(device, null); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } +// // TODO 此处错误无法获取到通道 +// Device device1 = storager.queryVideoDevice(device.getDeviceId()); +// if (device.isFirsRegister()) { +// logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId()); +// try { +// Thread.sleep(100); +// cmder.deviceInfoQuery(device); +// Thread.sleep(100); +// cmder.catalogQuery(device, null); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 3914fa1f..a2c38acb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -220,4 +220,6 @@ public class SendRtpItem { public void setDialog(byte[] dialog) { this.dialog = dialog; } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java new file mode 100644 index 00000000..a027486c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -0,0 +1,37 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import org.springframework.stereotype.Component; + +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class SubscribeHolder { + + private static ConcurrentHashMap catalogMap = new ConcurrentHashMap<>(); + private static ConcurrentHashMap mobilePositionMap = new ConcurrentHashMap<>(); + + + public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) { + catalogMap.put(platformId, subscribeInfo); + } + + public SubscribeInfo getCatalogSubscribe(String platformId) { + return catalogMap.get(platformId); + } + + public void removeCatalogSubscribe(String platformId) { + catalogMap.remove(platformId); + } + + public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) { + mobilePositionMap.put(platformId, subscribeInfo); + } + + public SubscribeInfo getMobilePositionSubscribe(String platformId) { + return mobilePositionMap.get(platformId); + } + + public void removeMobilePositionSubscribe(String platformId) { + mobilePositionMap.remove(platformId); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java index e9d41678..373533a6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -1,13 +1,15 @@ package com.genersoft.iot.vmp.gb28181.bean; +import com.genersoft.iot.vmp.utils.SerializeUtils; + +import javax.sip.Dialog; import javax.sip.RequestEvent; +import javax.sip.ServerTransaction; import javax.sip.header.*; import javax.sip.message.Request; public class SubscribeInfo { - public SubscribeInfo() { - } public SubscribeInfo(RequestEvent evt, String id) { this.id = id; @@ -23,6 +25,8 @@ public class SubscribeInfo { this.eventType = eventHeader.getEventType(); ViaHeader viaHeader = (ViaHeader)request.getHeader(ViaHeader.NAME); this.branch = viaHeader.getBranch(); + this.transaction = evt.getServerTransaction(); + this.dialog = evt.getDialog(); } private String id; @@ -33,6 +37,8 @@ public class SubscribeInfo { private String fromTag; private String toTag; private String branch; + private ServerTransaction transaction; + private Dialog dialog; public String getId() { return id; @@ -97,4 +103,20 @@ public class SubscribeInfo { public void setBranch(String branch) { this.branch = branch; } + + public ServerTransaction getTransaction() { + return transaction; + } + + public void setTransaction(ServerTransaction transaction) { + this.transaction = transaction; + } + + public Dialog getDialog() { + return dialog; + } + + public void setDialog(Dialog dialog) { + this.dialog = dialog; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index a9464b76..ffe477f0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -33,12 +33,20 @@ public class EventPublisher { @Autowired private ApplicationEventPublisher applicationEventPublisher; + public void onlineEventPublish(Device device, String from, int expires) { + OnlineEvent onEvent = new OnlineEvent(this); + onEvent.setDevice(device); + onEvent.setFrom(from); + onEvent.setExpires(expires); + applicationEventPublisher.publishEvent(onEvent); + } + public void onlineEventPublish(Device device, String from) { OnlineEvent onEvent = new OnlineEvent(this); onEvent.setDevice(device); onEvent.setFrom(from); - applicationEventPublisher.publishEvent(onEvent); - } + applicationEventPublisher.publishEvent(onEvent); + } public void outlineEventPublish(String deviceId, String from){ OfflineEvent outEvent = new OfflineEvent(this); @@ -107,6 +115,12 @@ public class EventPublisher { } + /** + * + * @param platformId + * @param deviceChannels + * @param type + */ public void catalogEventPublish(String platformId, List deviceChannels, String type) { CatalogEvent outEvent = new CatalogEvent(this); List channels = new ArrayList<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java index 97e480cb..9e67191b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java @@ -91,7 +91,7 @@ public class OfflineEventListener implements ApplicationListener { // 离线释放所有ssrc List ssrcTransactions = streamSession.getSsrcTransactionForAll(event.getDeviceId(), null, null, null); - if (ssrcTransactions.size() > 0) { + if (ssrcTransactions != null && ssrcTransactions.size() > 0) { for (SsrcTransaction ssrcTransaction : ssrcTransactions) { mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); mediaServerService.closeRTPServer(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEvent.java index 73d7f1f7..9aa9f8d7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEvent.java @@ -23,6 +23,8 @@ public class OnlineEvent extends ApplicationEvent { private String from; + private int expires; + public Device getDevice() { return device; } @@ -38,5 +40,12 @@ public class OnlineEvent extends ApplicationEvent { public void setFrom(String from) { this.from = from; } - + + public int getExpires() { + return expires; + } + + public void setExpires(int expires) { + this.expires = expires; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java index c0de8de3..27bc4bcf 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.storager.dao.dto.User; import org.slf4j.Logger; @@ -51,6 +52,9 @@ public class OnlineEventListener implements ApplicationListener { @Autowired private EventPublisher eventPublisher; + @Autowired + private SIPCommander cmder; + private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override @@ -62,13 +66,21 @@ public class OnlineEventListener implements ApplicationListener { Device device = event.getDevice(); if (device == null) return; String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_" + event.getDevice().getDeviceId(); - + Device deviceInStore = storager.queryVideoDevice(device.getDeviceId()); + device.setOnline(1); + // 处理上线监听 + storager.updateDevice(device); switch (event.getFrom()) { // 注册时触发的在线事件,先在redis中增加超时超时监听 case VideoManagerConstants.EVENT_ONLINE_REGISTER: // 超时时间 redis.set(key, event.getDevice().getDeviceId(), sipConfig.getKeepaliveTimeOut()); device.setRegisterTime(format.format(System.currentTimeMillis())); + if (deviceInStore == null) { //第一次上线 + logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId()); + cmder.deviceInfoQuery(device); + cmder.catalogQuery(device, null); + } break; // 设备主动发送心跳触发的在线事件 case VideoManagerConstants.EVENT_ONLINE_KEEPLIVE: @@ -87,19 +99,11 @@ public class OnlineEventListener implements ApplicationListener { break; } - device.setOnline(1); - Device deviceInStore = storager.queryVideoDevice(device.getDeviceId()); - if (deviceInStore != null && deviceInStore.getOnline() == 0) { - List deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId()); - eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON); - } - // 处理上线监听 - storager.updateDevice(device); - + List deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId()); + eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON); // 上线添加订阅 if (device.getSubscribeCycleForCatalog() > 0) { deviceService.addCatalogSubscribe(device); } - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index f9593633..9e3b352a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -52,6 +52,9 @@ public class CatalogEventLister implements ApplicationListener { @Autowired private IGbStreamService gbStreamService; + @Autowired + private SubscribeHolder subscribeHolder; + @Override public void onApplicationEvent(CatalogEvent event) { SubscribeInfo subscribe = null; @@ -62,7 +65,8 @@ public class CatalogEventLister implements ApplicationListener { parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId()); if (parentPlatform != null && !parentPlatform.isStatus())return; String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + event.getPlatformId(); - subscribe = redisCatchStorage.getSubscribe(key); +// subscribe = redisCatchStorage.getSubscribe(key); + subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId()); if (subscribe == null) { logger.debug("发送订阅消息时发现订阅信息已经不存在"); @@ -114,7 +118,8 @@ public class CatalogEventLister implements ApplicationListener { if (parentPlatforms != null && parentPlatforms.size() > 0) { for (ParentPlatform platform : parentPlatforms) { String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId(); - SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key); +// SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key); + SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(platform.getServerGBId()); if (subscribeInfo == null) continue; logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); List deviceChannelList = new ArrayList<>(); @@ -153,8 +158,9 @@ public class CatalogEventLister implements ApplicationListener { List parentPlatforms = parentPlatformMap.get(gbId); if (parentPlatforms != null && parentPlatforms.size() > 0) { for (ParentPlatform platform : parentPlatforms) { - String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId(); - SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key); +// String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId(); +// SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key); + SubscribeInfo subscribeInfo = subscribeHolder.getCatalogSubscribe(event.getPlatformId()); if (subscribeInfo == null) continue; logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); List deviceChannelList = new ArrayList<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java index 25dc75b4..4b21638c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.task; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; @@ -16,25 +17,28 @@ public class GPSSubscribeTask implements Runnable{ private IRedisCatchStorage redisCatchStorage; private IVideoManagerStorager storager; private ISIPCommanderForPlatform sipCommanderForPlatform; + private SubscribeHolder subscribeHolder; private String platformId; private String sn; private String key; private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key) { + public GPSSubscribeTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorager storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) { this.redisCatchStorage = redisCatchStorage; this.storager = storager; this.platformId = platformId; this.sn = sn; this.key = key; this.sipCommanderForPlatform = sipCommanderForPlatform; + this.subscribeHolder = subscribeInfo; } @Override public void run() { - SubscribeInfo subscribe = redisCatchStorage.getSubscribe(key); + SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId); + if (subscribe != null) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); if (parentPlatform == null || parentPlatform.isStatus()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 3cc4456d..30efa204 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -94,6 +94,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { Response response = responseEvent.getResponse(); logger.debug("\n收到响应:\n{}", responseEvent.getResponse()); int status = response.getStatusCode(); + if (((status >= 200) && (status < 300)) || status == 401) { // Success! CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); String method = cseqHeader.getMethod(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index c0e72817..43e26909 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -236,57 +236,57 @@ public class SIPRequestHeaderPlarformProvider { return request; } - public Request createNotifyRequest(ParentPlatform parentPlatform, String content, CallIdHeader callIdHeader, String viaTag, SubscribeInfo subscribeInfo) throws PeerUnavailableException, ParseException, InvalidArgumentException { - Request request = null; - // sipuri - SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort()); - // via - ArrayList viaHeaders = new ArrayList(); - ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()), - parentPlatform.getTransport(), subscribeInfo.getBranch()); - viaHeader.setRPort(); - viaHeaders.add(viaHeader); - // from - SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(), - parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort()); - Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI); - FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, subscribeInfo.getToTag()); - // to - SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain()); - Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI); - ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, subscribeInfo.getFromTag()); - - // Forwards - MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); - // ceq - CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY); - MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory(); - // 设置编码, 防止中文乱码 - messageFactory.setDefaultContentEncodingCharset("gb2312"); - request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader, - toHeader, viaHeaders, maxForwards); - List agentParam = new ArrayList<>(); - agentParam.add("wvp-pro"); - UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); - request.addHeader(userAgentHeader); - - EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType()); - if (subscribeInfo.getEventId() != null) { - event.setEventId(subscribeInfo.getEventId()); - } - - request.addHeader(event); - - SubscriptionStateHeader active = sipFactory.createHeaderFactory().createSubscriptionStateHeader("active"); - request.setHeader(active); - - String sipAddress = sipConfig.getIp() + ":" + sipConfig.getPort(); - Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory() - .createSipURI(parentPlatform.getDeviceGBId(), sipAddress)); - request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); - - ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); - request.setContent(content, contentTypeHeader); - return request; - } +// public Request createNotifyRequest(ParentPlatform parentPlatform, String content, CallIdHeader callIdHeader, String viaTag, String fromTag, SubscribeInfo subscribeInfo) throws PeerUnavailableException, ParseException, InvalidArgumentException { +// Request request = null; +// // sipuri +// SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP()+ ":" + parentPlatform.getServerPort()); +// // via +// ArrayList viaHeaders = new ArrayList(); +// ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(parentPlatform.getDeviceIp(), Integer.parseInt(parentPlatform.getDevicePort()), +// parentPlatform.getTransport(), viaTag); +// viaHeader.setRPort(); +// viaHeaders.add(viaHeader); +// // from +// SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getDeviceGBId(), +// parentPlatform.getDeviceIp() + ":" + parentPlatform.getDevicePort()); +// Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI); +// FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, fromTag); +// // to +// SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerGBDomain()); +// Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI); +// ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, subscribeInfo.getFromTag()); +// +// // Forwards +// MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70); +// // ceq +// CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.NOTIFY), Request.NOTIFY); +// MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory(); +// // 设置编码, 防止中文乱码 +// messageFactory.setDefaultContentEncodingCharset("gb2312"); +// request = messageFactory.createRequest(requestURI, Request.NOTIFY, callIdHeader, cSeqHeader, fromHeader, +// toHeader, viaHeaders, maxForwards); +// List agentParam = new ArrayList<>(); +// agentParam.add("wvp-pro"); +// UserAgentHeader userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); +// request.addHeader(userAgentHeader); +// +// EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType()); +// if (subscribeInfo.getEventId() != null) { +// event.setEventId(subscribeInfo.getEventId()); +// } +// +// request.addHeader(event); +// +// SubscriptionStateHeader active = sipFactory.createHeaderFactory().createSubscriptionStateHeader("active"); +// request.setHeader(active); +// +// String sipAddress = sipConfig.getIp() + ":" + sipConfig.getPort(); +// Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory() +// .createSipURI(parentPlatform.getDeviceGBId(), sipAddress)); +// request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); +// +// ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); +// request.setContent(content, contentTypeHeader); +// return request; +// } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java index f4a0ec40..1c368bf6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java @@ -215,6 +215,9 @@ public class SIPRequestHeaderProvider { // Event EventHeader eventHeader = sipFactory.createHeaderFactory().createEventHeader(event); + + int random = (int)Math.random() * 1000000000; + eventHeader.setEventId(random + ""); request.addHeader(eventHeader); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index a7b67ad6..b38a8c11 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -1518,7 +1518,7 @@ public class SIPCommander implements ISIPCommander { // 有效时间默认为60秒以上 Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm, - "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog() + 60, "Catalog" , + "fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" , callIdHeader); transmitRequest(device, request, errorEvent, okEvent); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 67cb734f..a379f391 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -27,9 +27,7 @@ import org.springframework.util.StringUtils; import javax.sip.*; import javax.sip.address.SipURI; -import javax.sip.header.CallIdHeader; -import javax.sip.header.ViaHeader; -import javax.sip.header.WWWAuthenticateHeader; +import javax.sip.header.*; import javax.sip.message.Request; import java.lang.reflect.Field; import java.text.ParseException; @@ -68,6 +66,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Qualifier(value="udpSipProvider") private SipProviderImpl udpSipProvider; + @Autowired + private SipFactory sipFactory; + @Override public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { return register(parentPlatform, null, null, errorEvent, okEvent, false); @@ -88,7 +89,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { public boolean register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain) { try { - Request request = null; + Request request; String tm = Long.toString(System.currentTimeMillis()); if (!registerAgain ) { // //callid @@ -364,16 +365,18 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { : udpSipProvider.getNewCallId(); callIdHeader.setCallId(subscribeInfo.getCallId()); - String tm = Long.toString(System.currentTimeMillis()); +// + sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> { + logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); + }, null); - Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, - deviceStatusXml.toString(),callIdHeader, - "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), subscribeInfo); - transmitRequest(parentPlatform, request); - - } catch (SipException | ParseException | InvalidArgumentException e) { + } catch (SipException | ParseException e) { e.printStackTrace(); return false; + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); } return true; } @@ -386,37 +389,89 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { if (index == null) { index = 0; } - + if (index >= deviceChannels.size()) { + return true; + } try { - if (index > deviceChannels.size() - 1) { - return true; - } - Request request = getCatalogNotifyRequestForCatalogAddOrUpdate(parentPlatform, deviceChannels.get(index), deviceChannels.size(), type, subscribeInfo); - index += 1; Integer finalIndex = index; - transmitRequest(parentPlatform, request, null, (eventResult -> { - sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex); + String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, deviceChannels.get(index ), deviceChannels.size(), type, subscribeInfo); + sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> { + logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); + }, (eventResult -> { + sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex + 1); })); - } catch (SipException | ParseException | InvalidArgumentException e) { + } catch (SipException | ParseException e) { e.printStackTrace(); return false; + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); } return true; } - private Request getCatalogNotifyRequestForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int size, String type, - SubscribeInfo subscribeInfo) throws ParseException, InvalidArgumentException, - PeerUnavailableException { - String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channel, size, type, subscribeInfo); + private void sendNotify(ParentPlatform parentPlatform, String catalogXmlContent, + SubscribeInfo subscribeInfo, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent ) + throws NoSuchFieldException, IllegalAccessException, SipException, ParseException { + Dialog dialog = subscribeInfo.getDialog(); + Request notifyRequest = dialog.createRequest(Request.NOTIFY); + + ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml"); + + notifyRequest.setContent(catalogXmlContent, contentTypeHeader); + + SubscriptionStateHeader subscriptionState = sipFactory.createHeaderFactory() + .createSubscriptionStateHeader(SubscriptionStateHeader.ACTIVE); + notifyRequest.addHeader(subscriptionState); + + EventHeader event = sipFactory.createHeaderFactory().createEventHeader(subscribeInfo.getEventType()); + if (subscribeInfo.getEventId() != null) { + event.setEventId(subscribeInfo.getEventId()); + } + notifyRequest.addHeader(event); + + SipURI sipURI = (SipURI) notifyRequest.getRequestURI(); + SIPRequest request = (SIPRequest) subscribeInfo.getTransaction().getRequest(); + sipURI.setHost(request.getRemoteAddress().getHostName()); + sipURI.setPort(request.getRemotePort()); + ClientTransaction transaction = null; + if ("TCP".equals(parentPlatform.getTransport())) { + transaction = tcpSipProvider.getNewClientTransaction(notifyRequest); + } else if ("UDP".equals(parentPlatform.getTransport())) { + transaction = udpSipProvider.getNewClientTransaction(notifyRequest); + } + // 添加错误订阅 + if (errorEvent != null) { + sipSubscribe.addErrorSubscribe(subscribeInfo.getCallId(), errorEvent); + } + // 添加订阅 + if (okEvent != null) { + sipSubscribe.addOkSubscribe(subscribeInfo.getCallId(), okEvent); + } + if (transaction == null) { + logger.error("平台{}的Transport错误:{}",parentPlatform.getServerGBId(), parentPlatform.getTransport()); + return; + } + dialog.sendRequest(transaction); - CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() - : udpSipProvider.getNewCallId(); - callIdHeader.setCallId(subscribeInfo.getCallId()); - Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXmlContent, - callIdHeader, "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), subscribeInfo); - return request; } +// private Request getCatalogNotifyRequestForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int size, String type, +// SubscribeInfo subscribeInfo) throws ParseException, InvalidArgumentException, +// PeerUnavailableException, NoSuchFieldException, IllegalAccessException { +// String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channel, size, type, subscribeInfo); +// +// CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() +// : udpSipProvider.getNewCallId(); +// callIdHeader.setCallId(subscribeInfo.getCallId()); +// String tm = Long.toString(System.currentTimeMillis()); +// +// Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXmlContent, +// callIdHeader, "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""),"FromRegister" + tm, subscribeInfo); +// return request; +// } + private String getCatalogXmlContentForCatalogAddOrUpdate(ParentPlatform parentPlatform, DeviceChannel channel, int sumNum, String type, SubscribeInfo subscribeInfo) { StringBuffer catalogXml = new StringBuffer(600); if (parentPlatform.getServerGBId().equals(channel.getParentId())) { @@ -465,34 +520,31 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { if (index == null) { index = 0; } - - if (index > deviceChannels.size() - 1) { + if (index >= deviceChannels.size()) { return true; } try { - String catalogXml = getCatalogXmlContentForCatalogOther(deviceChannels.get(index), type, parentPlatform); - CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() - : udpSipProvider.getNewCallId(); - Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXml, - callIdHeader, - "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), subscribeInfo); - index += 1; Integer finalIndex = index; - transmitRequest(parentPlatform, request, null, eventResult -> { - sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex); - }); + String catalogXmlContent = getCatalogXmlContentForCatalogOther(parentPlatform, deviceChannels.get(index), type); + sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> { + logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg); + }, (eventResult -> { + sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo, finalIndex + 1); + })); } catch (SipException e) { e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); } catch (ParseException e) { e.printStackTrace(); + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); } return true; } - private String getCatalogXmlContentForCatalogOther(DeviceChannel channel, String type, ParentPlatform parentPlatform) { + private String getCatalogXmlContentForCatalogOther(ParentPlatform parentPlatform, DeviceChannel channel, String type) { if (parentPlatform.getServerGBId().equals(channel.getParentId())) { channel.setParentId(parentPlatform.getDeviceGBId()); } @@ -594,6 +646,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { byte[] transactionByteArray = sendRtpItem.getTransaction(); ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray); Request byeRequest = dialog.createRequest(Request.BYE); + SipURI byeURI = (SipURI) byeRequest.getRequestURI(); SIPRequest request = (SIPRequest) clientTransaction.getRequest(); byeURI.setHost(request.getRemoteAddress().getHostName()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index c339598e..6ae0f3e7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -233,6 +233,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements */ private void processNotifyCatalogList(RequestEvent evt) { try { + System.out.println(343434); FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); @@ -309,12 +310,6 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements } - // RequestMessage msg = new RequestMessage(); - // msg.setDeviceId(deviceId); - // msg.setType(DeferredResultHolder.CALLBACK_CMD_CATALOG); - // msg.setData(device); - // deferredResultHolder.invokeResult(msg); - if (offLineDetector.isOnline(deviceId)) { publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index 737f7528..cfc61c40 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -81,7 +81,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort(); logger.info("[{}] 收到注册请求,开始处理", requestAddress); Request request = evt.getRequest(); - + ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME); Response response = null; boolean passwordCorrect = false; // 注册标志 0:未携带授权头或者密码错误 1:注册成功 2:注销成功 @@ -128,7 +128,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen dateHeader.setDate(wvpSipDate); response.addHeader(dateHeader); - ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME); + if (expiresHeader == null) { response = getMessageFactory().createResponse(Response.BAD_REQUEST, request); ServerTransaction serverTransaction = getServerTransaction(evt); @@ -193,9 +193,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen // 保存到redis if (registerFlag == 1 ) { logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress); - publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER); - // 重新注册更新设备和通道,以免设备替换或更新后信息无法更新 - handler.onRegister(device); + publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER, expiresHeader.getExpires()); } else if (registerFlag == 2) { logger.info("[{}] 注销成功! deviceId:" + device.getDeviceId(), requestAddress); publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index a765b3ab..e1e7125e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.CmdType; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; import com.genersoft.iot.vmp.gb28181.task.GPSSubscribeTask; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; @@ -15,18 +16,19 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.gb28181.utils.XmlUtil; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.utils.SerializeUtils; +import gov.nist.javax.sip.SipProviderImpl; import org.dom4j.DocumentException; import org.dom4j.Element; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; -import javax.sip.InvalidArgumentException; -import javax.sip.RequestEvent; -import javax.sip.ServerTransaction; -import javax.sip.SipException; +import javax.sip.*; import javax.sip.header.ExpiresHeader; import javax.sip.header.ToHeader; import javax.sip.message.Request; @@ -54,12 +56,26 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme @Autowired private IVideoManagerStorager storager; + @Lazy + @Autowired + @Qualifier(value="tcpSipProvider") + private SipProviderImpl tcpSipProvider; + + @Lazy + @Autowired + @Qualifier(value="udpSipProvider") + private SipProviderImpl udpSipProvider; + @Autowired private DynamicTask dynamicTask; @Autowired private UserSetup userSetup; + + @Autowired + private SubscribeHolder subscribeHolder; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -136,16 +152,17 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme .append("\r\n"); if (subscribeInfo.getExpires() > 0) { - if (redisCatchStorage.getSubscribe(key) != null) { + if (subscribeHolder.getMobilePositionSubscribe(platformId) != null) { dynamicTask.stop(key); } String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 - dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval)); - - redisCatchStorage.updateSubscribe(key, subscribeInfo); + dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key, subscribeHolder), Integer.parseInt(interval)); + subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo); +// redisCatchStorage.updateSubscribe(key, subscribeInfo); }else if (subscribeInfo.getExpires() == 0) { dynamicTask.stop(key); - redisCatchStorage.delSubscribe(key); +// redisCatchStorage.delSubscribe(key); + subscribeHolder.removeMobilePositionSubscribe(platformId); } try { @@ -168,10 +185,19 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme } - private void processNotifyCatalogList(RequestEvent evt, Element rootElement) { + private void processNotifyCatalogList(RequestEvent evt, Element rootElement) throws SipException { String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); String deviceID = XmlUtil.getText(rootElement, "DeviceID"); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId); SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId); + if (evt.getServerTransaction() == null) { + ServerTransaction serverTransaction = platform.getTransport().equals("TCP") ? tcpSipProvider.getNewServerTransaction(evt.getRequest()) + : udpSipProvider.getNewServerTransaction(evt.getRequest()); + subscribeInfo.setTransaction(serverTransaction); + Dialog dialog = serverTransaction.getDialog(); + dialog.terminateOnBye(false); + subscribeInfo.setDialog(dialog); + } String sn = XmlUtil.getText(rootElement, "SN"); String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platformId; logger.info("接收到{}的Catalog订阅", platformId); @@ -185,9 +211,11 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme .append("\r\n"); if (subscribeInfo.getExpires() > 0) { - redisCatchStorage.updateSubscribe(key, subscribeInfo); +// redisCatchStorage.updateSubscribe(key, subscribeInfo); + subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo); }else if (subscribeInfo.getExpires() == 0) { - redisCatchStorage.delSubscribe(key); +// redisCatchStorage.delSubscribe(key); + subscribeHolder.removeCatalogSubscribe(platformId); } try { @@ -195,7 +223,8 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform); ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME); subscribeInfo.setToTag(toHeader.getTag()); - redisCatchStorage.updateSubscribe(key, subscribeInfo); +// redisCatchStorage.updateSubscribe(key, subscribeInfo); + subscribeHolder.putCatalogSubscribe(platformId, subscribeInfo); } catch (SipException e) { e.printStackTrace(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java index ba95cf6a..5813998b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageRequestProcessor.java @@ -21,6 +21,7 @@ import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; +import javax.sip.header.CSeqHeader; import javax.sip.header.CallIdHeader; import javax.sip.message.Response; import java.text.ParseException; @@ -64,6 +65,11 @@ public class MessageRequestProcessor extends SIPRequestProcessorParent implement String deviceId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); // 查询设备是否存在 + CSeqHeader cseqHeader = (CSeqHeader) evt.getRequest().getHeader(CSeqHeader.NAME); + String method = cseqHeader.getMethod(); + if (method.equals("MESSAGE")) { + System.out.println(); + } Device device = redisCatchStorage.getDevice(deviceId); // 查询上级平台是否存在 ParentPlatform parentPlatform = storage.queryParentPlatByServerGBId(deviceId); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index dfd0eb25..62ec5e9b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -85,41 +85,54 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp return; } int sumNum = Integer.parseInt(sumNumElement.getText()); - Iterator deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - List channelList = new ArrayList<>(); - // 遍历DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - continue; + if (sumNum == 0) { + // 数据已经完整接收 + storager.cleanChannelsForDevice(device.getDeviceId()); + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + WVPResult result = new WVPResult<>(); + result.setCode(0); + result.setData(device); + msg.setData(result); + result.setMsg("更新成功,共0条"); + deferredResultHolder.invokeAllResult(msg); + catalogDataCatch.del(key); + }else { + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + List channelList = new ArrayList<>(); + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element channelDeviceElement = itemDevice.element("DeviceID"); + if (channelDeviceElement == null) { + continue; + } + DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice); + deviceChannel.setDeviceId(device.getDeviceId()); + logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), deviceChannel.getName(), deviceChannel.getChannelId()); + channelList.add(deviceChannel); } - DeviceChannel deviceChannel = XmlUtil.channelContentHander(itemDevice); - deviceChannel.setDeviceId(device.getDeviceId()); - logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), deviceChannel.getName(), deviceChannel.getChannelId()); - channelList.add(deviceChannel); - } - catalogDataCatch.put(key, sumNum, device, channelList); - if (catalogDataCatch.get(key).size() == sumNum) { - // 数据已经完整接收 - boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key)); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - WVPResult result = new WVPResult<>(); - result.setCode(0); - result.setData(device); - if (resetChannelsResult) { - result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条"); - }else { - result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条"); + catalogDataCatch.put(key, sumNum, device, channelList); + if (catalogDataCatch.get(key).size() == sumNum) { + // 数据已经完整接收 + boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key)); + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + WVPResult result = new WVPResult<>(); + result.setCode(0); + result.setData(device); + if (resetChannelsResult || sumNum ==0) { + result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条"); + }else { + result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条"); + } + msg.setData(result); + deferredResultHolder.invokeAllResult(msg); + catalogDataCatch.del(key); } - msg.setData(result); - deferredResultHolder.invokeAllResult(msg); - catalogDataCatch.del(key); } - // 回复200 OK responseAck(evt, Response.OK); if (offLineDetector.isOnline(device.getDeviceId())) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 675ed4eb..0fc6f4cf 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -31,8 +31,8 @@ public class DeviceServiceImpl implements IDeviceService { return false; } if (dynamicTask.contains(device.getDeviceId())) { - logger.info("[添加目录订阅] 设备{}的目录订阅以存在", device.getDeviceId()); - return false; + // 存在则停止现有的,开启新的 + dynamicTask.stop(device.getDeviceId()); } logger.info("[添加目录订阅] 设备{}", device.getDeviceId()); // 添加目录订阅 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index e5781657..4b442429 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -513,6 +513,14 @@ public class MediaServerServiceImpl implements IMediaServerService { param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex)); param.put("hook.timeoutSec","20"); param.put("general.streamNoneReaderDelayMS","-1".equals(mediaServerItem.getStreamNoneReaderDelayMS())?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() ); + // 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。 + // 置0关闭此特性(推流断开会导致立即断开播放器) + // 此参数不应大于播放器超时时间 + // 优化此消息以更快的收到流注销事件 + param.put("general.continue_push_ms", "3000" ); + // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流, + // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项 + param.put("general.wait_track_ready_ms", "3000" ); JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param); @@ -620,6 +628,8 @@ public class MediaServerServiceImpl implements IMediaServerService { public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) { MediaServerItem mediaServerItem = getOne(mediaServerId); if (mediaServerItem == null) { + // zlm连接重试 + logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息"); return; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 39d37c5d..e59a0901 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -1,14 +1,12 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetup; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.GbStream; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; @@ -23,6 +21,8 @@ import com.genersoft.iot.vmp.storager.dao.*; import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; @@ -33,6 +33,8 @@ import java.util.stream.Collectors; @Service public class StreamPushServiceImpl implements IStreamPushService { + private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class); + @Autowired private GbStreamMapper gbStreamMapper; @@ -158,12 +160,17 @@ public class StreamPushServiceImpl implements IStreamPushService { public boolean removeFromGB(GbStream stream) { // 判断是否需要发送事件 gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL); - int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream()); + int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream()); - if (mediaList == null) { - streamPushMapper.del(stream.getApp(), stream.getStream()); + if (mediaList != null) { + if (mediaList.getInteger("code") == 0) { + JSONArray data = mediaList.getJSONArray("data"); + if (data == null) { + streamPushMapper.del(stream.getApp(), stream.getStream()); + } + } } return del > 0; } @@ -180,9 +187,9 @@ public class StreamPushServiceImpl implements IStreamPushService { StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); - int delStream = streamPushMapper.del(app, streamId); - gbStreamMapper.del(app, streamId); platformGbStreamMapper.delByAppAndStream(app, streamId); + gbStreamMapper.del(app, streamId); + int delStream = streamPushMapper.del(app, streamId); if (delStream > 0) { MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId()); zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId); @@ -376,6 +383,29 @@ public class StreamPushServiceImpl implements IStreamPushService { .collect(Collectors.toList()); if (streamPushItemsForPlatform.size() > 0) { + // 获取所有平台,平台和目录信息一般不会特别大量。 + List parentPlatformList = parentPlatformMapper.getParentPlatformList(); + Map> platformInfoMap = new HashMap<>(); + if (parentPlatformList.size() == 0) { + return; + } + for (ParentPlatform platform : parentPlatformList) { + Map catalogMap = new HashMap<>(); + + // 创建根节点 + PlatformCatalog platformCatalog = new PlatformCatalog(); + platformCatalog.setId(platform.getServerGBId()); + catalogMap.put(platform.getServerGBId(), platformCatalog); + + // 查询所有节点信息 + List platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId()); + if (platformCatalogs.size() > 0) { + for (PlatformCatalog catalog : platformCatalogs) { + catalogMap.put(catalog.getId(), catalog); + } + } + platformInfoMap.put(platform.getServerGBId(), catalogMap); + } List streamPushItemListFroPlatform = new ArrayList<>(); Map> platformForEvent = new HashMap<>(); // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入 @@ -388,6 +418,12 @@ public class StreamPushServiceImpl implements IStreamPushService { streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId()); if (platFormInfoArray.length > 0) { // 数组 platFormInfoArray 0 为平台ID。 1为目录ID + // 不存在这个平台,则忽略导入此关联关系 + if (platformInfoMap.get(platFormInfoArray[0]) == null + || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) { + logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] ); + continue; + } streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]); List gbStreamList = platformForEvent.get(streamPushItem.getPlatformId()); @@ -406,8 +442,6 @@ public class StreamPushServiceImpl implements IStreamPushService { streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]); } streamPushItemListFroPlatform.add(streamPushItemForPlatform); - - } } @@ -432,9 +466,9 @@ public class StreamPushServiceImpl implements IStreamPushService { } gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL); - int delStream = streamPushMapper.delAllForGbStream(gbStreams); - gbStreamMapper.batchDelForGbStream(gbStreams); platformGbStreamMapper.delByGbStreams(gbStreams); + gbStreamMapper.batchDelForGbStream(gbStreams); + int delStream = streamPushMapper.delAllForGbStream(gbStreams); if (delStream > 0) { for (GbStream gbStream : gbStreams) { MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());