From c9757ca6451ad3cf3a992fc1b1195ca3b752ef53 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Thu, 4 May 2023 14:21:58 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=9B=BD=E6=A0=87=E8=A7=84?= =?UTF-8?q?=E8=8C=83=EF=BC=8C=E5=8F=82=E8=80=83=E5=9B=BD=E6=A0=87=E6=96=87?= =?UTF-8?q?=E6=A1=A3=E4=B8=AD-=E7=82=B9=E6=92=AD=E5=A4=96=E5=9F=9F?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E5=AA=92=E4=BD=93=E6=B5=81SSRC=E5=A4=84?= =?UTF-8?q?=E7=90=86=E6=96=B9=E5=BC=8F=EF=BC=8C=E4=B8=8A=E7=BA=A7=E7=82=B9?= =?UTF-8?q?=E6=92=AD=E6=97=B6=E8=87=AA=E5=AE=9A=E4=B9=89ssrc=EF=BC=8C?= =?UTF-8?q?=E4=B8=8D=E9=80=82=E7=94=A8=E4=B8=8A=E7=BA=A7=E6=90=BA=E5=B8=A6?= =?UTF-8?q?=E7=9A=84ssrc=EF=BC=8C=E4=B9=9F=E9=81=BF=E5=85=8D=E4=B8=8A?= =?UTF-8?q?=E7=BA=A7=E5=85=BC=E5=AE=B9=E6=80=A7=E5=B7=AE=EF=BC=8C=E4=B8=8D?= =?UTF-8?q?=E6=90=BA=E5=B8=A6ssrc=E7=9A=84=E9=97=AE=E9=A2=98=EF=BC=8C?= =?UTF-8?q?=E5=8F=AF=E9=80=9A=E8=BF=87=E9=85=8D=E7=BD=AE=E5=85=B3=E9=97=AD?= =?UTF-8?q?=E6=AD=A4=E7=89=B9=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/UserSetting.java | 9 ++++ .../iot/vmp/gb28181/task/SipRunner.java | 5 +++ .../request/impl/ByeRequestProcessor.java | 5 +++ .../request/impl/InviteRequestProcessor.java | 44 ++++++++++--------- .../vmp/media/zlm/ZLMHttpHookListener.java | 5 +++ .../vmp/service/impl/PlatformServiceImpl.java | 5 +++ .../storager/impl/RedisCatchStorageImpl.java | 7 ++- src/main/resources/all-application.yml | 2 + 8 files changed, 61 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index 539198f2..5ae9236b 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -53,6 +53,7 @@ public class UserSetting { private Boolean refuseChannelStatusChannelFormNotify = Boolean.FALSE; private Boolean deviceStatusNotify = Boolean.FALSE; + private Boolean useCustomSsrcForParentInvite = Boolean.TRUE; private String serverId = "000000"; @@ -277,4 +278,12 @@ public class UserSetting { public void setDeviceStatusNotify(Boolean deviceStatusNotify) { this.deviceStatusNotify = deviceStatusNotify; } + + public Boolean getUseCustomSsrcForParentInvite() { + return useCustomSsrcForParentInvite; + } + + public void setUseCustomSsrcForParentInvite(Boolean useCustomSsrcForParentInvite) { + this.useCustomSsrcForParentInvite = useCustomSsrcForParentInvite; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java index 3352838f..17c23a00 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/SipRunner.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -37,6 +38,9 @@ public class SipRunner implements CommandLineRunner { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private SSRCFactory ssrcFactory; + @Autowired private UserSetting userSetting; @@ -96,6 +100,7 @@ public class SipRunner implements CommandLineRunner { MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(),sendRtpItem.getChannelId(), sendRtpItem.getCallId(),sendRtpItem.getStreamId()); if (mediaServerItem != null) { + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); Map param = new HashMap<>(); param.put("vhost","__defaultVhost__"); param.put("app",sendRtpItem.getApp()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index addd6336..cc3051fb 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.InviteStreamType; import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem; import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; @@ -60,6 +61,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private ZLMRTPServerFactory zlmrtpServerFactory; + @Autowired + private SSRCFactory ssrcFactory; + @Autowired private IMediaServerService mediaServerService; @@ -102,6 +106,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In logger.info("[收到bye] 停止向上级推流:{}", streamId); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); redisCatchStorage.deleteSendRTPServer(platformGbId, channelId, callIdHeader.getCallId(), null); + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param); int totalReaderCount = zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId); if (totalReaderCount <= 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index a4367b4a..c2353206 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -245,18 +245,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements String contentString = new String(request.getRawContent()); // jainSip不支持y=字段, 移除以解析。 - int ssrcIndex = contentString.indexOf("y="); // 检查是否有y字段 - String ssrcDefault = "0000000000"; - String ssrc; + int ssrcIndex = contentString.indexOf("y="); + SessionDescription sdp; if (ssrcIndex >= 0) { //ssrc规定长度为10个字节,不取余下长度以避免后续还有“f=”字段 - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - String substring = contentString.substring(0, contentString.indexOf("y=")); + String substring = contentString.substring(0, ssrcIndex); sdp = SdpFactory.getInstance().createSessionDescription(substring); } else { - ssrc = ssrcDefault; sdp = SdpFactory.getInstance().createSessionDescription(contentString); } String sessionName = sdp.getSessionName().getValue(); @@ -320,7 +317,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getConnection().getAddress(); - logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc); + Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 if (channel != null) { @@ -344,6 +341,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } + + String ssrc; + if (userSetting.getUseCustomSsrcForParentInvite() || ssrcIndex < 0) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + }else { + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); + } + logger.info("[上级点播] 用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc); SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); @@ -465,29 +471,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } if (playTransaction == null) { + // 被点播的通道目前未被点播,则开始点播 String streamId = null; if (mediaServerItem.isRtpEnable()) { streamId = String.format("%s_%s", device.getDeviceId(), channelId); } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam()); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, ssrc, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam()); logger.info(JSONObject.toJSONString(ssrcInfo)); sendRtpItem.setStreamId(ssrcInfo.getStream()); - sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc); +// sendRtpItem.setSsrc(ssrcInfo.getSsrc()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); - MediaServerItem finalMediaServerItem = mediaServerItem; playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> { logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId); redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null); }); } else { - // 当前系统作为下级平台使用,当上级平台点播时不携带ssrc时,并且设备在当前系统中已经点播了。这个时候需要重新给生成一个ssrc,不使用默认的"0000000000"。 - if (ssrc.equals(ssrcDefault)) { - ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId()); - ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); - sendRtpItem.setSsrc(ssrc); - } sendRtpItem.setStreamId(playTransaction.getStream()); // 写入redis, 超时时回复 @@ -499,11 +499,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } } else if (gbStream != null) { - if(ssrc.equals(ssrcDefault)) - { - ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId()); - ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); + + String ssrc; + if (userSetting.getUseCustomSsrcForParentInvite() || ssrcIndex < 0) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + }else { + ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); } + if("push".equals(gbStream.getStreamType())) { if (streamPushItem != null && streamPushItem.isPushIng()) { // 推流状态 diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index 405fdd00..4e9b57d2 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; @@ -105,6 +106,9 @@ public class ZLMHttpHookListener { @Autowired private AssistRESTfulUtils assistRESTfulUtils; + @Autowired + private SSRCFactory ssrcFactory; + @Qualifier("taskExecutor") @Autowired private ThreadPoolTaskExecutor taskExecutor; @@ -666,6 +670,7 @@ public class ZLMHttpHookListener { if (sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId()); + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); try { commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId()); } catch (SipException | InvalidArgumentException | ParseException e) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java index 9233ca9d..63999b7c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/PlatformServiceImpl.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; +import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; @@ -53,6 +54,9 @@ public class PlatformServiceImpl implements IPlatformService { @Autowired private IRedisCatchStorage redisCatchStorage; + @Autowired + private SSRCFactory ssrcFactory; + @Autowired private IMediaServerService mediaServerService; @@ -328,6 +332,7 @@ public class PlatformServiceImpl implements IPlatformService { List sendRtpItems = redisCatchStorage.querySendRTPServer(platformId); if (sendRtpItems != null && sendRtpItems.size() > 0) { for (SendRtpItem sendRtpItem : sendRtpItems) { + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); redisCatchStorage.deleteSendRTPServer(platformId, sendRtpItem.getChannelId(), null, null); MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); Map param = new HashMap<>(3); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index d4abcb45..aed98119 100644 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -915,7 +915,12 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { @Override public void sendDeviceOrChannelStatus(String deviceId, String channelId, boolean online) { String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_DEVICE_STATUS; - logger.info("[redis通知] 推送设备/通道状态, {}/{}-{}", deviceId, channelId, online); + if (channelId == null) { + logger.info("[redis通知] 推送设备状态, {}-{}", deviceId, online); + }else { + logger.info("[redis通知] 推送通道状态, {}/{}-{}", deviceId, channelId, online); + } + StringBuilder msg = new StringBuilder(); msg.append(deviceId); if (channelId != null) { diff --git a/src/main/resources/all-application.yml b/src/main/resources/all-application.yml index cc2145a4..54d348dc 100644 --- a/src/main/resources/all-application.yml +++ b/src/main/resources/all-application.yml @@ -194,6 +194,8 @@ user-settings: max-notify-count-queue: 10000 # 设备/通道状态变化时发送消息 device-status-notify: false + # 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + use-custom-ssrc-for-parent-invite: true # 跨域配置,配置你访问前端页面的地址即可, 可以配置多个 allowed-origins: - http://localhost:8008