Merge pull request #567 from mrjackwang/wvp-28181-2.0

更新上级级联查看直播视频及代理拉流视频流bug
This commit is contained in:
648540858 2022-08-08 09:32:38 +08:00 committed by GitHub
commit 762fff4f01
4 changed files with 139 additions and 16 deletions

View File

@ -17,9 +17,11 @@ import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@ -65,6 +67,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
@Autowired
private IStreamPushService streamPushService;
@Autowired
private IStreamProxyService streamProxyService;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@ -142,6 +146,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
MediaServerItem mediaServerItem = null;
StreamPushItem streamPushItem = null;
StreamProxyItem proxyByAppAndStream =null;
// 不是通道可能是直播流
if (channel != null && gbStream == null) {
if (channel.getStatus() == 0) {
@ -175,6 +180,13 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
responseAck(evt, Response.GONE);
return;
}
}else if("proxy".equals(gbStream.getStreamType())){
proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
if (proxyByAppAndStream == null) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(evt, Response.GONE);
return;
}
}
}
responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在发181呼叫转接中
@ -416,14 +428,33 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
} else if (gbStream != null) {
if (streamPushItem != null && streamPushItem.isPushIng()) {
// 推流状态
pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
} else {
// 未推流 拉起
notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
if("push".equals(gbStream.getStreamType())) {
if (streamPushItem != null && streamPushItem.isPushIng()) {
// 推流状态
pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
} else {
// 未推流 拉起
notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}
}else if ("proxy".equals(gbStream.getStreamType())){
if(null != proxyByAppAndStream &&proxyByAppAndStream.isStatus()){
pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}else{
//开启代理拉流
boolean start1 = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
if(start1) {
pushProxyStream(evt, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}else{
//失败后通知
notifyStreamOnline(evt, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}
}
}
}
}
@ -442,7 +473,39 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
/**
* 安排推流
*/
private void pushProxyStream(RequestEvent evt, GbStream gbStream, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
if (streamReady) {
// 自平台内容
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId,
mediaTransmissionTCP);
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(evt, Response.BUSY_HERE);
return;
}
if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive);
}
sendRtpItem.setPlayType(InviteStreamType.PUSH);
// 写入redis 超时时回复
sendRtpItem.setStatus(1);
sendRtpItem.setCallId(callIdHeader.getCallId());
byte[] dialogByteArray = SerializeUtils.serialize(evt.getDialog());
sendRtpItem.setDialog(dialogByteArray);
byte[] transactionByteArray = SerializeUtils.serialize(evt.getServerTransaction());
sendRtpItem.setTransaction(transactionByteArray);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
sendStreamAck(mediaServerItem, sendRtpItem, platform, evt);
}
}
private void pushStream(RequestEvent evt, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
@ -487,7 +550,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
/**
* 通知流上线
*/

View File

@ -8,6 +8,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.genersoft.iot.vmp.media.zlm.ZLMRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -54,6 +55,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private SipConfig sipConfig;
@Autowired
private ZLMRunner zlmRunner;
@Value("${server.ssl.enabled:false}")
private boolean sslEnabled;
@ -277,7 +281,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
return null;
}
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
return (MediaServerItem)redisUtil.get(key);
MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key);
if(null==serverItem){
//zlm服务不在线启动重连
reloadZlm();
serverItem=(MediaServerItem)redisUtil.get(key);
}
return serverItem;
}
@Override
@ -470,8 +480,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
logger.info("获取负载最低的节点时无在线节点");
return null;
logger.info("获取负载最低的节点时无在线节点,启动重连机制");
//启动重连
reloadZlm();
if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
logger.info("获取负载最低的节点时无在线节点");
return null;
}
}
// 获取分数最低的及并发最低的
@ -633,8 +648,14 @@ public class MediaServerServiceImpl implements IMediaServerService {
MediaServerItem mediaServerItem = getOne(mediaServerId);
if (mediaServerItem == null) {
// zlm连接重试
logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
return;
logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息,尝试重连zlm");
reloadZlm();
mediaServerItem = getOne(mediaServerId);
if (mediaServerItem == null) {
// zlm连接重试
logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
return;
}
}
String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
@ -657,4 +678,12 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
}
public void reloadZlm(){
try {
zlmRunner.run();
Thread.sleep(500);//延迟0.5秒缓冲时间
} catch (Exception e) {
logger.warn("尝试重连zlm失败",e);
}
}
}

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
@ -78,6 +79,10 @@ public class StreamPushServiceImpl implements IStreamPushService {
@Autowired
TransactionDefinition transactionDefinition;
@Autowired
private MediaConfig mediaConfig;
@Override
public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
if (jsonData == null) {
@ -142,6 +147,8 @@ public class StreamPushServiceImpl implements IStreamPushService {
stream.setStreamType("push");
stream.setStatus(true);
stream.setCreateTime(DateUtil.getNow());
stream.setStreamType("push");
stream.setMediaServerId(mediaConfig.getId());
int add = gbStreamMapper.add(stream);
return add > 0;
}

View File

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
import com.genersoft.iot.vmp.media.zlm.dto.OnPublishHookParam;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IStreamProxyService;
import com.genersoft.iot.vmp.service.IStreamPushService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -37,6 +38,8 @@ public class MediaController {
@Autowired
private IMediaService mediaService;
@Autowired
private IStreamProxyService streamProxyService;
/**
@ -95,8 +98,30 @@ public class MediaController {
result.setMsg("scccess");
result.setData(streamInfo);
}else {
result.setCode(-1);
result.setMsg("fail");
//获取流失败重启拉流后重试一次
streamProxyService.stop(app,stream);
boolean start = streamProxyService.start(app, stream);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
String host = request.getHeader("Host");
String localAddr = host.split(":")[0];
logger.info("使用{}作为返回流的ip", localAddr);
streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority);
}else {
streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
}
if (streamInfo != null){
result.setCode(0);
result.setMsg("scccess");
result.setData(streamInfo);
}else {
result.setCode(-1);
result.setMsg("fail");
}
}
return result;
}