diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index 7a122c77..6d523084 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -99,6 +99,12 @@ public class VideoManagerConstants { */ public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED"; + + /** + * redis 消息通知平台通知设备推流结果 + */ + public static final String VM_MSG_STREAM_PUSH_RESPONSE = "VM_MSG_STREAM_PUSH_RESPONSE"; + /** * redis 消息请求所有的在线通道 */ diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java index 7bdeab46..0b653cf5 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisConfig.java @@ -12,7 +12,6 @@ import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import com.genersoft.iot.vmp.utils.redis.FastJsonRedisSerializer; @@ -43,7 +42,10 @@ public class RedisConfig extends CachingConfigurerSupport { private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener; @Autowired - private RedisPushStreamListMsgListener redisPushStreamListMsgListener; + private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener; + + @Autowired + private RedisPushStreamResponseListener redisPushStreamResponseListener; @Bean public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { @@ -81,7 +83,7 @@ public class RedisConfig extends CachingConfigurerSupport { container.addMessageListener(redisGbPlayMsgListener, new PatternTopic(RedisGbPlayMsgListener.WVP_PUSH_STREAM_KEY)); container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE)); container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE)); + container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE)); return container; } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 163467dc..135afd20 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -694,7 +694,7 @@ public class SIPCommander implements ISIPCommander { dialog = streamSession.getDialogByStream(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); } mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); - mediaServerService.closeRTPServer(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); streamSession.remove(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); if (dialog == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 706d4229..a1f0aae5 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -121,7 +121,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId); if (streamInfo != null) { redisCatchStorage.stopPlay(streamInfo); - mediaServerService.closeRTPServer(device.getDeviceId(), channelId, streamInfo.getStream()); + mediaServerService.closeRTPServer(streamInfo.getMediaServerId(), streamInfo.getStream()); } SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null); if (ssrcTransactionForPlay != null){ diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 82b3ba40..abcffe2c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -24,6 +24,7 @@ import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.impl.RedisGbPlayMsgListener; +import com.genersoft.iot.vmp.service.impl.RedisPushStreamResponseListener; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.utils.DateUtil; @@ -74,7 +75,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private DynamicTask dynamicTask; @Autowired - private SIPCommander cmder; + private RedisPushStreamResponseListener redisPushStreamResponseListener; @Autowired private IPlayService playService; @@ -556,7 +557,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements otherWvpPushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } - } /** * 通知流上线 @@ -639,6 +639,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); } }); + + // 添加回复的拒绝或者错误的通知 + redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> { + if (response.getCode() != 0) { + dynamicTask.stop(callIdHeader.getCallId()); + mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream()); + try { + responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, response.getMsg()); + } catch (SipException e) { + throw new RuntimeException(e); + } catch (InvalidArgumentException e) { + throw new RuntimeException(e); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } + }); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java index 1aafbf6e..d2db8f06 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/CatalogQueryMessageHandler.java @@ -79,8 +79,8 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem List allChannels = new ArrayList<>(); // 回复平台 - DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform); - allChannels.add(deviceChannel); +// DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform); +// allChannels.add(deviceChannel); // 回复目录 if (catalogs.size() > 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 6caff71c..a5b5f19c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -139,6 +139,7 @@ public class ZLMRTPServerFactory { param.put("stream_id", streamId); JSONObject jsonObject = zlmresTfulUtils.closeRtpServer(serverItem, param); if (jsonObject != null ) { + System.out.println(jsonObject); if (jsonObject.getInteger("code") == 0) { result = jsonObject.getInteger("hit") == 1; }else { diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index 55a4005f..0ecc717e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -50,7 +50,9 @@ public interface IMediaServerService { SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port); - void closeRTPServer(String deviceId, String channelId, String ssrc); + void closeRTPServer(MediaServerItem mediaServerItem, String streamId); + + void closeRTPServer(String mediaServerId, String streamId); void clearRTPServer(MediaServerItem mediaServerItem); diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java index 6dcc515f..8491fc5e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannel.java @@ -48,6 +48,8 @@ public class MessageForPushChannel { */ private String mediaServerId; + + public static MessageForPushChannel getInstance(int type, String app, String stream, String gbId, String platFormId, String platFormName, String serverId, String mediaServerId){ diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java new file mode 100644 index 00000000..10d1b43c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/MessageForPushChannelResponse.java @@ -0,0 +1,71 @@ +package com.genersoft.iot.vmp.service.bean; + +/** + * 当redis回复推流结果上级平台 + * @author lin + */ +public class MessageForPushChannelResponse { + /** + * 错误玛 + * 0 成功 1 失败 + */ + private int code; + /** + * 错误内容 + */ + private String msg; + + /** + * 流应用名 + */ + private String app; + + /** + * 流Id + */ + private String stream; + + + + public static MessageForPushChannelResponse getInstance(int code, String msg, String app, String stream){ + MessageForPushChannelResponse messageForPushChannel = new MessageForPushChannelResponse(); + messageForPushChannel.setCode(code); + messageForPushChannel.setMsg(msg); + messageForPushChannel.setApp(app); + messageForPushChannel.setStream(stream); + return messageForPushChannel; + } + + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + public String getApp() { + return app; + } + + public void setApp(String app) { + this.app = app; + } + + public String getStream() { + return stream; + } + + public void setStream(String stream) { + this.stream = stream; + } + + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java index 00e67831..b1f8f2a4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java @@ -145,7 +145,7 @@ public class DeviceServiceImpl implements IDeviceService { if (ssrcTransactions != null && ssrcTransactions.size() > 0) { for (SsrcTransaction ssrcTransaction : ssrcTransactions) { mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); - mediaServerService.closeRTPServer(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); + mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); streamSession.remove(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 702967d6..d00cb421 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -164,16 +164,18 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public void closeRTPServer(String deviceId, String channelId, String stream) { - String mediaServerId = streamSession.getMediaServerId(deviceId, channelId, stream); - String ssrc = streamSession.getSSRC(deviceId, channelId, stream); - MediaServerItem mediaServerItem = this.getOne(mediaServerId); - if (mediaServerItem != null) { - String streamId = String.format("%s_%s", deviceId, channelId); - zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId); - releaseSsrc(mediaServerItem.getId(), ssrc); + public void closeRTPServer(MediaServerItem mediaServerItem, String streamId) { + if (mediaServerItem == null) { + return; } - streamSession.remove(deviceId, channelId, stream); + zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId); + releaseSsrc(mediaServerItem.getId(), streamId); + } + + @Override + public void closeRTPServer(String mediaServerId, String streamId) { + MediaServerItem mediaServerItem = this.getOne(mediaServerId); + closeRTPServer(mediaServerItem, streamId); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index f3e01c97..320b1683 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -270,7 +270,7 @@ public class PlayServiceImpl implements IPlayService { logger.info("[点播超时] 消息未响应 deviceId: {}, channelId: {}", device.getDeviceId(), channelId); timeoutCallback.run(0, "点播超时"); mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); - mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); } }, userSetting.getPlayTimeout()); @@ -333,7 +333,7 @@ public class PlayServiceImpl implements IPlayService { }); } // 关闭rtp server - mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); // 重新开启ssrc server mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort()); @@ -341,7 +341,7 @@ public class PlayServiceImpl implements IPlayService { } }, (event) -> { dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, finalSsrcInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc()); @@ -445,7 +445,7 @@ public class PlayServiceImpl implements IPlayService { cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null); }else { mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); } cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null); @@ -533,7 +533,7 @@ public class PlayServiceImpl implements IPlayService { }); } // 关闭rtp server - mediaServerService.closeRTPServer(device.getDeviceId(), channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); // 重新开启ssrc server mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort()); } @@ -593,7 +593,7 @@ public class PlayServiceImpl implements IPlayService { cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null); }else { mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); streamSession.remove(deviceId, channelId, ssrcInfo.getStream()); } cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java new file mode 100644 index 00000000..56c9ff31 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamResponseListener.java @@ -0,0 +1,62 @@ +package com.genersoft.iot.vmp.service.impl; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent; +import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.service.IGbStreamService; +import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamPushService; +import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; +import com.genersoft.iot.vmp.utils.DateUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 接收redis返回的推流结果 + * @author lin + */ +@Component +public class RedisPushStreamResponseListener implements MessageListener { + + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class); + + private Map responseEvents = new ConcurrentHashMap<>(); + + public interface PushStreamResponseEvent{ + void run(MessageForPushChannelResponse response); + } + + @Override + public void onMessage(Message message, byte[] bytes) { + // + logger.warn("[REDIS消息-请求推流结果]: {}", new String(message.getBody())); + MessageForPushChannelResponse response = JSON.parseObject(new String(message.getBody()), MessageForPushChannelResponse.class); + if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){ + logger.info("[REDIS消息-请求推流结果]:参数不全"); + return; + } + // 查看正在等待的invite消息 + if (responseEvents.get(response.getApp() + response.getStream()) != null) { + responseEvents.get(response.getApp() + response.getStream()).run(response); + } + } + + public void addEvent(String app, String stream, PushStreamResponseEvent callback) { + responseEvents.put(app + stream, callback); + } + + public void removeEvent(String app, String stream) { + responseEvents.remove(app + stream); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java similarity index 94% rename from src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java rename to src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java index 23745eb8..bedbf447 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamListMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusListMsgListener.java @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.utils.DateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; @@ -23,9 +22,9 @@ import java.util.*; * @Description: 接收redis发送的推流设备列表更新通知 */ @Component -public class RedisPushStreamListMsgListener implements MessageListener { +public class RedisPushStreamStatusListMsgListener implements MessageListener { - private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamListMsgListener.class); + private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusListMsgListener.class); @Resource private IMediaServerService mediaServerService;