From b39993cf3df68d2b922f675d673d906f78ead388 Mon Sep 17 00:00:00 2001 From: gushouzheng <643466026@qq.com> Date: Sat, 6 Aug 2022 08:58:17 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E7=BA=A7=E8=81=94?= =?UTF-8?q?=E6=9F=A5=E7=9C=8B=E7=9B=B4=E6=92=AD=E8=A7=86=E9=A2=91=E5=8F=8A?= =?UTF-8?q?=E4=BB=A3=E7=90=86=E6=8B=89=E6=B5=81=E8=A7=86=E9=A2=91bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../request/impl/InviteRequestProcessor.java | 80 ++++++++++++++++--- .../service/impl/MediaServerServiceImpl.java | 6 +- .../service/impl/StreamPushServiceImpl.java | 7 ++ 3 files changed, 83 insertions(+), 10 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index fda3bff5..7daa89c0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -17,9 +17,11 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe; import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager; import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory; import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem; +import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem; import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem; import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IPlayService; +import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import com.genersoft.iot.vmp.service.bean.SSRCInfo; @@ -65,6 +67,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements @Autowired private IStreamPushService streamPushService; + @Autowired + private IStreamProxyService streamProxyService; @Autowired private IRedisCatchStorage redisCatchStorage; @@ -145,6 +149,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements MediaServerItem mediaServerItem = null; StreamPushItem streamPushItem = null; + StreamProxyItem proxyByAppAndStream =null; // 不是通道可能是直播流 if (channel != null && gbStream == null) { if (channel.getStatus() == 0) { @@ -178,6 +183,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements responseAck(evt, Response.GONE); return; } + }else if("proxy".equals(gbStream.getStreamType())){ + proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream()); + if (proxyByAppAndStream == null) { + logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId); + responseAck(evt, Response.GONE); + return; + } } } responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在,发181,呼叫转接中 @@ -419,14 +431,33 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } } else if (gbStream != null) { - if (streamPushItem != null && streamPushItem.isPushIng()) { - // 推流状态 - pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); - } else { - // 未推流 拉起 - notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, - mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + if("push".equals(gbStream.getStreamType())) { + if (streamPushItem != null && streamPushItem.isPushIng()) { + // 推流状态 + pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } else { + // 未推流 拉起 + notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + }else if ("proxy".equals(gbStream.getStreamType())){ + if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){ + pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + }else{ + //开启代理拉流 + boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream()); + if(start1) { + pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + }else{ + //失败后通知 + notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive, + mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId); + } + } + } } } @@ -445,7 +476,39 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements /** * 安排推流 */ + private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform, + CallIdHeader callIdHeader, MediaServerItem mediaServerItem, + int port, Boolean tcpActive, boolean mediaTransmissionTCP, + String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException { + Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream()); + if (streamReady) { + // 自平台内容 + SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId, + gbStream.getApp(), gbStream.getStream(), channelId, + mediaTransmissionTCP); + if (sendRtpItem == null) { + logger.warn("服务器端口资源不足"); + responseAck(evt, Response.BUSY_HERE); + return; + } + if (tcpActive != null) { + sendRtpItem.setTcpActive(tcpActive); + } + sendRtpItem.setPlayType(InviteStreamType.PUSH); + // 写入redis, 超时时回复 + sendRtpItem.setStatus(1); + sendRtpItem.setCallId(callIdHeader.getCallId()); + byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog()); + sendRtpItem.setDialog(dialogByteArray); + byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction()); + sendRtpItem.setTransaction(transactionByteArray); + redisCatchStorage.updateSendRTPSever(sendRtpItem); + sendStreamAck(mediaServerItem, sendRtpItem, platform, evt); + + } + + } private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform, CallIdHeader callIdHeader, MediaServerItem mediaServerItem, int port, Boolean tcpActive, boolean mediaTransmissionTCP, @@ -490,7 +553,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - /** * 通知流上线 */ diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index c23cfcdf..8da6df8b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -277,7 +277,11 @@ public class MediaServerServiceImpl implements IMediaServerService { return null; } String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId; - return (MediaServerItem)redisUtil.get(key); + MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key); + if(null==serverItem){ + serverItem=mediaServerMapper.queryOne(mediaServerId); + } + return serverItem; } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java index 8fa04094..061f807f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; +import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; @@ -78,6 +79,10 @@ public class StreamPushServiceImpl implements IStreamPushService { @Autowired TransactionDefinition transactionDefinition; + @Autowired + private MediaConfig mediaConfig; + + @Override public List handleJSON(String jsonData, MediaServerItem mediaServerItem) { if (jsonData == null) { @@ -142,6 +147,8 @@ public class StreamPushServiceImpl implements IStreamPushService { stream.setStreamType("push"); stream.setStatus(true); stream.setCreateTime(DateUtil.getNow()); + stream.setStreamType("push"); + stream.setMediaServerId(mediaConfig.getId()); int add = gbStreamMapper.add(stream); return add > 0; } From 9c4c1159a7e7eba99d2d863f88c45e4603ddd2d5 Mon Sep 17 00:00:00 2001 From: wym <421132955@qq.com> Date: Mon, 8 Aug 2022 08:31:12 +0800 Subject: [PATCH 2/2] =?UTF-8?q?zlm=E5=85=B3=E9=97=AD=E4=B8=80=E6=AE=B5?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E5=90=8E=EF=BC=8C=E9=87=8D=E5=90=AF=E5=90=8E?= =?UTF-8?q?=E4=BF=A1=E4=BB=A4=E6=9C=8D=E5=8A=A1=E4=B8=8D=E9=87=8D=E8=BF=9E?= =?UTF-8?q?zlm?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/MediaServerServiceImpl.java | 35 ++++++++++++++++--- .../gb28181/media/MediaController.java | 29 +++++++++++++-- 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java index 8da6df8b..59749189 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import com.genersoft.iot.vmp.media.zlm.ZLMRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -54,6 +55,9 @@ public class MediaServerServiceImpl implements IMediaServerService { @Autowired private SipConfig sipConfig; + @Autowired + private ZLMRunner zlmRunner; + @Value("${server.ssl.enabled:false}") private boolean sslEnabled; @@ -279,7 +283,9 @@ public class MediaServerServiceImpl implements IMediaServerService { String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId; MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key); if(null==serverItem){ - serverItem=mediaServerMapper.queryOne(mediaServerId); + //zlm服务不在线,启动重连 + reloadZlm(); + serverItem=(MediaServerItem)redisUtil.get(key); } return serverItem; } @@ -474,8 +480,13 @@ public class MediaServerServiceImpl implements IMediaServerService { String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId(); if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) { - logger.info("获取负载最低的节点时无在线节点"); - return null; + logger.info("获取负载最低的节点时无在线节点,启动重连机制"); + //启动重连 + reloadZlm(); + if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) { + logger.info("获取负载最低的节点时无在线节点"); + return null; + } } // 获取分数最低的,及并发最低的 @@ -637,8 +648,14 @@ public class MediaServerServiceImpl implements IMediaServerService { MediaServerItem mediaServerItem = getOne(mediaServerId); if (mediaServerItem == null) { // zlm连接重试 - logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息"); - return; + logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息,尝试重连zlm"); + reloadZlm(); + mediaServerItem = getOne(mediaServerId); + if (mediaServerItem == null) { + // zlm连接重试 + logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息"); + return; + } } String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId; int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2; @@ -661,4 +678,12 @@ public class MediaServerServiceImpl implements IMediaServerService { } } + public void reloadZlm(){ + try { + zlmRunner.run(); + Thread.sleep(500);//延迟0.5秒缓冲时间 + } catch (Exception e) { + logger.warn("尝试重连zlm失败!",e); + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java index 94fe8df2..48973f9c 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/media/MediaController.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.conf.security.dto.LoginUser; import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IStreamProxyService; import com.genersoft.iot.vmp.service.IStreamPushService; import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -37,6 +38,8 @@ public class MediaController { @Autowired private IMediaService mediaService; + @Autowired + private IStreamProxyService streamProxyService; /** @@ -95,8 +98,30 @@ public class MediaController { result.setMsg("scccess"); result.setData(streamInfo); }else { - result.setCode(-1); - result.setMsg("fail"); + //获取流失败,重启拉流后重试一次 + streamProxyService.stop(app,stream); + boolean start = streamProxyService.start(app, stream); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) { + String host = request.getHeader("Host"); + String localAddr = host.split(":")[0]; + logger.info("使用{}作为返回流的ip", localAddr); + streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority); + }else { + streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority); + } + if (streamInfo != null){ + result.setCode(0); + result.setMsg("scccess"); + result.setData(streamInfo); + }else { + result.setCode(-1); + result.setMsg("fail"); + } } return result; }