From e4bd61860d45ab1d506b70fb7458cf26c45aad86 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 31 Aug 2022 11:29:13 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E5=AF=B9redis=20key=E8=BF=87?= =?UTF-8?q?=E6=9C=9F=E4=BA=8B=E4=BB=B6=E7=9A=84=E4=BD=BF=E7=94=A8=EF=BC=9B?= =?UTF-8?q?=E9=87=8D=E6=9E=84=E5=9B=BD=E6=A0=87=E7=BA=A7=E8=81=94=E7=9A=84?= =?UTF-8?q?=E6=B3=A8=E5=86=8C=E4=BF=9D=E6=B4=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/SipPlatformRunner.java | 35 ++- .../genersoft/iot/vmp/conf/UserSetting.java | 10 - ...edisKeyExpirationEventMessageListener.java | 41 ---- .../genersoft/iot/vmp/gb28181/SipLayer.java | 15 +- .../iot/vmp/gb28181/bean/ParentPlatform.java | 6 +- .../vmp/gb28181/bean/ParentPlatformCatch.java | 4 +- .../iot/vmp/gb28181/bean/SubscribeHolder.java | 3 + .../iot/vmp/gb28181/event/EventPublisher.java | 33 --- .../iot/vmp/gb28181/event/SipSubscribe.java | 28 ++- .../KeepaliveTimeoutListenerForPlatform.java | 81 ------ .../PlatformKeepaliveExpireEvent.java | 28 --- .../PlatformKeepaliveExpireEventLister.java | 87 ------- .../PlatformCycleRegisterEvent.java | 24 -- .../PlatformCycleRegisterEventLister.java | 46 ---- .../PlatformNotRegisterEvent.java | 25 -- .../PlatformNotRegisterEventLister.java | 90 ------- .../subscribe/catalog/CatalogEventLister.java | 13 - .../MobilePositionSubscribeHandlerTask.java | 1 - .../cmd/ISIPCommanderForPlatform.java | 4 +- .../cmd/SIPRequestHeaderPlarformProvider.java | 11 +- .../cmd/SIPRequestHeaderProvider.java | 11 +- .../transmit/cmd/impl/SIPCommander.java | 18 +- .../cmd/impl/SIPCommanderFroPlatform.java | 24 +- .../request/SIPRequestProcessorParent.java | 9 + .../impl/InviteResponseProcessor.java | 11 +- .../impl/RegisterResponseProcessor.java | 54 ++-- .../iot/vmp/gb28181/utils/HeaderUtils.java | 22 ++ .../iot/vmp/media/zlm/AssistRESTfulUtils.java | 2 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 39 ++- .../iot/vmp/service/IPlatformService.java | 45 ++++ .../service/impl/MediaServerServiceImpl.java | 13 +- .../vmp/service/impl/PlatformServiceImpl.java | 232 ++++++++++++++++++ .../iot/vmp/service/impl/PlayServiceImpl.java | 4 +- .../iot/vmp/storager/IRedisCatchStorage.java | 9 +- .../vmp/storager/IVideoManagerStorage.java | 9 - .../dao/dto/PlatformRegisterInfo.java | 41 ++++ .../storager/impl/RedisCatchStorageImpl.java | 21 +- .../impl/VideoManagerStorageImpl.java | 7 - .../iot/vmp/utils/redis/RedisUtil.java | 1 + .../gb28181/platform/PlatformController.java | 39 +-- src/main/resources/all-application.yml | 2 - .../src/components/dialog/MediaServerEdit.vue | 2 +- .../src/components/dialog/StreamProxyEdit.vue | 2 +- .../src/components/dialog/platformEdit.vue | 2 +- .../src/components/dialog/pushStreamEdit.vue | 2 +- 45 files changed, 513 insertions(+), 693 deletions(-) delete mode 100644 src/main/java/com/genersoft/iot/vmp/conf/redis/RedisKeyExpirationEventMessageListener.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEvent.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEvent.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEventLister.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEvent.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/utils/HeaderUtils.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java create mode 100644 src/main/java/com/genersoft/iot/vmp/storager/dao/dto/PlatformRegisterInfo.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java index cf16f864..93674f61 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.springframework.beans.factory.annotation.Autowired; @@ -15,6 +16,7 @@ import java.util.List; /** * 系统启动时控制上级平台重新注册 + * @author lin */ @Component @Order(value=3) @@ -27,7 +29,7 @@ public class SipPlatformRunner implements CommandLineRunner { private IRedisCatchStorage redisCatchStorage; @Autowired - private EventPublisher publisher; + private IPlatformService platformService; @Autowired private ISIPCommanderForPlatform sipCommanderForPlatform; @@ -35,33 +37,26 @@ public class SipPlatformRunner implements CommandLineRunner { @Override public void run(String... args) throws Exception { - // 设置所有平台离线 - storager.outlineForAllParentPlatform(); - - // 清理所有平台注册缓存 - redisCatchStorage.cleanPlatformRegisterInfos(); - - // 停止所有推流 -// zlmrtpServerFactory.closeAllSendRtpStream(); - + // 获取所有启用的平台 List parentPlatforms = storager.queryEnableParentPlatformList(true); for (ParentPlatform parentPlatform : parentPlatforms) { - redisCatchStorage.updatePlatformRegister(parentPlatform); - - redisCatchStorage.updatePlatformKeepalive(parentPlatform); - + // 更新缓存 ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch(); - parentPlatformCatch.setParentPlatform(parentPlatform); parentPlatformCatch.setId(parentPlatform.getServerGBId()); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); + if (parentPlatform.isStatus()) { + // 设置所有平台离线 + platformService.offline(parentPlatform); + // 取消订阅 + sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{ + platformService.login(parentPlatform); + }); + }else { + platformService.login(parentPlatform); + } - // 取消订阅 - sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{ - // 发送平台未注册消息 - publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId()); - }); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index d28ddebc..017b39db 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -31,8 +31,6 @@ public class UserSetting { private Boolean logInDatebase = Boolean.TRUE; - private Boolean redisConfig = Boolean.TRUE; - private String serverId = "000000"; private String thirdPartyGBIdReg = "[\\s\\S]*"; @@ -123,14 +121,6 @@ public class UserSetting { this.thirdPartyGBIdReg = thirdPartyGBIdReg; } - public Boolean getRedisConfig() { - return redisConfig; - } - - public void setRedisConfig(Boolean redisConfig) { - this.redisConfig = redisConfig; - } - public Boolean getRecordSip() { return recordSip; } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisKeyExpirationEventMessageListener.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisKeyExpirationEventMessageListener.java deleted file mode 100644 index b3adab52..00000000 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisKeyExpirationEventMessageListener.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.genersoft.iot.vmp.conf.redis; - -import com.genersoft.iot.vmp.conf.UserSetting; -import org.springframework.data.redis.connection.RedisConnection; -import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; - -import java.util.Properties; - -public class RedisKeyExpirationEventMessageListener extends KeyExpirationEventMessageListener { - - private UserSetting userSetting; - private RedisMessageListenerContainer listenerContainer; - private String keyspaceNotificationsConfigParameter = "EA"; - - public RedisKeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) { - super(listenerContainer); - this.listenerContainer = listenerContainer; - this.userSetting = userSetting; - } - - @Override - public void init() { - if (!userSetting.getRedisConfig()) { - // 配置springboot默认Config为空,即不让应用去修改redis的默认配置,因为Redis服务出于安全会禁用CONFIG命令给远程用户使用 - setKeyspaceNotificationsConfigParameter(""); - }else { - - RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection(); - Properties config = connection.getConfig("notify-keyspace-events"); - try { - if (!keyspaceNotificationsConfigParameter.equals(config.getProperty("notify-keyspace-events"))) { - connection.setConfig("notify-keyspace-events", keyspaceNotificationsConfigParameter); - } - } finally { - connection.close(); - } - } - super.init(); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java index 29253e7a..afc56718 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java @@ -56,7 +56,7 @@ public class SipLayer{ * gov/nist/javax/sip/SipStackImpl.class */ if (logger.isDebugEnabled()) { - properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true"); + properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "false"); } // 接收所有notify请求,即使没有订阅 properties.setProperty("gov.nist.javax.sip.DELIVER_UNSOLICITED_NOTIFY", "true"); @@ -68,14 +68,13 @@ public class SipLayer{ properties.setProperty("gov.nist.javax.sip.RELIABLE_CONNECTION_KEEP_ALIVE_TIMEOUT", "60"); /** - * sip_server_log.log 和 sip_debug_log.log public static final int TRACE_NONE = - * 0; public static final int TRACE_MESSAGES = 16; public static final int - * TRACE_EXCEPTION = 17; public static final int TRACE_DEBUG = 32; + * sip_server_log.log 和 sip_debug_log.log ERROR, INFO, WARNING, OFF, DEBUG, TRACE */ - if (logger.isDebugEnabled()) { - properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "DEBUG"); - } - properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "INFO"); + properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "ERROR"); +// if (logger.isDebugEnabled()) { +// properties.setProperty("gov.nist.javax.sip.TRACE_LEVEL", "DEBUG"); +// } + sipStack = (SipStackImpl) sipFactory.createSipStack(properties); return sipStack; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java index ef2eecd5..ade5d0ee 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatform.java @@ -84,7 +84,7 @@ public class ParentPlatform { * 注册周期 (秒) */ @Schema(description = "注册周期 (秒)") - private String expires; + private int expires; /** * 心跳周期(秒) @@ -286,11 +286,11 @@ public class ParentPlatform { this.password = password; } - public String getExpires() { + public int getExpires() { return expires; } - public void setExpires(String expires) { + public void setExpires(int expires) { this.expires = expires; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java index 6c429f26..a53d26e4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ParentPlatformCatch.java @@ -4,7 +4,9 @@ public class ParentPlatformCatch { private String id; - // 心跳未回复次数 + /** + * 心跳未回复次数 + */ private int keepAliveReply; // 注册未回复次数 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 4a900c16..441dff3f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -14,6 +14,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +/** + * @author lin + */ @Component public class SubscribeHolder { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 8a4dd3da..26ababd4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -2,9 +2,6 @@ package com.genersoft.iot.vmp.gb28181.event; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.device.RequestTimeoutEvent; -import com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire.PlatformKeepaliveExpireEvent; -import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformCycleRegisterEvent; -import com.genersoft.iot.vmp.gb28181.event.platformNotRegister.PlatformNotRegisterEvent; import com.genersoft.iot.vmp.gb28181.event.record.RecordEndEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.media.zlm.event.ZLMOfflineEvent; @@ -31,36 +28,6 @@ public class EventPublisher { @Autowired private ApplicationEventPublisher applicationEventPublisher; - - /** - * 平台心跳到期事件 - * @param platformGbId - */ - public void platformKeepaliveExpireEventPublish(String platformGbId){ - PlatformKeepaliveExpireEvent platformKeepaliveExpireEvent = new PlatformKeepaliveExpireEvent(this); - platformKeepaliveExpireEvent.setPlatformGbID(platformGbId); - applicationEventPublisher.publishEvent(platformKeepaliveExpireEvent); - } - - /** - * 平台未注册事件 - * @param platformGbId - */ - public void platformNotRegisterEventPublish(String platformGbId){ - PlatformNotRegisterEvent platformNotRegisterEvent = new PlatformNotRegisterEvent(this); - platformNotRegisterEvent.setPlatformGbID(platformGbId); - applicationEventPublisher.publishEvent(platformNotRegisterEvent); - } - - /** - * 平台周期注册事件 - * @param paltformGbId - */ - public void platformRegisterCycleEventPublish(String paltformGbId) { - PlatformCycleRegisterEvent platformCycleRegisterEvent = new PlatformCycleRegisterEvent(this); - platformCycleRegisterEvent.setPlatformGbID(paltformGbId); - applicationEventPublisher.publishEvent(platformCycleRegisterEvent); - } /** * 设备报警事件 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java index c6cfc7a0..b3fd82e8 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java @@ -59,9 +59,25 @@ public class SipSubscribe { void response(EventResult eventResult); } + /** + * + */ + public enum EventResultType{ + // 超时 + timeout, + // 回复 + response, + // 事务已结束 + transactionTerminated, + // 会话已结束 + dialogTerminated, + // 设备未找到 + deviceNotFoundEvent + } + public static class EventResult{ public int statusCode; - public String type; + public EventResultType type; public String msg; public String callId; public Dialog dialog; @@ -76,7 +92,7 @@ public class SipSubscribe { ResponseEvent responseEvent = (ResponseEvent)event; Response response = responseEvent.getResponse(); this.dialog = responseEvent.getDialog(); - this.type = "response"; + this.type = EventResultType.response; if (response != null) { this.msg = response.getReasonPhrase(); this.statusCode = response.getStatusCode(); @@ -85,28 +101,28 @@ public class SipSubscribe { }else if (event instanceof TimeoutEvent) { TimeoutEvent timeoutEvent = (TimeoutEvent)event; - this.type = "timeout"; + this.type = EventResultType.timeout; this.msg = "消息超时未回复"; this.statusCode = -1024; this.dialog = timeoutEvent.getClientTransaction().getDialog(); this.callId = this.dialog != null?timeoutEvent.getClientTransaction().getDialog().getCallId().getCallId(): null; }else if (event instanceof TransactionTerminatedEvent) { TransactionTerminatedEvent transactionTerminatedEvent = (TransactionTerminatedEvent)event; - this.type = "transactionTerminated"; + this.type = EventResultType.transactionTerminated; this.msg = "事务已结束"; this.statusCode = -1024; this.callId = transactionTerminatedEvent.getClientTransaction().getDialog().getCallId().getCallId(); this.dialog = transactionTerminatedEvent.getClientTransaction().getDialog(); }else if (event instanceof DialogTerminatedEvent) { DialogTerminatedEvent dialogTerminatedEvent = (DialogTerminatedEvent)event; - this.type = "dialogTerminated"; + this.type = EventResultType.dialogTerminated; this.msg = "会话已结束"; this.statusCode = -1024; this.callId = dialogTerminatedEvent.getDialog().getCallId().getCallId(); this.dialog = dialogTerminatedEvent.getDialog(); }else if (event instanceof DeviceNotFoundEvent) { DeviceNotFoundEvent deviceNotFoundEvent = (DeviceNotFoundEvent)event; - this.type = "deviceNotFoundEvent"; + this.type = EventResultType.deviceNotFoundEvent; this.msg = "设备未找到"; this.statusCode = -1024; this.dialog = deviceNotFoundEvent.getDialog(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java deleted file mode 100644 index ead82464..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java +++ /dev/null @@ -1,81 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.offline; - -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.conf.redis.RedisKeyExpirationEventMessageListener; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.connection.Message; -import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.stereotype.Component; - -import com.genersoft.iot.vmp.common.VideoManagerConstants; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; - -/** - * 设备心跳超时监听,借助redis过期特性,进行监听,监听到说明设备心跳超时,发送离线事件 - * @author swwheihei - */ -@Component -public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEventMessageListener { - - private Logger logger = LoggerFactory.getLogger(KeepaliveTimeoutListenerForPlatform.class); - - @Autowired - private EventPublisher publisher; - - @Autowired - private UserSetting userSetting; - - @Autowired - private SipSubscribe sipSubscribe; - - @Autowired - private IVideoManagerStorage storager; - - public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) { - super(listenerContainer, userSetting); - } - - - /** - * 监听失效的key - * @param message - * @param pattern - */ - @Override - public void onMessage(Message message, byte[] pattern) { - // 获取失效的key - String expiredKey = message.toString(); - // 平台心跳到期,需要重发, 判断是否已经多次未收到心跳回复, 多次未收到,则重新发起注册, 注册尝试多次未得到回复,则认为平台离线 - String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.PLATFORM_KEEPALIVE_PREFIX + userSetting.getServerId() + "_"; - String PLATFORM_REGISTER_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_PREFIX + userSetting.getServerId() + "_"; - String REGISTER_INFO_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_"; - if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { - String platformGbId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length()); - ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGbId); - if (platform != null) { - publisher.platformKeepaliveExpireEventPublish(platformGbId); - } - }else if (expiredKey.startsWith(PLATFORM_REGISTER_PREFIX)) { - String platformGbId = expiredKey.substring(PLATFORM_REGISTER_PREFIX.length(),expiredKey.length()); - ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGbId); - if (platform != null) { - publisher.platformRegisterCycleEventPublish(platformGbId); - } - }else if (expiredKey.startsWith(REGISTER_INFO_PREFIX)) { - String callId = expiredKey.substring(REGISTER_INFO_PREFIX.length()); - if (sipSubscribe.getErrorSubscribe(callId) != null) { - SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); - eventResult.callId = callId; - eventResult.msg = "注册超时"; - eventResult.type = "register timeout"; - sipSubscribe.getErrorSubscribe(callId).response(eventResult); - } - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEvent.java deleted file mode 100644 index 1e9a2c4b..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEvent.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire; - -import org.springframework.context.ApplicationEvent; - -/** - * 平台心跳超时事件 - */ -public class PlatformKeepaliveExpireEvent extends ApplicationEvent { - - /** - * Add default serial version ID - */ - private static final long serialVersionUID = 1L; - - private String platformGbID; - - public PlatformKeepaliveExpireEvent(Object source) { - super(source); - } - - public String getPlatformGbID() { - return platformGbID; - } - - public void setPlatformGbID(String platformGbID) { - this.platformGbID = platformGbID; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java deleted file mode 100644 index 67b297c3..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformKeepaliveExpire/PlatformKeepaliveExpireEventLister.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.platformKeepaliveExpire; - -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; -import org.springframework.stereotype.Component; - -import javax.sip.message.Response; - -/** - * @description: 平台心跳超时事件 - * @author: panll - * @date: 2020年11月5日 10:00 - */ -@Component -public class PlatformKeepaliveExpireEventLister implements ApplicationListener { - - - private final static Logger logger = LoggerFactory.getLogger(PlatformKeepaliveExpireEventLister.class); - - @Autowired - private IVideoManagerStorage storager; - - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private ISIPCommanderForPlatform sipCommanderForPlatform; - - @Autowired - private SipSubscribe sipSubscribe; - - @Autowired - private EventPublisher publisher; - - @Override - public void onApplicationEvent(@NotNull PlatformKeepaliveExpireEvent event) { - - if (logger.isDebugEnabled()) { - logger.debug("平台心跳到期事件事件触发,平台国标ID:" + event.getPlatformGbID()); - } - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformGbID()); - ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(event.getPlatformGbID()); - if (parentPlatformCatch == null) { - return; - } - if (parentPlatform == null) { - logger.debug("平台心跳到期事件事件触发,但平台已经删除!!! 平台国标ID:" + event.getPlatformGbID()); - return; - } - parentPlatformCatch.setParentPlatform(parentPlatform); - // 发送心跳 - if (parentPlatformCatch.getKeepAliveReply() >= 3) { - // 有3次未收到心跳回复, 设置平台状态为离线, 开始重新注册 - logger.warn("有3次未收到心跳回复,标记设置平台状态为离线, 并重新注册 平台国标ID:" + event.getPlatformGbID()); - storager.updateParentPlatformStatus(event.getPlatformGbID(), false); - publisher.platformNotRegisterEventPublish(event.getPlatformGbID()); - parentPlatformCatch.setKeepAliveReply(0); - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - }else { - // 再次发送心跳 - String callId = sipCommanderForPlatform.keepalive(parentPlatform); - - parentPlatformCatch.setKeepAliveReply( parentPlatformCatch.getKeepAliveReply() + 1); - // 存储心跳信息, 并设置状态为未回复, 如果多次过期仍未收到回复,则认为上级平台已经离线 - redisCatchStorage.updatePlatformKeepalive(parentPlatform); - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - - sipSubscribe.addOkSubscribe(callId, (SipSubscribe.EventResult eventResult) ->{ - if (eventResult.statusCode == Response.OK) { - // 收到心跳响应信息, - parentPlatformCatch.setKeepAliveReply(0); - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - } - } ); - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEvent.java deleted file mode 100644 index c2ff61f3..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEvent.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.platformNotRegister; - -import org.springframework.context.ApplicationEvent; - -public class PlatformCycleRegisterEvent extends ApplicationEvent { - /** - * Add default serial version ID - */ - private static final long serialVersionUID = 1L; - - private String platformGbID; - - public String getPlatformGbID() { - return platformGbID; - } - - public void setPlatformGbID(String platformGbID) { - this.platformGbID = platformGbID; - } - - public PlatformCycleRegisterEvent(Object source) { - super(source); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEventLister.java deleted file mode 100644 index d2a9246f..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformCycleRegisterEventLister.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.platformNotRegister; - -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; -import org.springframework.stereotype.Component; - -import java.util.Timer; -import java.util.TimerTask; - -@Component -public class PlatformCycleRegisterEventLister implements ApplicationListener { - - private final static Logger logger = LoggerFactory.getLogger(PlatformCycleRegisterEventLister.class); - - @Autowired - private IVideoManagerStorage storager; - @Autowired - private ISIPCommanderForPlatform sipCommanderFroPlatform; - @Autowired - private DynamicTask dynamicTask; - - @Override - public void onApplicationEvent(PlatformCycleRegisterEvent event) { - logger.info("上级平台周期注册事件"); - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformGbID()); - if (parentPlatform == null) { - logger.info("[ 平台未注册事件 ] 平台已经删除!!! 平台国标ID:" + event.getPlatformGbID()); - return; - } - String taskKey = "platform-cycle-register" + parentPlatform.getServerGBId();; - SipSubscribe.Event okEvent = (responseEvent)->{ - dynamicTask.stop(taskKey); - }; - dynamicTask.startCron(taskKey, ()->{ - logger.info("[平台注册]再次向平台注册,平台国标ID:" + event.getPlatformGbID()); - sipCommanderFroPlatform.register(parentPlatform, null, okEvent); - }, Integer.parseInt(parentPlatform.getExpires())* 1000); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEvent.java deleted file mode 100644 index c9369754..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEvent.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.platformNotRegister; - -import org.springframework.context.ApplicationEvent; - -public class PlatformNotRegisterEvent extends ApplicationEvent { - - /** - * Add default serial version ID - */ - private static final long serialVersionUID = 1L; - - private String platformGbID; - - public PlatformNotRegisterEvent(Object source) { - super(source); - } - - public String getPlatformGbID() { - return platformGbID; - } - - public void setPlatformGbID(String platformGbID) { - this.platformGbID = platformGbID; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java deleted file mode 100644 index 56bdeb58..00000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.platformNotRegister; - -import com.genersoft.iot.vmp.conf.DynamicTask; -import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; -import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; -import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; -import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.service.IMediaServerService; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.storager.IVideoManagerStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; -import org.springframework.stereotype.Component; - -import java.util.*; - -/** - * @description: 平台未注册事件,来源有二: - * 1、平台新添加 - * 2、平台心跳超时 - * @author: panll - * @date: 2020年11月24日 10:00 - */ -@Component -public class PlatformNotRegisterEventLister implements ApplicationListener { - - private final static Logger logger = LoggerFactory.getLogger(PlatformNotRegisterEventLister.class); - - @Autowired - private IVideoManagerStorage storager; - @Autowired - private IRedisCatchStorage redisCatchStorage; - @Autowired - private IMediaServerService mediaServerService; - - @Autowired - private SIPCommanderFroPlatform sipCommanderFroPlatform; - - @Autowired - private ZLMRTPServerFactory zlmrtpServerFactory; - - @Autowired - private SipConfig config; - - @Autowired - private DynamicTask dynamicTask; - - // @Autowired - // private RedisUtil redis; - - @Override - public void onApplicationEvent(PlatformNotRegisterEvent event) { - - logger.info("[ 平台未注册事件 ]平台国标ID:" + event.getPlatformGbID()); - - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformGbID()); - if (parentPlatform == null) { - logger.info("[ 平台未注册事件 ] 平台已经删除!!! 平台国标ID:" + event.getPlatformGbID()); - return; - } - // 查询是否有推流, 如果有则都停止 - List sendRtpItems = redisCatchStorage.querySendRTPServer(event.getPlatformGbID()); - if (sendRtpItems != null && sendRtpItems.size() > 0) { - logger.info("[ 平台未注册事件 ] 停止[ {} ]的所有推流", event.getPlatformGbID()); - for (SendRtpItem sendRtpItem : sendRtpItems) { - redisCatchStorage.deleteSendRTPServer(event.getPlatformGbID(), sendRtpItem.getChannelId(), null, null); - MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - Map param = new HashMap<>(); - param.put("vhost", "__defaultVhost__"); - param.put("app", sendRtpItem.getApp()); - param.put("stream", sendRtpItem.getStreamId()); - zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); - } - - } - String taskKey = "platform-not-register-" + parentPlatform.getServerGBId(); - SipSubscribe.Event okEvent = (responseEvent)->{ - dynamicTask.stop(taskKey); - }; - dynamicTask.startCron(taskKey, ()->{ - logger.info("[平台注册]再次向平台注册,平台国标ID:" + event.getPlatformGbID()); - sipCommanderFroPlatform.register(parentPlatform, null, okEvent); - }, config.getRegisterTimeInterval()* 1000); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 79bb4cad..734d6007 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -30,23 +30,10 @@ public class CatalogEventLister implements ApplicationListener { @Autowired private IVideoManagerStorage storager; - @Autowired - private IRedisCatchStorage redisCatchStorage; - @Autowired - private IMediaServerService mediaServerService; @Autowired private SIPCommanderFroPlatform sipCommanderFroPlatform; - @Autowired - private ZLMRTPServerFactory zlmrtpServerFactory; - - @Autowired - private SipConfig config; - - @Autowired - private UserSetting userSetting; - @Autowired private IGbStreamService gbStreamService; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index 7edee4dd..2ee10376 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -60,7 +60,6 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持 List gbStreams = storager.queryGbStreamListInPlatform(platform.getServerGBId()); if (gbStreams.size() == 0) { - logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platform.getServerGBId()); return; } for (DeviceChannel deviceChannel : gbStreams) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index d000f5af..351505f3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -15,7 +15,7 @@ public interface ISIPCommanderForPlatform { * @return */ boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent); - boolean register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain); + boolean register(ParentPlatform parentPlatform, String callId, WWWAuthenticateHeader www, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister); /** * 向上级平台注销 @@ -30,7 +30,7 @@ public interface ISIPCommanderForPlatform { * @param parentPlatform * @return callId(作为接受回复的判定) */ - String keepalive(ParentPlatform parentPlatform); + String keepalive(ParentPlatform parentPlatform,SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent); /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java index ad8043f1..a75e806c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderPlarformProvider.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; +import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import gov.nist.javax.sip.message.MessageFactoryImpl; import org.springframework.beans.factory.annotation.Autowired; @@ -75,7 +76,7 @@ public class SIPRequestHeaderPlarformProvider { } - public Request createRegisterRequest(@NotNull ParentPlatform platform, long CSeq, String fromTag, String viaTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException { + public Request createRegisterRequest(@NotNull ParentPlatform platform, long CSeq, String fromTag, String viaTag, CallIdHeader callIdHeader, boolean isRegister) throws ParseException, InvalidArgumentException, PeerUnavailableException { Request request = null; String sipAddress = sipConfig.getIp() + ":" + sipConfig.getPort(); //请求行 @@ -109,18 +110,20 @@ public class SIPRequestHeaderPlarformProvider { .createSipURI(platform.getDeviceGBId(), sipAddress)); request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); - ExpiresHeader expires = sipFactory.createHeaderFactory().createExpiresHeader(Integer.parseInt(platform.getExpires())); + ExpiresHeader expires = sipFactory.createHeaderFactory().createExpiresHeader(isRegister ? platform.getExpires() : 0); request.addHeader(expires); + UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory); + request.addHeader(userAgentHeader); return request; } public Request createRegisterRequest(@NotNull ParentPlatform parentPlatform, String fromTag, String viaTag, - String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException { + String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader, boolean isRegister) throws ParseException, PeerUnavailableException, InvalidArgumentException { - Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, viaTag, callIdHeader); + Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, viaTag, callIdHeader, isRegister); SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort()); if (www == null) { AuthorizationHeader authorizationHeader = sipFactory.createHeaderFactory().createAuthorizationHeader("Digest"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java index b89fd8ea..aee6d4e0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java @@ -12,6 +12,7 @@ import javax.sip.message.Request; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import gov.nist.javax.sip.SipProviderImpl; import gov.nist.javax.sip.SipStackImpl; @@ -266,15 +267,7 @@ public class SIPRequestHeaderProvider { Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory() .createSipURI(sipConfig.getId(), sipConfig.getIp() + ":" + sipConfig.getPort())); infoRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); - List agentParam = new ArrayList<>(); - agentParam.add("wvp-pro"); - // TODO 添加版本信息以及日期 - UserAgentHeader userAgentHeader = null; - try { - userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); - } catch (ParseException e) { - throw new RuntimeException(e); - } + UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory); infoRequest.addHeader(userAgentHeader); ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 3f6fa0ce..e9d80466 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider; +import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.HookType; @@ -740,15 +741,7 @@ public class SIPCommander implements ISIPCommander { // 增加Contact header Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort())); byeRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); - List agentParam = new ArrayList<>(); - agentParam.add("wvp-pro"); - // TODO 添加版本信息以及日期 - UserAgentHeader userAgentHeader = null; - try { - userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); - } catch (ParseException e) { - throw new RuntimeException(e); - } + UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory); byeRequest.addHeader(userAgentHeader); ClientTransaction clientTransaction = null; if("TCP".equals(protocol)) { @@ -1677,14 +1670,11 @@ public class SIPCommander implements ISIPCommander { clientTransaction = udpSipProvider.getNewClientTransaction(request); } if (request.getHeader(UserAgentHeader.NAME) == null) { - List agentParam = new ArrayList<>(); - agentParam.add("wvp-pro"); - // TODO 添加版本信息以及日期 UserAgentHeader userAgentHeader = null; try { - userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); + userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory); } catch (ParseException e) { - throw new RuntimeException(e); + logger.error("添加UserAgentHeader失败", e); } request.addHeader(userAgentHeader); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index 1d8b6058..2d6d582b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; +import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -75,28 +76,21 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Override public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { - return register(parentPlatform, null, null, errorEvent, okEvent, false); + return register(parentPlatform, null, null, errorEvent, okEvent, false, true); } @Override public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { - ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); - parentPlatform.setExpires("0"); - if (parentPlatformCatch != null) { - parentPlatformCatch.setParentPlatform(parentPlatform); - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - } - return register(parentPlatform, null, null, errorEvent, okEvent, false); + return register(parentPlatform, null, null, errorEvent, okEvent, false, false); } @Override public boolean register(ParentPlatform parentPlatform, @Nullable String callId, @Nullable WWWAuthenticateHeader www, - SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain) { + SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent, boolean registerAgain, boolean isRegister) { try { Request request; String tm = Long.toString(System.currentTimeMillis()); if (!registerAgain ) { - // //callid CallIdHeader callIdHeader = null; if(parentPlatform.getTransport().equals("TCP")) { callIdHeader = tcpSipProvider.getNewCallId(); @@ -107,10 +101,10 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), "FromRegister" + tm, - "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader); + "z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader, isRegister); // 将 callid 写入缓存, 等注册成功可以更新状态 String callIdFromHeader = callIdHeader.getCallId(); - redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, parentPlatform.getServerGBId()); + redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister)); sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{ if (event != null) { @@ -127,7 +121,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { }else { CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); - request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, "FromRegister" + tm, null, callId, www, callIdHeader); + request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform, "FromRegister" + tm, null, callId, www, callIdHeader, isRegister); } transmitRequest(parentPlatform, request, null, okEvent); @@ -145,7 +139,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } @Override - public String keepalive(ParentPlatform parentPlatform) { + public String keepalive(ParentPlatform parentPlatform,SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { String callId = null; try { String characterSet = parentPlatform.getCharacterSet(); @@ -168,7 +162,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { UUID.randomUUID().toString().replace("-", ""), null, callIdHeader); - transmitRequest(parentPlatform, request); + transmitRequest(parentPlatform, request, errorEvent, okEvent); callId = callIdHeader.getCallId(); } catch (ParseException | InvalidArgumentException | SipException e) { e.printStackTrace(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index 8f3ba0a2..8977d8a7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -59,6 +59,9 @@ public abstract class SIPRequestProcessorParent { public ServerTransaction getServerTransaction(RequestEvent evt) { Request request = evt.getRequest(); ServerTransaction serverTransaction = evt.getServerTransaction(); + if (serverTransaction != null) { + System.out.println(serverTransaction.getState().toString()); + } // 判断TCP还是UDP boolean isTcp = false; ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME); @@ -86,6 +89,8 @@ public abstract class SIPRequestProcessorParent { logger.error(e.getMessage()); } catch (TransactionUnavailableException e) { logger.error(e.getMessage()); + }finally { + } } return serverTransaction; @@ -182,6 +187,10 @@ public abstract class SIPRequestProcessorParent { sipFactory.createAddressFactory().createSipURI(sipURI.getUser(), sipURI.getHost()+":"+sipURI.getPort() )); response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); + ServerTransaction serverTransaction = getServerTransaction(evt); + if (serverTransaction == null) { + + } getServerTransaction(evt).sendResponse(response); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index 04a11b93..1a396353 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; +import com.genersoft.iot.vmp.gb28181.utils.HeaderUtils; import gov.nist.javax.sip.ResponseEventExt; import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.stack.SIPDialog; @@ -103,15 +104,7 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract { } requestURI.setPort(event.getRemotePort()); reqAck.setRequestURI(requestURI); - List agentParam = new ArrayList<>(); - agentParam.add("wvp-pro"); - // TODO 添加版本信息以及日期 - UserAgentHeader userAgentHeader = null; - try { - userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); - } catch (ParseException e) { - throw new RuntimeException(e); - } + UserAgentHeader userAgentHeader = HeaderUtils.createUserAgentHeader(sipFactory); reqAck.addHeader(userAgentHeader); Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort())); reqAck.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress)); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index a48dd203..a5cddaed 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -6,8 +6,10 @@ import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract; +import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -44,6 +46,9 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { @Autowired private SubscribeHolder subscribeHolder; + @Autowired + private IPlatformService platformService; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -60,48 +65,39 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { Response response = evt.getResponse(); CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME); String callId = callIdHeader.getCallId(); - - String platformGBId = redisCatchStorage.queryPlatformRegisterInfo(callId); - if (platformGBId == null) { - logger.info(String.format("未找到callId: %s 的注册/注销平台id", callId )); + PlatformRegisterInfo platformRegisterInfo = redisCatchStorage.queryPlatformRegisterInfo(callId); + if (platformRegisterInfo == null) { + logger.info(String.format("[国标级联]未找到callId: %s 的注册/注销平台id", callId )); return; } - ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId); + ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformRegisterInfo.getPlatformId()); if (parentPlatformCatch == null) { - logger.warn(String.format("[收到注册/注销%S请求]平台:%s,但是平台缓存信息未查询到!!!", response.getStatusCode(),platformGBId)); + logger.warn(String.format("[国标级联]收到注册/注销%S请求,平台:%s,但是平台缓存信息未查询到!!!", response.getStatusCode(),platformRegisterInfo.getPlatformId())); return; } - String action = parentPlatformCatch.getParentPlatform().getExpires().equals("0") ? "注销" : "注册"; - logger.info(String.format("[%s %S响应]%s ", action, response.getStatusCode(), platformGBId )); + + String action = platformRegisterInfo.isRegister() ? "注册" : "注销"; + logger.info(String.format("[国标级联]%s %S响应,%s ", action, response.getStatusCode(), platformRegisterInfo.getPlatformId() )); ParentPlatform parentPlatform = parentPlatformCatch.getParentPlatform(); if (parentPlatform == null) { - logger.warn(String.format("收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformGBId, action, response.getStatusCode())); + logger.warn(String.format("[国标级联]收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformRegisterInfo.getPlatformId(), action, response.getStatusCode())); return; } - if (response.getStatusCode() == 401) { + if (response.getStatusCode() == Response.UNAUTHORIZED) { WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME); - sipCommanderForPlatform.register(parentPlatform, callId, www, null, null, true); - }else if (response.getStatusCode() == 200){ - // 注册/注销成功 - logger.info(String.format("%s %s成功", platformGBId, action)); + sipCommanderForPlatform.register(parentPlatform, callId, www, null, null, true, platformRegisterInfo.isRegister()); + }else if (response.getStatusCode() == Response.OK){ + + if (platformRegisterInfo.isRegister()) { + platformService.online(parentPlatform); + }else { + platformService.offline(parentPlatform); + } + + // 注册/注销成功移除缓存的信息 redisCatchStorage.delPlatformRegisterInfo(callId); - redisCatchStorage.delPlatformCatchInfo(platformGBId); - // 取回Expires设置,避免注销过程中被置为0 - ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); - if (parentPlatformTmp != null) { - parentPlatformTmp.setStatus("注册".equals(action)); - redisCatchStorage.updatePlatformRegister(parentPlatformTmp); - redisCatchStorage.updatePlatformKeepalive(parentPlatformTmp); - parentPlatformCatch.setParentPlatform(parentPlatformTmp); - } - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); - storager.updateParentPlatformStatus(platformGBId, "注册".equals(action)); - if ("注销".equals(action)) { - subscribeHolder.removeCatalogSubscribe(platformGBId); - subscribeHolder.removeMobilePositionSubscribe(platformGBId); - } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/HeaderUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/HeaderUtils.java new file mode 100644 index 00000000..86112672 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/HeaderUtils.java @@ -0,0 +1,22 @@ +package com.genersoft.iot.vmp.gb28181.utils; + +import javax.sip.PeerUnavailableException; +import javax.sip.SipFactory; +import javax.sip.header.UserAgentHeader; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; + +/** + * 生成header的工具类 + * @author lin + */ +public class HeaderUtils { + + public static UserAgentHeader createUserAgentHeader(SipFactory sipFactory) throws PeerUnavailableException, ParseException { + List agentParam = new ArrayList<>(); + agentParam.add("WVP PRO"); + // TODO 添加版本信息以及日期 + return sipFactory.createHeaderFactory().createUserAgentHeader(agentParam); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java index 36ae1b81..2d117543 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/AssistRESTfulUtils.java @@ -50,7 +50,7 @@ public class AssistRESTfulUtils { if (mediaServerItem == null) { return null; } - if (ObjectUtils.isEmpty(mediaServerItem.getRecordAssistPort())) { + if (mediaServerItem.getRecordAssistPort() > 0) { logger.warn("未启用Assist服务"); return null; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index e7c32ef8..7afe0fd1 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -18,8 +18,11 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.PostMapping; @@ -92,6 +95,10 @@ public class ZLMHttpHookListener { @Autowired private AssistRESTfulUtils assistRESTfulUtils; + @Qualifier("taskExecutor") + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + /** * 服务器定时上报时间,上报间隔可配置,默认10s上报一次 * @@ -238,9 +245,12 @@ public class ZLMHttpHookListener { // 鉴权通过 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo); // 通知assist新的callId - if (mediaInfo != null && mediaInfo.getRecordAssistPort() > 0) { - assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null); - } + taskExecutor.execute(()->{ + if (mediaInfo != null && mediaInfo.getRecordAssistPort() > 0) { + assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null); + } + }); + }else { zlmMediaListManager.sendStreamEvent(param.getApp(),param.getStream(), param.getMediaServerId()); } @@ -416,18 +426,23 @@ public class ZLMHttpHookListener { String schema = item.getSchema(); List tracks = item.getTracks(); boolean regist = item.isRegist(); - if (regist) { - StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); - if (streamAuthorityInfo == null) { - streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(item); + if (item.getOriginType() == OriginType.RTMP_PUSH.ordinal() + || item.getOriginType() == OriginType.RTSP_PUSH.ordinal() + || item.getOriginType() == OriginType.RTC_PUSH.ordinal()) { + if (regist) { + StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); + if (streamAuthorityInfo == null) { + streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(item); + }else { + streamAuthorityInfo.setOriginType(item.getOriginType()); + streamAuthorityInfo.setOriginTypeStr(item.getOriginTypeStr()); + } + redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo); }else { - streamAuthorityInfo.setOriginType(item.getOriginType()); - streamAuthorityInfo.setOriginTypeStr(item.getOriginTypeStr()); + redisCatchStorage.removeStreamAuthorityInfo(app, stream); } - redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo); - }else { - redisCatchStorage.removeStreamAuthorityInfo(app, stream); } + if ("rtsp".equals(schema)){ logger.info("on_stream_changed:注册->{}, app->{}, stream->{}", regist, app, stream); if (regist) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java new file mode 100644 index 00000000..b5f3c5b7 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlatformService.java @@ -0,0 +1,45 @@ +package com.genersoft.iot.vmp.service; + +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.github.pagehelper.PageInfo; + +/** + * 国标平台的业务类 + * @author lin + */ +public interface IPlatformService { + + ParentPlatform queryPlatformByServerGBId(String platformGbId); + + /** + * 分页获取上级平台 + * @param page + * @param count + * @return + */ + PageInfo queryParentPlatformList(int page, int count); + + /** + * 添加级联平台 + * @param parentPlatform 级联平台 + */ + boolean add(ParentPlatform parentPlatform); + + /** + * 平台上线 + * @param parentPlatform 平台信息 + */ + void online(ParentPlatform parentPlatform); + + /** + * 平台离线 + * @param parentPlatform 平台信息 + */ + void offline(ParentPlatform parentPlatform); + + /** + * 向上级平台发起注册 + * @param parentPlatform + */ + void login(ParentPlatform parentPlatform); +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index d923755a..37aeca0d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -513,10 +513,7 @@ public class MediaServerServiceImpl implements IMediaServerService { mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); String protocol = sslEnabled ? "https" : "http"; String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort); - String recordHookPrex = null; - if (mediaServerItem.getRecordAssistPort() != 0) { - recordHookPrex = String.format("http://127.0.0.1:%s/api/record", mediaServerItem.getRecordAssistPort()); - } + Map param = new HashMap<>(); param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline param.put("ffmpeg.cmd","%s -fflags nobuffer -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s"); @@ -525,7 +522,6 @@ public class MediaServerServiceImpl implements IMediaServerService { param.put("hook.on_play",String.format("%s/on_play", hookPrex)); param.put("hook.on_http_access",String.format("%s/on_http_access", hookPrex)); param.put("hook.on_publish", String.format("%s/on_publish", hookPrex)); - param.put("hook.on_record_mp4",recordHookPrex != null? String.format("%s/on_record_mp4", recordHookPrex): ""); param.put("hook.on_record_ts",String.format("%s/on_record_ts", hookPrex)); param.put("hook.on_rtsp_auth",String.format("%s/on_rtsp_auth", hookPrex)); param.put("hook.on_rtsp_realm",String.format("%s/on_rtsp_realm", hookPrex)); @@ -535,6 +531,11 @@ public class MediaServerServiceImpl implements IMediaServerService { param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrex)); param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex)); param.put("hook.on_server_keepalive",String.format("%s/on_server_keepalive", hookPrex)); + if (mediaServerItem.getRecordAssistPort() > 0) { + param.put("hook.on_record_mp4",String.format("http://127.0.0.1:%s/api/record/on_record_mp4", mediaServerItem.getRecordAssistPort())); + }else { + param.put("hook.on_record_mp4",""); + } param.put("hook.timeoutSec","20"); param.put("general.streamNoneReaderDelayMS",mediaServerItem.getStreamNoneReaderDelayMS()==-1?"3600000":mediaServerItem.getStreamNoneReaderDelayMS() ); // 推流断开后可以在超时时间内重新连接上继续推流,这样播放器会接着播放。 @@ -544,7 +545,7 @@ public class MediaServerServiceImpl implements IMediaServerService { param.put("general.continue_push_ms", "3000" ); // 最多等待未初始化的Track时间,单位毫秒,超时之后会忽略未初始化的Track, 设置此选项优化那些音频错误的不规范流, // 等zlm支持给每个rtpServer设置关闭音频的时候可以不设置此选项 - param.put("general.wait_track_ready_ms", "3000" ); +// param.put("general.wait_track_ready_ms", "3000" ); if (mediaServerItem.isRtpEnable() && !ObjectUtils.isEmpty(mediaServerItem.getRtpPortRange())) { param.put("rtp_proxy.port_range", mediaServerItem.getRtpPortRange().replace(",", "-")); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java new file mode 100644 index 00000000..3a5d19a6 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -0,0 +1,232 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; +import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IPlatformService; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper; +import com.github.pagehelper.PageHelper; +import com.github.pagehelper.PageInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.sip.TimeoutEvent; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author lin + */ +@Service +public class PlatformServiceImpl implements IPlatformService { + + private final static String REGISTER_KEY_PREFIX = "platform_register_"; + private final static String KEEPALIVE_KEY_PREFIX = "platform_keepalive_"; + + private final static Logger logger = LoggerFactory.getLogger(PlatformServiceImpl.class); + + @Autowired + private ParentPlatformMapper platformMapper; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private SIPCommanderFroPlatform commanderForPlatform; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private SubscribeHolder subscribeHolder; + + + + @Override + public ParentPlatform queryPlatformByServerGBId(String platformGbId) { + return platformMapper.getParentPlatByServerGBId(platformGbId); + } + + @Override + public PageInfo queryParentPlatformList(int page, int count) { + PageHelper.startPage(page, count); + List all = platformMapper.getParentPlatformList(); + return new PageInfo<>(all); + } + + @Override + public boolean add(ParentPlatform parentPlatform) { + + if (parentPlatform.getCatalogGroup() == 0) { + // 每次发送目录的数量默认为1 + parentPlatform.setCatalogGroup(1); + } + if (parentPlatform.getAdministrativeDivision() == null) { + // 行政区划默认去编号的前6位 + parentPlatform.setAdministrativeDivision(parentPlatform.getServerGBId().substring(0,6)); + } + parentPlatform.setCatalogId(parentPlatform.getDeviceGBId()); + int result = platformMapper.addParentPlatform(parentPlatform); + // 添加缓存 + ParentPlatformCatch parentPlatformCatch = new ParentPlatformCatch(); + parentPlatformCatch.setParentPlatform(parentPlatform); + parentPlatformCatch.setId(parentPlatform.getServerGBId()); + parentPlatformCatch.setParentPlatform(parentPlatform); + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); + if (parentPlatform.isEnable()) { + // 保存时启用就发送注册 + // 注册成功时由程序直接调用了online方法 + commanderForPlatform.register(parentPlatform, eventResult -> { + logger.info("[国标级联] {},添加向上级注册失败,请确定上级平台可用时重新保存", parentPlatform.getServerGBId()); + }, null); + } + return result > 0; + } + + @Override + public void online(ParentPlatform parentPlatform) { + logger.info("[国标级联]:{}, 平台上线", parentPlatform.getServerGBId()); + platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), true); + ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); + if (parentPlatformCatch != null) { + parentPlatformCatch.getParentPlatform().setStatus(true); + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); + }else { + parentPlatformCatch = new ParentPlatformCatch(); + parentPlatformCatch.setParentPlatform(parentPlatform); + parentPlatformCatch.setId(parentPlatform.getServerGBId()); + parentPlatform.setStatus(true); + parentPlatformCatch.setParentPlatform(parentPlatform); + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); + } + + final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); + if (dynamicTask.contains(registerTaskKey)) { + dynamicTask.stop(registerTaskKey); + } + // 添加注册任务 + dynamicTask.startDelay(registerTaskKey, + // 注册失败(注册成功时由程序直接调用了online方法) + ()->commanderForPlatform.register(parentPlatform, eventResult -> offline(parentPlatform),null), + parentPlatform.getExpires()*1000); + + final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId(); + if (!dynamicTask.contains(keepaliveTaskKey)) { + // 添加心跳任务 + dynamicTask.startCron(keepaliveTaskKey, + ()-> commanderForPlatform.keepalive(parentPlatform, eventResult -> { + // 心跳失败 + if (eventResult.type == SipSubscribe.EventResultType.timeout) { + // 心跳超时 + ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); + // 此时是第三次心跳超时, 平台离线 + if (platformCatch.getKeepAliveReply() == 2) { + // 设置平台离线,并重新注册 + offline(parentPlatform); + logger.info("[国标级联] {},三次心跳超时后再次发起注册", parentPlatform.getServerGBId()); + commanderForPlatform.register(parentPlatform, eventResult1 -> { + logger.info("[国标级联] {},三次心跳超时后再次发起注册仍然失败,开始定时发起注册,间隔为1分钟", parentPlatform.getServerGBId()); + // 添加注册任务 + dynamicTask.startCron(registerTaskKey, + // 注册失败(注册成功时由程序直接调用了online方法) + ()->logger.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()), + 60*1000); + }, null); + } + + }else { + logger.warn("[国标级联]发送心跳收到错误,code: {}, msg: {}", eventResult.statusCode, eventResult.msg); + } + + }, eventResult -> { + // 心跳成功 + // 清空之前的心跳超时计数 + ParentPlatformCatch platformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); + if (platformCatch.getKeepAliveReply() > 0) { + platformCatch.setKeepAliveReply(0); + redisCatchStorage.updatePlatformCatchInfo(platformCatch); + } + }), + parentPlatform.getExpires()*1000); + } + } + + @Override + public void offline(ParentPlatform parentPlatform) { + logger.info("[平台离线]:{}", parentPlatform.getServerGBId()); + ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); + parentPlatformCatch.setKeepAliveReply(0); + parentPlatformCatch.setRegisterAliveReply(0); + ParentPlatform parentPlatformInCatch = parentPlatformCatch.getParentPlatform(); + parentPlatformInCatch.setStatus(false); + parentPlatformCatch.setParentPlatform(parentPlatformInCatch); + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); + platformMapper.updateParentPlatformStatus(parentPlatform.getServerGBId(), false); + + // 停止所有推流 + logger.info("[平台离线] {}, 停止所有推流", parentPlatform.getServerGBId()); + stopAllPush(parentPlatform.getServerGBId()); + // 清除注册定时 + logger.info("[平台离线] {}, 停止定时注册任务", parentPlatform.getServerGBId()); + final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); + if (dynamicTask.contains(registerTaskKey)) { + dynamicTask.stop(registerTaskKey); + } + // 清除心跳定时 + logger.info("[平台离线] {}, 停止定时发送心跳任务", parentPlatform.getServerGBId()); + final String keepaliveTaskKey = KEEPALIVE_KEY_PREFIX + parentPlatform.getServerGBId(); + if (dynamicTask.contains(keepaliveTaskKey)) { + // 添加心跳任务 + dynamicTask.stop(keepaliveTaskKey); + } + // 停止目录订阅回复 + logger.info("[平台离线] {}, 停止订阅回复", parentPlatform.getServerGBId()); + subscribeHolder.removeAllSubscribe(parentPlatform.getServerGBId()); + } + + private void stopAllPush(String platformId) { + List sendRtpItems = redisCatchStorage.querySendRTPServer(platformId); + if (sendRtpItems != null && sendRtpItems.size() > 0) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null); + MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); + Map param = new HashMap<>(3); + param.put("vhost", "__defaultVhost__"); + param.put("app", sendRtpItem.getApp()); + param.put("stream", sendRtpItem.getStreamId()); + zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); + } + + } + } + + @Override + public void login(ParentPlatform parentPlatform) { + final String registerTaskKey = REGISTER_KEY_PREFIX + parentPlatform.getServerGBId(); + commanderForPlatform.register(parentPlatform, eventResult1 -> { + logger.info("[国标级联] {},开始定时发起注册,间隔为1分钟", parentPlatform.getServerGBId()); + // 添加注册任务 + dynamicTask.startCron(registerTaskKey, + // 注册失败(注册成功时由程序直接调用了online方法) + ()->logger.info("[国标级联] {},平台离线后持续发起注册,失败", parentPlatform.getServerGBId()), + 60*1000); + }, null); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index e00eb557..dc62b07f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -575,7 +575,7 @@ public class PlayServiceImpl implements IPlayService { logger.warn("查询录像信息时发现节点已离线"); return null; } - if (mediaServerItem.getRecordAssistPort() != 0) { + if (mediaServerItem.getRecordAssistPort() > 0) { JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null); if (jsonObject != null && jsonObject.getInteger("code") == 0) { long duration = jsonObject.getLong("data"); @@ -691,7 +691,7 @@ public class PlayServiceImpl implements IPlayService { // for (SendRtpItem sendRtpItem : sendRtpItems) { // if (sendRtpItem.getMediaServerId().equals(mediaServerId)) { // if (mediaListMap.get(sendRtpItem.getStreamId()) == null) { -// ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); +// ParentPlatform platform = storager.queryPlatformByServerGBId(sendRtpItem.getPlatformId()); // sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId()); // } // } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index b3bb89cc..1f467e4e 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; +import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import java.util.List; import java.util.Map; @@ -61,17 +62,13 @@ public interface IRedisCatchStorage { void delPlatformCatchInfo(String platformGbId); - void updatePlatformKeepalive(ParentPlatform parentPlatform); - void delPlatformKeepalive(String platformGbId); - void updatePlatformRegister(ParentPlatform parentPlatform); - void delPlatformRegister(String platformGbId); - void updatePlatformRegisterInfo(String callId, String platformGbId); + void updatePlatformRegisterInfo(String callId, PlatformRegisterInfo platformRegisterInfo); - String queryPlatformRegisterInfo(String callId); + PlatformRegisterInfo queryPlatformRegisterInfo(String callId); void delPlatformRegisterInfo(String callId); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java index b2c1e1b9..b65cc681 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java @@ -170,15 +170,6 @@ public interface IVideoManagerStorage { */ boolean deleteParentPlatform(ParentPlatform parentPlatform); - - /** - * 分页获取上级平台 - * @param page - * @param count - * @return - */ - PageInfo queryParentPlatformList(int page, int count); - /** * 获取所有已启用的平台 * @return diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/dto/PlatformRegisterInfo.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/dto/PlatformRegisterInfo.java new file mode 100644 index 00000000..16f66363 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/dto/PlatformRegisterInfo.java @@ -0,0 +1,41 @@ +package com.genersoft.iot.vmp.storager.dao.dto; + +/** + * 平台发送注册/注销消息时缓存此消息 + * @author lin + */ +public class PlatformRegisterInfo { + + /** + * 平台Id + */ + private String platformId; + + /** + * 是否时注册,false为注销 + */ + private boolean register; + + public static PlatformRegisterInfo getInstance(String platformId, boolean register) { + PlatformRegisterInfo platformRegisterInfo = new PlatformRegisterInfo(); + platformRegisterInfo.setPlatformId(platformId); + platformRegisterInfo.setRegister(register); + return platformRegisterInfo; + } + + public String getPlatformId() { + return platformId; + } + + public void setPlatformId(String platformId) { + this.platformId = platformId; + } + + public boolean isRegister() { + return register; + } + + public void setRegister(boolean register) { + this.register = register; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 14a369cc..ecefe737 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -16,6 +16,7 @@ import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.ThirdPartyGB; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper; +import com.genersoft.iot.vmp.storager.dao.dto.PlatformRegisterInfo; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.slf4j.Logger; @@ -290,18 +291,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { RedisUtil.set(key, parentPlatformCatch); } - @Override - public void updatePlatformKeepalive(ParentPlatform parentPlatform) { - String key = VideoManagerConstants.PLATFORM_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + parentPlatform.getServerGBId(); - RedisUtil.set(key, "", Integer.parseInt(parentPlatform.getKeepTimeout())); - } - - @Override - public void updatePlatformRegister(ParentPlatform parentPlatform) { - String key = VideoManagerConstants.PLATFORM_REGISTER_PREFIX + userSetting.getServerId() + "_" + parentPlatform.getServerGBId(); - RedisUtil.set(key, "", Integer.parseInt(parentPlatform.getExpires())); - } - @Override public ParentPlatformCatch queryPlatformCatchInfo(String platformGbId) { return (ParentPlatformCatch)RedisUtil.get(VideoManagerConstants.PLATFORM_CATCH_PREFIX + userSetting.getServerId() + "_" + platformGbId); @@ -324,15 +313,15 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override - public void updatePlatformRegisterInfo(String callId, String platformGbId) { + public void updatePlatformRegisterInfo(String callId, PlatformRegisterInfo platformRegisterInfo) { String key = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId; - RedisUtil.set(key, platformGbId, 30); + RedisUtil.set(key, platformRegisterInfo, 30); } @Override - public String queryPlatformRegisterInfo(String callId) { - return (String)RedisUtil.get(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId); + public PlatformRegisterInfo queryPlatformRegisterInfo(String callId) { + return (PlatformRegisterInfo)RedisUtil.get(VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_" + callId); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 0ec9dbc9..e8e0e02f 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -457,13 +457,6 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { return result > 0; } - @Override - public PageInfo queryParentPlatformList(int page, int count) { - PageHelper.startPage(page, count); - List all = platformMapper.getParentPlatformList(); - return new PageInfo<>(all); - } - @Override public ParentPlatform queryParentPlatByServerGBId(String platformGbId) { return platformMapper.getParentPlatByServerGBId(platformGbId); diff --git a/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java index 0034c398..749dddd2 100644 --- a/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java @@ -5,6 +5,7 @@ import java.util.concurrent.TimeUnit; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.utils.SpringBeanFactory; +import gov.nist.javax.sip.stack.UDPMessageChannel; import org.springframework.data.redis.core.*; import org.springframework.util.CollectionUtils; diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java index 2a403301..bf421894 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java @@ -9,14 +9,13 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; -import com.genersoft.iot.vmp.gb28181.bean.TreeType; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.service.IPlatformChannelService; +import com.genersoft.iot.vmp.service.IPlatformService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; -import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce; import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.UpdateChannelParam; import com.github.pagehelper.PageInfo; @@ -26,10 +25,7 @@ import io.swagger.v3.oas.annotations.tags.Tag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; import org.springframework.util.ObjectUtils; -import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.*; import com.genersoft.iot.vmp.conf.SipConfig; @@ -70,6 +66,9 @@ public class PlatformController { @Autowired private DynamicTask dynamicTask; + @Autowired + private IPlatformService platformService; + /** * 获取国标服务的配置 * @@ -95,8 +94,7 @@ public class PlatformController { @Parameter(name = "id", description = "平台国标编号", required = true) @GetMapping("/info/{id}") public ParentPlatform getPlatform(@PathVariable String id) { - ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(id); - WVPResult wvpResult = new WVPResult<>(); + ParentPlatform parentPlatform = platformService.queryPlatformByServerGBId(id); if (parentPlatform != null) { return parentPlatform; } else { @@ -117,7 +115,7 @@ public class PlatformController { @Parameter(name = "count", description = "每页条数", required = true) public PageInfo platforms(@PathVariable int page, @PathVariable int count) { - PageInfo parentPlatformPageInfo = storager.queryParentPlatformList(page, count); + PageInfo parentPlatformPageInfo = platformService.queryParentPlatformList(page, count); if (parentPlatformPageInfo.getList().size() > 0) { for (ParentPlatform platform : parentPlatformPageInfo.getList()) { platform.setMobilePositionSubscribe(subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()) != null); @@ -136,7 +134,7 @@ public class PlatformController { @Operation(summary = "添加上级平台信息") @PostMapping("/add") @ResponseBody - public String addPlatform(@RequestBody ParentPlatform parentPlatform) { + public void addPlatform(@RequestBody ParentPlatform parentPlatform) { if (logger.isDebugEnabled()) { logger.debug("保存上级平台信息API调用"); @@ -158,33 +156,16 @@ public class PlatformController { throw new ControllerException(ErrorCode.ERROR400.getCode(), "error severPort"); } + ParentPlatform parentPlatformOld = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId()); if (parentPlatformOld != null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "平台 " + parentPlatform.getServerGBId() + " 已存在"); } parentPlatform.setCreateTime(DateUtil.getNow()); parentPlatform.setUpdateTime(DateUtil.getNow()); - boolean updateResult = storager.updateParentPlatform(parentPlatform); + boolean updateResult = platformService.add(parentPlatform); - if (updateResult) { - // 保存时启用就发送注册 - if (parentPlatform.isEnable()) { - if (parentPlatformOld != null && parentPlatformOld.isStatus()) { - commanderForPlatform.unregister(parentPlatformOld, null, eventResult -> { - // 只要保存就发送注册 - commanderForPlatform.register(parentPlatform, null, null); - }); - } else { - // 只要保存就发送注册 - commanderForPlatform.register(parentPlatform, null, null); - } - - } else if (parentPlatformOld != null && parentPlatformOld.isEnable()) { - // 关闭启用时注销 - commanderForPlatform.unregister(parentPlatform, null, null); - } - return null; - } else { + if (!updateResult) { throw new ControllerException(ErrorCode.ERROR100.getCode(),"写入数据库失败"); } } diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index 6679c0d8..222c0765 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -179,8 +179,6 @@ user-settings: platform-play-timeout: 60000 # 是否开启接口鉴权 interface-authentication: true - # 自动配置redis 可以过期事件 - redis-config: true # 接口鉴权例外的接口, 即不进行接口鉴权的接口,尽量详细书写,尽量不用/**,至少两级目录 interface-authentication-excludes: - /api/v1/** diff --git a/web_src/src/components/dialog/MediaServerEdit.vue b/web_src/src/components/dialog/MediaServerEdit.vue index 7206bace..1754461d 100644 --- a/web_src/src/components/dialog/MediaServerEdit.vue +++ b/web_src/src/components/dialog/MediaServerEdit.vue @@ -357,7 +357,7 @@ export default { var result = false; var that = this; await that.$axios({ - method: 'post', + method: 'get', url:`/api/platform/exit/${deviceGbId}` }).then(function (res) { result = res.data; diff --git a/web_src/src/components/dialog/StreamProxyEdit.vue b/web_src/src/components/dialog/StreamProxyEdit.vue index ac209db6..936bc53d 100644 --- a/web_src/src/components/dialog/StreamProxyEdit.vue +++ b/web_src/src/components/dialog/StreamProxyEdit.vue @@ -263,7 +263,7 @@ export default { var result = false; var that = this; await that.$axios({ - method: 'post', + method: 'get', url:`/api/platform/exit/${deviceGbId}` }).then(function (res) { result = res.data; diff --git a/web_src/src/components/dialog/platformEdit.vue b/web_src/src/components/dialog/platformEdit.vue index 633160b3..ef28b4c7 100644 --- a/web_src/src/components/dialog/platformEdit.vue +++ b/web_src/src/components/dialog/platformEdit.vue @@ -327,7 +327,7 @@ export default { var result = false; var that = this; await that.$axios({ - method: 'post', + method: 'get', url:`/api/platform/exit/${deviceGbId}`}) .then(function (res) { if (res.data.code === 0) { diff --git a/web_src/src/components/dialog/pushStreamEdit.vue b/web_src/src/components/dialog/pushStreamEdit.vue index 8c827a3d..de4e7bcc 100644 --- a/web_src/src/components/dialog/pushStreamEdit.vue +++ b/web_src/src/components/dialog/pushStreamEdit.vue @@ -158,7 +158,7 @@ export default { var result = false; var that = this; await that.$axios({ - method:"post", + method:"get", url:`/api/platform/exit/${deviceGbId}` }).then(function (res) { result = res.data;