From a77628e8759901abc3219412fc2c4aced940db28 Mon Sep 17 00:00:00 2001 From: 648540858 <648540858@qq.com> Date: Wed, 28 Jun 2023 14:50:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=AB=AF=E5=8F=A3=E9=A2=84?= =?UTF-8?q?=E5=8D=A0=E7=94=A8=EF=BC=8C=E9=98=B2=E6=AD=A2=E5=8D=A0=E7=94=A8?= =?UTF-8?q?=E6=97=A0=E6=B3=95=E9=87=8A=E6=94=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/bean/Gb28181Sdp.java | 46 ++++++++++ .../request/impl/InviteRequestProcessor.java | 76 ++++++++++------- .../iot/vmp/gb28181/utils/SipUtils.java | 84 ++++++++++++++++++- .../vmp/media/zlm/ZLMRTPServerFactory.java | 34 ++++++-- .../redisMsg/RedisGbPlayMsgListener.java | 33 +++++++- 5 files changed, 230 insertions(+), 43 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java new file mode 100644 index 00000000..3f2094bb --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Gb28181Sdp.java @@ -0,0 +1,46 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import javax.sdp.SessionDescription; + +/** + * 28181 的SDP解析器 + */ +public class Gb28181Sdp { + private SessionDescription baseSdb; + private String ssrc; + + private String mediaDescription; + + public static Gb28181Sdp getInstance(SessionDescription baseSdb, String ssrc, String mediaDescription) { + Gb28181Sdp gb28181Sdp = new Gb28181Sdp(); + gb28181Sdp.setBaseSdb(baseSdb); + gb28181Sdp.setSsrc(ssrc); + gb28181Sdp.setMediaDescription(mediaDescription); + return gb28181Sdp; + } + + + public SessionDescription getBaseSdb() { + return baseSdb; + } + + public void setBaseSdb(SessionDescription baseSdb) { + this.baseSdb = baseSdb; + } + + public String getSsrc() { + return ssrc; + } + + public void setSsrc(String ssrc) { + this.ssrc = ssrc; + } + + public String getMediaDescription() { + return mediaDescription; + } + + public void setMediaDescription(String mediaDescription) { + this.mediaDescription = mediaDescription; + } +} \ No newline at end of file diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 3d83d226..b7f600f7 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -241,21 +241,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 解析sdp消息, 使用jainsip 自带的sdp解析方式 String contentString = new String(request.getRawContent()); - // jainSip不支持y=字段, 移除以解析。 - int ssrcIndex = contentString.indexOf("y="); - // 检查是否有y字段 - String ssrcDefault = "0000000000"; - String ssrc; - SessionDescription sdp; - if (ssrcIndex >= 0) { - //ssrc规定长度为10个字节,不取余下长度以避免后续还有“f=”字段 - ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12); - String substring = contentString.substring(0, contentString.indexOf("y=")); - sdp = SdpFactory.getInstance().createSessionDescription(substring); - } else { - ssrc = ssrcDefault; - sdp = SdpFactory.getInstance().createSessionDescription(contentString); - } + Gb28181Sdp gb28181Sdp = SipUtils.parseSDP(contentString); + SessionDescription sdp = gb28181Sdp.getBaseSdb(); String sessionName = sdp.getSessionName().getValue(); Long startTime = null; @@ -317,7 +304,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements String username = sdp.getOrigin().getUsername(); String addressStr = sdp.getConnection().getAddress(); - logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc); + Device device = null; // 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标 if (channel != null) { @@ -341,8 +328,30 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } return; } + + String ssrc; + if (gb28181Sdp.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + logger.warn("[上级Invite] {} 平台:{}, 通道:{}, 缺少 ssrc,补充为: {}", sessionName, username, channelId, ssrc); + }else { + ssrc = gb28181Sdp.getSsrc(); + } + String streamTypeStr = null; + if (mediaTransmissionTCP) { + if (tcpActive) { + streamTypeStr = "TCP-ACTIVE"; + }else { + streamTypeStr = "TCP-PASSIVE"; + } + }else { + streamTypeStr = "UDP"; + } + logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc); SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp()); + device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { + return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; + }); if (tcpActive != null) { sendRtpItem.setTcpActive(tcpActive); @@ -469,7 +478,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false, 0, false, device.getStreamModeForParam()); logger.info(JSONObject.toJSONString(ssrcInfo)); sendRtpItem.setStreamId(ssrcInfo.getStream()); - sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc); + sendRtpItem.setSsrc(ssrc); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -480,12 +489,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements }); } else { // 当前系统作为下级平台使用,当上级平台点播时不携带ssrc时,并且设备在当前系统中已经点播了。这个时候需要重新给生成一个ssrc,不使用默认的"0000000000"。 - if (ssrc.equals(ssrcDefault)) { - ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId()); - ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); - sendRtpItem.setSsrc(ssrc); - } - + sendRtpItem.setSsrc(ssrc); sendRtpItem.setStreamId(playTransaction.getStream()); // 写入redis, 超时时回复 redisCatchStorage.updateSendRTPSever(sendRtpItem); @@ -496,11 +500,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } } else if (gbStream != null) { - if(ssrc.equals(ssrcDefault)) - { - ssrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId()); - ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); + + String ssrc; + if (gb28181Sdp.getSsrc() == null) { + // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 + ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); + }else { + ssrc = gb28181Sdp.getSsrc(); } + if("push".equals(gbStream.getStreamType())) { if (streamPushItem != null && streamPushItem.isPushIng()) { // 推流状态 @@ -545,7 +553,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (streamReady) { // 自平台内容 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ + return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; + }); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); @@ -584,7 +594,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements if (streamReady) { // 自平台内容 SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, - gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp()); + gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback ->{ + return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; + }); if (sendRtpItem == null) { logger.warn("服务器端口资源不足"); @@ -701,7 +713,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements dynamicTask.stop(callIdHeader.getCallId()); if (serverId.equals(userSetting.getServerId())) { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId, - app, stream, channelId, mediaTransmissionTCP, platform.isRtcp()); + app, stream, channelId, mediaTransmissionTCP, platform.isRtcp(), ssrcFromCallback -> { + return redisCatchStorage.querySendRTPServer(platform.getServerGBId(), channelId, null, callIdHeader.getCallId()) != null; + }); if (sendRtpItem == null) { logger.warn("上级点时创建sendRTPItem失败,可能是服务器端口资源不足"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java index d6037a11..58fc6012 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java @@ -1,14 +1,22 @@ package com.genersoft.iot.vmp.gb28181.utils; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.Gb28181Sdp; import com.genersoft.iot.vmp.gb28181.bean.RemoteAddressInfo; +import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.GitUtil; import gov.nist.javax.sip.address.AddressImpl; import gov.nist.javax.sip.address.SipUri; import gov.nist.javax.sip.header.Subject; import gov.nist.javax.sip.message.SIPRequest; +import org.apache.commons.lang3.RandomStringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.util.ObjectUtils; +import javax.sdp.SdpFactory; +import javax.sdp.SdpParseException; +import javax.sdp.SessionDescription; import javax.sip.PeerUnavailableException; import javax.sip.SipFactory; import javax.sip.header.FromHeader; @@ -16,6 +24,8 @@ import javax.sip.header.Header; import javax.sip.header.UserAgentHeader; import javax.sip.message.Request; import java.text.ParseException; +import java.time.LocalDateTime; +import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -28,6 +38,8 @@ import java.util.UUID; */ public class SipUtils { + private final static Logger logger = LoggerFactory.getLogger(SipUtils.class); + public static String getUserIdFromFromHeader(Request request) { FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME); return getUserIdFromFromHeader(fromHeader); @@ -51,7 +63,7 @@ public class SipUtils { } public static String getNewViaTag() { - return "z9hG4bK" + System.currentTimeMillis(); + return "z9hG4bK" + RandomStringUtils.randomNumeric(10); } public static UserAgentHeader createUserAgentHeader(GitUtil gitUtil) throws PeerUnavailableException, ParseException { @@ -113,6 +125,12 @@ public class SipUtils { strTmp = String.format("%02X", moveSpeed); builder.append(strTmp, 0, 2); builder.append(strTmp, 0, 2); + + //优化zoom低倍速下的变倍速率 + if ((zoomSpeed > 0) && (zoomSpeed <16)) + { + zoomSpeed = 16; + } strTmp = String.format("%X", zoomSpeed); builder.append(strTmp, 0, 1).append("0"); //计算校验码 @@ -183,4 +201,66 @@ public class SipUtils { } return deviceChannel; } -} + + public static Gb28181Sdp parseSDP(String sdpStr) throws SdpParseException { + + // jainSip不支持y= f=字段, 移除以解析。 + int ssrcIndex = sdpStr.indexOf("y="); + int mediaDescriptionIndex = sdpStr.indexOf("f="); + // 检查是否有y字段 + SessionDescription sdp; + String ssrc = null; + String mediaDescription = null; + if (mediaDescriptionIndex == 0 && ssrcIndex == 0) { + sdp = SdpFactory.getInstance().createSessionDescription(sdpStr); + }else { + String lines[] = sdpStr.split("\\r?\\n"); + StringBuilder sdpBuffer = new StringBuilder(); + for (String line : lines) { + if (line.trim().startsWith("y=")) { + ssrc = line.substring(2); + }else if (line.trim().startsWith("f=")) { + mediaDescription = line.substring(2); + }else { + sdpBuffer.append(line.trim()).append("\r\n"); + } + } + sdp = SdpFactory.getInstance().createSessionDescription(sdpBuffer.toString()); + } + return Gb28181Sdp.getInstance(sdp, ssrc, mediaDescription); + } + + public static String getSsrcFromSdp(String sdpStr) { + + // jainSip不支持y= f=字段, 移除以解析。 + int ssrcIndex = sdpStr.indexOf("y="); + if (ssrcIndex == 0) { + return null; + } + String lines[] = sdpStr.split("\\r?\\n"); + for (String line : lines) { + if (line.trim().startsWith("y=")) { + return line.substring(2); + } + } + return null; + } + + public static String parseTime(String timeStr) { + if (ObjectUtils.isEmpty(timeStr)){ + return null; + } + LocalDateTime localDateTime; + try { + localDateTime = LocalDateTime.parse(timeStr); + }catch (DateTimeParseException e) { + try { + localDateTime = LocalDateTime.parse(timeStr, DateUtil.formatterISO8601); + }catch (DateTimeParseException e2) { + logger.error("[格式化时间] 无法格式化时间: {}", timeStr); + return null; + } + } + return localDateTime.format(DateUtil.formatterISO8601); + } +} \ No newline at end of file diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java index 68f1bee4..9c5a4728 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java @@ -219,13 +219,14 @@ public class ZLMRTPServerFactory { * @param tcp 是否为tcp * @return SendRtpItem */ - public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String deviceId, String channelId, boolean tcp, boolean rtcp){ + public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, + String deviceId, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ // 默认为随机端口 int localPort = 0; if (userSetting.getGbSendStreamStrict()) { if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc); + localPort = keepPort(serverItem, ssrc, localPort, callback); if (localPort == 0) { return null; } @@ -257,11 +258,12 @@ public class ZLMRTPServerFactory { * @param tcp 是否为tcp * @return SendRtpItem */ - public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, String app, String stream, String channelId, boolean tcp, boolean rtcp){ + public SendRtpItem createSendRtpItem(MediaServerItem serverItem, String ip, int port, String ssrc, String platformId, + String app, String stream, String channelId, boolean tcp, boolean rtcp, KeepPortCallback callback){ // 默认为随机端口 int localPort = 0; if (userSetting.getGbSendStreamStrict()) { - localPort = keepPort(serverItem, ssrc); + localPort = keepPort(serverItem, ssrc, localPort, callback); if (localPort == 0) { return null; } @@ -282,13 +284,16 @@ public class ZLMRTPServerFactory { return sendRtpItem; } + public interface KeepPortCallback{ + Boolean keep(String ssrc); + } + /** * 保持端口,直到需要需要发流时再释放 */ - public int keepPort(MediaServerItem serverItem, String ssrc) { - int localPort = 0; + public int keepPort(MediaServerItem serverItem, String ssrc, int localPort, KeepPortCallback keepPortCallback) { Map param = new HashMap<>(3); - param.put("port", 0); + param.put("port", localPort); param.put("enable_tcp", 1); param.put("stream_id", ssrc); JSONObject jsonObject = zlmresTfulUtils.openRtpServer(serverItem, param); @@ -296,10 +301,21 @@ public class ZLMRTPServerFactory { localPort = jsonObject.getInteger("port"); HookSubscribeForRtpServerTimeout hookSubscribeForRtpServerTimeout = HookSubscribeFactory.on_rtp_server_timeout(ssrc, null, serverItem.getId()); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 + Integer finalLocalPort = localPort; hookSubscribe.addSubscribe(hookSubscribeForRtpServerTimeout, (MediaServerItem mediaServerItem, JSONObject response)->{ - logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc); - keepPort(serverItem, ssrc); + System.out.println("监听端口到期继续保持监听"); + System.out.println(response); + if (ssrc.equals(response.getString("stream_id"))) { + if (keepPortCallback.keep(ssrc)) { + logger.info("[上级点播] {}->监听端口到期继续保持监听", ssrc); + keepPort(serverItem, ssrc, finalLocalPort, keepPortCallback); + }else { + logger.info("[上级点播] {}->发送取消,无需继续监听", ssrc); + releasePort(serverItem, ssrc); + } + } + }); logger.info("[上级点播] {}->监听端口: {}", ssrc, localPort); }else { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java index 868e861d..3e2385e5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGbPlayMsgListener.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.bean.*; +import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +27,7 @@ import org.springframework.stereotype.Component; import java.text.ParseException; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -314,7 +316,9 @@ public class RedisGbPlayMsgListener implements MessageListener { SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, content.getIp(), content.getPort(), content.getSsrc(), content.getPlatformId(), content.getApp(), content.getStream(), content.getChannelId(), - content.getTcp(), content.getRtcp()); + content.getTcp(), content.getRtcp(), ssrcFromCallback -> { + return querySendRTPServer(content.getPlatformId(), content.getChannelId(), content.getStream(), null) != null; + }); WVPResult result = new WVPResult<>(); result.setCode(0); @@ -391,4 +395,31 @@ public class RedisGbPlayMsgListener implements MessageListener { }); redisTemplate.convertAndSend(WVP_PUSH_STREAM_KEY, jsonObject); } + + private SendRtpItem querySendRTPServer(String platformGbId, String channelId, String streamId, String callId) { + if (platformGbId == null) { + platformGbId = "*"; + } + if (channelId == null) { + channelId = "*"; + } + if (streamId == null) { + streamId = "*"; + } + if (callId == null) { + callId = "*"; + } + String key = VideoManagerConstants.PLATFORM_SEND_RTP_INFO_PREFIX + + userSetting.getServerId() + "_*_" + + platformGbId + "_" + + channelId + "_" + + streamId + "_" + + callId; + List scan = RedisUtil.scan(redisTemplate, key); + if (scan.size() > 0) { + return (SendRtpItem)redisTemplate.opsForValue().get(scan.get(0)); + }else { + return null; + } + } }