diff --git a/src/main/java/com/genersoft/iot/vmp/common/CommonCallback.java b/src/main/java/com/genersoft/iot/vmp/common/CommonCallback.java new file mode 100644 index 00000000..819fe0dd --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/common/CommonCallback.java @@ -0,0 +1,5 @@ +package com.genersoft.iot.vmp.common; + +public interface CommonCallback{ + public void run(T t); +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 00e2613b..4f0dc11a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -546,7 +546,7 @@ public class SIPCommander implements ISIPCommander { HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId()); // 添加订阅 CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); - String callId=newCallIdHeader.getCallId(); + String callId= newCallIdHeader.getCallId(); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> { logger.debug("sipc 添加订阅===callId {}",callId); hookEvent.call(new InviteStreamInfo(mediaServerItem, json,callId, "rtp", ssrcInfo.getStream())); @@ -558,7 +558,7 @@ public class SIPCommander implements ISIPCommander { (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd) -> { logger.info("[录像]下载结束, 发送BYE"); try { - streamByeCmd(device, channelId, ssrcInfo.getStream(),callId); + streamByeCmd(device, channelId, ssrcInfo.getStream(), callId); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { logger.error("[录像]下载结束, 发送BYE失败 {}", e.getMessage()); @@ -580,8 +580,6 @@ public class SIPCommander implements ISIPCommander { if (ssrcIndex >= 0) { ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); } - logger.debug("接收到的下载响应ssrc====>{}",ssrcInfo.getSsrc()); - logger.debug("接收到的下载响应ssrc====>{}",ssrc); streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download); okEvent.response(event); }); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index b15003cc..728ff0e3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -12,6 +12,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; +import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory; +import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import gov.nist.javax.sip.message.SIPRequest; @@ -58,6 +61,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i @Autowired private VideoStreamSessionManager sessionManager; + @Autowired + private ZlmHttpHookSubscribe subscribe; + @Override public void afterPropertiesSet() throws Exception { notifyMessageHandler.addHandler(cmdType, this); @@ -93,6 +99,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i } catch (InvalidArgumentException | ParseException | SsrcTransactionNotFoundException | SipException e) { logger.error("[录像流]推送完毕,收到关流通知, 发送BYE失败 {}", e.getMessage()); } + // 去除监听流注销自动停止下载的监听 + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcTransaction.getStream(), false, "rtsp", ssrcTransaction.getMediaServerId()); + subscribe.removeSubscribe(hookSubscribe); // 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题,需要将点播CallId进行上下级绑定 SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, ssrcTransaction.getChannelId(), null, null); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index a2891973..77758a3f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -276,6 +276,10 @@ public class ZLMRESTfulUtils { return sendPost(mediaServerItem, "closeRtpServer",param, null); } + public void closeRtpServer(MediaServerItem mediaServerItem, Map param, RequestCallback callback) { + sendPost(mediaServerItem, "closeRtpServer",param, callback); + } + public JSONObject listRtpServer(MediaServerItem mediaServerItem) { return sendPost(mediaServerItem, "listRtpServer",null, null); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 9bf1a3ad..ce38b10d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.media.zlm.dto.*; @@ -164,6 +165,31 @@ public class ZLMRTPServerFactory { return result; } + public void closeRtpServer(MediaServerItem serverItem, String streamId, CommonCallback callback) { + if (serverItem == null) { + callback.run(false); + return; + } + Map param = new HashMap<>(); + param.put("stream_id", streamId); + zlmresTfulUtils.closeRtpServer(serverItem, param, jsonObject -> { + if (jsonObject != null ) { + if (jsonObject.getInteger("code") == 0) { + callback.run(jsonObject.getInteger("hit") == 1); + return; + }else { + logger.error("关闭RTP Server 失败: " + jsonObject.getString("msg")); + } + }else { + // 检查ZLM状态 + logger.error("关闭RTP Server 失败: 请检查ZLM服务"); + } + callback.run(false); + }); + + + } + /** * 创建一个国标推流 diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index 1233455f..a5b9034d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData; @@ -51,6 +52,8 @@ public interface IMediaServerService { void closeRTPServer(MediaServerItem mediaServerItem, String streamId); + void closeRTPServer(MediaServerItem mediaServerItem, String streamId, CommonCallback callback); + void closeRTPServer(String mediaServerId, String streamId); void clearRTPServer(MediaServerItem mediaServerItem); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 856359cd..5a2db63b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.service.impl; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; @@ -172,6 +173,15 @@ public class MediaServerServiceImpl implements IMediaServerService { zlmrtpServerFactory.closeRtpServer(mediaServerItem, streamId); } + @Override + public void closeRTPServer(MediaServerItem mediaServerItem, String streamId, CommonCallback callback) { + if (mediaServerItem == null) { + callback.run(false); + return; + } + zlmrtpServerFactory.closeRtpServer(mediaServerItem, streamId, callback); + } + @Override public void closeRTPServer(String mediaServerId, String streamId) { MediaServerItem mediaServerItem = this.getOne(mediaServerId); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index f143a15c..6c2ee7ee 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -328,9 +328,30 @@ public class PlayServiceImpl implements IPlayService { }); } // 关闭rtp server - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 重新开启ssrc server - mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream(), result->{ + if (result) { + // 重新开启ssrc server + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, ssrcInfo.getPort()); + }else { + try { + logger.warn("[停止点播] {}/{}", device.getDeviceId(), channelId); + cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null); + } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 停止点播, 发送BYE: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + + dynamicTask.stop(timeOutTaskKey); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + event.msg = "下级自定义了ssrc,重新设置收流信息失败"; + event.statusCode = 500; + errorEvent.response(event); + } + }); + } } @@ -472,7 +493,7 @@ public class PlayServiceImpl implements IPlayService { if (device == null) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "设备: " + deviceId + "不存在"); } - + logger.info("[回放消息] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); PlayBackResult playBackResult = new PlayBackResult<>(); String playBackTimeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(playBackTimeOutTaskKey, () -> { @@ -546,6 +567,7 @@ public class PlayServiceImpl implements IPlayService { if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) { // ssrc 不可用 // 释放ssrc + dynamicTask.stop(playBackTimeOutTaskKey); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用"; @@ -568,10 +590,31 @@ public class PlayServiceImpl implements IPlayService { hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream())); }); } + // 关闭rtp server - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 重新开启ssrc server - mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream(), result->{ + if (result) { + // 重新开启ssrc server + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort()); + }else { + try { + logger.warn("[回放消息]停止 {}/{}", device.getDeviceId(), channelId); + cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null); + } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 停止点播 停止, 发送BYE: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + + dynamicTask.stop(playBackTimeOutTaskKey); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + errorEvent.response(eventResult); + eventResult.msg = "下级自定义了ssrc,重新设置收流信息失败"; + eventResult.statusCode = 500; + errorEvent.response(eventResult); + } + }); } } } @@ -619,7 +662,7 @@ public class PlayServiceImpl implements IPlayService { throw new ControllerException(ErrorCode.ERROR400.getCode(), "设备:" + deviceId + "不存在"); } PlayBackResult downloadResult = new PlayBackResult<>(); - + logger.info("[录像下载] deviceId: {}, channelId: {},收流端口: {}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channelId, ssrcInfo.getPort(), device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); String downLoadTimeOutTaskKey = UUID.randomUUID().toString(); dynamicTask.startDelay(downLoadTimeOutTaskKey, () -> { logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId)); @@ -648,7 +691,7 @@ public class PlayServiceImpl implements IPlayService { streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); }; InviteStreamCallback hookEvent = (InviteStreamInfo inviteStreamInfo) -> { - logger.info("收到订阅消息: " + inviteStreamInfo.getCallId()); + logger.info("[录像下载]收到订阅消息: " + inviteStreamInfo.getCallId()); dynamicTask.stop(downLoadTimeOutTaskKey); StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId); streamInfo.setStartTime(startTime); @@ -678,9 +721,9 @@ public class PlayServiceImpl implements IPlayService { if (ssrcInfo.getSsrc().equals(ssrcInResponse)) { return; } - logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); + logger.info("[录像下载] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse); if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) { - logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); + logger.info("[录像下载] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); if (!ssrcFactory.checkSsrc(mediaServerItem.getId(),ssrcInResponse)) { // ssrc 不可用 @@ -707,14 +750,34 @@ public class PlayServiceImpl implements IPlayService { hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream())); }); } + // 关闭rtp server - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); - // 重新开启ssrc server - mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream(), result->{ + if (result) { + // 重新开启ssrc server + mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort()); + }else { + try { + logger.warn("[录像下载] 停止{}/{}", device.getDeviceId(), channelId); + cmder.streamByeCmd(device, channelId, ssrcInfo.getStream(), null, null); + } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { + logger.error("[命令发送失败] 录像下载停止, 发送BYE: {}", e.getMessage()); + throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); + } + + dynamicTask.stop(downLoadTimeOutTaskKey); + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + + streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream()); + eventResult.msg = "下级自定义了ssrc,重新设置收流信息失败"; + eventResult.statusCode = 500; + errorEvent.response(eventResult); + } + }); } } } - }); } catch (InvalidArgumentException | SipException | ParseException e) { logger.error("[命令发送失败] 录像下载: {}", e.getMessage()); diff --git a/web_src/src/components/dialog/recordDownload.vue b/web_src/src/components/dialog/recordDownload.vue index 3e8c4271..b3f46c87 100644 --- a/web_src/src/components/dialog/recordDownload.vue +++ b/web_src/src/components/dialog/recordDownload.vue @@ -96,7 +96,10 @@ export default { }); }, close: function (){ - this.stopDownloadRecord(); + if (this.streamInfo.progress < 1) { + this.stopDownloadRecord(); + } + if (this.timer !== null) { window.clearTimeout(this.timer); this.timer = null;