From ac1a4a027a7bd88efb32e9da666bdba4b5fa166f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Fri, 14 Jan 2022 17:04:26 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=9B=BD=E6=A0=87=E7=BA=A7?= =?UTF-8?q?=E8=81=94=E7=9A=84=E7=9B=AE=E5=BD=95=E8=AE=A2=E9=98=85=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/SubscribeInfo.java | 20 +-- .../iot/vmp/gb28181/event/EventPublisher.java | 54 ++++++ .../KeepaliveTimeoutListenerForPlatform.java | 12 ++ .../event/offline/OfflineEventListener.java | 10 ++ .../event/online/OnlineEventListener.java | 13 ++ .../event/subscribe/catalog/CatalogEvent.java | 58 ++++++ .../subscribe/catalog/CatalogEventLister.java | 167 ++++++++++++++++++ .../vmp/gb28181/task/GPSSubscribeTask.java | 4 +- .../transmit/SIPProcessorObserver.java | 19 ++ .../cmd/ISIPCommanderForPlatform.java | 18 +- .../cmd/impl/SIPCommanderFroPlatform.java | 117 +++++++++++- .../request/impl/InviteRequestProcessor.java | 3 +- .../request/impl/NotifyRequestProcessor.java | 20 ++- .../impl/SubscribeRequestProcessor.java | 39 +++- .../vmp/media/zlm/ZLMHttpHookListener.java | 51 ++++-- .../vmp/media/zlm/ZLMMediaListManager.java | 4 +- .../iot/vmp/service/IGbStreamService.java | 8 +- .../vmp/service/impl/GbStreamServiceImpl.java | 74 +++++++- .../service/impl/StreamProxyServiceImpl.java | 12 ++ .../service/impl/StreamPushServiceImpl.java | 29 ++- .../iot/vmp/storager/IRedisCatchStorage.java | 4 + .../vmp/storager/IVideoManagerStorager.java | 9 +- .../vmp/storager/dao/DeviceChannelMapper.java | 56 +++--- .../iot/vmp/storager/dao/GbStreamMapper.java | 4 +- .../storager/dao/PlatformChannelMapper.java | 15 ++ .../storager/dao/PlatformGbStreamMapper.java | 36 +++- .../storager/impl/RedisCatchStorageImpl.java | 28 ++- .../impl/VideoManagerStoragerImpl.java | 155 ++++++++++++++-- .../gb28181/gbStream/GbStreamController.java | 2 +- web_src/package-lock.json | 3 +- .../dialog/chooseChannelForStream.vue | 1 + 31 files changed, 951 insertions(+), 94 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java index 66d67bfa..60418f2c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java @@ -18,13 +18,13 @@ public class SubscribeInfo { this.fromTag = fromHeader.getTag(); ExpiresHeader expiresHeader = (ExpiresHeader)request.getHeader(ExpiresHeader.NAME); this.expires = expiresHeader.getExpires(); - this.event = (EventHeader)request.getHeader(EventHeader.NAME); + this.event = ((EventHeader)request.getHeader(EventHeader.NAME)).getName(); } private String id; private int expires; private String callId; - private EventHeader event; + private String event; private String fromTag; private String toTag; @@ -40,10 +40,6 @@ public class SubscribeInfo { return callId; } - public EventHeader getEvent() { - return event; - } - public String getFromTag() { return fromTag; } @@ -68,11 +64,15 @@ public class SubscribeInfo { this.callId = callId; } - public void setEvent(EventHeader event) { - this.event = event; - } - public void setFromTag(String fromTag) { this.fromTag = fromTag; } + + public String getEvent() { + return event; + } + + public void setEvent(String event) { + this.event = event; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 9495e9de..76b44271 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -1,19 +1,28 @@ package com.genersoft.iot.vmp.gb28181.event; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent; import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent; import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent; import com.genersoft.iot.vmp.media.zlm.event.ZLMOnlineEvent; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent; import com.genersoft.iot.vmp.gb28181.event.online.OnlineEvent; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + /** * @description:Event事件通知推送器,支持推送在线事件、离线事件 * @author: swwheihei @@ -80,4 +89,49 @@ public class EventPublisher { outEvent.setMediaServerId(mediaServerId); applicationEventPublisher.publishEvent(outEvent); } + + @Async + public void catalogEventPublish(String platformId, DeviceChannel deviceChannel, String type) { + List deviceChannelList = new ArrayList<>(); + deviceChannelList.add(deviceChannel); + catalogEventPublish(platformId, deviceChannelList, type); + } + + @Async + public void catalogEventPublish(String platformId, List deviceChannels, String type) { + CatalogEvent outEvent = new CatalogEvent(this); + List channels = new ArrayList<>(); + if (deviceChannels.size() > 1) { + // 数据去重 + Set gbIdSet = new HashSet<>(); + for (DeviceChannel deviceChannel : deviceChannels) { + if (!gbIdSet.contains(deviceChannel.getChannelId())) { + gbIdSet.add(deviceChannel.getChannelId()); + channels.add(deviceChannel); + } + } + }else { + channels = deviceChannels; + } + outEvent.setDeviceChannels(channels); + outEvent.setType(type); + outEvent.setPlatformId(platformId); + applicationEventPublisher.publishEvent(outEvent); + } + + @Async + public void catalogEventPublishForStream(String platformId, List gbStreams, String type) { + CatalogEvent outEvent = new CatalogEvent(this); + outEvent.setGbStreams(gbStreams); + outEvent.setType(type); + outEvent.setPlatformId(platformId); + applicationEventPublisher.publishEvent(outEvent); + } + + @Async + public void catalogEventPublishForStream(String platformId, GbStream gbStream, String type) { + List gbStreamList = new ArrayList<>(); + gbStreamList.add(gbStream); + catalogEventPublishForStream(platformId, gbStreamList, type); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java index 937e555b..ea322d12 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.event.offline; import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -35,6 +36,9 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent @Autowired private UserSetup userSetup; + @Autowired + private SipSubscribe sipSubscribe; + public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) { super(listenerContainer, userSetup); } @@ -54,6 +58,7 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.PLATFORM_KEEPALIVE_PREFIX + userSetup.getServerId() + "_"; String PLATFORM_REGISTER_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_PREFIX + userSetup.getServerId() + "_"; String KEEPLIVEKEY_PREFIX = VideoManagerConstants.KEEPLIVEKEY_PREFIX + userSetup.getServerId() + "_"; + String REGISTER_INFO_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetup.getServerId() + "_"; if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { String platformGBId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); @@ -65,6 +70,13 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent }else if (expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){ String deviceId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX); + }else if (expiredKey.startsWith(REGISTER_INFO_PREFIX)) { + String callid = expiredKey.substring(REGISTER_INFO_PREFIX.length()); + SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); + eventResult.callId = callid; + eventResult.msg = "注册超时"; + eventResult.type = "register timeout"; + sipSubscribe.getErrorSubscribe(callid).response(eventResult); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java index 9751915e..aa87728d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java @@ -1,6 +1,9 @@ package com.genersoft.iot.vmp.gb28181.event.offline; import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -13,6 +16,8 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import java.util.List; + /** * @description: 离线事件监听器,监听到离线后,修改设备离在线状态。 设备离线有两个来源: * 1、设备主动注销,发送注销指令,{@link com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.RegisterRequestProcessor} @@ -34,6 +39,9 @@ public class OfflineEventListener implements ApplicationListener { @Autowired private UserSetup userSetup; + @Autowired + private EventPublisher eventPublisher; + @Override public void onApplicationEvent(OfflineEvent event) { @@ -58,6 +66,8 @@ public class OfflineEventListener implements ApplicationListener { } } + List deviceChannelList = storager.queryOnlineChannelsByDeviceId(event.getDeviceId()); + eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.OFF); // 处理离线监听 storager.outline(event.getDeviceId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java index a62c76d5..12a50e1d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java @@ -3,6 +3,10 @@ package com.genersoft.iot.vmp.gb28181.event.online; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.service.IDeviceService; import com.genersoft.iot.vmp.storager.dao.dto.User; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,6 +19,7 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import java.text.SimpleDateFormat; +import java.util.List; /** * @description: 在线事件监听器,监听到离线后,修改设备离在线状态。 设备在线有两个来源: @@ -40,6 +45,9 @@ public class OnlineEventListener implements ApplicationListener { @Autowired private UserSetup userSetup; + @Autowired + private EventPublisher eventPublisher; + private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override @@ -76,6 +84,11 @@ public class OnlineEventListener implements ApplicationListener { } device.setOnline(1); + Device deviceInstore = storager.queryVideoDevice(device.getDeviceId()); + if (deviceInstore.getOnline() == 0) { + List deviceChannelList = storager.queryOnlineChannelsByDeviceId(device.getDeviceId()); + eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON); + } // 处理上线监听 storager.updateDevice(device); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java new file mode 100644 index 00000000..c035b808 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEvent.java @@ -0,0 +1,58 @@ +package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog; + +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import org.springframework.context.ApplicationEvent; + +import java.util.List; + +public class CatalogEvent extends ApplicationEvent { + public CatalogEvent(Object source) { + super(source); + } + + public static final String ON = "ON"; // 上线 + public static final String OFF = "OFF"; // 离线 + public static final String VLOST = "VLOST"; // 视频丢失 + public static final String DEFECT = "DEFECT"; // 故障 + public static final String ADD = "ADD"; // 增加 + public static final String DEL = "DEL"; // 删除 + public static final String UPDATE = "UPDATE"; // 更新 + + private List deviceChannels; + private List gbStreams; + private String type; + private String platformId; + + public List getDeviceChannels() { + return deviceChannels; + } + + public void setDeviceChannels(List deviceChannels) { + this.deviceChannels = deviceChannels; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getPlatformId() { + return platformId; + } + + public void setPlatformId(String platformId) { + this.platformId = platformId; + } + + public List getGbStreams() { + return gbStreams; + } + + public void setGbStreams(List gbStreams) { + this.gbStreams = gbStreams; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java new file mode 100644 index 00000000..4cbe5fbb --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -0,0 +1,167 @@ +package com.genersoft.iot.vmp.gb28181.event.subscribe.catalog; + +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IGbStreamService; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; + +import java.util.*; + +/** + * catalog事件 + */ +@Component +public class CatalogEventLister implements ApplicationListener { + + private final static Logger logger = LoggerFactory.getLogger(CatalogEventLister.class); + + @Autowired + private IVideoManagerStorager storager; + @Autowired + private IRedisCatchStorage redisCatchStorage; + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private SIPCommanderFroPlatform sipCommanderFroPlatform; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private SipConfig config; + + @Autowired + private UserSetup userSetup; + + @Autowired + private IGbStreamService gbStreamService; + + @Override + public void onApplicationEvent(CatalogEvent event) { + SubscribeInfo subscribe = null; + ParentPlatform parentPlatform = null; + + Map> parentPlatformMap = new HashMap<>(); + if (event.getPlatformId() != null) { + parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId()); + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + event.getPlatformId(); + subscribe = redisCatchStorage.getSubscribe(key); + }else { + // 获取所用订阅 + List platforms = redisCatchStorage.getAllSubscribePlatform(); + if (event.getDeviceChannels() != null) { + if (platforms.size() > 0) { + for (DeviceChannel deviceChannel : event.getDeviceChannels()) { + List parentPlatformsForGB = storager.queryPlatFormListForGBWithGBId(deviceChannel.getChannelId(), platforms); + parentPlatformMap.put(deviceChannel.getChannelId(), parentPlatformsForGB); + } + } + }else if (event.getGbStreams() != null) { + if (platforms.size() > 0) { + for (GbStream gbStream : event.getGbStreams()) { + List parentPlatformsForGB = storager.queryPlatFormListForStreamWithGBId(gbStream.getApp(),gbStream.getStream(), platforms); + parentPlatformMap.put(gbStream.getGbId(), parentPlatformsForGB); + } + } + } + + } + switch (event.getType()) { + case CatalogEvent.ON: + case CatalogEvent.OFF: + case CatalogEvent.DEL: + + if (parentPlatform != null || subscribe != null) { + List deviceChannelList = new ArrayList<>(); + if (event.getDeviceChannels() != null) { + deviceChannelList.addAll(event.getDeviceChannels()); + } + if (event.getGbStreams().size() > 0){ + for (GbStream gbStream : event.getGbStreams()) { + DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId()); + deviceChannelList.add(deviceChannelByStream); + } + } + if (deviceChannelList.size() > 0) { + logger.info("[Catalog事件: {}]平台:{},影响通道{}个", event.getType(), event.getPlatformId(), deviceChannelList.size()); + sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), parentPlatform, deviceChannelList, subscribe); + } + }else if (parentPlatformMap.keySet().size() > 0) { + for (String gbId : parentPlatformMap.keySet()) { + List parentPlatforms = parentPlatformMap.get(gbId); + if (parentPlatforms != null && parentPlatforms.size() > 0) { + for (ParentPlatform platform : parentPlatforms) { + logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId(); + SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key); + List deviceChannelList = new ArrayList<>(); + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setChannelId(gbId); + deviceChannelList.add(deviceChannel); + sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, deviceChannelList, subscribeInfo); + } + } + } + } + break; + case CatalogEvent.VLOST: + break; + case CatalogEvent.DEFECT: + break; + case CatalogEvent.ADD: + case CatalogEvent.UPDATE: + if (parentPlatform != null || subscribe != null) { + List deviceChannelList = new ArrayList<>(); + if (event.getDeviceChannels() != null) { + deviceChannelList.addAll(event.getDeviceChannels()); + } + if (event.getGbStreams().size() > 0){ + for (GbStream gbStream : event.getGbStreams()) { + DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), parentPlatform.getDeviceGBId()); + deviceChannelList.add(deviceChannelByStream); + } + } + if (deviceChannelList.size() > 0) { + logger.info("[Catalog事件: {}]平台:{},影响通道{}个", event.getType(), event.getPlatformId(), deviceChannelList.size()); + sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), parentPlatform, deviceChannelList, subscribe); + } + }else if (parentPlatformMap.keySet().size() > 0) { + for (String gbId : parentPlatformMap.keySet()) { + List parentPlatforms = parentPlatformMap.get(gbId); + if (parentPlatforms != null && parentPlatforms.size() > 0) { + for (ParentPlatform platform : parentPlatforms) { + logger.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), gbId); + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platform.getServerGBId(); + SubscribeInfo subscribeInfo = redisCatchStorage.getSubscribe(key); + List deviceChannelList = new ArrayList<>(); + DeviceChannel deviceChannel = storager.queryChannelInParentPlatform(platform.getServerGBId(), gbId); + deviceChannelList.add(deviceChannel); + GbStream gbStream = storager.queryStreamInParentPlatform(platform.getServerGBId(), gbId); + DeviceChannel deviceChannelByStream = gbStreamService.getDeviceChannelListByStream(gbStream, gbStream.getCatalogId(), platform.getDeviceGBId()); + deviceChannelList.add(deviceChannelByStream); + sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, deviceChannelList, subscribeInfo); + } + } + } + } + break; + default: + break; + } + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java index fc3d0277..0d56bd58 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java @@ -48,7 +48,7 @@ public class GPSSubscribeTask implements Runnable{ if (gbStream.isStatus()) { if (gpsMsgInfo != null) { // 发送GPS消息 - sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); + sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe); }else { // 没有在redis找到新的消息就使用数据库的消息 gpsMsgInfo = new GPSMsgInfo(); @@ -56,7 +56,7 @@ public class GPSSubscribeTask implements Runnable{ gpsMsgInfo.setLat(gbStream.getLongitude()); gpsMsgInfo.setLng(gbStream.getLongitude()); // 发送GPS消息 - sipCommanderForPlatform.sendMobilePosition(parentPlatform, gpsMsgInfo, subscribe); + sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index be369aed..71025c00 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -14,6 +14,7 @@ import org.springframework.stereotype.Component; import javax.sip.*; import javax.sip.header.CSeqHeader; import javax.sip.header.CallIdHeader; +import javax.sip.header.Header; import javax.sip.message.Response; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -140,6 +141,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { */ @Override public void processTimeout(TimeoutEvent timeoutEvent) { + System.out.println("processTimeout"); if(timeoutProcessor != null) { timeoutProcessor.process(timeoutEvent); } @@ -147,14 +149,31 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { @Override public void processIOException(IOExceptionEvent exceptionEvent) { + System.out.println("processIOException"); } @Override public void processTransactionTerminated(TransactionTerminatedEvent transactionTerminatedEvent) { +// Transaction transaction = null; +// System.out.println("processTransactionTerminated"); +// if (transactionTerminatedEvent.isServerTransaction()) { +// transaction = transactionTerminatedEvent.getServerTransaction(); +// }else { +// transaction = transactionTerminatedEvent.getClientTransaction(); +// } +// +// System.out.println(transaction.getBranchId()); +// System.out.println(transaction.getState()); +// System.out.println(transaction.getRequest().getMethod()); +// CallIdHeader header = (CallIdHeader)transaction.getRequest().getHeader(CallIdHeader.NAME); +// SipSubscribe.EventResult terminatedEventEventResult = new SipSubscribe.EventResult<>(transactionTerminatedEvent); + +// sipSubscribe.getErrorSubscribe(header.getCallId()).response(terminatedEventEventResult); } @Override public void processDialogTerminated(DialogTerminatedEvent dialogTerminatedEvent) { + System.out.println("processDialogTerminated"); CallIdHeader callId = dialogTerminatedEvent.getDialog().getCallId(); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index e8b41247..7325889f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import javax.sip.header.WWWAuthenticateHeader; +import java.util.List; public interface ISIPCommanderForPlatform { @@ -70,5 +71,20 @@ public interface ISIPCommanderForPlatform { * @param subscribeInfo 订阅相关的信息 * @return */ - boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo); + boolean sendNotifyMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo); + + /** + * 回复catalog事件-增加/更新 + * @param parentPlatform + * @param deviceChannels + */ + boolean sendNotifyForCatalogAddOrUpdate(String type, ParentPlatform parentPlatform, List deviceChannels, SubscribeInfo subscribeInfo); + + /** + * 回复catalog事件-删除 + * @param parentPlatform + * @param deviceChannels + */ + boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List deviceChannels, SubscribeInfo subscribeInfo); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 1707bde8..4216bb88 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -23,6 +23,7 @@ import javax.sip.header.CallIdHeader; import javax.sip.header.WWWAuthenticateHeader; import javax.sip.message.Request; import java.text.ParseException; +import java.util.List; import java.util.UUID; @Component @@ -96,7 +97,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm, null, callIdHeader); // 将 callid 写入缓存, 等注册成功可以更新状态 - redisCatchStorage.updatePlatformRegisterInfo(callIdHeader.getCallId(), parentPlatform.getServerGBId()); + String callIdFromHeader = callIdHeader.getCallId(); + redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, parentPlatform.getServerGBId()); sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{ if (event != null) { @@ -104,6 +106,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { parentPlatform.getServerGBId(), event.msg); } + redisCatchStorage.delPlatformRegisterInfo(callIdFromHeader); if (errorEvent != null ) { errorEvent.response(event); } @@ -219,8 +222,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { catalogXml.append("" + channel.getOwner() + "\r\n"); catalogXml.append("" + channel.getCivilCode() + "\r\n"); catalogXml.append("
" + channel.getAddress() + "
\r\n"); - catalogXml.append("" + channel.getParental() + "\r\n");// TODO 当前不能添加分组, 所以暂时没有父节点 - catalogXml.append("" + channel.getParentId() + "\r\n"); // TODO 当前不能添加分组, 所以暂时没有父节点 + catalogXml.append("" + channel.getParental() + "\r\n"); + catalogXml.append("" + channel.getParentId() + "\r\n"); catalogXml.append("" + channel.getSecrecy() + "\r\n"); catalogXml.append("" + channel.getRegisterWay() + "\r\n"); catalogXml.append("" + (channel.getStatus() == 0?"OFF":"ON") + "\r\n"); @@ -329,7 +332,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } @Override - public boolean sendMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) { + public boolean sendNotifyMobilePosition(ParentPlatform parentPlatform, GPSMsgInfo gpsMsgInfo, SubscribeInfo subscribeInfo) { if (parentPlatform == null) { return false; } @@ -364,4 +367,110 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } return true; } + + @Override + public boolean sendNotifyForCatalogAddOrUpdate(String type, ParentPlatform parentPlatform, List deviceChannels, SubscribeInfo subscribeInfo) { + if (parentPlatform == null) { + return false; + } + if (deviceChannels == null || deviceChannels.size() == 0) { + return false; + } + for (DeviceChannel channel : deviceChannels) { + try { + StringBuffer catalogXml = new StringBuffer(600); + catalogXml.append("\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("Catalog\r\n"); + catalogXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); + catalogXml.append("" + deviceChannels.size() + "\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("" + channel.getChannelId() + "\r\n"); + catalogXml.append("" + type + "\r\n"); + catalogXml.append("" + channel.getName() + "\r\n"); + catalogXml.append("" + channel.getManufacture() + "\r\n"); + catalogXml.append("" + channel.getModel() + "\r\n"); + catalogXml.append("" + channel.getOwner() + "\r\n"); + catalogXml.append("" + channel.getCivilCode() + "\r\n"); + catalogXml.append("
" + channel.getAddress() + "
\r\n"); + catalogXml.append("" + channel.getParental() + "\r\n"); + catalogXml.append("" + channel.getParentId() + "\r\n"); + catalogXml.append("" + channel.getSecrecy() + "\r\n"); + catalogXml.append("" + channel.getRegisterWay() + "\r\n"); + catalogXml.append("" + (channel.getStatus() == 0 ? "OFF" : "ON") + "\r\n"); + catalogXml.append("" + channel.getLongitude() + "\r\n"); + catalogXml.append("" + channel.getLatitude() + "\r\n"); + catalogXml.append("" + channel.getIpAddress() + "\r\n"); + catalogXml.append("" + channel.getPort() + "\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("" + channel.getPTZType() + "\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("
\r\n"); + catalogXml.append("
\r\n"); + catalogXml.append("
\r\n"); + + CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() + : udpSipProvider.getNewCallId(); + callIdHeader.setCallId(subscribeInfo.getCallId()); + + String tm = Long.toString(System.currentTimeMillis()); + + Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXml.toString(), subscribeInfo.getToTag(), subscribeInfo.getFromTag(), callIdHeader); + transmitRequest(parentPlatform, request); + Thread.sleep(100); + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + return false; + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + + @Override + public boolean sendNotifyForCatalogOther(String type, ParentPlatform parentPlatform, List deviceChannels, SubscribeInfo subscribeInfo) { + if (parentPlatform == null) { + return false; + } + if (deviceChannels == null || deviceChannels.size() == 0) { + return false; + } + + for (DeviceChannel channel : deviceChannels) { + try { + StringBuffer catalogXml = new StringBuffer(600); + catalogXml.append("\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("Catalog\r\n"); + catalogXml.append("" + (int) ((Math.random() * 9 + 1) * 100000) + "\r\n"); + catalogXml.append("" + deviceChannels.size() + "\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("" + channel.getChannelId() + "\r\n"); + catalogXml.append("" + type + "\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("\r\n"); + catalogXml.append("\r\n"); + + CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() + : udpSipProvider.getNewCallId(); + callIdHeader.setCallId(subscribeInfo.getCallId()); + + String tm = Long.toString(System.currentTimeMillis()); + + Request request = headerProviderPlarformProvider.createNotifyRequest(parentPlatform, catalogXml.toString(), subscribeInfo.getToTag(), subscribeInfo.getFromTag(), callIdHeader); + transmitRequest(parentPlatform, request); + Thread.sleep(100); + } catch (SipException | ParseException | InvalidArgumentException e) { + e.printStackTrace(); + return false; + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return true; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 2a9abad5..ae2819ca 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -106,9 +106,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (platform != null) { // 查询平台下是否有该通道 DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); - List gbStreams = storager.queryStreamInParentPlatform(requesterId, channelId); + GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); PlatformCatalog catalog = storager.getCatalog(channelId); - GbStream gbStream = gbStreams.size() > 0? gbStreams.get(0):null; MediaServerItem mediaServerItem = null; // 不是通道可能是直播流 if (channel != null && gbStream == null ) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index d6ceca3d..c339598e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; @@ -50,6 +51,9 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IVideoManagerStorager storager; + @Autowired + private EventPublisher eventPublisher; + @Autowired private SipConfig sipConfig; @@ -259,39 +263,39 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements channel.setDeviceId(device.getDeviceId()); logger.debug("收到来自设备【{}】的通道: {}【{}】", device.getDeviceId(), channel.getName(), channel.getChannelId()); switch (eventElement.getText().toUpperCase()) { - case "ON" : // 上线 + case CatalogEvent.ON: // 上线 logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOnline(deviceId, channel.getChannelId()); // 回复200 OK responseAck(evt, Response.OK); break; - case "OFF" : // 离线 + case CatalogEvent.OFF : // 离线 logger.info("收到来自设备【{}】的通道【{}】离线通知", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); // 回复200 OK responseAck(evt, Response.OK); break; - case "VLOST" : // 视频丢失 + case CatalogEvent.VLOST: // 视频丢失 logger.info("收到来自设备【{}】的通道【{}】视频丢失通知", device.getDeviceId(), channel.getChannelId()); storager.deviceChannelOffline(deviceId, channel.getChannelId()); // 回复200 OK responseAck(evt, Response.OK); break; - case "DEFECT" : // 故障 + case CatalogEvent.DEFECT: // 故障 // 回复200 OK responseAck(evt, Response.OK); break; - case "ADD" : // 增加 + case CatalogEvent.ADD: // 增加 logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channel.getChannelId()); storager.updateChannel(deviceId, channel); responseAck(evt, Response.OK); break; - case "DEL" : // 删除 + case CatalogEvent.DEL: // 删除 logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channel.getChannelId()); storager.delChannel(deviceId, channel.getChannelId()); responseAck(evt, Response.OK); break; - case "UPDATE" : // 更新 + case CatalogEvent.UPDATE: // 更新 logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channel.getChannelId()); storager.updateChannel(deviceId, channel); responseAck(evt, Response.OK); @@ -300,6 +304,8 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements responseAck(evt, Response.BAD_REQUEST, "event not found"); } + // 转发变化信息 + eventPublisher.catalogEventPublish(null, channel, eventElement.getText().toUpperCase()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index 9c5be8e4..13335d60 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -85,9 +85,9 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme // } else if (CmdType.ALARM.equals(cmd)) { // logger.info("接收到Alarm订阅"); // processNotifyAlarm(evt, rootElement); -// } else if (CmdType.CATALOG.equals(cmd)) { -// logger.info("接收到Catalog订阅"); -// processNotifyCatalogList(evt, rootElement); + } else if (CmdType.CATALOG.equals(cmd)) { + logger.info("接收到Catalog订阅"); + processNotifyCatalogList(evt, rootElement); } else { logger.info("接收到消息:" + cmd); // responseAck(evt, Response.OK); @@ -177,7 +177,40 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme } private void processNotifyCatalogList(RequestEvent evt, Element rootElement) { + String platformId = SipUtils.getUserIdFromFromHeader(evt.getRequest()); + String deviceID = XmlUtil.getText(rootElement, "DeviceID"); + SubscribeInfo subscribeInfo = new SubscribeInfo(evt, platformId); + String sn = XmlUtil.getText(rootElement, "SN"); + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_" + platformId; + StringBuilder resultXml = new StringBuilder(200); + resultXml.append("\r\n") + .append("\r\n") + .append("Catalog\r\n") + .append("" + sn + "\r\n") + .append("" + deviceID + "\r\n") + .append("OK\r\n") + .append("\r\n"); + + if (subscribeInfo.getExpires() > 0) { + redisCatchStorage.updateSubscribe(key, subscribeInfo); + }else if (subscribeInfo.getExpires() == 0) { + redisCatchStorage.delSubscribe(key); + } + + try { + Response response = responseXmlAck(evt, resultXml.toString()); + ToHeader toHeader = (ToHeader)response.getHeader(ToHeader.NAME); + subscribeInfo.setToTag(toHeader.getTag()); + redisCatchStorage.updateSubscribe(key, subscribeInfo); + + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index c1f473df..5919619f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.media.zlm; +import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -8,6 +9,9 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.*; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -65,7 +69,7 @@ public class ZLMHttpHookListener { private IMediaService mediaService; @Autowired - private ZLMRESTfulUtils zlmresTfulUtils; + private EventPublisher eventPublisher; @Autowired private ZLMMediaListManager zlmMediaListManager; @@ -341,29 +345,52 @@ public class ZLMHttpHookListener { if (!"rtp".equals(app)){ String type = OriginType.values()[item.getOriginType()].getType(); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem != null){ if (regist) { + StreamPushItem streamPushItem = null; redisCatchStorage.addStream(mediaServerItem, type, app, streamId, item); if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { - zlmMediaListManager.addPush(item); + streamPushItem = zlmMediaListManager.addPush(item); } + List gbStreams = new ArrayList<>(); + if (streamPushItem == null || streamPushItem.getGbId() == null) { + GbStream gbStream = storager.getGbStream(app, streamId); + gbStreams.add(gbStream); + }else { + if (streamPushItem.getGbId() != null) { + gbStreams.add(streamPushItem); + } + } + if (gbStreams.size() > 0) { + eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON); + } + }else { // 兼容流注销时类型从redis记录获取 MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, streamId, mediaServerId); - type = OriginType.values()[mediaItem.getOriginType()].getType(); + if (mediaItem != null) { + type = OriginType.values()[mediaItem.getOriginType()].getType(); + redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId); + } + GbStream gbStream = storager.getGbStream(app, streamId); + if (gbStream != null) { + eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF); + } zlmMediaListManager.removeMedia(app, streamId); - redisCatchStorage.removeStream(mediaServerItem.getId(), type, app, streamId); } - // 发送流变化redis消息 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("serverId", userSetup.getServerId()); - jsonObject.put("app", app); - jsonObject.put("stream", streamId); - jsonObject.put("register", regist); - jsonObject.put("mediaServerId", mediaServerId); - redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + if (type != null) { + // 发送流变化redis消息 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("serverId", userSetup.getServerId()); + jsonObject.put("app", app); + jsonObject.put("stream", streamId); + jsonObject.put("register", regist); + jsonObject.put("mediaServerId", mediaServerId); + redisCatchStorage.sendStreamChangeMsg(type, jsonObject); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 5b7ba1cc..a6c30cf9 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -105,7 +105,7 @@ public class ZLMMediaListManager { updateMedia(mediaServerItem, app, streamId); } - public void addPush(MediaItem mediaItem) { + public StreamPushItem addPush(MediaItem mediaItem) { // 查找此直播流是否存在redis预设gbId StreamPushItem transform = streamPushService.transform(mediaItem); // 从streamId取出查询关键值 @@ -130,7 +130,6 @@ public class ZLMMediaListManager { for (GbStream gbStream : gbStreams) { // 出现使用相同国标Id的视频流时,使用新流替换旧流, gbStreamMapper.del(gbStream.getApp(), gbStream.getStream()); - platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream()); if (!gbStream.isStatus()) { streamPushMapper.del(gbStream.getApp(), gbStream.getStream()); } @@ -142,6 +141,7 @@ public class ZLMMediaListManager { gbStreamMapper.add(transform); } } + return transform; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java index 9c5c32f1..3bb964b6 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IGbStreamService.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.github.pagehelper.PageInfo; @@ -35,6 +36,11 @@ public interface IGbStreamService { /** * 移除国标关联 * @param gbStreams + * @param platformId */ - boolean delPlatformInfo(List gbStreams); + boolean delPlatformInfo(String platformId, List gbStreams); + + DeviceChannel getDeviceChannelListByStream(GbStream gbStream, String catalogId, String deviceGBId); + + void sendCatalogMsg(GbStream gbStream, String type); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java index 21c666fd..35c262fe 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java @@ -1,7 +1,14 @@ package com.genersoft.iot.vmp.service.impl; +import com.genersoft.iot.vmp.conf.SipConfig; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.storager.dao.GbStreamMapper; +import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper; import com.genersoft.iot.vmp.service.IGbStreamService; import com.github.pagehelper.PageHelper; @@ -14,6 +21,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.TransactionDefinition; import org.springframework.transaction.TransactionStatus; +import java.util.ArrayList; import java.util.List; @Service @@ -33,6 +41,15 @@ public class GbStreamServiceImpl implements IGbStreamService { @Autowired private PlatformGbStreamMapper platformGbStreamMapper; + @Autowired + private ParentPlatformMapper platformMapper; + + @Autowired + private SipConfig sipConfig; + + @Autowired + private EventPublisher eventPublisher; + @Override public PageInfo getAll(Integer page, Integer count) { PageHelper.startPage(page, count); @@ -51,32 +68,62 @@ public class GbStreamServiceImpl implements IGbStreamService { // 放在事务内执行 boolean result = false; TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); + ParentPlatform parentPlatform = platformMapper.getParentPlatByServerGBId(platformId); try { + List deviceChannelList = new ArrayList<>(); for (GbStream gbStream : gbStreams) { gbStream.setCatalogId(catalogId); gbStream.setPlatformId(platformId); // TODO 修改为批量提交 platformGbStreamMapper.add(gbStream); + DeviceChannel deviceChannelListByStream = getDeviceChannelListByStream(gbStream, catalogId, parentPlatform.getDeviceGBId()); + deviceChannelList.add(deviceChannelListByStream); } dataSourceTransactionManager.commit(transactionStatus); //手动提交 + eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD); result = true; }catch (Exception e) { logger.error("批量保存流与平台的关系时错误", e); dataSourceTransactionManager.rollback(transactionStatus); } return result; - } @Override - public boolean delPlatformInfo(List gbStreams) { + public DeviceChannel getDeviceChannelListByStream(GbStream gbStream, String catalogId, String deviceGBId) { + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setChannelId(gbStream.getGbId()); + deviceChannel.setName(gbStream.getName()); + deviceChannel.setLongitude(gbStream.getLongitude()); + deviceChannel.setLatitude(gbStream.getLatitude()); + deviceChannel.setDeviceId(deviceGBId); + deviceChannel.setManufacture("wvp-pro"); + deviceChannel.setStatus(gbStream.isStatus()?1:0); + deviceChannel.setParentId(catalogId ==null?gbStream.getCatalogId():catalogId); + deviceChannel.setRegisterWay(1); + deviceChannel.setCivilCode(sipConfig.getDomain()); + deviceChannel.setModel("live"); + deviceChannel.setOwner("wvp-pro"); + deviceChannel.setParental(0); + deviceChannel.setSecrecy("0"); + return deviceChannel; + } + + @Override + public boolean delPlatformInfo(String platformId, List gbStreams) { // 放在事务内执行 boolean result = false; TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition); try { + List deviceChannelList = new ArrayList<>(); for (GbStream gbStream : gbStreams) { platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream()); + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setChannelId(gbStream.getGbId()); + deviceChannelList.add(deviceChannel); + eventPublisher.catalogEventPublish(platformId, deviceChannel, CatalogEvent.DEL); } + dataSourceTransactionManager.commit(transactionStatus); //手动提交 result = true; }catch (Exception e) { @@ -85,4 +132,27 @@ public class GbStreamServiceImpl implements IGbStreamService { } return result; } + + @Override + public void sendCatalogMsg(GbStream gbStream, String type) { + List gbStreams = new ArrayList<>(); + if (gbStream.getGbId() != null) { + gbStreams.add(gbStream); + }else { + StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(gbStream.getApp(), gbStream.getStream()); + if (streamProxyItem != null && streamProxyItem.getGbId() != null){ + gbStreams.add(streamProxyItem); + } + } + if (gbStreams.size() > 0) { + for (GbStream gs : gbStreams) { + List parentPlatforms = platformGbStreamMapper.selectByAppAndStream(gs.getApp(), gs.getStream()); + if (parentPlatforms.size() > 0) { + for (ParentPlatform parentPlatform : parentPlatforms) { + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), gs, type); + } + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java index 1efc6145..1c8f191f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java @@ -2,9 +2,13 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; @@ -57,12 +61,18 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Autowired private UserSetup userSetup; + @Autowired + private SipConfig sipConfig; + @Autowired private GbStreamMapper gbStreamMapper; @Autowired private PlatformGbStreamMapper platformGbStreamMapper; + @Autowired + private EventPublisher eventPublisher; + @Autowired private ParentPlatformMapper parentPlatformMapper; @@ -146,6 +156,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(param.getApp(), stream, parentPlatform.getServerGBId()); if (streamProxyItems == null) { platformGbStreamMapper.add(param); + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), param, CatalogEvent.ADD); } } } @@ -194,6 +205,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { public void del(String app, String stream) { StreamProxyItem streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream); if (streamProxyItem != null) { + gbStreamService.sendCatalogMsg(streamProxyItem, CatalogEvent.DEL); videoManagerStorager.deleteStreamProxy(app, stream); JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem); if (jsonObject != null && jsonObject.getInteger("code") == 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 3fac37ad..c8bf1915 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -5,12 +5,16 @@ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.GbStream; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.*; +import com.genersoft.iot.vmp.service.IGbStreamService; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -41,6 +45,12 @@ public class StreamPushServiceImpl implements IStreamPushService { @Autowired private PlatformGbStreamMapper platformGbStreamMapper; + @Autowired + private IGbStreamService gbStreamService; + + @Autowired + private EventPublisher eventPublisher; + @Autowired private ZLMRESTfulUtils zlmresTfulUtils; @@ -115,6 +125,7 @@ public class StreamPushServiceImpl implements IStreamPushService { stream.setStreamType("push"); stream.setStatus(true); int add = gbStreamMapper.add(stream); + // 查找开启了全部直播流共享的上级平台 List parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); if (parentPlatforms.size() > 0) { @@ -122,18 +133,30 @@ public class StreamPushServiceImpl implements IStreamPushService { stream.setCatalogId(parentPlatform.getCatalogId()); stream.setPlatformId(parentPlatform.getServerGBId()); String streamId = stream.getStream(); - StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId()); - if (streamProxyItems == null) { + StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId()); + if (streamProxyItem == null) { platformGbStreamMapper.add(stream); + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); + }else { + if (!streamProxyItem.getGbId().equals(stream.getGbId())) { + // 此流使用另一个国标Id已经与该平台关联,移除此记录 + platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId()); + platformGbStreamMapper.add(stream); + eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD); + } } } } + return add > 0; } @Override public boolean removeFromGB(GbStream stream) { + // 判断是否需要发送事件 + gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL); int del = gbStreamMapper.del(stream.getApp(), stream.getStream()); + platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream()); MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId()); JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream()); if (mediaList == null) { @@ -152,6 +175,8 @@ public class StreamPushServiceImpl implements IStreamPushService { @Override public boolean stop(String app, String streamId) { StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId); + gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL); + int delStream = streamPushMapper.del(app, streamId); gbStreamMapper.del(app, streamId); platformGbStreamMapper.delByAppAndStream(app, streamId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 8dc21ff3..bf2104c7 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -210,4 +210,8 @@ public interface IRedisCatchStorage { void delSubscribe(String key); MediaItem getStreamInfo(String app, String streamId, String mediaServerId); + + List getAllSubscribe(); + + List getAllSubscribePlatform(); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java index 229ab8cb..a89fc562 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java @@ -100,6 +100,7 @@ public interface IVideoManagerStorager { * @return */ public List queryChannelsByDeviceId(String deviceId); + public List queryOnlineChannelsByDeviceId(String deviceId); /** * 获取某个设备的通道 @@ -341,7 +342,7 @@ public interface IVideoManagerStorager { * @param channelId * @return */ - List queryStreamInParentPlatform(String platformId, String channelId); + GbStream queryStreamInParentPlatform(String platformId, String channelId); /** * 获取平台关联的直播流 @@ -459,4 +460,10 @@ public interface IVideoManagerStorager { int delRelation(PlatformCatalog platformCatalog); int updateStreamGPS(List gpsMsgInfo); + + List queryPlatFormListForGBWithGBId(String channelId, List platforms); + + List queryPlatFormListForStreamWithGBId(String app, String stream, List platforms); + + GbStream getGbStream(String app, String streamId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index 3a4f466e..4a12ce4b 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -54,17 +54,22 @@ public interface DeviceChannelMapper { int update(DeviceChannel channel); @Select(value = {" "}) List queryChannels(String deviceId, String parentChannelId, String query, Boolean hasSubChannel, Boolean online); @@ -170,19 +175,30 @@ public interface DeviceChannelMapper { ""}) int batchUpdate(List updateChannels); + @Select(value = {" "}) - List queryChannelsByDeviceIdWithStartAndLimit(String deviceId, String parentChannelId, String query, Boolean hasSubChannel, Boolean online, int start, int limit); + List queryChannelsByDeviceIdWithStartAndLimit(String deviceId, String parentChannelId, String query, + Boolean hasSubChannel, Boolean online, int start, int limit); + + @Select("SELECT * FROM device_channel WHERE deviceId=#{deviceId} AND status=1") + List queryOnlineChannelsByDeviceId(String deviceId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java index a51f3dc3..82df3318 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java @@ -13,7 +13,7 @@ import java.util.List; @Repository public interface GbStreamMapper { - @Insert("INSERT INTO gb_stream (app, stream, gbId, name, " + + @Insert("REPLACE INTO gb_stream (app, stream, gbId, name, " + "longitude, latitude, streamType, mediaServerId, status) VALUES" + "('${app}', '${stream}', '${gbId}', '${name}', " + "'${longitude}', '${latitude}', '${streamType}', " + @@ -48,7 +48,7 @@ public interface GbStreamMapper { @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs " + "LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " + "WHERE gs.gbId = '${gbId}' AND pgs.platformId = '${platformId}'") - List queryStreamInPlatform(String platformId, String gbId); + GbStream queryStreamInPlatform(String platformId, String gbId); @Select("SELECT gs.*, pgs.platformId as platformId, pgs.catalogId as catalogId FROM gb_stream gs " + "LEFT JOIN platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream " + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java index da38cb00..a9e2c626 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformChannelMapper.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.storager.dao; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; import org.apache.ibatis.annotations.Delete; @@ -73,4 +74,18 @@ public interface PlatformChannelMapper { "DELETE FROM platform_gb_channel WHERE catalogId=#{parentId} AND platformId=#{platformId} AND channelId=#{id}" + "") int delByCatalogIdAndChannelIdAndPlatformId(PlatformCatalog platformCatalog); + + @Select(" ") + List queryPlatFormListForGBWithGBId(String channelId, List platforms); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java index 06486c92..248b37a0 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformGbStreamMapper.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.storager.dao; import com.genersoft.iot.vmp.gb28181.bean.GbStream; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.bean.PlatformGbStream; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; @@ -14,7 +15,7 @@ import java.util.List; @Repository public interface PlatformGbStreamMapper { - @Insert("INSERT INTO platform_gb_stream (app, stream, platformId, catalogId) VALUES" + + @Insert("REPLACE INTO platform_gb_stream (app, stream, platformId, catalogId) VALUES" + "('${app}', '${stream}', '${platformId}', '${catalogId}')") int add(PlatformGbStream platformGbStream); @@ -24,10 +25,20 @@ public interface PlatformGbStreamMapper { @Delete("DELETE FROM platform_gb_stream WHERE platformId=#{platformId}") int delByPlatformId(String platformId); - @Select("SELECT * FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream}") - List selectByAppAndStream(String app, String stream); + @Select("SELECT " + + "pp.* " + + "FROM " + + "platform_gb_stream pgs " + + "LEFT JOIN parent_platform pp ON pp.serverGBId = pgs.platformId " + + "WHERE " + + "pgs.app =#{app} " + + "AND pgs.stream =#{stream} " + + "GROUP BY pp.serverGBId") + List selectByAppAndStream(String app, String stream); - @Select("SELECT * FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream} AND platformId=#{serverGBId}") + @Select("SELECT pgs.*, gs.gbId FROM platform_gb_stream pgs " + + "LEFT JOIN gb_stream gs ON pgs.app = gs.app AND pgs.stream = gs.stream " + + "WHERE pgs.app=#{app} AND pgs.stream=#{stream} AND pgs.platformId=#{serverGBId}") StreamProxyItem selectOne(String app, String stream, String serverGBId); @Select("select gs.* \n" + @@ -47,4 +58,21 @@ public interface PlatformGbStreamMapper { @Delete("DELETE FROM platform_gb_stream WHERE catalogId=#{id}") int delByCatalogId(String id); + @Select(" ") + List queryPlatFormListForGBWithGBId(String app, String stream, List platforms); + + @Select("SELECT * FROM platform_gb_stream WHERE app=#{app} AND stream=#{stream} AND platformId=#{platformId}") + int delByAppAndStreamAndPlatform(String app, String streamId, String platformId); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 1d689434..56789d92 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -250,7 +250,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void updatePlatformRegisterInfo(String callId, String platformGbId) { String key = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetup.getServerId() + "_" + callId; - redis.set(key, platformGbId); + redis.set(key, platformGbId, 30); } @@ -508,4 +508,30 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { return result; } + + @Override + public List getAllSubscribe() { + String scanKey = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_*"; + List result = new ArrayList<>(); + List keys = redis.scan(scanKey); + for (int i = 0; i < keys.size(); i++) { + String key = (String) keys.get(i); + SubscribeInfo subscribeInfo = (SubscribeInfo) redis.get(key); + result.add(subscribeInfo); + } + return result; + } + + @Override + public List getAllSubscribePlatform() { + String scanKey = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_Catalog_*"; + List result = new ArrayList<>(); + List keys = redis.scan(scanKey); + for (int i = 0; i < keys.size(); i++) { + String key = (String) keys.get(i); + String platformId = key.substring(scanKey.length() - 1); + result.add(platformId); + } + return result; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 2f4c9664..e0b955ff 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -1,6 +1,9 @@ package com.genersoft.iot.vmp.storager.impl; +import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; @@ -27,9 +30,9 @@ import java.text.SimpleDateFormat; import java.util.*; /** - * @description:视频设备数据存储-jdbc实现 - * @author: swwheihei - * @date: 2020年5月6日 下午2:31:42 + * 视频设备数据存储-jdbc实现 + * swwheihei + * 2020年5月6日 下午2:31:42 */ @SuppressWarnings("rawtypes") @Component @@ -37,6 +40,12 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { private Logger logger = LoggerFactory.getLogger(VideoManagerStoragerImpl.class); + @Autowired + EventPublisher eventPublisher; + + @Autowired + SipConfig sipConfig; + @Autowired DataSourceTransactionManager dataSourceTransactionManager; @@ -134,6 +143,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { return deviceMapper.add(device) > 0; }else { redisCatchStorage.updateDevice(device); + return deviceMapper.update(device) > 0; } @@ -408,6 +418,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { device.setOnline(1); logger.info("更新设备在线: " + deviceId); redisCatchStorage.updateDevice(device); + List deviceChannelList = deviceChannelMapper.queryOnlineChannelsByDeviceId(deviceId); + eventPublisher.catalogEventPublish(null, deviceChannelList, CatalogEvent.ON); return deviceMapper.update(device) > 0; } @@ -514,7 +526,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { if (parentPlatform.isShareAllLiveStream()) { gbStreamService.addPlatformInfo(gbStreams, parentPlatform.getServerGBId(), parentPlatform.getCatalogId()); }else { - gbStreamService.delPlatformInfo(gbStreams); + gbStreamService.delPlatformInfo(parentPlatform.getServerGBId(), gbStreams); } } } @@ -590,6 +602,9 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { int result = 0; if (channelReducesToAdd.size() > 0) { result = platformChannelMapper.addChannels(platformId, channelReducesToAdd); + // TODO 后续给平台增加控制开关以控制是否响应目录订阅 + List deviceChannelList = getDeviceChannelListByChannelReduceList(channelReducesToAdd, catalogId); + eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.ADD); } return result; @@ -600,7 +615,13 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { public int delChannelForGB(String platformId, List channelReduces) { int result = platformChannelMapper.delChannelForGB(platformId, channelReduces); - + List deviceChannelList = new ArrayList<>(); + for (ChannelReduce channelReduce : channelReduces) { + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setChannelId(channelReduce.getChannelId()); + deviceChannelList.add(deviceChannel); + } + eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.DEL); return result; } @@ -739,7 +760,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { * @return */ @Override - public List queryStreamInParentPlatform(String platformId, String gbId) { + public GbStream queryStreamInParentPlatform(String platformId, String gbId) { return gbStreamMapper.queryStreamInPlatform(platformId, gbId); } @@ -771,7 +792,11 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { streamPushMapper.addAll(streamPushItems); // TODO 待优化 for (int i = 0; i < streamPushItems.size(); i++) { - gbStreamMapper.setStatus(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream(), true); + int onlineResult = gbStreamMapper.setStatus(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream(), true); + if (onlineResult > 0) { + // 发送上线通知 + eventPublisher.catalogEventPublishForStream(null, streamPushItems.get(i), CatalogEvent.ON); + } } } @@ -780,6 +805,7 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { streamPushMapper.del(streamPushItem.getApp(), streamPushItem.getStream()); streamPushMapper.add(streamPushItem); gbStreamMapper.setStatus(streamPushItem.getApp(), streamPushItem.getStream(), true); + if(!StringUtils.isEmpty(streamPushItem.getGbId() )){ // 查找开启了全部直播流共享的上级平台 List parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream(); @@ -858,7 +884,12 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @Override public int addCatalog(PlatformCatalog platformCatalog) { - return catalogMapper.add(platformCatalog); + int result = catalogMapper.add(platformCatalog); + if (result > 0) { + DeviceChannel deviceChannel = getDeviceChannelByCatalog(platformCatalog); + eventPublisher.catalogEventPublish(platformCatalog.getPlatformId(), deviceChannel, CatalogEvent.ADD); + } + return result; } @Override @@ -873,23 +904,56 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { List platformCatalogList = catalogMapper.selectByParentId(platformCatalog.getPlatformId(), platformCatalog.getId()); for (PlatformCatalog catalog : platformCatalogList) { if (catalog.getChildrenCount() == 0) { - catalogMapper.del(catalog.getId()); - platformGbStreamMapper.delByCatalogId(catalog.getId()); - platformChannelMapper.delByCatalogId(catalog.getId()); + delCatalogExecute(catalog.getId(), catalog.getPlatformId()); }else { delCatalog(catalog.getId()); } } } - int delresult = catalogMapper.del(id); - int delStreamresult = platformGbStreamMapper.delByCatalogId(id); - int delChanneresult = platformChannelMapper.delByCatalogId(id); - return delresult + delChanneresult + delStreamresult; + return delCatalogExecute(id, platformCatalog.getPlatformId()); } + private int delCatalogExecute(String id, String platformId) { + int delresult = catalogMapper.del(id); + DeviceChannel deviceChannelForCatalog = new DeviceChannel(); + if (delresult > 0){ + deviceChannelForCatalog.setChannelId(id); + eventPublisher.catalogEventPublish(platformId, deviceChannelForCatalog, CatalogEvent.DEL); + } + + List gbStreams = platformGbStreamMapper.queryChannelInParentPlatformAndCatalog(platformId, id); + if (gbStreams.size() > 0){ + List deviceChannelList = new ArrayList<>(); + for (GbStream gbStream : gbStreams) { + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setChannelId(gbStream.getGbId()); + deviceChannelList.add(deviceChannel); + } + eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.DEL); + } + int delStreamresult = platformGbStreamMapper.delByCatalogId(id); + List platformCatalogs = platformChannelMapper.queryChannelInParentPlatformAndCatalog(platformId, id); + if (platformCatalogs.size() > 0){ + List deviceChannelList = new ArrayList<>(); + for (PlatformCatalog platformCatalog : platformCatalogs) { + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setChannelId(platformCatalog.getId()); + deviceChannelList.add(deviceChannel); + } + eventPublisher.catalogEventPublish(platformId, deviceChannelList, CatalogEvent.DEL); + } + int delChannelresult = platformChannelMapper.delByCatalogId(id); + return delresult + delChannelresult + delStreamresult; + } + @Override public int updateCatalog(PlatformCatalog platformCatalog) { - return catalogMapper.update(platformCatalog); + int result = catalogMapper.update(platformCatalog); + if (result > 0) { + DeviceChannel deviceChannel = getDeviceChannelByCatalog(platformCatalog); + eventPublisher.catalogEventPublish(platformCatalog.getPlatformId(), deviceChannel, CatalogEvent.UPDATE); + } + return result; } @Override @@ -905,11 +969,17 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { @Override public int delRelation(PlatformCatalog platformCatalog) { if (platformCatalog.getType() == 1) { + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setChannelId(platformCatalog.getId()); + eventPublisher.catalogEventPublish(platformCatalog.getPlatformId(), deviceChannel, CatalogEvent.DEL); return platformChannelMapper.delByCatalogIdAndChannelIdAndPlatformId(platformCatalog); }else if (platformCatalog.getType() == 2) { List gbStreams = platformGbStreamMapper.queryChannelInParentPlatformAndCatalog(platformCatalog.getPlatformId(), platformCatalog.getParentId()); for (GbStream gbStream : gbStreams) { if (gbStream.getGbId().equals(platformCatalog.getId())) { + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setChannelId(gbStream.getGbId()); + eventPublisher.catalogEventPublish(platformCatalog.getPlatformId(), deviceChannel, CatalogEvent.DEL); return platformGbStreamMapper.delByAppAndStream(gbStream.getApp(), gbStream.getStream()); } } @@ -921,4 +991,57 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { public int updateStreamGPS(List gpsMsgInfos) { return gbStreamMapper.updateStreamGPS(gpsMsgInfos); } + + private List getDeviceChannelListByChannelReduceList(List channelReduces, String catalogId) { + List deviceChannelList = new ArrayList<>(); + if (channelReduces.size() > 0){ + for (ChannelReduce channelReduce : channelReduces) { + DeviceChannel deviceChannel = queryChannel(channelReduce.getDeviceId(), channelReduce.getChannelId()); + deviceChannel.setParental(1); + deviceChannel.setParentId(catalogId); + deviceChannelList.add(deviceChannel); + } + } + return deviceChannelList; + } + + private DeviceChannel getDeviceChannelByCatalog(PlatformCatalog catalog) { + ParentPlatform parentPlatByServerGBId = platformMapper.getParentPlatByServerGBId(catalog.getPlatformId()); + DeviceChannel deviceChannel = new DeviceChannel(); + deviceChannel.setChannelId(catalog.getId()); + deviceChannel.setName(catalog.getName()); + deviceChannel.setLongitude(0.0); + deviceChannel.setLatitude(0.0); + deviceChannel.setDeviceId(parentPlatByServerGBId.getDeviceGBId()); + deviceChannel.setManufacture("wvp-pro"); + deviceChannel.setStatus(1); + deviceChannel.setParental(1); + deviceChannel.setParentId(catalog.getParentId()); + deviceChannel.setRegisterWay(1); + deviceChannel.setCivilCode(sipConfig.getDomain()); + deviceChannel.setModel("live"); + deviceChannel.setOwner("wvp-pro"); + deviceChannel.setSecrecy("0"); + return deviceChannel; + } + + @Override + public List queryOnlineChannelsByDeviceId(String deviceId) { + return deviceChannelMapper.queryOnlineChannelsByDeviceId(deviceId); + } + + @Override + public List queryPlatFormListForGBWithGBId(String channelId, List platforms) { + return platformChannelMapper.queryPlatFormListForGBWithGBId(channelId, platforms); + } + + @Override + public List queryPlatFormListForStreamWithGBId(String app, String stream, List platforms) { + return platformGbStreamMapper.queryPlatFormListForGBWithGBId(app, stream, platforms); + } + + @Override + public GbStream getGbStream(String app, String streamId) { + return gbStreamMapper.selectOne(app, streamId); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java index 69492a79..a2fcf15f 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/gbStream/GbStreamController.java @@ -62,7 +62,7 @@ public class GbStreamController { @DeleteMapping(value = "/del") @ResponseBody public Object del(@RequestBody GbStreamParam gbStreamParam){ - if (gbStreamService.delPlatformInfo(gbStreamParam.getGbStreams())) { + if (gbStreamService.delPlatformInfo(gbStreamParam.getPlatformId(), gbStreamParam.getGbStreams())) { return "success"; }else { return "fail"; diff --git a/web_src/package-lock.json b/web_src/package-lock.json index 1fc922b6..9a0ce7f6 100644 --- a/web_src/package-lock.json +++ b/web_src/package-lock.json @@ -5094,7 +5094,8 @@ }, "js-yaml": { "version": "3.7.0", - "resolved": "", + "resolved": "https://registry.npmjs.org/js-yaml/-/js-yaml-3.7.0.tgz", + "integrity": "sha1-XJZ93YN6m/3KXy3oQlOr6KHAO4A=", "dev": true, "requires": { "argparse": "^1.0.7", diff --git a/web_src/src/components/dialog/chooseChannelForStream.vue b/web_src/src/components/dialog/chooseChannelForStream.vue index 341c22f7..20c42efb 100644 --- a/web_src/src/components/dialog/chooseChannelForStream.vue +++ b/web_src/src/components/dialog/chooseChannelForStream.vue @@ -147,6 +147,7 @@ export default { method:"delete", url:"/api/gbStream/del", data:{ + platformId: that.platformId, gbStreams: delData, } }).then((res)=>{