From b2c953fc76de2a9686ee81d5311bd9b06e453912 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Mon, 7 Mar 2022 01:17:45 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96ssrc=E9=87=8A=E6=94=BE?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BC=98=E5=8C=96=E7=BA=A7=E8=81=94?= =?UTF-8?q?=E7=82=B9=E6=92=AD=E9=80=9F=E5=BA=A6=EF=BC=8C=E5=8E=BB=E9=99=A4?= =?UTF-8?q?=E7=AD=89=E5=BE=85=E6=B5=81=E6=A0=BC=E5=BC=8F=E7=9A=84=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E9=A1=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/DynamicTask.java | 28 +++- .../iot/vmp/conf/SipPlatformRunner.java | 7 +- .../com/genersoft/iot/vmp/conf/UserSetup.java | 10 -- .../gb28181/auth/RegisterLogicHandler.java | 16 +- .../iot/vmp/gb28181/bean/SendRtpItem.java | 20 +++ .../KeepaliveTimeoutListenerForPlatform.java | 23 ++- .../event/offline/OfflineEventListener.java | 26 ++++ .../PlatformNotRegisterEventLister.java | 4 +- .../SubscribeListenerForPlatform.java | 4 +- .../session/VideoStreamSessionManager.java | 24 +++ .../vmp/gb28181/task/GPSSubscribeTask.java | 2 - .../cmd/ISIPCommanderForPlatform.java | 7 + .../transmit/cmd/impl/SIPCommander.java | 22 ++- .../cmd/impl/SIPCommanderFroPlatform.java | 80 +++++++++- .../request/impl/AckRequestProcessor.java | 66 ++++---- .../request/impl/ByeRequestProcessor.java | 54 ++++--- .../request/impl/InviteRequestProcessor.java | 29 +++- .../impl/RegisterRequestProcessor.java | 5 +- .../impl/SubscribeRequestProcessor.java | 8 +- .../impl/RegisterResponseProcessor.java | 21 ++- .../vmp/media/zlm/ZLMHttpHookListener.java | 18 +-- .../vmp/media/zlm/ZLMRTPServerFactory.java | 4 +- .../zlm/event/ZLMStatusEventListener.java | 6 +- .../iot/vmp/service/IMediaServerService.java | 2 +- .../iot/vmp/service/IPlayService.java | 4 +- .../vmp/service/impl/DeviceServiceImpl.java | 4 +- .../service/impl/MediaServerServiceImpl.java | 5 +- .../iot/vmp/service/impl/PlayServiceImpl.java | 146 +++++++++++++----- .../iot/vmp/storager/IRedisCatchStorage.java | 4 +- .../vmp/storager/dao/DeviceChannelMapper.java | 37 +++++ .../storager/impl/RedisCatchStorageImpl.java | 42 +++-- .../impl/VideoManagerStoragerImpl.java | 3 +- .../vmanager/gb28181/device/DeviceQuery.java | 7 +- .../gb28181/platform/PlatformController.java | 18 ++- .../vmanager/gb28181/play/PlayController.java | 2 +- .../vmp/web/gb28181/ApiStreamController.java | 2 +- src/main/resources/all-application.yml | 2 - 37 files changed, 555 insertions(+), 207 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java index c9572ae5..80e39f5f 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java @@ -5,6 +5,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.stereotype.Component; +import java.util.Date; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; @@ -25,15 +26,38 @@ public class DynamicTask { return new ThreadPoolTaskScheduler(); } + /** + * 循环执行的任务 + * @param key 任务ID + * @param task 任务 + * @param cycleForCatalog 间隔 + * @return + */ public String startCron(String key, Runnable task, int cycleForCatalog) { - stopCron(key); + stop(key); // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L); futureMap.put(key, future); return "startCron"; } - public void stopCron(String key) { + /** + * 延时任务 + * @param key 任务ID + * @param task 任务 + * @param delay 延时 /秒 + * @return + */ + public String startDelay(String key, Runnable task, int delay) { + stop(key); + Date starTime = new Date(System.currentTimeMillis() + delay * 1000); + // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔 + ScheduledFuture future = threadPoolTaskScheduler.schedule(task, starTime); + futureMap.put(key, future); + return "startCron"; + } + + public void stop(String key) { if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) { futureMap.get(key).cancel(true); } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java index b69bf68e..4ebaf0bf 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/SipPlatformRunner.java @@ -59,8 +59,11 @@ public class SipPlatformRunner implements CommandLineRunner { redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); // 取消订阅 - sipCommanderForPlatform.unregister(parentPlatform, null, null); - Thread.sleep(500); + sipCommanderForPlatform.unregister(parentPlatform, null, (eventResult)->{ + ParentPlatform platform = storager.queryParentPlatByServerGBId(parentPlatform.getServerGBId()); + sipCommanderForPlatform.register(platform, null, null); + }); + // 发送平台未注册消息 publisher.platformNotRegisterEventPublish(parentPlatform.getServerGBId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java index 4decd2bb..d1d0e20b 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java @@ -19,8 +19,6 @@ public class UserSetup { private Long playTimeout = 18000L; - private Boolean waitTrack = Boolean.FALSE; - private Boolean interfaceAuthentication = Boolean.TRUE; private Boolean recordPushLive = Boolean.TRUE; @@ -57,10 +55,6 @@ public class UserSetup { return playTimeout; } - public Boolean isWaitTrack() { - return waitTrack; - } - public Boolean isInterfaceAuthentication() { return interfaceAuthentication; } @@ -89,10 +83,6 @@ public class UserSetup { this.playTimeout = playTimeout; } - public void setWaitTrack(Boolean waitTrack) { - this.waitTrack = waitTrack; - } - public void setInterfaceAuthentication(boolean interfaceAuthentication) { this.interfaceAuthentication = interfaceAuthentication; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java index 62d4bec0..c6fba3db 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/auth/RegisterLogicHandler.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.auth; +import com.genersoft.iot.vmp.storager.impl.VideoManagerStoragerImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -20,13 +21,24 @@ public class RegisterLogicHandler { @Autowired private SIPCommander cmder; + + @Autowired + private VideoManagerStoragerImpl storager; public void onRegister(Device device) { // 只有第一次注册时调用查询设备信息,如需更新调用更新API接口 + // TODO 此处错误无法获取到通道 + Device device1 = storager.queryVideoDevice(device.getDeviceId()); if (device.isFirsRegister()) { logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId()); - cmder.deviceInfoQuery(device); - cmder.catalogQuery(device, null); + try { + Thread.sleep(100); + cmder.deviceInfoQuery(device); + Thread.sleep(100); + cmder.catalogQuery(device, null); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index 3e5d222a..3914fa1f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -81,6 +81,10 @@ public class SendRtpItem { */ private boolean isPlay; + private byte[] transaction; + + private byte[] dialog; + public String getIp() { return ip; } @@ -200,4 +204,20 @@ public class SendRtpItem { public void setPlay(boolean play) { isPlay = play; } + + public byte[] getTransaction() { + return transaction; + } + + public void setTransaction(byte[] transaction) { + this.transaction = transaction; + } + + public byte[] getDialog() { + return dialog; + } + + public void setDialog(byte[] dialog) { + this.dialog = dialog; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java index 9ba0c055..3b611b5b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java @@ -2,7 +2,10 @@ package com.genersoft.iot.vmp.gb28181.event.offline; import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; import com.genersoft.iot.vmp.conf.UserSetup; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -39,6 +42,9 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent @Autowired private SipSubscribe sipSubscribe; + @Autowired + private IVideoManagerStorager storager; + public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetup userSetup) { super(listenerContainer, userSetup); } @@ -61,15 +67,22 @@ public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEvent String REGISTER_INFO_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetup.getServerId() + "_"; if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { String platformGBId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); - - publisher.platformKeepaliveExpireEventPublish(platformGBId); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGBId); + if (platform != null) { + publisher.platformKeepaliveExpireEventPublish(platformGBId); + } }else if (expiredKey.startsWith(PLATFORM_REGISTER_PREFIX)) { String platformGBId = expiredKey.substring(PLATFORM_REGISTER_PREFIX.length(),expiredKey.length()); - - publisher.platformRegisterCycleEventPublish(platformGBId); + ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGBId); + if (platform != null) { + publisher.platformRegisterCycleEventPublish(platformGBId); + } }else if (expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){ String deviceId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length()); - publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX); + Device device = storager.queryVideoDevice(deviceId); + if (device != null) { + publisher.outlineEventPublish(deviceId, KEEPLIVEKEY_PREFIX); + } }else if (expiredKey.startsWith(REGISTER_INFO_PREFIX)) { String callid = expiredKey.substring(REGISTER_INFO_PREFIX.length()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java index aa87728d..97e480cb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/OfflineEventListener.java @@ -2,8 +2,13 @@ package com.genersoft.iot.vmp.gb28181.event.offline; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -32,6 +37,9 @@ public class OfflineEventListener implements ApplicationListener { @Autowired private IVideoManagerStorager storager; + + @Autowired + private VideoStreamSessionManager streamSession; @Autowired private RedisUtil redis; @@ -42,6 +50,14 @@ public class OfflineEventListener implements ApplicationListener { @Autowired private EventPublisher eventPublisher; + + @Autowired + private IMediaServerService mediaServerService; + + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + @Override public void onApplicationEvent(OfflineEvent event) { @@ -73,5 +89,15 @@ public class OfflineEventListener implements ApplicationListener { // TODO 离线取消订阅 + // 离线释放所有ssrc + List ssrcTransactions = streamSession.getSsrcTransactionForAll(event.getDeviceId(), null, null, null); + if (ssrcTransactions.size() > 0) { + for (SsrcTransaction ssrcTransaction : ssrcTransactions) { + mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); + mediaServerService.closeRTPServer(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + streamSession.remove(event.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java index 2ab2b239..1b8e7aed 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/platformNotRegister/PlatformNotRegisterEventLister.java @@ -75,7 +75,7 @@ public class PlatformNotRegisterEventLister implements ApplicationListener param = new HashMap<>(); param.put("vhost", "__defaultVhost__"); @@ -84,9 +84,7 @@ public class PlatformNotRegisterEventLister implements ApplicationListener{ timer.cancel(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java index bac8e3dd..3b2bd230 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java @@ -4,8 +4,6 @@ import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener; import com.genersoft.iot.vmp.conf.UserSetup; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import org.checkerframework.checker.units.qual.A; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -46,7 +44,7 @@ public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessage String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_"; if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) { // 取消定时任务 - dynamicTask.stopCron(expiredKey); + dynamicTask.stop(expiredKey); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java index 3e9f28aa..ba8f24c1 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java @@ -86,6 +86,15 @@ public class VideoStreamSessionManager { return dialog; } + public SIPDialog getDialogByCallId(String deviceId, String channelId, String callID){ + SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, callID, null); + if (ssrcTransaction == null) return null; + byte[] dialogByteArray = ssrcTransaction.getDialog(); + if (dialogByteArray == null) return null; + SIPDialog dialog = (SIPDialog)SerializeUtils.deSerialize(dialogByteArray); + return dialog; + } + public SsrcTransaction getSsrcTransaction(String deviceId, String channelId, String callId, String stream){ if (StringUtils.isEmpty(callId)) callId ="*"; if (StringUtils.isEmpty(stream)) stream ="*"; @@ -95,6 +104,21 @@ public class VideoStreamSessionManager { return (SsrcTransaction)redisUtil.get((String) scanResult.get(0)); } + public List getSsrcTransactionForAll(String deviceId, String channelId, String callId, String stream){ + if (StringUtils.isEmpty(deviceId)) deviceId ="*"; + if (StringUtils.isEmpty(channelId)) channelId ="*"; + if (StringUtils.isEmpty(callId)) callId ="*"; + if (StringUtils.isEmpty(stream)) stream ="*"; + String key = VideoManagerConstants.MEDIA_TRANSACTION_USED_PREFIX + userSetup.getServerId() + "_" + deviceId + "_" + channelId + "_" + callId+ "_" + stream; + List scanResult = redisUtil.scan(key); + if (scanResult.size() == 0) return null; + List result = new ArrayList<>(); + for (Object keyObj : scanResult) { + result.add((SsrcTransaction)redisUtil.get((String) keyObj)); + } + return result; + } + public String getMediaServerId(String deviceId, String channelId, String stream){ SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream); if (ssrcTransaction == null) return null; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java index f0d90336..25dc75b4 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/GPSSubscribeTask.java @@ -63,7 +63,5 @@ public class GPSSubscribeTask implements Runnable{ } } } - - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index cd2d6274..1f58a151 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -96,4 +96,11 @@ public interface ISIPCommanderForPlatform { * @param recordInfo 录像信息 */ boolean recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo); + + /** + * 向发起点播的上级回复bye + * @param platform 平台信息 + * @param callId callId + */ + void streamByeCmd(ParentPlatform platform, String callId); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 437c69d9..a7b67ad6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -85,6 +86,9 @@ public class SIPCommander implements ISIPCommander { @Autowired private IMediaServerService mediaServerService; + @Autowired + private DynamicTask dynamicTask; + /** * 云台方向放控制,使用配置文件中的默认镜头移动速度 @@ -330,7 +334,8 @@ public class SIPCommander implements ISIPCommander { * @param errorEvent sip错误订阅 */ @Override - public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) { + public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId, + ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent) { String streamId = ssrcInfo.getStream(); try { if (device == null) return; @@ -342,15 +347,13 @@ public class SIPCommander implements ISIPCommander { subscribeKey.put("app", "rtp"); subscribeKey.put("stream", streamId); subscribeKey.put("regist", true); + subscribeKey.put("schema", "rtmp"); subscribeKey.put("mediaServerId", mediaServerItem.getId()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; if (event != null) { event.response(mediaServerItemInUse, json); } - -// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); }); // StringBuffer content = new StringBuffer(200); @@ -419,7 +422,7 @@ public class SIPCommander implements ISIPCommander { transmitRequest(device, request, (e -> { streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); - mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); errorEvent.response(e); }), e ->{ // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 @@ -458,8 +461,6 @@ public class SIPCommander implements ISIPCommander { logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - System.out.println(344444); - if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; if (event != null) { event.response(mediaServerItemInUse, json); } @@ -565,7 +566,6 @@ public class SIPCommander implements ISIPCommander { logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString()); subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ - if (userSetup.isWaitTrack() && json.getJSONArray("tracks") == null) return; event.response(mediaServerItemInUse, json); subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); }); @@ -662,6 +662,7 @@ public class SIPCommander implements ISIPCommander { @Override public void streamByeCmd(String deviceId, String channelId, String stream, SipSubscribe.Event okEvent) { try { + SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream); ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream); if (transaction == null) { logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId); @@ -715,10 +716,9 @@ public class SIPCommander implements ISIPCommander { dialog.sendRequest(clientTransaction); - SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, callIdHeader.getCallId(), null); if (ssrcTransaction != null) { MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); - mediaServerService.releaseSsrc(mediaServerItem, ssrcTransaction.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc()); mediaServerService.closeRTPServer(deviceId, channelId, ssrcTransaction.getStream()); streamSession.remove(deviceId, channelId, ssrcTransaction.getStream()); } @@ -1169,8 +1169,6 @@ public class SIPCommander implements ISIPCommander { */ @Override public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) { - // 清空通道 -// storager.cleanChannelsForDevice(device.getDeviceId()); try { StringBuffer catalogXml = new StringBuffer(200); catalogXml.append("\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java index b57a5e44..b55b0448 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java @@ -5,8 +5,16 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderPlarformProvider; import com.genersoft.iot.vmp.gb28181.utils.DateUtil; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.SerializeUtils; +import gov.nist.javax.sip.SipProviderImpl; +import gov.nist.javax.sip.SipStackImpl; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -18,10 +26,14 @@ import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.sip.*; +import javax.sip.address.SipURI; import javax.sip.header.CallIdHeader; +import javax.sip.header.ViaHeader; import javax.sip.header.WWWAuthenticateHeader; import javax.sip.message.Request; +import java.lang.reflect.Field; import java.text.ParseException; +import java.util.HashSet; import java.util.List; import java.util.UUID; @@ -37,18 +49,24 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private IMediaServerService mediaServerService; + @Autowired private SipSubscribe sipSubscribe; + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + @Lazy @Autowired @Qualifier(value="tcpSipProvider") - private SipProvider tcpSipProvider; + private SipProviderImpl tcpSipProvider; @Lazy @Autowired @Qualifier(value="udpSipProvider") - private SipProvider udpSipProvider; + private SipProviderImpl udpSipProvider; @Override public boolean register(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { @@ -57,13 +75,12 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { @Override public boolean unregister(ParentPlatform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) { - parentPlatform.setExpires("0"); ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(parentPlatform.getServerGBId()); if (parentPlatformCatch != null) { parentPlatformCatch.setParentPlatform(parentPlatform); redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); } - + parentPlatform.setExpires("0"); return register(parentPlatform, null, null, errorEvent, okEvent, false); } @@ -543,4 +560,59 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { } return true; } + + @Override + public void streamByeCmd(ParentPlatform platform, String callId) { + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platform.getServerGBId(), null, null, callId); + if (sendRtpItem != null) { + String mediaServerId = sendRtpItem.getMediaServerId(); + MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem != null) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + zlmrtpServerFactory.closeRTPServer(mediaServerItem, sendRtpItem.getStreamId()); + } + byte[] dialogByteArray = sendRtpItem.getDialog(); + if (dialogByteArray != null) { + SIPDialog dialog = (SIPDialog) SerializeUtils.deSerialize(dialogByteArray); + SipStack sipStack = udpSipProvider.getSipStack(); + SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog); + if (dialog != sipDialog) { + dialog = sipDialog; + } else { + try { + dialog.setSipProvider(udpSipProvider); + Field sipStackField = SIPDialog.class.getDeclaredField("sipStack"); + sipStackField.setAccessible(true); + sipStackField.set(dialog, sipStack); + Field eventListenersField = SIPDialog.class.getDeclaredField("eventListeners"); + eventListenersField.setAccessible(true); + eventListenersField.set(dialog, new HashSet<>()); + + byte[] transactionByteArray = sendRtpItem.getTransaction(); + ClientTransaction clientTransaction = (ClientTransaction) SerializeUtils.deSerialize(transactionByteArray); + Request byeRequest = dialog.createRequest(Request.BYE); + SipURI byeURI = (SipURI) byeRequest.getRequestURI(); + SIPRequest request = (SIPRequest) clientTransaction.getRequest(); + byeURI.setHost(request.getRemoteAddress().getHostName()); + byeURI.setPort(request.getRemotePort()); + if ("TCP".equals(platform.getTransport())) { + clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest); + } else if ("UDP".equals(platform.getTransport())) { + clientTransaction = udpSipProvider.getNewClientTransaction(byeRequest); + } + dialog.sendRequest(clientTransaction); + } catch (SipException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } + + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index d5bc99b7..8556730c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; @@ -22,6 +23,7 @@ import javax.sip.Dialog; import javax.sip.DialogState; import javax.sip.RequestEvent; import javax.sip.address.SipURI; +import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; @@ -60,6 +62,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private ZLMHttpHookSubscribe subscribe; + @Autowired + private DynamicTask dynamicTask; + /** * 处理 ACK请求 @@ -68,13 +73,16 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In */ @Override public void process(RequestEvent evt) { - logger.info("ACK请求: {}", ((System.currentTimeMillis()))); Dialog dialog = evt.getDialog(); + CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); if (dialog == null) return; if (dialog.getState()== DialogState.CONFIRMED) { String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); + logger.info("ACK请求: platformGbId->{}", platformGbId); + // 取消设置的超时任务 + dynamicTask.stop(callIdHeader.getCallId()); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); String is_Udp = sendRtpItem.isTcp() ? "0" : "1"; String deviceId = sendRtpItem.getDeviceId(); StreamInfo streamInfo = null; @@ -83,15 +91,12 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In }else { streamInfo = redisCatchStorage.queryPlaybackByDevice(deviceId, channelId); } - System.out.println(JSON.toJSON(streamInfo)); if (streamInfo == null) { streamInfo = new StreamInfo(); streamInfo.setApp(sendRtpItem.getApp()); streamInfo.setStream(sendRtpItem.getStreamId()); } redisCatchStorage.updateSendRTPSever(sendRtpItem); - logger.info(platformGbId); - logger.info(channelId); Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",streamInfo.getApp()); @@ -100,42 +105,23 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In param.put("dst_url",sendRtpItem.getIp()); param.put("dst_port", sendRtpItem.getPort()); param.put("is_udp", is_Udp); - // 设备推流查询,成功后才能转推 MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); - zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) { -// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]", -// streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// } else { -// // 对hook进行订阅 -// logger.info("等待设备推流[{}/{}].......", -// streamInfo.getApp(), streamInfo.getStreamId()); -// Timer timer = new Timer(); -// timer.schedule(new TimerTask() { -// @Override -// public void run() { -// logger.info("设备推流[{}/{}]超时,终止向上级推流", -// finalStreamInfo.getApp() , finalStreamInfo.getStreamId()); -// -// } -// }, 30*1000L); -// // 添加订阅 -// JSONObject subscribeKey = new JSONObject(); -// subscribeKey.put("app", "rtp"); -// subscribeKey.put("stream", streamInfo.getStreamId()); -// subscribeKey.put("mediaServerId", streamInfo.getMediaServerId()); -// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, subscribeKey, -// (MediaServerItem mediaServerItemInUse, JSONObject json) -> { -// logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]", -// finalStreamInfo.getApp(), finalStreamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort()); -// timer.cancel(); -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); -// }); -// } - - + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + if (jsonObject.getInteger("code") != 0) { + logger.info("监听流以等待流上线{}/{}", streamInfo.getApp(), streamInfo.getStream()); + // 监听流上线 + // 添加订阅 + JSONObject subscribeKey = new JSONObject(); + subscribeKey.put("app", "rtp"); + subscribeKey.put("stream", streamInfo.getStream()); + subscribeKey.put("regist", true); + subscribeKey.put("schema", "rtmp"); + subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + }); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index deda7832..2811c4f5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -4,6 +4,8 @@ import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; +import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor; @@ -13,6 +15,8 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.utils.SerializeUtils; +import gov.nist.javax.sip.stack.SIPDialog; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; @@ -21,6 +25,7 @@ import org.springframework.stereotype.Component; import javax.sip.*; import javax.sip.address.SipURI; +import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.header.HeaderAddress; import javax.sip.header.ToHeader; @@ -56,6 +61,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private SIPProcessorObserver sipProcessorObserver; + @Autowired + private VideoStreamSessionManager streamSession; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -71,11 +79,12 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In try { responseAck(evt, Response.OK); Dialog dialog = evt.getDialog(); + CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); if (dialog == null) return; if (dialog.getState().equals(DialogState.TERMINATED)) { String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser(); String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser(); - SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId); + SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId, null, callIdHeader.getCallId()); logger.info("收到bye, [{}/{}]", platformGbId, channelId); if (sendRtpItem != null){ String streamId = sendRtpItem.getStreamId(); @@ -87,35 +96,44 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In logger.info("停止向上级推流:" + streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); - redisCatchStorage.deleteSendRTPServer(platformGbId, channelId); + redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); - if (totalReaderCount == 0) { + if (totalReaderCount <= 0) { logger.info(streamId + "无其它观看者,通知设备停止推流"); cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId); - }else if (totalReaderCount == -1){ - logger.warn(streamId + " 查找其它观看者失败"); } } // 可能是设备主动停止 Device device = storager.queryVideoDeviceByChannelId(platformGbId); if (device != null) { + storager.stopPlay(device.getDeviceId(), channelId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); - if (sendRtpItem != null) { - if (sendRtpItem.isPlay()) { - if (streamInfo != null) { - redisCatchStorage.stopPlay(streamInfo); - } - }else { - if (streamInfo != null) { - redisCatchStorage.stopPlayback(streamInfo); - } - } - - storager.stopPlay(device.getDeviceId(), channelId); + if (streamInfo != null) { + redisCatchStorage.stopPlay(streamInfo); mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream()); } + SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); + if (ssrcTransactionForPlay != null){ + SIPDialog dialogForPlay = (SIPDialog) SerializeUtils.deSerialize(ssrcTransactionForPlay.getDialog()); + if (dialogForPlay.getCallId().equals(callIdHeader.getCallId())){ + // 释放ssrc + MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId()); + if (mediaServerItem != null) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlay.getSsrc()); + } + streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlay.getStream()); + } + } + SsrcTransaction ssrcTransactionForPlayBack = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, callIdHeader.getCallId(), null); + if (ssrcTransactionForPlayBack != null) { + // 释放ssrc + MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlayBack.getMediaServerId()); + if (mediaServerItem != null) { + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransactionForPlayBack.getSsrc()); + } + streamSession.remove(device.getDeviceId(), channelId, ssrcTransactionForPlayBack.getStream()); + } } - } } catch (SipException e) { e.printStackTrace(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f861da41..f82f7810 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; @@ -21,6 +22,7 @@ import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorager; +import com.genersoft.iot.vmp.utils.SerializeUtils; import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult; import gov.nist.javax.sdp.TimeDescriptionImpl; import gov.nist.javax.sdp.fields.TimeField; @@ -68,6 +70,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private DynamicTask dynamicTask; + @Autowired private SIPCommander cmder; @@ -257,11 +262,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } sendRtpItem.setCallId(callIdHeader.getCallId()); sendRtpItem.setPlay("Play".equals(sessionName)); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); - Device finalDevice = device; - MediaServerItem finalMediaServerItem = mediaServerItem; Long finalStartTime = startTime; Long finalStopTime = stopTime; ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ @@ -289,7 +296,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements content.append("f=\r\n"); try { + // 超时未收到Ack应该回复bye,当前等待时间为10秒 + dynamicTask.startDelay(callIdHeader.getCallId(), ()->{ + logger.info("Ack 等待超时"); + mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc); + // 回复bye + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); + }, 60); responseSdpAck(evt, content.toString(), platform); + } catch (SipException e) { e.printStackTrace(); } catch (InvalidArgumentException e) { @@ -320,6 +335,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (result.getEvent() != null) { errorEvent.response(result.getEvent()); } + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); try { responseAck(evt, Response.REQUEST_TIMEOUT); } catch (SipException e) { @@ -343,7 +359,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements sendRtpItem.setStreamId(String.format("%s_%s", device.getDeviceId(), channelId)); } sendRtpItem.setPlay(false); - playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent,errorEvent); + playService.play(mediaServerItem,device.getDeviceId(), channelId, hookEvent, errorEvent, ()->{ + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); + }); }else { sendRtpItem.setStreamId(streamInfo.getStream()); hookEvent.response(mediaServerItem, null); @@ -365,6 +383,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 写入redis, 超时时回复 sendRtpItem.setStatus(1); + sendRtpItem.setCallId(callIdHeader.getCallId()); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); redisCatchStorage.updateSendRTPSever(sendRtpItem); StringBuffer content = new StringBuffer(200); content.append("v=0\r\n"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index d9bfb560..737f7528 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -158,6 +158,10 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen device.setCharset("gb2312"); device.setDeviceId(deviceId); device.setFirsRegister(true); + }else { + if (device.getOnline() == 0) { + device.setFirsRegister(true); + } } device.setIp(received); device.setPort(rPort); @@ -187,7 +191,6 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete(); // 注册成功 // 保存到redis - // 下发catelog查询目录 if (registerFlag == 1 ) { logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress); publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java index a2c6cbfb..a765b3ab 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java @@ -27,9 +27,7 @@ import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.ServerTransaction; import javax.sip.SipException; -import javax.sip.header.CallIdHeader; import javax.sip.header.ExpiresHeader; -import javax.sip.header.Header; import javax.sip.header.ToHeader; import javax.sip.message.Request; import javax.sip.message.Response; @@ -139,19 +137,17 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme if (subscribeInfo.getExpires() > 0) { if (redisCatchStorage.getSubscribe(key) != null) { - dynamicTask.stopCron(key); + dynamicTask.stop(key); } String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔 dynamicTask.startCron(key, new GPSSubscribeTask(redisCatchStorage, sipCommanderForPlatform, storager, platformId, sn, key), Integer.parseInt(interval)); redisCatchStorage.updateSubscribe(key, subscribeInfo); }else if (subscribeInfo.getExpires() == 0) { - dynamicTask.stopCron(key); + dynamicTask.stop(key); redisCatchStorage.delSubscribe(key); } - - try { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId); Response response = responseXmlAck(evt, resultXml.toString(), parentPlatform); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index b6040aad..ffac1d00 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -85,19 +85,18 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { redisCatchStorage.delPlatformRegisterInfo(callId); parentPlatform.setStatus("注册".equals(action)); // 取回Expires设置,避免注销过程中被置为0 - ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); - String expires = parentPlatformTmp.getExpires(); - parentPlatform.setExpires(expires); - parentPlatform.setId(parentPlatformTmp.getId()); + if (!parentPlatformCatch.getParentPlatform().getExpires().equals("0")) { + ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId); + String expires = parentPlatformTmp.getExpires(); + parentPlatform.setExpires(expires); + parentPlatform.setId(parentPlatformTmp.getId()); + redisCatchStorage.updatePlatformRegister(parentPlatform); + redisCatchStorage.updatePlatformKeepalive(parentPlatform); + parentPlatformCatch.setParentPlatform(parentPlatform); + redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); + } storager.updateParentPlatformStatus(platformGBId, "注册".equals(action)); - redisCatchStorage.updatePlatformRegister(parentPlatform); - - redisCatchStorage.updatePlatformKeepalive(parentPlatform); - - parentPlatformCatch.setParentPlatform(parentPlatform); - - redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch); } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 14705bc0..62723ac5 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -489,7 +489,7 @@ public class ZLMHttpHookListener { } String mediaServerId = json.getString("mediaServerId"); MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId); - if (userSetup.isAutoApplyPlay() && mediaInfo != null) { + if (userSetup.isAutoApplyPlay() && mediaInfo != null && mediaInfo.isRtpEnable()) { String app = json.getString("app"); String streamId = json.getString("stream"); if ("rtp".equals(app)) { @@ -499,28 +499,16 @@ public class ZLMHttpHookListener { String channelId = s[1]; Device device = redisCatchStorage.getDevice(deviceId); if (device != null) { - UUID uuid = UUID.randomUUID(); - SSRCInfo ssrcInfo; - String streamId2 = null; - if (mediaInfo.isRtpEnable()) { - streamId2 = String.format("%s_%s", device.getDeviceId(), channelId); - } - ssrcInfo = mediaServerService.openRTPServer(mediaInfo, streamId2); - cmder.playStreamCmd(mediaInfo, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { - logger.info("收到订阅消息: " + response.toJSONString()); - playService.onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid.toString()); - }, null); + playService.play(mediaInfo,deviceId, channelId, null, null, null); } - } } - } JSONObject ret = new JSONObject(); ret.put("code", 0); ret.put("msg", "success"); - return new ResponseEntity(ret.toString(),HttpStatus.OK); + return new ResponseEntity<>(ret.toString(),HttpStatus.OK); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 76bab9c0..a0b7e75b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -205,7 +205,7 @@ public class ZLMRTPServerFactory { /** * 调用zlm RESTful API —— startSendRtp */ - public Boolean startSendRtpStream(MediaServerItem mediaServerItem, Mapparam) { + public JSONObject startSendRtpStream(MediaServerItem mediaServerItem, Mapparam) { Boolean result = false; JSONObject jsonObject = zlmresTfulUtils.startSendRtp(mediaServerItem, param); if (jsonObject == null) { @@ -216,7 +216,7 @@ public class ZLMRTPServerFactory { } else { logger.error("RTP推流失败: " + jsonObject.getString("msg")); } - return result; + return jsonObject; } /** diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java index 5b0741b6..ca6fdfe4 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMStatusEventListener.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.media.zlm.event; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import org.slf4j.Logger; @@ -34,6 +35,9 @@ public class ZLMStatusEventListener { @Autowired private IMediaServerService mediaServerService; + @Autowired + private IPlayService playService; + private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Async @@ -55,6 +59,6 @@ public class ZLMStatusEventListener { mediaServerService.zlmServerOffline(event.getMediaServerId()); streamProxyService.zlmServerOffline(event.getMediaServerId()); streamPushService.zlmServerOffline(event.getMediaServerId()); - // TODO 处理对国标的影响 + playService.zlmServerOffline(event.getMediaServerId()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index 8c12c787..00ec0dd5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -58,7 +58,7 @@ public interface IMediaServerService { void removeCount(String mediaServerId); - void releaseSsrc(MediaServerItem mediaServerItem, String ssrc); + void releaseSsrc(String mediaServerItemId, String ssrc); void clearMediaServerForOnline(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java index 12bb8fa9..80ededa6 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IPlayService.java @@ -17,11 +17,13 @@ public interface IPlayService { void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId, String uuid); - PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent); + PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event event, SipSubscribe.Event errorEvent, Runnable timeoutCallback); MediaServerItem getNewMediaServerItem(Device device); void onPublishHandlerForDownload(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String toString); DeferredResult> playBack(String deviceId, String channelId, String startTime, String endTime, PlayBackCallback errorCallBack); + + void zlmServerOffline(String mediaServerId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 66407a1c..675ed4eb 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -52,11 +52,9 @@ public class DeviceServiceImpl implements IDeviceService { return false; } logger.info("移除目录订阅: {}", device.getDeviceId()); - dynamicTask.stopCron(device.getDeviceId()); + dynamicTask.stop(device.getDeviceId()); device.setSubscribeCycleForCatalog(0); sipCommander.catalogSubscribe(device, null, null); - // 清空cseq计数 - return true; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index f226a37f..e5781657 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -167,13 +167,14 @@ public class MediaServerServiceImpl implements IMediaServerService { if (mediaServerItem != null) { String streamId = String.format("%s_%s", deviceId, channelId); zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId); - releaseSsrc(mediaServerItem, ssrc); + releaseSsrc(mediaServerItem.getId(), ssrc); } streamSession.remove(deviceId, channelId, stream); } @Override - public void releaseSsrc(MediaServerItem mediaServerItem, String ssrc) { + public void releaseSsrc(String mediaServerItemId, String ssrc) { + MediaServerItem mediaServerItem = getOne(mediaServerItemId); if (mediaServerItem == null || ssrc == null) { return; } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index 778bd67e..286f8237 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -5,13 +5,13 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.conf.UserSetup; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -37,8 +37,7 @@ import org.springframework.util.ResourceUtils; import org.springframework.web.context.request.async.DeferredResult; import java.io.FileNotFoundException; -import java.util.Objects; -import java.util.UUID; +import java.util.*; @SuppressWarnings(value = {"rawtypes", "unchecked"}) @Service @@ -52,6 +51,9 @@ public class PlayServiceImpl implements IPlayService { @Autowired private SIPCommander cmder; + @Autowired + private SIPCommanderFroPlatform sipCommanderFroPlatform; + @Autowired private IRedisCatchStorage redisCatchStorage; @@ -78,7 +80,9 @@ public class PlayServiceImpl implements IPlayService { @Override - public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) { + public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId, + ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, + Runnable timeoutCallback) { PlayResult playResult = new PlayResult(); RequestMessage msg = new RequestMessage(); String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId; @@ -101,29 +105,10 @@ public class PlayServiceImpl implements IPlayService { Device device = redisCatchStorage.getDevice(deviceId); StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId); playResult.setDevice(device); - // 超时处理 - result.onTimeout(()->{ - logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(-1); - SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, streamInfo.getStream()); - if (dialog != null) { - wvpResult.setMsg("收流超时,请稍候重试"); - }else { - wvpResult.setMsg("点播超时,请稍候重试"); - } - msg.setData(wvpResult); - // 点播超时回复BYE - cmder.streamByeCmd(device.getDeviceId(), channelId, streamInfo.getStream()); - // 释放rtpserver - mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, streamInfo.getStream()); - // 回复之前所有的点播请求 - resultHolder.invokeAllResult(msg); - // TODO 释放ssrc - }); result.onCompletion(()->{ // 点播结束时调用截图接口 + // TODO 应该在上流时调用更好,结束也可能是错误结束 try { String classPath = ResourceUtils.getURL("classpath:").getPath(); // 兼容打包为jar的class路径 @@ -161,31 +146,60 @@ public class PlayServiceImpl implements IPlayService { if (mediaServerItem.isRtpEnable()) { streamId = String.format("%s_%s", device.getDeviceId(), channelId); } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId); + // 超时处理 + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); + if (timeoutCallback != null) { + timeoutCallback.run(); + } + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(-1); + SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); + if (dialog != null) { + wvpResult.setMsg("收流超时,请稍候重试"); + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); + }else { + wvpResult.setMsg("点播超时,请稍候重试"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); + streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); + } + + msg.setData(wvpResult); + + // 回复之前所有的点播请求 + resultHolder.invokeAllResult(msg); + } + }, userSetup.getPlayTimeout()); // 发送点播消息 cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInUse, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); + timer.cancel(); onPublishHandlerForPlay(mediaServerItemInUse, response, deviceId, channelId, uuid); if (hookEvent != null) { hookEvent.response(mediaServerItem, response); } }, (event) -> { + timer.cancel(); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); // 点播返回sip错误 mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); + wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg)); msg.setData(wvpResult); resultHolder.invokeAllResult(msg); if (errorEvent != null) { errorEvent.response(event); } - - }); } else { String streamId = streamInfo.getStream(); @@ -222,13 +236,41 @@ public class PlayServiceImpl implements IPlayService { streamId2 = String.format("%s_%s", device.getDeviceId(), channelId); } SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId2); + // 超时处理 + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId)); + if (timeoutCallback != null) { + timeoutCallback.run(); + } + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(-1); + SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream()); + if (dialog != null) { + wvpResult.setMsg("收流超时,请稍候重试"); + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); + }else { + wvpResult.setMsg("点播超时,请稍候重试"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); + streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); + } + + msg.setData(wvpResult); + // 回复之前所有的点播请求 + resultHolder.invokeAllResult(msg); + } + }, userSetup.getPlayTimeout()); cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); onPublishHandlerForPlay(mediaServerItemInuse, response, deviceId, channelId, uuid); }, (event) -> { mediaServerService.closeRTPServer(playResult.getDevice().getDeviceId(), channelId, ssrcInfo.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem, ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); WVPResult wvpResult = new WVPResult(); wvpResult.setCode(-1); @@ -306,14 +348,23 @@ public class PlayServiceImpl implements IPlayService { msg.setId(uuid); msg.setKey(key); PlayBackResult playBackResult = new PlayBackResult<>(); - result.onTimeout(()->{ - msg.setData("回放超时"); - playBackResult.setCode(-1); - playBackResult.setData(msg); - callback.call(playBackResult); - }); + Timer timer = new Timer(); + timer.schedule(new TimerTask() { + @Override + public void run() { + logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId)); + playBackResult.setCode(-1); + playBackResult.setData(msg); + callback.call(playBackResult); + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 + cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream()); + // 回复之前所有的点播请求 + callback.call(playBackResult); + } + }, userSetup.getPlayTimeout()); cmder.playbackStreamCmd(newMediaServerItem, ssrcInfo, device, channelId, startTime, endTime, (MediaServerItem mediaServerItem, JSONObject response) -> { logger.info("收到订阅消息: " + response.toJSONString()); + timer.cancel(); StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId); if (streamInfo == null) { logger.warn("设备回放API调用失败!"); @@ -331,6 +382,7 @@ public class PlayServiceImpl implements IPlayService { playBackResult.setResponse(response); callback.call(playBackResult); }, event -> { + timer.cancel(); msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg)); playBackResult.setCode(-1); playBackResult.setData(msg); @@ -370,4 +422,26 @@ public class PlayServiceImpl implements IPlayService { return streamInfo; } + @Override + public void zlmServerOffline(String mediaServerId) { + // 处理正在向上推流的上级平台 + List sendRtpItems = redisCatchStorage.querySendRTPServer(null); + if (sendRtpItems.size() > 0) { + for (SendRtpItem sendRtpItem : sendRtpItems) { + if (sendRtpItem.getMediaServerId().equals(mediaServerId)) { + ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId()); + } + } + } + // 处理正在观看的国标设备 + List allSsrc = streamSession.getAllSsrc(); + if (allSsrc.size() > 0) { + for (SsrcTransaction ssrcTransaction : allSsrc) { + if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) { + cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + } + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index 1a939020..b0edc063 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -89,7 +89,7 @@ public interface IRedisCatchStorage { * @param channelId * @return sendRtpItem */ - SendRtpItem querySendRTPServer(String platformGbId, String channelId); + SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId); List querySendRTPServer(String platformGbId); @@ -98,7 +98,7 @@ public interface IRedisCatchStorage { * @param platformGbId * @param channelId */ - void deleteSendRTPServer(String platformGbId, String channelId); + void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId); /** * 查询某个通道是否存在上级点播(RTP推送) diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index d4cace44..2431699a 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -135,6 +135,32 @@ public interface DeviceChannelMapper { "'${item.ipAddress}', ${item.port}, '${item.password}', ${item.PTZType}, ${item.status}, " + "'${item.streamId}', ${item.longitude}, ${item.latitude},'${item.createTime}', '${item.updateTime}')" + " " + + "ON DUPLICATE KEY UPDATE " + + "updateTime=VALUES(updateTime), " + + "name=VALUES(name), " + + "manufacture=VALUES(manufacture), " + + "model=VALUES(model), " + + "owner=VALUES(owner), " + + "civilCode=VALUES(civilCode), " + + "block=VALUES(block), " + + "subCount=VALUES(subCount), " + + "address=VALUES(address), " + + "parental=VALUES(parental), " + + "parentId=VALUES(parentId), " + + "safetyWay=VALUES(safetyWay), " + + "registerWay=VALUES(registerWay), " + + "certNum=VALUES(certNum), " + + "certifiable=VALUES(certifiable), " + + "errCode=VALUES(errCode), " + + "secrecy=VALUES(secrecy), " + + "ipAddress=VALUES(ipAddress), " + + "port=VALUES(port), " + + "password=VALUES(password), " + + "PTZType=VALUES(PTZType), " + + "status=VALUES(status), " + + "streamId=VALUES(streamId), " + + "longitude=VALUES(longitude), " + + "latitude=VALUES(latitude)" + "") int batchAdd(List addChannels); @@ -211,4 +237,15 @@ public interface DeviceChannelMapper { " from device_channel\n" + " where deviceId = #{deviceId}") List tree(String deviceId); + + @Delete(value = {" "}) + int cleanChannelsNotInList(String deviceId, List channels); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index b5a3aba0..0641348a 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -18,6 +18,7 @@ import com.genersoft.iot.vmp.utils.redis.RedisUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.security.core.parameters.P; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; @@ -276,19 +277,32 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void updateSendRTPSever(SendRtpItem sendRtpItem) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId(); + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + + sendRtpItem.getPlatformId() + "_" + sendRtpItem.getChannelId() + "_" + + sendRtpItem.getStreamId() + "_" + sendRtpItem.getCallId(); redis.set(key, sendRtpItem); } @Override - public SendRtpItem querySendRTPServer(String platformGbId, String channelId) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_" + channelId; - return (SendRtpItem)redis.get(key); + public SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { + if (platformGbId == null) platformGbId = "*"; + if (channelId == null) channelId = "*"; + if (streamId == null) streamId = "*"; + if (callId == null) callId = "*"; + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + + "_" + channelId + "_" + streamId + "_" + callId; + List scan = redis.scan(key); + if (scan.size() > 0) { + return (SendRtpItem)redis.get((String)scan.get(0)); + }else { + return null; + } } @Override public List querySendRTPServer(String platformGbId) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_*"; + if (platformGbId == null) platformGbId = "*"; + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_*" + "_*" + "_*"; List queryResult = redis.scan(key); List result= new ArrayList<>(); @@ -306,18 +320,28 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { * @param channelId */ @Override - public void deleteSendRTPServer(String platformGbId, String channelId) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + "_" + channelId; - redis.del(key); + public void deleteSendRTPServer(String platformGbId, String channelId, String callId, String streamId) { + if (streamId == null) streamId = "*"; + if (callId == null) callId = "*"; + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + platformGbId + + "_" + channelId + "_" + streamId + "_" + callId; + List scan = redis.scan(key); + if (scan.size() > 0) { + for (Object keyStr : scan) { + redis.del((String)keyStr); + } + } } + + /** * 查询某个通道是否存在上级点播(RTP推送) * @param channelId */ @Override public boolean isChannelSendingRTP(String channelId) { - String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + "*_" + channelId; + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + userSetup.getServerId() + "_" + "*_" + channelId + "*_" + "*_"; List RtpStreams = redis.scan(key); if (RtpStreams.size() > 0) { return true; diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java index 5e6410b3..ce450884 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java @@ -284,7 +284,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager { logger.debug("[目录查询]收到的数据存在重复: {}" , stringBuilder); } try { - int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId); +// int cleanChannelsResult = deviceChannelMapper.cleanChannelsByDeviceId(deviceId); + int cleanChannelsResult = deviceChannelMapper.cleanChannelsNotInList(deviceId, channels); int limitCount = 300; boolean result = cleanChannelsResult < 0; if (!result && channels.size() > 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java index 1d639098..178ad9bf 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.vmanager.gb28181.device; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector; @@ -13,7 +14,6 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.vmanager.bean.DeviceChannelTree; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.github.pagehelper.PageInfo; -import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; @@ -57,6 +57,9 @@ public class DeviceQuery { @Autowired private IDeviceService deviceService; + @Autowired + private DynamicTask dynamicTask; + /** * 使用ID查询国标设备 * @param deviceId 国标ID @@ -209,6 +212,8 @@ public class DeviceQuery { boolean isSuccess = storager.delete(deviceId); if (isSuccess) { redisCatchStorage.clearCatchByDeviceId(deviceId); + // 停止此设备的订阅更新 + dynamicTask.stop(deviceId); JSONObject json = new JSONObject(); json.put("deviceId", deviceId); return new ResponseEntity<>(json.toString(),HttpStatus.OK); diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java index caca64e3..4f4d8001 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java @@ -2,8 +2,9 @@ package com.genersoft.iot.vmp.vmanager.gb28181.platform; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; -import com.genersoft.iot.vmp.gb28181.bean.CatalogData; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetup; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; @@ -39,6 +40,9 @@ public class PlatformController { private final static Logger logger = LoggerFactory.getLogger(PlatformController.class); + @Autowired + private UserSetup userSetup; + @Autowired private IVideoManagerStorager storager; @@ -51,6 +55,9 @@ public class PlatformController { @Autowired private SipConfig sipConfig; + @Autowired + private DynamicTask dynamicTask; + /** * 获取国标服务的配置 * @@ -222,7 +229,7 @@ public class PlatformController { if (updateResult) { // 保存时启用就发送注册 if (parentPlatform.isEnable()) { - if (parentPlatformOld.isStatus()) { + if (parentPlatformOld != null && parentPlatformOld.isStatus()) { commanderForPlatform.unregister(parentPlatformOld, null, null); try { Thread.sleep(500); @@ -287,8 +294,9 @@ public class PlatformController { boolean deleteResult = storager.deleteParentPlatform(parentPlatform); storager.delCatalogByPlatformId(parentPlatform.getServerGBId()); storager.delRelationByPlatformId(parentPlatform.getServerGBId()); - - + // 停止发送位置订阅定时任务 + String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetup.getServerId() + "_MobilePosition_" + parentPlatform.getServerGBId(); + dynamicTask.stop(key); if (deleteResult) { return new ResponseEntity<>("success", HttpStatus.OK); } else { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java index fd70690e..4a22546f 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/play/PlayController.java @@ -88,7 +88,7 @@ public class PlayController { // 获取可用的zlm Device device = storager.queryVideoDevice(deviceId); MediaServerItem newMediaServerItem = playService.getNewMediaServerItem(device); - PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null); + PlayResult playResult = playService.play(newMediaServerItem, deviceId, channelId, null, null, null); return playResult.getResult(); } diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java index 853ec562..955b68bb 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java @@ -150,7 +150,7 @@ public class ApiStreamController { JSONObject result = new JSONObject(); result.put("error", "channel[ " + code + " ] " + eventResult.msg); resultDeferredResult.setResult(result); - }); + }, null); return resultDeferredResult; } diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index f3f1fb33..e90f5a1e 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -170,8 +170,6 @@ user-settings: save-position-history: false # 点播等待超时时间,单位:毫秒 play-timeout: 3000 - # 等待音视频编码信息再返回, true: 可以根据编码选择合适的播放器,false: 可以更快点播 - wait-track: false # 是否开启接口鉴权 interface-authentication: true # 自动配置redis 可以过期事件