diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 4d1e568b..4c40f54d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -356,7 +356,7 @@ public class SIPCommander implements ISIPCommander { // String streamMode = device.getStreamMode().toUpperCase(); logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtsp", mediaServerItem.getId()); subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ if (event != null) { event.response(mediaServerItemInUse, json); @@ -524,7 +524,7 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId() : udpSipProvider.getNewCallId(); - HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtmp", mediaServerItem.getId()); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId()); // 添加订阅 subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{ if (hookEvent != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index a6956dab..82b3ba40 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -16,9 +16,7 @@ import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; -import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; -import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; +import com.genersoft.iot.vmp.media.zlm.dto.*; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IStreamProxyService; @@ -90,6 +88,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IMediaServerService mediaServerService; + @Autowired + private ZlmHttpHookSubscribe zlmHttpHookSubscribe; + @Autowired private SIPProcessorObserver sipProcessorObserver; @@ -400,7 +401,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (playTransaction != null) { Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream()); if (!streamReady) { - playTransaction = null; + boolean hasRtpServer = mediaServerService.checkRtpServer(mediaServerItem, "rtp", playTransaction.getStream()); + if (hasRtpServer) { + logger.info("[上级点播]已经开启rtpServer但是尚未收到流,开启监听流的到来"); + HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", playTransaction.getStream(), true, "rtsp", mediaServerItem.getId()); + zlmHttpHookSubscribe.addSubscribe(hookSubscribe, hookEvent); + }else { + playTransaction = null; + } } } if (playTransaction == null) { @@ -564,7 +572,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } else if ("push".equals(gbStream.getStreamType())) { if (!platform.isStartOfflinePush()) { // 平台设置中关闭了拉起离线的推流则直接回复 - responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel unavailable"); + responseAck(evt, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing"); return; } // 发送redis消息以使设备上线 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java index 733f78a2..2d568a1c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/XmlUtil.java @@ -203,6 +203,12 @@ public class XmlUtil { return null; } deviceChannel.setChannelId(channelId); + int channelTypeCode = Integer.parseInt(channelId.substring(10, 13)); + if (channelTypeCode == 136 || channelTypeCode == 137 || channelTypeCode == 138) { + deviceChannel.setHasAudio(true); + }else { + deviceChannel.setHasAudio(false); + } if (event != null && !event.equals(CatalogEvent.ADD) && !event.equals(CatalogEvent.UPDATE)) { // 除了ADD和update情况下需要识别全部内容, return deviceChannel; @@ -396,7 +402,6 @@ public class XmlUtil { } else { deviceChannel.setPTZType(Integer.parseInt(XmlUtil.getText(itemDevice, "PTZType"))); } - deviceChannel.setHasAudio(true); // 默认含有音频,播放时再检查是否有音频及是否AAC return deviceChannel; } } \ No newline at end of file diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 6c70096e..6caff71c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -96,6 +96,10 @@ public class ZLMRTPServerFactory { if(rtpInfo.getInteger("code") == 0){ if (rtpInfo.getBoolean("exist")) { result = rtpInfo.getInteger("local_port"); + if (result == 0) { + // 此时说明rtpServer已经创建但是流还没有推上来 + + } return result; } }else if(rtpInfo.getInteger("code") == -2){ diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java index f1163cae..55a4005f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java @@ -83,4 +83,6 @@ public interface IMediaServerService { MediaServerItem getDefaultMediaServer(); void updateMediaServerKeepalive(String mediaServerId, JSONObject data); + + boolean checkRtpServer(MediaServerItem mediaServerItem, String rtp, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 64a411aa..702967d6 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -147,9 +147,11 @@ public class MediaServerServiceImpl implements IMediaServerService { if (streamId == null) { streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase(); } - int rtpServerPort = mediaServerItem.getRtpProxyPort(); + int rtpServerPort; if (mediaServerItem.isRtpEnable()) { rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port); + } else { + rtpServerPort = mediaServerItem.getRtpProxyPort(); } RedisUtil.set(key, mediaServerItem); return new SSRCInfo(rtpServerPort, ssrc, streamId); @@ -681,4 +683,13 @@ public class MediaServerServiceImpl implements IMediaServerService { } } } + + @Override + public boolean checkRtpServer(MediaServerItem mediaServerItem, String app, String stream) { + JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, stream); + if(rtpInfo.getInteger("code") == 0){ + return rtpInfo.getBoolean("exist"); + } + return false; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java index aa019229..5abb3425 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java @@ -164,17 +164,30 @@ public class PlayServiceImpl implements IPlayService { JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId); if(rtpInfo.getInteger("code") == 0){ if (rtpInfo.getBoolean("exist")) { + int localPort = rtpInfo.getInteger("local_port"); + if (localPort == 0) { + logger.warn("[点播],点播时发现rtpServerC存在,但是尚未开始推流"); + // 此时说明rtpServer已经创建但是流还没有推上来 + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(ErrorCode.ERROR100.getCode()); + wvpResult.setMsg("点播已经在进行中,请稍候重试"); + msg.setData(wvpResult); - WVPResult wvpResult = new WVPResult(); - wvpResult.setCode(ErrorCode.SUCCESS.getCode()); - wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); - wvpResult.setData(streamInfo); - msg.setData(wvpResult); + resultHolder.invokeAllResult(msg); + return playResult; + }else { + WVPResult wvpResult = new WVPResult(); + wvpResult.setCode(ErrorCode.SUCCESS.getCode()); + wvpResult.setMsg(ErrorCode.SUCCESS.getMsg()); + wvpResult.setData(streamInfo); + msg.setData(wvpResult); - resultHolder.invokeAllResult(msg); - if (hookEvent != null) { - hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo))); + resultHolder.invokeAllResult(msg); + if (hookEvent != null) { + hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo))); + } } + }else { redisCatchStorage.stopPlay(streamInfo); storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId()); @@ -187,7 +200,6 @@ public class PlayServiceImpl implements IPlayService { streamInfo = null; } - } if (streamInfo == null) { String streamId = null; diff --git a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java index bcebb943..25745c4c 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java @@ -143,15 +143,12 @@ public interface DeviceChannelMapper { @Update(value = {"UPDATE device_channel SET status=0 WHERE deviceId=#{deviceId}"}) void offlineByDeviceId(String deviceId); - @Update(value = {"UPDATE device_channel SET status=1 WHERE deviceId=#{deviceId} AND channelId=#{channelId}"}) - void online(String deviceId, String channelId); - @Insert("") int batchAdd(List addChannels); + @Update(value = {"UPDATE device_channel SET status=1 WHERE deviceId=#{deviceId} AND channelId=#{channelId}"}) + void online(String deviceId, String channelId); + @Update({"