From e0344ccf9725fe3d22a90ab11257396071e7f55f Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Tue, 14 Jun 2022 14:37:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=9B=BD=E6=A0=87=E7=BA=A7=E8=81=94=E6=8E=A8?= =?UTF-8?q?=E9=80=81=E6=8E=A8=E6=B5=81=20=E6=94=AF=E6=8C=81=E5=A4=9Awvp?= =?UTF-8?q?=E9=97=B4=E8=87=AA=E5=8A=A8=E9=80=89=E6=8B=A9=E4=B8=8E=E6=8E=A8?= =?UTF-8?q?=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sql/update.sql | 12 +- .../iot/vmp/common/VideoManagerConstants.java | 1 + .../genersoft/iot/vmp/conf/RedisConfig.java | 14 +- .../iot/vmp/gb28181/bean/SendRtpItem.java | 13 + .../MobilePositionSubscribeHandlerTask.java | 4 +- .../request/impl/AckRequestProcessor.java | 108 +- .../request/impl/ByeRequestProcessor.java | 10 +- .../request/impl/CancelRequestProcessor.java | 2 +- .../request/impl/InviteRequestProcessor.java | 1208 +++++++++-------- .../impl/RegisterRequestProcessor.java | 2 +- .../cmd/RecordInfoResponseMessageHandler.java | 2 - .../response/impl/ByeResponseProcessor.java | 2 +- .../impl/CancelResponseProcessor.java | 2 +- .../impl/InviteResponseProcessor.java | 2 +- .../impl/RegisterResponseProcessor.java | 2 +- .../vmp/media/zlm/ZLMHttpHookListener.java | 25 +- .../vmp/media/zlm/ZLMMediaListManager.java | 7 +- .../vmp/media/zlm/ZLMRTPServerFactory.java | 6 + .../vmp/media/zlm/dto/ChannelOnlineEvent.java | 5 +- .../iot/vmp/media/zlm/dto/MediaItem.java | 15 +- .../iot/vmp/media/zlm/dto/StreamPushItem.java | 13 + .../vmp/service/StreamGPSSubscribeTask.java | 1 - .../service/bean/MessageForPushChannel.java | 17 + .../service/bean/RequestPushStreamMsg.java | 170 +++ .../vmp/service/bean/RequestSendItemMsg.java | 173 +++ .../vmp/service/bean/ResponseSendItemMsg.java | 31 + .../iot/vmp/service/bean/WvpRedisMsg.java | 116 ++ .../iot/vmp/service/bean/WvpRedisMsgCmd.java | 12 + .../service/impl/RedisGbPlayMsgListener.java | 377 +++++ ...Listener.java => RedisGpsMsgListener.java} | 15 +- .../service/impl/RedisStreamMsgListener.java | 83 ++ .../service/impl/StreamPushServiceImpl.java | 1 + .../vmp/storager/IVideoManagerStorage.java | 9 + .../vmp/storager/dao/StreamPushMapper.java | 4 +- .../storager/impl/RedisCatchStorageImpl.java | 8 +- .../impl/VideoManagerStorageImpl.java | 5 + web_src/src/components/dialog/rtcPlayer.vue | 18 +- 37 files changed, 1831 insertions(+), 664 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java create mode 100644 src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java rename src/main/java/com/genersoft/iot/vmp/service/impl/{RedisGPSMsgListener.java => RedisGpsMsgListener.java} (66%) create mode 100644 src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java diff --git a/sql/update.sql b/sql/update.sql index 9c18c26e..0abc5446 100644 --- a/sql/update.sql +++ b/sql/update.sql @@ -1,12 +1,4 @@ -alter table parent_platform - add startOfflinePush int default 0 null; +alter table stream_push + add serverId varchar(50) not null; -alter table parent_platform - add administrativeDivision varchar(50) not null; - -alter table parent_platform - add catalogGroup int default 1 null; - -alter table device - add ssrcCheck int default 0 null; diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index eb98f6f2..57f764b5 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -97,4 +97,5 @@ public class VideoManagerConstants { //************************** 第三方 **************************************** public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_"; public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_"; + } diff --git a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java index 6b45eb5d..ec1f9bac 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/RedisConfig.java @@ -2,7 +2,9 @@ package com.genersoft.iot.vmp.conf; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.service.impl.RedisAlarmMsgListener; -import com.genersoft.iot.vmp.service.impl.RedisGPSMsgListener; +import com.genersoft.iot.vmp.service.impl.RedisGpsMsgListener; +import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; +import com.genersoft.iot.vmp.service.impl.RedisStreamMsgListener; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -47,11 +49,17 @@ public class RedisConfig extends CachingConfigurerSupport { private int poolMaxWait; @Autowired - private RedisGPSMsgListener redisGPSMsgListener; + private RedisGpsMsgListener redisGPSMsgListener; @Autowired private RedisAlarmMsgListener redisAlarmMsgListener; + @Autowired + private RedisStreamMsgListener redisStreamMsgListener; + + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; + @Bean public JedisPool jedisPool() { if (StringUtils.isBlank(password)) { @@ -98,6 +106,8 @@ public class RedisConfig extends CachingConfigurerSupport { container.setConnectionFactory(connectionFactory); container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS)); container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE)); + container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH")); + container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); return container; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java index c7f61820..41e1af78 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java @@ -71,6 +71,11 @@ public class SendRtpItem { */ private String mediaServerId; + /** + * 使用的服务的ID + */ + private String serverId; + /** * invite的callId */ @@ -259,4 +264,12 @@ public class SendRtpItem { public void setOnlyAudio(boolean onlyAudio) { this.onlyAudio = onlyAudio; } + + public String getServerId() { + return serverId; + } + + public void setServerId(String serverId) { + this.serverId = serverId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java index c416766e..66b57fec 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java @@ -71,7 +71,9 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask { String gbId = gbStream.getGbId(); GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId); if (gpsMsgInfo != null) { // 无最新位置不发送 - logger.info("无最新位置不发送"); + if (logger.isDebugEnabled()) { + logger.debug("无最新位置不发送"); + } // 经纬度都为0不发送 if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) { continue; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java index b3d67ded..78b8e62c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java @@ -16,6 +16,8 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; +import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.SerializeUtils; @@ -43,7 +45,7 @@ import java.util.*; public class AckRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class); - private String method = "ACK"; + private final String method = "ACK"; @Autowired private SIPProcessorObserver sipProcessorObserver; @@ -78,6 +80,9 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private ISIPCommanderForPlatform commanderForPlatform; + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; + /** * 处理 ACK请求 @@ -114,78 +119,41 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In param.put("pt", sendRtpItem.getPt()); param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0"); param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0"); - JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); - if (jsonObject == null) { - logger.error("RTP推流失败: 请检查ZLM服务"); - } else if (jsonObject.getInteger("code") == 0) { - logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); - byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); - sendRtpItem.setDialog(dialogByteArray); - byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); - sendRtpItem.setTransaction(transactionByteArray); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - } else { - logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); - if (sendRtpItem.isOnlyAudio()) { - // TODO 可能是语音对讲 - }else { - // 向上级平台 - commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); - } + if (mediaInfo == null) { + RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance( + sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStreamId(), + sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(), + sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio()); + redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, jsonObject->{ + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); + }); + }else { + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + startSendRtpStreamHand(evt, sendRtpItem, parentPlatform, jsonObject, param, callIdHeader); } -// if (streamInfo == null) { // 流还没上来,对方就回复ack -// logger.info("监听流以等待流上线1 rtp/{}", sendRtpItem.getStreamId()); -// // 监听流上线 -// // 添加订阅 -// JSONObject subscribeKey = new JSONObject(); -// subscribeKey.put("app", "rtp"); -// subscribeKey.put("stream", sendRtpItem.getStreamId()); -// subscribeKey.put("regist", true); -// subscribeKey.put("schema", "rtmp"); -// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); -// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, -// (MediaServerItem mediaServerItemInUse, JSONObject json)->{ -// Map param = new HashMap<>(); -// param.put("vhost","__defaultVhost__"); -// param.put("app",json.getString("app")); -// param.put("stream",json.getString("stream")); -// param.put("ssrc", sendRtpItem.getSsrc()); -// param.put("dst_url",sendRtpItem.getIp()); -// param.put("dst_port", sendRtpItem.getPort()); -// param.put("is_udp", is_Udp); -// param.put("src_port", sendRtpItem.getLocalPort()); -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// }); -// }else { -// Map param = new HashMap<>(); -// param.put("vhost","__defaultVhost__"); -// param.put("app",streamInfo.getApp()); -// param.put("stream",streamInfo.getStream()); -// param.put("ssrc", sendRtpItem.getSsrc()); -// param.put("dst_url",sendRtpItem.getIp()); -// param.put("dst_port", sendRtpItem.getPort()); -// param.put("is_udp", is_Udp); -// param.put("src_port", sendRtpItem.getLocalPort()); -// -// JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// if (jsonObject.getInteger("code") != 0) { -// logger.info("监听流以等待流上线2 {}/{}", streamInfo.getApp(), streamInfo.getStream()); -// // 监听流上线 -// // 添加订阅 -// JSONObject subscribeKey = new JSONObject(); -// subscribeKey.put("app", "rtp"); -// subscribeKey.put("stream", streamInfo.getStream()); -// subscribeKey.put("regist", true); -// subscribeKey.put("schema", "rtmp"); -// subscribeKey.put("mediaServerId", sendRtpItem.getMediaServerId()); -// subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, -// (MediaServerItem mediaServerItemInUse, JSONObject json)->{ -// zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); -// }); -// } -// } + } + } + private void startSendRtpStreamHand(RequestEvent evt, SendRtpItem sendRtpItem, ParentPlatform parentPlatform, + JSONObject jsonObject, Map param, CallIdHeader callIdHeader) { + if (jsonObject == null) { + logger.error("RTP推流失败: 请检查ZLM服务"); + } else if (jsonObject.getInteger("code") == 0) { + logger.info("RTP推流成功[ {}/{} ],{}->{}:{}, " ,param.get("app"), param.get("stream"), jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port")); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + } else { + logger.error("RTP推流失败: {}, 参数:{}",jsonObject.getString("msg"),JSONObject.toJSON(param)); + if (sendRtpItem.isOnlyAudio()) { + // TODO 可能是语音对讲 + }else { + // 向上级平台 + commanderForPlatform.streamByeCmd(parentPlatform, callIdHeader.getCallId()); + } } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 75318093..32688801 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -107,13 +107,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId, streamId, null); } if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) { - MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); - messageForPushChannel.setType(0); - messageForPushChannel.setGbId(sendRtpItem.getChannelId()); - messageForPushChannel.setApp(sendRtpItem.getApp()); - messageForPushChannel.setStream(sendRtpItem.getStreamId()); - messageForPushChannel.setMediaServerId(sendRtpItem.getMediaServerId()); - messageForPushChannel.setPlatFormId(sendRtpItem.getPlatformId()); + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, + sendRtpItem.getApp(), sendRtpItem.getStreamId(), sendRtpItem.getChannelId(), + sendRtpItem.getPlatformId(), null, null, sendRtpItem.getMediaServerId()); redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java index 0a818ee6..b04352a6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/CancelRequestProcessor.java @@ -15,7 +15,7 @@ import javax.sip.RequestEvent; @Component public class CancelRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { - private String method = "CANCEL"; + private final String method = "CANCEL"; @Autowired private SIPProcessorObserver sipProcessorObserver; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 75b4114d..19908e49 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -17,10 +17,13 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; +import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -50,562 +53,709 @@ import java.util.Vector; @Component public class InviteRequestProcessor extends SIPRequestProcessorParent implements InitializingBean, ISIPRequestProcessor { - private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class); + private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class); - private String method = "INVITE"; + private final String method = "INVITE"; - @Autowired - private SIPCommanderFroPlatform cmderFroPlatform; + @Autowired + private SIPCommanderFroPlatform cmderFroPlatform; - @Autowired - private IVideoManagerStorage storager; + @Autowired + private IVideoManagerStorage storager; - @Autowired - private IRedisCatchStorage redisCatchStorage; + @Autowired + private IStreamPushService streamPushService; - @Autowired - private DynamicTask dynamicTask; + @Autowired + private IRedisCatchStorage redisCatchStorage; - @Autowired - private SIPCommander cmder; + @Autowired + private DynamicTask dynamicTask; - @Autowired - private IPlayService playService; + @Autowired + private SIPCommander cmder; - @Autowired - private ISIPCommander commander; + @Autowired + private IPlayService playService; - @Autowired - private ZLMRTPServerFactory zlmrtpServerFactory; + @Autowired + private ISIPCommander commander; - @Autowired - private IMediaServerService mediaServerService; + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; - @Autowired - private SIPProcessorObserver sipProcessorObserver; + @Autowired + private IMediaServerService mediaServerService; - @Autowired - private VideoStreamSessionManager sessionManager; + @Autowired + private SIPProcessorObserver sipProcessorObserver; - @Autowired - private UserSetting userSetting; + @Autowired + private VideoStreamSessionManager sessionManager; - @Autowired - private ZLMMediaListManager mediaListManager; + @Autowired + private UserSetting userSetting; + + @Autowired + private ZLMMediaListManager mediaListManager; - @Override - public void afterPropertiesSet() throws Exception { - // 添加消息处理的订阅 - sipProcessorObserver.addRequestProcessor(method, this); - } - - /** - * 处理invite请求 - * - * @param evt - * 请求消息 - */ - @Override - public void process(RequestEvent evt) { - // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 - try { - Request request = evt.getRequest(); - SipURI sipURI = (SipURI) request.getRequestURI(); - //从subject读取channelId,不再从request-line读取。 有些平台request-line是平台国标编码,不是设备国标编码。 - //String channelId = sipURI.getUser(); - String channelId = SipUtils.getChannelIdFromHeader(request); - String requesterId = SipUtils.getUserIdFromFromHeader(request); - CallIdHeader callIdHeader = (CallIdHeader)request.getHeader(CallIdHeader.NAME); - if (requesterId == null || channelId == null) { - logger.info("无法从FromHeader的Address中获取到平台id,返回400"); - responseAck(evt, Response.BAD_REQUEST); // 参数不全, 发400,请求错误 - return; - } - - // 查询请求是否来自上级平台\设备 - ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); - if (platform == null) { - inviteFromDeviceHandle(evt, requesterId); - }else { - // 查询平台下是否有该通道 - DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); - GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); - PlatformCatalog catalog = storager.getCatalog(channelId); - MediaServerItem mediaServerItem = null; - // 不是通道可能是直播流 - if (channel != null && gbStream == null ) { - if (channel.getStatus() == 0) { - logger.info("通道离线,返回400"); - responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline"); - return; - } - responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 - }else if(channel == null && gbStream != null){ - String mediaServerId = gbStream.getMediaServerId(); - mediaServerItem = mediaServerService.getOne(mediaServerId); - if (mediaServerItem == null) { - logger.info("[ app={}, stream={} ]找不到zlm {},返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId); - responseAck(evt, Response.GONE); - return; - } - responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 - }else if (catalog != null) { - responseAck(evt, Response.BAD_REQUEST, "catalog channel can not play"); // 目录不支持点播 - return; - } else { - logger.info("通道不存在,返回404"); - responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在 - return; - } - // 解析sdp消息, 使用jainsip 自带的sdp解析方式 - String contentString = new String(request.getRawContent()); - - // jainSip不支持y=字段, 移除以解析。 - int ssrcIndex = contentString.indexOf("y="); - // 检查是否有y字段 - String ssrcDefault = "0000000000"; - String ssrc; - SessionDescription sdp; - if (ssrcIndex >= 0) { - //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - String substring = contentString.substring(0, contentString.indexOf("y=")); - sdp = SdpFactory.getInstance().createSessionDescription(substring); - }else { - ssrc = ssrcDefault; - sdp = SdpFactory.getInstance().createSessionDescription(contentString); - } - String sessionName = sdp.getSessionName().getValue(); - - Long startTime = null; - Long stopTime = null; - Instant start = null; - Instant end = null; - if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { - TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0)); - TimeField startTimeFiled = (TimeField)timeDescription.getTime(); - startTime = startTimeFiled.getStartTime(); - stopTime = startTimeFiled.getStopTime(); - - start = Instant.ofEpochSecond(startTime); - end = Instant.ofEpochSecond(stopTime); - } - // 获取支持的格式 - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - // 查看是否支持PS 负载96 - //String ip = null; - int port = -1; - boolean mediaTransmissionTCP = false; - Boolean tcpActive = null; - for (Object description : mediaDescriptions) { - MediaDescription mediaDescription = (MediaDescription) description; - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("96")) { - port = media.getMediaPort(); - //String mediaType = media.getMediaType(); - String protocol = media.getProtocol(); - - // 区分TCP发流还是udp, 当前默认udp - if ("TCP/RTP/AVP".equals(protocol)) { - String setup = mediaDescription.getAttribute("setup"); - if (setup != null) { - mediaTransmissionTCP = true; - if ("active".equals(setup)) { - tcpActive = true; - // 不支持tcp主动 - responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 目录不支持点播 - return; - } else if ("passive".equals(setup)) { - tcpActive = false; - } - } - } - break; - } - } - if (port == -1) { - logger.info("不支持的媒体格式,返回415"); - // 回复不支持的格式 - responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 - return; - } - String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getOrigin().getAddress(); - - logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc); - Device device = null; - // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 - if (channel != null) { - device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); - if (device == null) { - logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); - responseAck(evt, Response.SERVER_INTERNAL_ERROR); - return; - } - mediaServerItem = playService.getNewMediaServerItem(device); - if (mediaServerItem == null) { - logger.warn("未找到可用的zlm"); - responseAck(evt, Response.BUSY_HERE); - return; - } - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, - mediaTransmissionTCP); - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - if (sendRtpItem == null) { - logger.warn("服务器端口资源不足"); - responseAck(evt, Response.BUSY_HERE); - return; - } - sendRtpItem.setCallId(callIdHeader.getCallId()); - sendRtpItem.setPlayType("Play".equals(sessionName)?InviteStreamType.PLAY:InviteStreamType.PLAYBACK); - - Long finalStartTime = startTime; - Long finalStopTime = stopTime; - ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON)->{ - String app = responseJSON.getString("app"); - String stream = responseJSON.getString("stream"); - logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream); - // * 0 等待设备推流上来 - // * 1 下级已经推流,等待上级平台回复ack - // * 2 推流中 - sendRtpItem.setStatus(1); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - - StringBuffer content = new StringBuffer(200); - content.append("v=0\r\n"); - content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); - content.append("s=" + sessionName+"\r\n"); - content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n"); - if ("Playback".equals(sessionName)) { - content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); - }else { - content.append("t=0 0\r\n"); - } - content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); - content.append("a=sendonly\r\n"); - content.append("a=rtpmap:96 PS/90000\r\n"); - content.append("y="+ ssrc + "\r\n"); - content.append("f=\r\n"); - - try { - // 超时未收到Ack应该回复bye,当前等待时间为10秒 - dynamicTask.startDelay(callIdHeader.getCallId(), ()->{ - logger.info("Ack 等待超时"); - mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc); - // 回复bye - cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); - }, 60*1000); - responseSdpAck(evt, content.toString(), platform); - - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - }; - SipSubscribe.Event errorEvent = ((event) -> { - // 未知错误。直接转发设备点播的错误 - Response response = null; - try { - response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); - ServerTransaction serverTransaction = getServerTransaction(evt); - serverTransaction.sendResponse(response); - if (serverTransaction.getDialog() != null) { - serverTransaction.getDialog().delete(); - } - } catch (ParseException | SipException | InvalidArgumentException e) { - e.printStackTrace(); - } - }); - sendRtpItem.setApp("rtp"); - if ("Playback".equals(sessionName)) { - sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true); - sendRtpItem.setStreamId(ssrcInfo.getStream()); - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), - DateUtil.formatter.format(end), null, result -> { - if (result.getCode() != 0){ - logger.warn("录像回放失败"); - if (result.getEvent() != null) { - errorEvent.response(result.getEvent()); - } - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - try { - responseAck(evt, Response.REQUEST_TIMEOUT); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - }else { - if (result.getMediaServerItem() != null) { - hookEvent.response(result.getMediaServerItem(), result.getResponse()); - } - } - }); - }else { - sendRtpItem.setPlayType(InviteStreamType.PLAY); - SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); - if (playTransaction != null) { - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); - if (!streamReady) { - playTransaction = null; - } - } - if (playTransaction == null) { - String streamId = null; - if (mediaServerItem.isRtpEnable()) { - streamId = String.format("%s_%s", device.getDeviceId(), channelId); - } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false); - sendRtpItem.setStreamId(ssrcInfo.getStream()); - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg)->{ - logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); - redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); - }, null); - }else { - sendRtpItem.setStreamId(playTransaction.getStream()); - // 写入redis, 超时时回复 - redisCatchStorage.updateSendRTPSever(sendRtpItem); - JSONObject jsonObject = new JSONObject(); - jsonObject.put("app", sendRtpItem.getApp()); - jsonObject.put("stream", sendRtpItem.getStreamId()); - hookEvent.response(mediaServerItem, jsonObject); - } - } - }else if (gbStream != null) { - - Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); - if (!streamReady ) { - if ("proxy".equals(gbStream.getStreamType())) { - // TODO 控制启用以使设备上线 - logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流",gbStream.getApp(), gbStream.getStream()); - responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); - }else if ("push".equals(gbStream.getStreamType())) { - if (!platform.isStartOfflinePush()) { - responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable"); - return; - } - // 发送redis消息以使设备上线 - logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",gbStream.getApp(), gbStream.getStream()); - MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); - messageForPushChannel.setType(1); - messageForPushChannel.setGbId(gbStream.getGbId()); - messageForPushChannel.setApp(gbStream.getApp()); - messageForPushChannel.setStream(gbStream.getStream()); - // TODO 获取低负载的节点 - messageForPushChannel.setMediaServerId(gbStream.getMediaServerId()); - messageForPushChannel.setPlatFormId(platform.getServerGBId()); - messageForPushChannel.setPlatFormName(platform.getName()); - redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); - // 设置超时 - dynamicTask.startDelay(callIdHeader.getCallId(), ()->{ - logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream()); - try { - mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId()); - responseAck(evt, Response.REQUEST_TIMEOUT); // 超时 - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - }, userSetting.getPlatformPlayTimeout()); - // 添加监听 - MediaServerItem finalMediaServerItem = mediaServerItem; - int finalPort = port; - boolean finalMediaTransmissionTCP = mediaTransmissionTCP; - Boolean finalTcpActive = tcpActive; - mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream)->{ - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(finalMediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, finalMediaTransmissionTCP); - - if (sendRtpItem == null) { - logger.warn("服务器端口资源不足"); - try { - responseAck(evt, Response.BUSY_HERE); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - return; - } - if (finalTcpActive != null) { - sendRtpItem.setTcpActive(finalTcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 写入redis, 超时时回复 - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); - sendRtpItem.setDialog(dialogByteArray); - byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); - sendRtpItem.setTransaction(transactionByteArray); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - sendStreamAck(finalMediaServerItem, sendRtpItem, platform, evt); - - }); - } - }else { - SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, - mediaTransmissionTCP); + @Autowired + private RedisGbPlayMsgListener redisGbPlayMsgListener; - if (sendRtpItem == null) { - logger.warn("服务器端口资源不足"); - responseAck(evt, Response.BUSY_HERE); - return; - } - if (tcpActive != null) { - sendRtpItem.setTcpActive(tcpActive); - } - sendRtpItem.setPlayType(InviteStreamType.PUSH); - // 写入redis, 超时时回复 - sendRtpItem.setStatus(1); - sendRtpItem.setCallId(callIdHeader.getCallId()); - byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); - sendRtpItem.setDialog(dialogByteArray); - byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); - sendRtpItem.setTransaction(transactionByteArray); - redisCatchStorage.updateSendRTPSever(sendRtpItem); - sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); - } + @Override + public void afterPropertiesSet() throws Exception { + // 添加消息处理的订阅 + sipProcessorObserver.addRequestProcessor(method, this); + } + + /** + * 处理invite请求 + * + * @param evt 请求消息 + */ + @Override + public void process(RequestEvent evt) { + // Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令 + try { + Request request = evt.getRequest(); + SipURI sipUri = (SipURI) request.getRequestURI(); + //从subject读取channelId,不再从request-line读取。 有些平台request-line是平台国标编码,不是设备国标编码。 + //String channelId = sipURI.getUser(); + String channelId = SipUtils.getChannelIdFromHeader(request); + String requesterId = SipUtils.getUserIdFromFromHeader(request); + CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME); + if (requesterId == null || channelId == null) { + logger.info("无法从FromHeader的Address中获取到平台id,返回400"); + // 参数不全, 发400,请求错误 + responseAck(evt, Response.BAD_REQUEST); + return; + } + + // 查询请求是否来自上级平台\设备 + ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId); + if (platform == null) { + inviteFromDeviceHandle(evt, requesterId); + } else { + // 查询平台下是否有该通道 + DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId); + GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId); + PlatformCatalog catalog = storager.getCatalog(channelId); + + MediaServerItem mediaServerItem = null; + StreamPushItem streamPushItem = null; + // 不是通道可能是直播流 + if (channel != null && gbStream == null) { + if (channel.getStatus() == 0) { + logger.info("通道离线,返回400"); + responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline"); + return; + } + responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 + } else if (channel == null && gbStream != null) { + + String mediaServerId = gbStream.getMediaServerId(); + mediaServerItem = mediaServerService.getOne(mediaServerId); + if (mediaServerItem == null) { + if ("proxy".equals(gbStream.getStreamType())) { + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAck(evt, Response.GONE); + return; + } else { + streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); + if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) { + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAck(evt, Response.GONE); + return; + } + } + } else { + if ("push".equals(gbStream.getStreamType())) { + streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream()); + if (streamPushItem == null) { + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAck(evt, Response.GONE); + return; + } + } + } + responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 + } else if (catalog != null) { + responseAck(evt, Response.BAD_REQUEST, "catalog channel can not play"); // 目录不支持点播 + return; + } else { + logger.info("通道不存在,返回404"); + responseAck(evt, Response.NOT_FOUND); // 通道不存在,发404,资源不存在 + return; + } + // 解析sdp消息, 使用jainsip 自带的sdp解析方式 + String contentString = new String(request.getRawContent()); + + // jainSip不支持y=字段, 移除以解析。 + int ssrcIndex = contentString.indexOf("y="); + // 检查是否有y字段 + String ssrcDefault = "0000000000"; + String ssrc; + SessionDescription sdp; + if (ssrcIndex >= 0) { + //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + String substring = contentString.substring(0, contentString.indexOf("y=")); + sdp = SdpFactory.getInstance().createSessionDescription(substring); + } else { + ssrc = ssrcDefault; + sdp = SdpFactory.getInstance().createSessionDescription(contentString); + } + String sessionName = sdp.getSessionName().getValue(); + + Long startTime = null; + Long stopTime = null; + Instant start = null; + Instant end = null; + if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { + TimeDescriptionImpl timeDescription = (TimeDescriptionImpl) (sdp.getTimeDescriptions(false).get(0)); + TimeField startTimeFiled = (TimeField) timeDescription.getTime(); + startTime = startTimeFiled.getStartTime(); + stopTime = startTimeFiled.getStopTime(); + + start = Instant.ofEpochSecond(startTime); + end = Instant.ofEpochSecond(stopTime); + } + // 获取支持的格式 + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 查看是否支持PS 负载96 + //String ip = null; + int port = -1; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (Object description : mediaDescriptions) { + MediaDescription mediaDescription = (MediaDescription) description; + Media media = mediaDescription.getMedia(); + + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("96")) { + port = media.getMediaPort(); + //String mediaType = media.getMediaType(); + String protocol = media.getProtocol(); + + // 区分TCP发流还是udp, 当前默认udp + if ("TCP/RTP/AVP".equals(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equals(setup)) { + tcpActive = true; + // 不支持tcp主动 + responseAck(evt, Response.NOT_IMPLEMENTED, "tcp active not support"); // 目录不支持点播 + return; + } else if ("passive".equals(setup)) { + tcpActive = false; + } + } + } + break; + } + } + if (port == -1) { + logger.info("不支持的媒体格式,返回415"); + // 回复不支持的格式 + responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 + return; + } + String username = sdp.getOrigin().getUsername(); + String addressStr = sdp.getOrigin().getAddress(); + + logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc); + Device device = null; + // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 + if (channel != null) { + device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId); + if (device == null) { + logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel); + responseAck(evt, Response.SERVER_INTERNAL_ERROR); + return; + } + mediaServerItem = playService.getNewMediaServerItem(device); + if (mediaServerItem == null) { + logger.warn("未找到可用的zlm"); + responseAck(evt, Response.BUSY_HERE); + return; + } + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + device.getDeviceId(), channelId, + mediaTransmissionTCP); + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + if (sendRtpItem == null) { + logger.warn("服务器端口资源不足"); + responseAck(evt, Response.BUSY_HERE); + return; + } + sendRtpItem.setCallId(callIdHeader.getCallId()); + sendRtpItem.setPlayType("Play".equals(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); + + Long finalStartTime = startTime; + Long finalStopTime = stopTime; + ZLMHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> { + String app = responseJSON.getString("app"); + String stream = responseJSON.getString("stream"); + logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream); + // * 0 等待设备推流上来 + // * 1 下级已经推流,等待上级平台回复ack + // * 2 推流中 + sendRtpItem.setStatus(1); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); + content.append("s=" + sessionName + "\r\n"); + content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n"); + if ("Playback".equals(sessionName)) { + content.append("t=" + finalStartTime + " " + finalStopTime + "\r\n"); + } else { + content.append("t=0 0\r\n"); + } + content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + content.append("y=" + ssrc + "\r\n"); + content.append("f=\r\n"); + + try { + // 超时未收到Ack应该回复bye,当前等待时间为10秒 + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("Ack 等待超时"); + mediaServerService.releaseSsrc(mediaServerItemInUSe.getId(), ssrc); + // 回复bye + cmderFroPlatform.streamByeCmd(platform, callIdHeader.getCallId()); + }, 60 * 1000); + responseSdpAck(evt, content.toString(), platform); + + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + }; + SipSubscribe.Event errorEvent = ((event) -> { + // 未知错误。直接转发设备点播的错误 + Response response = null; + try { + response = getMessageFactory().createResponse(event.statusCode, evt.getRequest()); + ServerTransaction serverTransaction = getServerTransaction(evt); + serverTransaction.sendResponse(response); + if (serverTransaction.getDialog() != null) { + serverTransaction.getDialog().delete(); + } + } catch (ParseException | SipException | InvalidArgumentException e) { + e.printStackTrace(); + } + }); + sendRtpItem.setApp("rtp"); + if ("Playback".equals(sessionName)) { + sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true); + sendRtpItem.setStreamId(ssrcInfo.getStream()); + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start), + DateUtil.formatter.format(end), null, result -> { + if (result.getCode() != 0) { + logger.warn("录像回放失败"); + if (result.getEvent() != null) { + errorEvent.response(result.getEvent()); + } + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); + try { + responseAck(evt, Response.REQUEST_TIMEOUT); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + } else { + if (result.getMediaServerItem() != null) { + hookEvent.response(result.getMediaServerItem(), result.getResponse()); + } + } + }); + } else { + sendRtpItem.setPlayType(InviteStreamType.PLAY); + SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); + if (playTransaction != null) { + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); + if (!streamReady) { + playTransaction = null; + } + } + if (playTransaction == null) { + String streamId = null; + if (mediaServerItem.isRtpEnable()) { + streamId = String.format("%s_%s", device.getDeviceId(), channelId); + } + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false); + sendRtpItem.setStreamId(ssrcInfo.getStream()); + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> { + logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); + redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); + }, null); + } else { + sendRtpItem.setStreamId(playTransaction.getStream()); + // 写入redis, 超时时回复 + redisCatchStorage.updateSendRTPSever(sendRtpItem); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("app", sendRtpItem.getApp()); + jsonObject.put("stream", sendRtpItem.getStreamId()); + hookEvent.response(mediaServerItem, jsonObject); + } + } + } else if (gbStream != null) { + if (streamPushItem.isStatus()) { + // 在线状态 + pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } else { + // 不在线 拉起 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + + } + + } + + } catch (SipException | InvalidArgumentException | ParseException e) { + e.printStackTrace(); + logger.warn("sdp解析错误"); + e.printStackTrace(); + } catch (SdpParseException e) { + e.printStackTrace(); + } catch (SdpException e) { + e.printStackTrace(); + } + } + + /** + * 安排推流 + */ + + private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, + CallIdHeader callIdHeader, MediaServerItem mediaServerItem, + int port, Boolean tcpActive, boolean mediaTransmissionTCP, + String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { + // 推流 + if (streamPushItem.getServerId().equals(userSetting.getServerId())) { + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + if (streamReady) { + // 自平台内容 + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + gbStream.getApp(), gbStream.getStream(), channelId, + mediaTransmissionTCP); + + if (sendRtpItem == null) { + logger.warn("服务器端口资源不足"); + responseAck(evt, Response.BUSY_HERE); + return; + } + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + sendRtpItem.setPlayType(InviteStreamType.PUSH); + // 写入redis, 超时时回复 + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(callIdHeader.getCallId()); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); + } else { + // 不在线 拉起 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + + } else { + // 其他平台内容 + otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + + } + + /** + * 通知流上线 + */ + private void notifyStreamOnline(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, + CallIdHeader callIdHeader, MediaServerItem mediaServerItem, + int port, Boolean tcpActive, boolean mediaTransmissionTCP, + String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { + if ("proxy".equals(gbStream.getStreamType())) { + // TODO 控制启用以使设备上线 + logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流", gbStream.getApp(), gbStream.getStream()); + responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline"); + } else if ("push".equals(gbStream.getStreamType())) { + if (!platform.isStartOfflinePush()) { + responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable"); + return; + } + // 发送redis消息以使设备上线 + logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream()); + + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, + gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(), + platform.getName(), null, gbStream.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + // 设置超时 + dynamicTask.startDelay(callIdHeader.getCallId(), () -> { + logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream()); + try { + mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId()); + responseAck(evt, Response.REQUEST_TIMEOUT); // 超时 + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + }, userSetting.getPlatformPlayTimeout()); + // 添加监听 + int finalPort = port; + Boolean finalTcpActive = tcpActive; + + // 添加在本机上线的通知 + mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> { + dynamicTask.stop(callIdHeader.getCallId()); + if (serverId.equals(userSetting.getServerId())) { + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, + app, stream, channelId, mediaTransmissionTCP); + + if (sendRtpItem == null) { + logger.warn("服务器端口资源不足"); + try { + responseAck(evt, Response.BUSY_HERE); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + return; + } + if (finalTcpActive != null) { + sendRtpItem.setTcpActive(finalTcpActive); + } + sendRtpItem.setPlayType(InviteStreamType.PUSH); + // 写入redis, 超时时回复 + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(callIdHeader.getCallId()); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); + } else { + // 其他平台内容 + otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + }); + } + } + + /** + * 来自其他wvp的推流 + */ + private void otherWvpPushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, + CallIdHeader callIdHeader, MediaServerItem mediaServerItem, + int port, Boolean tcpActive, boolean mediaTransmissionTCP, + String channelId, String addressStr, String ssrc, String requesterId) { + logger.info("[级联点播]直播流来自其他平台,发送redis消息"); + // 发送redis消息 + redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(), + streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId, + channelId, mediaTransmissionTCP, null, responseSendItemMsg -> { + SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem(); + if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) { + logger.warn("服务器端口资源不足"); + try { + responseAck(evt, Response.BUSY_HERE); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + return; + } + // 收到sendItem + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + sendRtpItem.setPlayType(InviteStreamType.PUSH); + // 写入redis, 超时时回复 + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(callIdHeader.getCallId()); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendStreamAck(responseSendItemMsg.getMediaServerItem(), sendRtpItem, platform, evt); + }, (wvpResult) -> { + try { + // 错误 + if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) { + // 离线 + // 查询是否在本机上线了 + StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream()); + if (currentStreamPushItem.isStatus()) { + // 在线状态 + pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + + } else { + // 不在线 拉起 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + } + } catch (InvalidArgumentException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } catch (SipException e) { + throw new RuntimeException(e); + } - } + try { + responseAck(evt, Response.BUSY_HERE); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + return; + }); + } - } + public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt) { - } catch (SipException | InvalidArgumentException | ParseException e) { - e.printStackTrace(); - logger.warn("sdp解析错误"); - e.printStackTrace(); - } catch (SdpParseException e) { - e.printStackTrace(); - } catch (SdpException e) { - e.printStackTrace(); - } - } + StringBuffer content = new StringBuffer(200); + content.append("v=0\r\n"); + content.append("o=" + sendRtpItem.getChannelId() + " 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); + content.append("s=Play\r\n"); + content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); + content.append("t=0 0\r\n"); + content.append("m=video " + sendRtpItem.getLocalPort() + " RTP/AVP 96\r\n"); + content.append("a=sendonly\r\n"); + content.append("a=rtpmap:96 PS/90000\r\n"); + if (sendRtpItem.isTcp()) { + content.append("a=connection:new\r\n"); + if (!sendRtpItem.isTcpActive()) { + content.append("a=setup:active\r\n"); + } else { + content.append("a=setup:passive\r\n"); + } + } + content.append("y=" + sendRtpItem.getSsrc() + "\r\n"); + content.append("f=\r\n"); - public void sendStreamAck(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem, ParentPlatform platform, RequestEvent evt){ + try { + responseSdpAck(evt, content.toString(), platform); + } catch (SipException e) { + e.printStackTrace(); + } catch (InvalidArgumentException e) { + e.printStackTrace(); + } catch (ParseException e) { + e.printStackTrace(); + } + } - StringBuffer content = new StringBuffer(200); - content.append("v=0\r\n"); - content.append("o="+ sendRtpItem.getChannelId() +" 0 0 IN IP4 "+ mediaServerItem.getSdpIp()+"\r\n"); - content.append("s=Play\r\n"); - content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); - content.append("t=0 0\r\n"); - content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n"); - content.append("a=sendonly\r\n"); - content.append("a=rtpmap:96 PS/90000\r\n"); - if (sendRtpItem.isTcp()) { - content.append("a=connection:new\r\n"); - if (!sendRtpItem.isTcpActive()) { - content.append("a=setup:active\r\n"); - }else { - content.append("a=setup:passive\r\n"); - } - } - content.append("y="+ sendRtpItem.getSsrc() + "\r\n"); - content.append("f=\r\n"); + public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { - try { - responseSdpAck(evt, content.toString(), platform); - } catch (SipException e) { - e.printStackTrace(); - } catch (InvalidArgumentException e) { - e.printStackTrace(); - } catch (ParseException e) { - e.printStackTrace(); - } - } + // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) + Device device = redisCatchStorage.getDevice(requesterId); + Request request = evt.getRequest(); + if (device != null) { + logger.info("收到设备" + requesterId + "的语音广播Invite请求"); + responseAck(evt, Response.TRYING); - public void inviteFromDeviceHandle(RequestEvent evt, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException { + String contentString = new String(request.getRawContent()); + // jainSip不支持y=字段, 移除移除以解析。 + String substring = contentString; + String ssrc = "0000000404"; + int ssrcIndex = contentString.indexOf("y="); + if (ssrcIndex > 0) { + substring = contentString.substring(0, ssrcIndex); + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + } + ssrcIndex = substring.indexOf("f="); + if (ssrcIndex > 0) { + substring = contentString.substring(0, ssrcIndex); + } + SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); - // 非上级平台请求,查询是否设备请求(通常为接收语音广播的设备) - Device device = redisCatchStorage.getDevice(requesterId); - Request request = evt.getRequest(); - if (device != null) { - logger.info("收到设备" + requesterId + "的语音广播Invite请求"); - responseAck(evt, Response.TRYING); + // 获取支持的格式 + Vector mediaDescriptions = sdp.getMediaDescriptions(true); + // 查看是否支持PS 负载96 + int port = -1; + //boolean recvonly = false; + boolean mediaTransmissionTCP = false; + Boolean tcpActive = null; + for (int i = 0; i < mediaDescriptions.size(); i++) { + MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i); + Media media = mediaDescription.getMedia(); - String contentString = new String(request.getRawContent()); - // jainSip不支持y=字段, 移除移除以解析。 - String substring = contentString; - String ssrc = "0000000404"; - int ssrcIndex = contentString.indexOf("y="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - } - ssrcIndex = substring.indexOf("f="); - if (ssrcIndex > 0) { - substring = contentString.substring(0, ssrcIndex); - } - SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring); + Vector mediaFormats = media.getMediaFormats(false); + if (mediaFormats.contains("8")) { + port = media.getMediaPort(); + String protocol = media.getProtocol(); + // 区分TCP发流还是udp, 当前默认udp + if ("TCP/RTP/AVP".equals(protocol)) { + String setup = mediaDescription.getAttribute("setup"); + if (setup != null) { + mediaTransmissionTCP = true; + if ("active".equals(setup)) { + tcpActive = true; + } else if ("passive".equals(setup)) { + tcpActive = false; + } + } + } + break; + } + } + if (port == -1) { + logger.info("不支持的媒体格式,返回415"); + // 回复不支持的格式 + responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 + return; + } + String username = sdp.getOrigin().getUsername(); + String addressStr = sdp.getOrigin().getAddress(); + logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); - // 获取支持的格式 - Vector mediaDescriptions = sdp.getMediaDescriptions(true); - // 查看是否支持PS 负载96 - int port = -1; - //boolean recvonly = false; - boolean mediaTransmissionTCP = false; - Boolean tcpActive = null; - for (int i = 0; i < mediaDescriptions.size(); i++) { - MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i); - Media media = mediaDescription.getMedia(); - - Vector mediaFormats = media.getMediaFormats(false); - if (mediaFormats.contains("8")) { - port = media.getMediaPort(); - String protocol = media.getProtocol(); - // 区分TCP发流还是udp, 当前默认udp - if ("TCP/RTP/AVP".equals(protocol)) { - String setup = mediaDescription.getAttribute("setup"); - if (setup != null) { - mediaTransmissionTCP = true; - if ("active".equals(setup)) { - tcpActive = true; - } else if ("passive".equals(setup)) { - tcpActive = false; - } - } - } - break; - } - } - if (port == -1) { - logger.info("不支持的媒体格式,返回415"); - // 回复不支持的格式 - responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式,发415 - return; - } - String username = sdp.getOrigin().getUsername(); - String addressStr = sdp.getOrigin().getAddress(); - logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}", username, addressStr, port, ssrc); - - } else { - logger.warn("来自无效设备/平台的请求"); - responseAck(evt, Response.BAD_REQUEST); - } - } + } else { + logger.warn("来自无效设备/平台的请求"); + responseAck(evt, Response.BAD_REQUEST); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index cebc2eda..e9143392 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -41,7 +41,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen private final Logger logger = LoggerFactory.getLogger(RegisterRequestProcessor.class); - public String method = "REGISTER"; + public final String method = "REGISTER"; @Autowired private SipConfig sipConfig; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 87adc3ef..57e80454 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -40,9 +40,7 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { private Logger logger = LoggerFactory.getLogger(RecordInfoResponseMessageHandler.class); - public static volatile List threadNameList = new ArrayList(); private final String cmdType = "RecordInfo"; - private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java index 64933b80..ff63fad0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/ByeResponseProcessor.java @@ -17,7 +17,7 @@ import javax.sip.ResponseEvent; @Component public class ByeResponseProcessor extends SIPResponseProcessorAbstract { - private String method = "BYE"; + private final String method = "BYE"; @Autowired private SipLayer sipLayer; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java index 80d7e2b2..775beeb6 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/CancelResponseProcessor.java @@ -17,7 +17,7 @@ import javax.sip.ResponseEvent; @Component public class CancelResponseProcessor extends SIPResponseProcessorAbstract { - private String method = "CANCEL"; + private final String method = "CANCEL"; @Autowired private SipLayer sipLayer; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java index c81aabb6..89958e92 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/InviteResponseProcessor.java @@ -31,7 +31,7 @@ import java.text.ParseException; public class InviteResponseProcessor extends SIPResponseProcessorAbstract { private final static Logger logger = LoggerFactory.getLogger(InviteResponseProcessor.class); - private String method = "INVITE"; + private final String method = "INVITE"; @Autowired private SipLayer sipLayer; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java index 02f5e1d0..f3a9f65f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java @@ -27,7 +27,7 @@ import javax.sip.message.Response; public class RegisterResponseProcessor extends SIPResponseProcessorAbstract { private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class); - private String method = "REGISTER"; + private final String method = "REGISTER"; @Autowired private ISIPCommanderForPlatform sipCommanderForPlatform; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 528609d7..4ea9cf14 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -397,21 +397,22 @@ public class ZLMHttpHookListener { if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal() || item.getOriginType() == OriginType.RTMP_PUSH.ordinal() || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) { - streamPushItem = zlmMediaListManager.addPush(item); + item.setSeverId(userSetting.getServerId()); + zlmMediaListManager.addPush(item); } - List gbStreams = new ArrayList<>(); - if (streamPushItem == null || streamPushItem.getGbId() == null) { - GbStream gbStream = storager.getGbStream(app, streamId); - gbStreams.add(gbStream); - }else { - if (streamPushItem.getGbId() != null) { - gbStreams.add(streamPushItem); - } - } - if (gbStreams.size() > 0) { +// List gbStreams = new ArrayList<>(); +// if (streamPushItem == null || streamPushItem.getGbId() == null) { +// GbStream gbStream = storager.getGbStream(app, streamId); +// gbStreams.add(gbStream); +// }else { +// if (streamPushItem.getGbId() != null) { +// gbStreams.add(streamPushItem); +// } +// } +// if (gbStreams.size() > 0) { // eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON); - } +// } }else { // 兼容流注销时类型从redis记录获取 diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java index 9beac16f..959c06ec 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java @@ -24,6 +24,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; +/** + * @author lin + */ @Component public class ZLMMediaListManager { @@ -147,7 +150,6 @@ public class ZLMMediaListManager { } } } - // StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(transform.getApp(), transform.getStream()); List gbStreamList = gbStreamMapper.selectByGBId(transform.getGbId()); if (gbStreamList != null && gbStreamList.size() == 1) { transform.setGbStreamId(gbStreamList.get(0).getGbStreamId()); @@ -162,13 +164,12 @@ public class ZLMMediaListManager { } if (transform != null) { if (channelOnlineEvents.get(transform.getGbId()) != null) { - channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream()); + channelOnlineEvents.get(transform.getGbId()).run(transform.getApp(), transform.getStream(), transform.getServerId()); channelOnlineEvents.remove(transform.getGbId()); } } } - storager.updateMedia(transform); return transform; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index bd3dcb14..210aa72e 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import org.slf4j.Logger; @@ -20,6 +21,9 @@ public class ZLMRTPServerFactory { @Autowired private ZLMRESTfulUtils zlmresTfulUtils; + @Autowired + private UserSetting userSetting; + private int[] portRangeArray = new int[2]; public int getFreePort(MediaServerItem mediaServerItem, int startPort, int endPort, List usedFreelist) { @@ -197,6 +201,7 @@ public class ZLMRTPServerFactory { sendRtpItem.setTcp(tcp); sendRtpItem.setApp("rtp"); sendRtpItem.setLocalPort(localPort); + sendRtpItem.setServerId(userSetting.getServerId()); sendRtpItem.setMediaServerId(serverItem.getId()); return sendRtpItem; } @@ -238,6 +243,7 @@ public class ZLMRTPServerFactory { sendRtpItem.setChannelId(channelId); sendRtpItem.setTcp(tcp); sendRtpItem.setLocalPort(localPort); + sendRtpItem.setServerId(userSetting.getServerId()); sendRtpItem.setMediaServerId(serverItem.getId()); return sendRtpItem; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java index ba7daecd..21e6ca03 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/ChannelOnlineEvent.java @@ -1,6 +1,9 @@ package com.genersoft.iot.vmp.media.zlm.dto; +/** + * @author lin + */ public interface ChannelOnlineEvent { - void run(String app, String stream); + void run(String app, String stream, String serverId); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java index ad158ec7..8abac5b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java @@ -61,10 +61,15 @@ public class MediaItem { private String originUrl; /** - * 服务器id + * 流媒体服务器id */ private String mediaServerId; + /** + * 服务器id + */ + private String severId; + /** * GMT unix系统时间戳,单位秒 */ @@ -414,4 +419,12 @@ public class MediaItem { public void setStreamInfo(StreamInfo streamInfo) { this.streamInfo = streamInfo; } + + public String getSeverId() { + return severId; + } + + public void setSeverId(String severId) { + this.severId = severId; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java index 81c9c768..ceb48b3f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java @@ -81,6 +81,11 @@ public class StreamPushItem extends GbStream implements Comparable gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java index 3ab1b80d..6dcc515f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java @@ -1,7 +1,10 @@ package com.genersoft.iot.vmp.service.bean; +import java.util.stream.Stream; + /** * 当上级平台 + * @author lin */ public class MessageForPushChannel { /** @@ -45,6 +48,20 @@ public class MessageForPushChannel { */ private String mediaServerId; + public static MessageForPushChannel getInstance(int type, String app, String stream, String gbId, + String platFormId, String platFormName, String serverId, + String mediaServerId){ + MessageForPushChannel messageForPushChannel = new MessageForPushChannel(); + messageForPushChannel.setType(type); + messageForPushChannel.setGbId(gbId); + messageForPushChannel.setApp(app); + messageForPushChannel.setStream(stream); + messageForPushChannel.setMediaServerId(mediaServerId); + messageForPushChannel.setPlatFormId(platFormId); + messageForPushChannel.setPlatFormName(platFormName); + return messageForPushChannel; + } + public int getType() { return type; diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java new file mode 100644 index 00000000..5827d013 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestPushStreamMsg.java @@ -0,0 +1,170 @@ +package com.genersoft.iot.vmp.service.bean; + +/** + * redis消息:请求下级推送流信息 + * @author lin + */ +public class RequestPushStreamMsg { + + + /** + * 下级服务ID + */ + private String mediaServerId; + + /** + * 流ID + */ + private String app; + + /** + * 应用名 + */ + private String stream; + + /** + * 目标IP + */ + private String ip; + + /** + * 目标端口 + */ + private int port; + + /** + * ssrc + */ + private String ssrc; + + /** + * 是否使用TCP方式 + */ + private boolean tcp; + + /** + * 本地使用的端口 + */ + private int srcPort; + + /** + * 发送时,rtp的pt(uint8_t),不传时默认为96 + */ + private int pt; + + /** + * 发送时,rtp的负载类型。为true时,负载为ps;为false时,为es; + */ + private boolean ps; + + /** + * 是否只有音频 + */ + private boolean onlyAudio; + + + public static RequestPushStreamMsg getInstance(String mediaServerId, String app, String stream, String ip, int port, String ssrc, + boolean tcp, int srcPort, int pt, boolean ps, boolean onlyAudio) { + RequestPushStreamMsg requestPushStreamMsg = new RequestPushStreamMsg(); + requestPushStreamMsg.setMediaServerId(mediaServerId); + requestPushStreamMsg.setApp(app); + requestPushStreamMsg.setStream(stream); + requestPushStreamMsg.setIp(ip); + requestPushStreamMsg.setPort(port); + requestPushStreamMsg.setSsrc(ssrc); + requestPushStreamMsg.setTcp(tcp); + requestPushStreamMsg.setSrcPort(srcPort); + requestPushStreamMsg.setPt(pt); + requestPushStreamMsg.setPs(ps); + requestPushStreamMsg.setOnlyAudio(onlyAudio); + return requestPushStreamMsg; + } + + public String getMediaServerId() { + return mediaServerId; + } + + public void setMediaServerId(String mediaServerId) { + this.mediaServerId = mediaServerId; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getSsrc() { + return ssrc; + } + + public void setSsrc(String ssrc) { + this.ssrc = ssrc; + } + + public boolean isTcp() { + return tcp; + } + + public void setTcp(boolean tcp) { + this.tcp = tcp; + } + + public int getSrcPort() { + return srcPort; + } + + public void setSrcPort(int srcPort) { + this.srcPort = srcPort; + } + + public int getPt() { + return pt; + } + + public void setPt(int pt) { + this.pt = pt; + } + + public boolean isPs() { + return ps; + } + + public void setPs(boolean ps) { + this.ps = ps; + } + + public boolean isOnlyAudio() { + return onlyAudio; + } + + public void setOnlyAudio(boolean onlyAudio) { + this.onlyAudio = onlyAudio; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java new file mode 100644 index 00000000..66689fa6 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RequestSendItemMsg.java @@ -0,0 +1,173 @@ +package com.genersoft.iot.vmp.service.bean; + +/** + * redis消息:请求下级回复推送信息 + * @author lin + */ +public class RequestSendItemMsg { + + /** + * 下级服务ID + */ + private String serverId; + + /** + * 下级服务ID + */ + private String mediaServerId; + + /** + * 流ID + */ + private String app; + + /** + * 应用名 + */ + private String stream; + + /** + * 目标IP + */ + private String ip; + + /** + * 目标端口 + */ + private int port; + + /** + * ssrc + */ + private String ssrc; + + /** + * 平台国标编号 + */ + private String platformId; + + /** + * 平台名称 + */ + private String platformName; + + /** + * 通道ID + */ + private String channelId; + + + /** + * 是否使用TCP + */ + private Boolean isTcp; + + + + + public static RequestSendItemMsg getInstance(String serverId, String mediaServerId, String app, String stream, String ip, int port, + String ssrc, String platformId, String channelId, Boolean isTcp, String platformName) { + RequestSendItemMsg requestSendItemMsg = new RequestSendItemMsg(); + requestSendItemMsg.setServerId(serverId); + requestSendItemMsg.setMediaServerId(mediaServerId); + requestSendItemMsg.setApp(app); + requestSendItemMsg.setStream(stream); + requestSendItemMsg.setIp(ip); + requestSendItemMsg.setPort(port); + requestSendItemMsg.setSsrc(ssrc); + requestSendItemMsg.setPlatformId(platformId); + requestSendItemMsg.setPlatformName(platformName); + requestSendItemMsg.setChannelId(channelId); + requestSendItemMsg.setTcp(isTcp); + + return requestSendItemMsg; + } + + public String getServerId() { + return serverId; + } + + public void setServerId(String serverId) { + this.serverId = serverId; + } + + public String getMediaServerId() { + return mediaServerId; + } + + public void setMediaServerId(String mediaServerId) { + this.mediaServerId = mediaServerId; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getSsrc() { + return ssrc; + } + + public void setSsrc(String ssrc) { + this.ssrc = ssrc; + } + + public String getPlatformId() { + return platformId; + } + + public void setPlatformId(String platformId) { + this.platformId = platformId; + } + + public String getPlatformName() { + return platformName; + } + + public void setPlatformName(String platformName) { + this.platformName = platformName; + } + + public String getChannelId() { + return channelId; + } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + + public Boolean getTcp() { + return isTcp; + } + + public void setTcp(Boolean tcp) { + isTcp = tcp; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java new file mode 100644 index 00000000..501621bb --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/ResponseSendItemMsg.java @@ -0,0 +1,31 @@ +package com.genersoft.iot.vmp.service.bean; + +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; + +/** + * redis消息:下级回复推送信息 + * @author lin + */ +public class ResponseSendItemMsg { + + private SendRtpItem sendRtpItem; + + private MediaServerItem mediaServerItem; + + public SendRtpItem getSendRtpItem() { + return sendRtpItem; + } + + public void setSendRtpItem(SendRtpItem sendRtpItem) { + this.sendRtpItem = sendRtpItem; + } + + public MediaServerItem getMediaServerItem() { + return mediaServerItem; + } + + public void setMediaServerItem(MediaServerItem mediaServerItem) { + this.mediaServerItem = mediaServerItem; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java new file mode 100644 index 00000000..12d79cb2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsg.java @@ -0,0 +1,116 @@ +package com.genersoft.iot.vmp.service.bean; + +/** + * @author lin + */ +public class WvpRedisMsg { + + public static WvpRedisMsg getInstance(String fromId, String toId, String type, String cmd, String serial, String content){ + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); + wvpRedisMsg.setFromId(fromId); + wvpRedisMsg.setToId(toId); + wvpRedisMsg.setType(type); + wvpRedisMsg.setCmd(cmd); + wvpRedisMsg.setSerial(serial); + wvpRedisMsg.setContent(content); + return wvpRedisMsg; + } + + private String fromId; + + private String toId; + /** + * req 请求, res 回复 + */ + private String type; + private String cmd; + + /** + * 消息的ID + */ + private String serial; + private Object content; + + private final static String requestTag = "req"; + private final static String responseTag = "res"; + + public static WvpRedisMsg getRequestInstance(String fromId, String toId, String cmd, String serial, Object content) { + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); + wvpRedisMsg.setType(requestTag); + wvpRedisMsg.setFromId(fromId); + wvpRedisMsg.setToId(toId); + wvpRedisMsg.setCmd(cmd); + wvpRedisMsg.setSerial(serial); + wvpRedisMsg.setContent(content); + return wvpRedisMsg; + } + + public static WvpRedisMsg getResponseInstance() { + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); + wvpRedisMsg.setType(responseTag); + return wvpRedisMsg; + } + + public static WvpRedisMsg getResponseInstance(String fromId, String toId, String cmd, String serial, Object content) { + WvpRedisMsg wvpRedisMsg = new WvpRedisMsg(); + wvpRedisMsg.setType(responseTag); + wvpRedisMsg.setFromId(fromId); + wvpRedisMsg.setToId(toId); + wvpRedisMsg.setCmd(cmd); + wvpRedisMsg.setSerial(serial); + wvpRedisMsg.setContent(content); + return wvpRedisMsg; + } + + public static boolean isRequest(WvpRedisMsg wvpRedisMsg) { + return requestTag.equals(wvpRedisMsg.getType()); + } + + public String getSerial() { + return serial; + } + + public void setSerial(String serial) { + this.serial = serial; + } + + public String getFromId() { + return fromId; + } + + public void setFromId(String fromId) { + this.fromId = fromId; + } + + public String getToId() { + return toId; + } + + public void setToId(String toId) { + this.toId = toId; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getCmd() { + return cmd; + } + + public void setCmd(String cmd) { + this.cmd = cmd; + } + + public Object getContent() { + return content; + } + + public void setContent(Object content) { + this.content = content; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java new file mode 100644 index 00000000..cb118865 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/WvpRedisMsgCmd.java @@ -0,0 +1,12 @@ +package com.genersoft.iot.vmp.service.bean; + +/** + * @author lin + */ + +public class WvpRedisMsgCmd { + + public static final String GET_SEND_ITEM = "GetSendItem"; + public static final String REQUEST_PUSH_STREAM = "RequestPushStream"; + +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java new file mode 100644 index 00000000..638ea41d --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java @@ -0,0 +1,377 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.conf.DynamicTask; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; +import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.bean.*; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import com.genersoft.iot.vmp.vmanager.bean.WVPResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + +import javax.sip.InvalidArgumentException; +import javax.sip.SipException; +import java.text.ParseException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * 监听下级发送推送信息,并发送国标推流消息上级 + * @author lin + */ +@Component +public class RedisGbPlayMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisGbPlayMsgListener.class); + + public static final String WVP_PUSH_STREAM_KEY = "WVP_PUSH_STREAM"; + + /** + * 流媒体不存在的错误玛 + */ + public static final int ERROR_CODE_MEDIA_SERVER_NOT_FOUND = -1; + + /** + * 离线的错误玛 + */ + public static final int ERROR_CODE_OFFLINE = -2; + + /** + * 超时的错误玛 + */ + public static final int ERROR_CODE_TIMEOUT = -3; + + private Map callbacks = new ConcurrentHashMap<>(); + private Map callbacksForStartSendRtpStream = new ConcurrentHashMap<>(); + private Map callbacksForError = new ConcurrentHashMap<>(); + + @Autowired + private UserSetting userSetting; + + @Autowired + private RedisUtil redis; + + @Autowired + private ZLMMediaListManager zlmMediaListManager; + + @Autowired + private ZLMRTPServerFactory zlmrtpServerFactory; + + @Autowired + private IMediaServerService mediaServerService; + + @Autowired + private IRedisCatchStorage redisCatchStorage; + + @Autowired + private DynamicTask dynamicTask; + + @Autowired + private ZLMMediaListManager mediaListManager; + + @Autowired + private ZLMHttpHookSubscribe subscribe; + + + public interface PlayMsgCallback{ + void handler(ResponseSendItemMsg responseSendItemMsg); + } + + public interface PlayMsgCallbackForStartSendRtpStream{ + void handler(JSONObject jsonObject); + } + + public interface PlayMsgErrorCallback{ + void handler(WVPResult wvpResult); + } + + @Override + public void onMessage(Message message, byte[] bytes) { + JSONObject msgJSON = JSON.parseObject(message.getBody(), JSONObject.class); + WvpRedisMsg wvpRedisMsg = JSON.toJavaObject(msgJSON, WvpRedisMsg.class); + if (!userSetting.getServerId().equals(wvpRedisMsg.getToId())) { + return; + } + if (WvpRedisMsg.isRequest(wvpRedisMsg)) { + logger.info("[收到REDIS通知] 请求: {}", new String(message.getBody())); + + switch (wvpRedisMsg.getCmd()){ + case WvpRedisMsgCmd.GET_SEND_ITEM: + RequestSendItemMsg content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestSendItemMsg.class); + requestSendItemMsgHand(content, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: + RequestPushStreamMsg param = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), RequestPushStreamMsg.class);; + requestPushStreamMsgHand(param, wvpRedisMsg.getFromId(), wvpRedisMsg.getSerial()); + break; + default: + break; + } + + }else { + logger.info("[收到REDIS通知] 回复: {}", new String(message.getBody())); + switch (wvpRedisMsg.getCmd()){ + case WvpRedisMsgCmd.GET_SEND_ITEM: + + WVPResult content = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); + + String key = wvpRedisMsg.getSerial(); + switch (content.getCode()) { + case 0: + ResponseSendItemMsg responseSendItemMsg =JSON.toJavaObject((JSONObject)content.getData(), ResponseSendItemMsg.class); + PlayMsgCallback playMsgCallback = callbacks.get(key); + if (playMsgCallback != null) { + callbacksForError.remove(key); + playMsgCallback.handler(responseSendItemMsg); + } + break; + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: + case ERROR_CODE_OFFLINE: + case ERROR_CODE_TIMEOUT: + PlayMsgErrorCallback errorCallback = callbacksForError.get(key); + if (errorCallback != null) { + callbacks.remove(key); + errorCallback.handler(content); + } + break; + default: + break; + } + break; + case WvpRedisMsgCmd.REQUEST_PUSH_STREAM: + WVPResult wvpResult = JSON.toJavaObject((JSONObject)wvpRedisMsg.getContent(), WVPResult.class); + String serial = wvpRedisMsg.getSerial(); + switch (wvpResult.getCode()) { + case 0: + JSONObject jsonObject = (JSONObject)wvpResult.getData(); + PlayMsgCallbackForStartSendRtpStream playMsgCallback = callbacksForStartSendRtpStream.get(serial); + if (playMsgCallback != null) { + callbacksForError.remove(serial); + playMsgCallback.handler(jsonObject); + } + break; + case ERROR_CODE_MEDIA_SERVER_NOT_FOUND: + case ERROR_CODE_OFFLINE: + case ERROR_CODE_TIMEOUT: + PlayMsgErrorCallback errorCallback = callbacksForError.get(serial); + if (errorCallback != null) { + callbacks.remove(serial); + errorCallback.handler(wvpResult); + } + break; + default: + break; + } + break; + default: + break; + } + } + + + + + } + + /** + * 处理收到的请求推流的请求 + */ + private void requestPushStreamMsgHand(RequestPushStreamMsg requestPushStreamMsg, String fromId, String serial) { + MediaServerItem mediaInfo = mediaServerService.getOne(requestPushStreamMsg.getMediaServerId()); + if (mediaInfo == null) { + // TODO 回复错误 + return; + } + String is_Udp = requestPushStreamMsg.isTcp() ? "0" : "1"; + Map param = new HashMap<>(); + param.put("vhost","__defaultVhost__"); + param.put("app",requestPushStreamMsg.getApp()); + param.put("stream",requestPushStreamMsg.getStream()); + param.put("ssrc", requestPushStreamMsg.getSsrc()); + param.put("dst_url",requestPushStreamMsg.getIp()); + param.put("dst_port", requestPushStreamMsg.getPort()); + param.put("is_udp", is_Udp); + param.put("src_port", requestPushStreamMsg.getSrcPort()); + param.put("pt", requestPushStreamMsg.getPt()); + param.put("use_ps", requestPushStreamMsg.isPs() ? "1" : "0"); + param.put("only_audio", requestPushStreamMsg.isOnlyAudio() ? "1" : "0"); + JSONObject jsonObject = zlmrtpServerFactory.startSendRtpStream(mediaInfo, param); + // 回复消息 + responsePushStream(jsonObject, fromId, serial); + } + + private void responsePushStream(JSONObject content, String toId, String serial) { + + WVPResult result = new WVPResult<>(); + result.setCode(0); + result.setData(content); + + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, serial, result); + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + /** + * 处理收到的请求sendItem的请求 + */ + private void requestSendItemMsgHand(RequestSendItemMsg content, String toId, String serial) { + MediaServerItem mediaServerItem = mediaServerService.getOne(content.getMediaServerId()); + if (mediaServerItem == null) { + logger.info("[回复推流信息] 流媒体{}不存在 ", content.getMediaServerId()); + + WVPResult result = new WVPResult<>(); + result.setCode(ERROR_CODE_MEDIA_SERVER_NOT_FOUND); + result.setMsg("流媒体不存在"); + + WvpRedisMsg response = WvpRedisMsg.getResponseInstance(userSetting.getServerId(), toId, + WvpRedisMsgCmd.GET_SEND_ITEM, serial, result); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + return; + } + // 确定流是否在线 + boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, content.getApp(), content.getStream()); + if (streamReady) { + logger.info("[回复推流信息] {}/{}", content.getApp(), content.getStream()); + responseSendItem(mediaServerItem, content, toId, serial); + }else { + // 流已经离线 + // 发送redis消息以使设备上线 + logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流",content.getApp(), content.getStream()); + + String taskKey = UUID.randomUUID().toString(); + // 设置超时 + dynamicTask.startDelay(taskKey, ()->{ + logger.info("[ app={}, stream={} ] 等待设备开始推流超时", content.getApp(), content.getStream()); + WVPResult result = new WVPResult<>(); + result.setCode(ERROR_CODE_TIMEOUT); + WvpRedisMsg response = WvpRedisMsg.getResponseInstance( + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result + ); + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + }, userSetting.getPlatformPlayTimeout()); + + // 添加订阅 + JSONObject subscribeKey = new JSONObject(); + subscribeKey.put("app", content.getApp()); + subscribeKey.put("stream", content.getStream()); + subscribeKey.put("regist", true); + subscribeKey.put("schema", "rtmp"); + subscribeKey.put("mediaServerId", mediaServerItem.getId()); + subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey, + (MediaServerItem mediaServerItemInUse, JSONObject json)->{ + dynamicTask.stop(taskKey); + responseSendItem(mediaServerItem, content, toId, serial); + }); + + MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1, content.getApp(), content.getStream(), + content.getChannelId(), content.getPlatformId(), content.getPlatformName(), content.getServerId(), + content.getMediaServerId()); + redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel); + + } + } + + /** + * 将获取到的sendItem发送出去 + */ + private void responseSendItem(MediaServerItem mediaServerItem, RequestSendItemMsg content, String toId, String serial) { + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), + content.getPort(), content.getSsrc(), content.getPlatformId(), + content.getApp(), content.getStream(), content.getChannelId(), + content.getTcp()); + + WVPResult result = new WVPResult<>(); + result.setCode(0); + ResponseSendItemMsg responseSendItemMsg = new ResponseSendItemMsg(); + responseSendItemMsg.setSendRtpItem(sendRtpItem); + responseSendItemMsg.setMediaServerItem(mediaServerItem); + result.setData(responseSendItemMsg); + + WvpRedisMsg response = WvpRedisMsg.getResponseInstance( + userSetting.getServerId(), toId, WvpRedisMsgCmd.GET_SEND_ITEM, serial, result + ); + JSONObject jsonObject = (JSONObject)JSON.toJSON(response); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + /** + * 发送消息要求下级生成推流信息 + * @param serverId 下级服务ID + * @param app 应用名 + * @param stream 流ID + * @param ip 目标IP + * @param port 目标端口 + * @param ssrc ssrc + * @param platformId 平台国标编号 + * @param channelId 通道ID + * @param isTcp 是否使用TCP + * @param callback 得到信息的回调 + */ + public void sendMsg(String serverId, String mediaServerId, String app, String stream, String ip, int port, String ssrc, + String platformId, String channelId, boolean isTcp, String platformName, PlayMsgCallback callback, PlayMsgErrorCallback errorCallback) { + RequestSendItemMsg requestSendItemMsg = RequestSendItemMsg.getInstance( + serverId, mediaServerId, app, stream, ip, port, ssrc, platformId, channelId, isTcp, platformName); + requestSendItemMsg.setServerId(serverId); + String key = UUID.randomUUID().toString(); + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, WvpRedisMsgCmd.GET_SEND_ITEM, + key, requestSendItemMsg); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); + logger.info("[请求推流SendItem] {}: {}", serverId, jsonObject); + callbacks.put(key, callback); + callbacksForError.put(key, errorCallback); + dynamicTask.startDelay(key, ()->{ + callbacks.remove(key); + callbacksForError.remove(key); + WVPResult wvpResult = new WVPResult<>(); + wvpResult.setCode(ERROR_CODE_TIMEOUT); + wvpResult.setMsg("timeout"); + errorCallback.handler(wvpResult); + }, userSetting.getPlatformPlayTimeout()); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } + + /** + * 发送请求推流的消息 + * @param param 推流参数 + * @param callback 回调 + */ + public void sendMsgForStartSendRtpStream(String serverId, RequestPushStreamMsg param, PlayMsgCallbackForStartSendRtpStream callback) { + String key = UUID.randomUUID().toString(); + WvpRedisMsg redisMsg = WvpRedisMsg.getRequestInstance(userSetting.getServerId(), serverId, + WvpRedisMsgCmd.REQUEST_PUSH_STREAM, key, param); + + JSONObject jsonObject = (JSONObject)JSON.toJSON(redisMsg); + logger.info("[REDIS 请求其他平台推流] {}: {}", serverId, jsonObject); + dynamicTask.startDelay(key, ()->{ + callbacksForStartSendRtpStream.remove(key); + callbacksForError.remove(key); + }, userSetting.getPlatformPlayTimeout()); + callbacksForStartSendRtpStream.put(key, callback); + callbacksForError.put(key, (wvpResult)->{ + logger.info("[REDIS 请求其他平台推流] 失败: {}", wvpResult.getMsg()); + callbacksForStartSendRtpStream.remove(key); + callbacksForError.remove(key); + }); + redis.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java similarity index 66% rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java rename to src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java index c688d13f..238aafdb 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGPSMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson.JSON; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; +import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -10,17 +11,23 @@ import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; +/** + * 接收来自redis的GPS更新通知 + * @author lin + */ @Component -public class RedisGPSMsgListener implements MessageListener { +public class RedisGpsMsgListener implements MessageListener { - private final static Logger logger = LoggerFactory.getLogger(RedisGPSMsgListener.class); + private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class); @Autowired private IRedisCatchStorage redisCatchStorage; @Override - public void onMessage(Message message, byte[] bytes) { - logger.info("收到来自REDIS的GPS通知: {}", new String(message.getBody())); + public void onMessage(@NotNull Message message, byte[] bytes) { + if (logger.isDebugEnabled()) { + logger.debug("收到来自REDIS的GPS通知: {}", new String(message.getBody())); + } GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class); redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java new file mode 100644 index 00000000..07fffdcc --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java @@ -0,0 +1,83 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.AlarmChannelMessage; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; +import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; +import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; +import com.genersoft.iot.vmp.media.zlm.dto.MediaItem; +import com.genersoft.iot.vmp.storager.IVideoManagerStorage; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + + +/** + * @author lin + */ +@Component +public class RedisStreamMsgListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class); + + @Autowired + private ISIPCommander commander; + + @Autowired + private ISIPCommanderForPlatform commanderForPlatform; + + @Autowired + private IVideoManagerStorage storage; + + @Autowired + private UserSetting userSetting; + + @Autowired + private ZLMMediaListManager zlmMediaListManager; + + @Override + public void onMessage(Message message, byte[] bytes) { + + JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class); + if (steamMsgJson == null) { + logger.warn("[REDIS的ALARM通知]消息解析失败"); + return; + } + String serverId = steamMsgJson.getString("serverId"); + + if (userSetting.getServerId().equals(serverId)) { + // 自己发送的消息忽略即可 + return; + } + logger.info("[REDIS通知] 流变化: {}", new String(message.getBody())); + String app = steamMsgJson.getString("app"); + String stream = steamMsgJson.getString("stream"); + boolean register = steamMsgJson.getBoolean("register"); + String mediaServerId = steamMsgJson.getString("mediaServerId"); + MediaItem mediaItem = new MediaItem(); + mediaItem.setSeverId(serverId); + mediaItem.setApp(app); + mediaItem.setStream(stream); + mediaItem.setRegist(register); + mediaItem.setMediaServerId(mediaServerId); + mediaItem.setCreateStamp(System.currentTimeMillis()/1000); + mediaItem.setAliveSecond(0L); + mediaItem.setTotalReaderCount("0"); + mediaItem.setOriginType(0); + mediaItem.setOriginTypeStr("0"); + mediaItem.setOriginTypeStr("unknown"); + + zlmMediaListManager.addPush(mediaItem); + + + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index d710dad7..1e00faa1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -107,6 +107,7 @@ public class StreamPushServiceImpl implements IStreamPushService { streamPushItem.setStatus(true); streamPushItem.setStreamType("push"); streamPushItem.setVhost(item.getVhost()); + streamPushItem.setServerId(item.getSeverId()); return streamPushItem; } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java index d94669b8..6b862802 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java @@ -356,6 +356,15 @@ public interface IVideoManagerStorage { int removeMedia(String app, String stream); + /** + * 获取但个推流 + * @param app + * @param stream + * @return + */ + StreamPushItem getMedia(String app, String stream); + + /** * 清空推流列表 */ diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java index 909d3a8b..ebd34782 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java @@ -14,9 +14,9 @@ import java.util.List; public interface StreamPushMapper { @Insert("INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " + - "createStamp, aliveSecond, mediaServerId) VALUES" + + "createStamp, aliveSecond, mediaServerId, serverId) VALUES" + "('${app}', '${stream}', '${totalReaderCount}', '${originType}', '${originTypeStr}', " + - "'${createStamp}', '${aliveSecond}', '${mediaServerId}' )") + "'${createStamp}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' )") int add(StreamPushItem streamPushItem); @Update("UPDATE stream_push " + diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 39daeda9..5377e23f 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -587,11 +587,11 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { String scanKey = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId() + "_*"; List result = new ArrayList<>(); List keys = redis.scan(scanKey); - for (int i = 0; i < keys.size(); i++) { - String key = (String) keys.get(i); + for (Object o : keys) { + String key = (String) o; GPSMsgInfo gpsMsgInfo = (GPSMsgInfo) redis.get(key); if (!gpsMsgInfo.isStored()) { // 只取没有存过得 - result.add((GPSMsgInfo)redis.get(key)); + result.add((GPSMsgInfo) redis.get(key)); } } @@ -667,7 +667,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void sendStreamPushRequestedMsg(MessageForPushChannel msg) { String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED; - logger.info("[redis 推流被请求通知] {}: {}-{}", key, msg.getApp(), msg.getStream()); + logger.info("[redis 推流被请求通知] {}: {}/{}", key, msg.getApp(), msg.getStream()); redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg)); } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java index 338bb779..ac870f7d 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java @@ -884,6 +884,11 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage { return streamPushMapper.del(app, stream); } + @Override + public StreamPushItem getMedia(String app, String stream) { + return streamPushMapper.selectOne(app, stream); + } + @Override public void clearMediaList() { streamPushMapper.clear(); diff --git a/web_src/src/components/dialog/rtcPlayer.vue b/web_src/src/components/dialog/rtcPlayer.vue index 75c18f33..4737849b 100644 --- a/web_src/src/components/dialog/rtcPlayer.vue +++ b/web_src/src/components/dialog/rtcPlayer.vue @@ -7,11 +7,11 @@