修复录像回放中的信令错误

This commit is contained in:
648540858 2022-08-15 15:08:51 +08:00
parent 8f629a15cf
commit d7865d55e1
10 changed files with 179 additions and 101 deletions

View File

@ -77,38 +77,54 @@ public class VideoManagerConstants {
//************************** redis 消息*********************************
// 流变化的通知
/**
* 流变化的通知
*/
public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";
// 接收推流设备的GPS变化通知
/**
* 接收推流设备的GPS变化通知
*/
public static final String VM_MSG_GPS = "VM_MSG_GPS";
// 接收推流设备的GPS变化通知
/**
* 接收推流设备的GPS变化通知
*/
public static final String VM_MSG_PUSH_STREAM_STATUS_CHANGE = "VM_MSG_PUSH_STREAM_STATUS_CHANGE";
// redis 消息通知设备推流到平台
/**
* redis 消息通知设备推流到平台
*/
public static final String VM_MSG_STREAM_PUSH_REQUESTED = "VM_MSG_STREAM_PUSH_REQUESTED";
// redis 消息请求所有的在线通道
/**
* redis 消息请求所有的在线通道
*/
public static final String VM_MSG_GET_ALL_ONLINE_REQUESTED = "VM_MSG_GET_ALL_ONLINE_REQUESTED";
// 移动位置订阅通知
/**
* 移动位置订阅通知
*/
public static final String VM_MSG_SUBSCRIBE_MOBILE_POSITION = "mobileposition";
// 报警订阅的通知收到报警向redis发出通知
/**
* 报警订阅的通知收到报警向redis发出通知
*/
public static final String VM_MSG_SUBSCRIBE_ALARM = "alarm";
// 报警通知的发送 收到redis发出的通知转发给其他平台
/**
* 报警通知的发送 收到redis发出的通知转发给其他平台
*/
public static final String VM_MSG_SUBSCRIBE_ALARM_RECEIVE= "alarm_receive";
// 设备状态订阅的通知
/**
* 设备状态订阅的通知
*/
public static final String VM_MSG_SUBSCRIBE_DEVICE_STATUS = "device";
//************************** 第三方 ****************************************
public static final String WVP_STREAM_GB_ID_PREFIX = "memberNo_";
public static final String WVP_STREAM_GPS_MSG_PREFIX = "WVP_STREAM_GPS_MSG_";

View File

@ -62,7 +62,7 @@ public class SIPRequestHeaderPlarformProvider {
// Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.MESSAGE);
request = sipFactory.createMessageFactory().createRequest(requestURI, Request.MESSAGE, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards);
@ -120,7 +120,7 @@ public class SIPRequestHeaderPlarformProvider {
String callId, WWWAuthenticateHeader www , CallIdHeader callIdHeader) throws ParseException, PeerUnavailableException, InvalidArgumentException {
Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(Request.REGISTER), fromTag, viaTag, callIdHeader);
Request registerRequest = createRegisterRequest(parentPlatform, redisCatchStorage.getCSEQ(), fromTag, viaTag, callIdHeader);
SipURI requestURI = sipFactory.createAddressFactory().createSipURI(parentPlatform.getServerGBId(), parentPlatform.getServerIP() + ":" + parentPlatform.getServerPort());
if (www == null) {
AuthorizationHeader authorizationHeader = sipFactory.createHeaderFactory().createAuthorizationHeader("Digest");
@ -213,7 +213,7 @@ public class SIPRequestHeaderPlarformProvider {
// Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.MESSAGE), Request.MESSAGE);
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.MESSAGE);
MessageFactoryImpl messageFactory = (MessageFactoryImpl) sipFactory.createMessageFactory();
// 设置编码 防止中文乱码
messageFactory.setDefaultContentEncodingCharset(parentPlatform.getCharacterSet());

View File

@ -2,11 +2,9 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import javax.sip.Dialog;
import javax.sip.InvalidArgumentException;
import javax.sip.PeerUnavailableException;
import javax.sip.SipFactory;
import javax.sip.*;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.*;
@ -15,7 +13,11 @@ import javax.sip.message.Request;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.stack.SIPDialog;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.conf.SipConfig;
@ -40,6 +42,14 @@ public class SIPRequestHeaderProvider {
@Autowired
private VideoStreamSessionManager streamSession;
@Autowired
@Qualifier(value="tcpSipProvider")
private SipProviderImpl tcpSipProvider;
@Autowired
@Qualifier(value="udpSipProvider")
private SipProviderImpl udpSipProvider;
public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
Request request = null;
@ -95,7 +105,7 @@ public class SIPRequestHeaderProvider {
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
//ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.INVITE), Request.INVITE);
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE);
request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards);
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
@ -131,7 +141,7 @@ public class SIPRequestHeaderProvider {
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
//ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.INVITE), Request.INVITE);
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.INVITE);
request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INVITE, callIdHeader, cSeqHeader,fromHeader, toHeader, viaHeaders, maxForwards);
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
@ -200,7 +210,7 @@ public class SIPRequestHeaderProvider {
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(Request.SUBSCRIBE), Request.SUBSCRIBE);
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(redisCatchStorage.getCSEQ(), Request.SUBSCRIBE);
request = sipFactory.createMessageFactory().createRequest(requestURI, Request.SUBSCRIBE, callIdHeader, cSeqHeader, fromHeader,
toHeader, viaHeaders, maxForwards);
@ -226,55 +236,55 @@ public class SIPRequestHeaderProvider {
}
public Request createInfoRequest(Device device, StreamInfo streamInfo, String content)
throws PeerUnavailableException, ParseException, InvalidArgumentException {
Request request = null;
throws SipException, ParseException, InvalidArgumentException {
if (streamInfo == null) {
return null;
}
Dialog dialog = streamSession.getDialogByStream(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream());
Request request = null;
SIPDialog dialog = streamSession.getDialogByStream(streamInfo.getDeviceID(), streamInfo.getChannelId(), streamInfo.getStream());
if (dialog == null) {
return null;
}
SipURI requestLine = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(),
device.getHostAddress());
// via
ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getIp(), device.getPort(),
device.getTransport(), null);
SipStack sipStack = udpSipProvider.getSipStack();
SIPDialog sipDialog = ((SipStackImpl) sipStack).putDialog(dialog);
if (dialog != sipDialog) {
dialog = sipDialog;
}else {
dialog.setSipProvider(udpSipProvider);
}
streamSession.put(streamInfo.getDeviceID(), streamInfo.getChannelId(), dialog.getCallId().getCallId(), dialog);
Request infoRequest = dialog.createRequest(Request.INFO);
SipURI sipURI = (SipURI) infoRequest.getRequestURI();
sipURI.setHost(device.getIp());
sipURI.setPort(device.getPort());
sipURI.setUser(streamInfo.getChannelId());
ViaHeader viaHeader = (ViaHeader) infoRequest.getHeader(ViaHeader.NAME);
viaHeader.setRPort();
viaHeaders.add(viaHeader);
// from
SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(sipConfig.getId(),
sipConfig.getDomain());
Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, dialog.getLocalTag());
// to
SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(streamInfo.getChannelId(),
sipConfig.getDomain());
Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, dialog.getRemoteTag());
// callid
CallIdHeader callIdHeader = dialog.getCallId();
// Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
Long cseq = redisCatchStorage.getCSEQ(Request.INVITE);
// ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory()
.createCSeqHeader(cseq, Request.INFO);
request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INFO, callIdHeader, cSeqHeader,
fromHeader, toHeader, viaHeaders, maxForwards);
// 增加Contact header
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory()
.createSipURI(sipConfig.getId(), sipConfig.getIp() + ":" + sipConfig.getPort()));
request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
infoRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro");
// TODO 添加版本信息以及日期
UserAgentHeader userAgentHeader = null;
try {
userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
} catch (ParseException e) {
throw new RuntimeException(e);
}
infoRequest.addHeader(userAgentHeader);
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application",
"MANSRTSP");
request.setContent(content, contentTypeHeader);
return request;
infoRequest.setContent(content, contentTypeHeader);
CSeqHeader cSeqHeader = (CSeqHeader)infoRequest.getHeader(CSeqHeader.NAME);
cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ());
// ceq
infoRequest.addHeader(cSeqHeader);
return infoRequest;
}
}

View File

@ -732,8 +732,23 @@ public class SIPCommander implements ISIPCommander {
SIPRequest request = (SIPRequest)transaction.getRequest();
byeURI.setHost(request.getRemoteAddress().getHostAddress());
byeURI.setPort(request.getRemotePort());
byeURI.setUser(channelId);
ViaHeader viaHeader = (ViaHeader) byeRequest.getHeader(ViaHeader.NAME);
String protocol = viaHeader.getTransport().toUpperCase();
viaHeader.setRPort();
// 增加Contact header
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
byeRequest.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro");
// TODO 添加版本信息以及日期
UserAgentHeader userAgentHeader = null;
try {
userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
} catch (ParseException e) {
throw new RuntimeException(e);
}
byeRequest.addHeader(userAgentHeader);
ClientTransaction clientTransaction = null;
if("TCP".equals(protocol)) {
clientTransaction = tcpSipProvider.getNewClientTransaction(byeRequest);
@ -745,11 +760,14 @@ public class SIPCommander implements ISIPCommander {
if (okEvent != null) {
sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), okEvent);
}
CSeqHeader cSeqHeader = (CSeqHeader)byeRequest.getHeader(CSeqHeader.NAME);
cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ());
dialog.sendRequest(clientTransaction);
} catch (SipException | ParseException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
throw new RuntimeException(e);
}
}
@ -1483,7 +1501,7 @@ public class SIPCommander implements ISIPCommander {
request.setContent(subscribePostitionXml.toString(), contentTypeHeader);
CSeqHeader cSeqHeader = (CSeqHeader)request.getHeader(CSeqHeader.NAME);
cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ(Request.SUBSCRIBE));
cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ());
request.removeHeader(CSeqHeader.NAME);
request.addHeader(cSeqHeader);
}else {
@ -1587,7 +1605,7 @@ public class SIPCommander implements ISIPCommander {
request.setContent(cmdXml.toString(), contentTypeHeader);
CSeqHeader cSeqHeader = (CSeqHeader)request.getHeader(CSeqHeader.NAME);
cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ(Request.SUBSCRIBE));
cSeqHeader.setSeqNumber(redisCatchStorage.getCSEQ());
request.removeHeader(CSeqHeader.NAME);
request.addHeader(cSeqHeader);
@ -1697,10 +1715,9 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playPauseCmd(Device device, StreamInfo streamInfo) {
try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200);
content.append("PAUSE RTSP/1.0\r\n");
content.append("CSeq: " + cseq + "\r\n");
content.append("CSeq: " + getInfoCseq() + "\r\n");
content.append("PauseTime: now\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
if (request == null) {
@ -1728,10 +1745,9 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playResumeCmd(Device device, StreamInfo streamInfo) {
try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200);
content.append("PLAY RTSP/1.0\r\n");
content.append("CSeq: " + cseq + "\r\n");
content.append("CSeq: " + getInfoCseq() + "\r\n");
content.append("Range: npt=now-\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
if (request == null) {
@ -1758,10 +1774,9 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) {
try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200);
content.append("PLAY RTSP/1.0\r\n");
content.append("CSeq: " + cseq + "\r\n");
content.append("CSeq: " + getInfoCseq() + "\r\n");
content.append("Range: npt=" + Math.abs(seekTime) + "-\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
@ -1789,11 +1804,11 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed) {
try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200);
content.append("PLAY RTSP/1.0\r\n");
content.append("CSeq: " + cseq + "\r\n");
content.append("Scale: " + String.format("%.1f",speed) + "\r\n");
content.append("CSeq: " + getInfoCseq() + "\r\n");
content.append("Scale: " + String.format("%.6f",speed) + "\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
if (request == null) {
return;
@ -1812,6 +1827,10 @@ public class SIPCommander implements ISIPCommander {
e.printStackTrace();
}
}
private int getInfoCseq() {
return (int) ((Math.random() * 9 + 1) * Math.pow(10, 8));
}
@Override
public void playbackControlCmd(Device device, StreamInfo streamInfo, String content,SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) {
@ -1820,7 +1839,6 @@ public class SIPCommander implements ISIPCommander {
if (request == null) {
return;
}
logger.info(request.toString());
ClientTransaction clientTransaction = null;
if ("TCP".equals(device.getTransport())) {
clientTransaction = tcpSipProvider.getNewClientTransaction(request);

View File

@ -105,7 +105,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}
request = headerProviderPlarformProvider.createRegisterRequest(parentPlatform,
redisCatchStorage.getCSEQ(Request.REGISTER), "FromRegister" + tm,
redisCatchStorage.getCSEQ(), "FromRegister" + tm,
"z9hG4bK-" + UUID.randomUUID().toString().replace("-", ""), callIdHeader);
// callid 写入缓存 等注册成功可以更新状态
String callIdFromHeader = callIdHeader.getCallId();

View File

@ -2,24 +2,32 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import gov.nist.javax.sip.ResponseEventExt;
import gov.nist.javax.sip.message.SIPResponse;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sdp.SdpFactory;
import javax.sdp.SdpParseException;
import javax.sdp.SessionDescription;
import javax.sip.*;
import javax.sip.address.Address;
import javax.sip.address.SipURI;
import javax.sip.header.CSeqHeader;
import javax.sip.header.CallIdHeader;
import javax.sip.header.UserAgentHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
/**
@ -34,23 +42,24 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
private final String method = "INVITE";
@Autowired
private SipLayer sipLayer;
@Autowired
private SipConfig config;
private VideoStreamSessionManager streamSession;
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Autowired
private SipConfig sipConfig;
@Autowired
private SipFactory sipFactory;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addResponseProcessor(method, this);
}
@Autowired
private VideoStreamSessionManager streamSession;
/**
* 处理invite响应
@ -74,6 +83,19 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
CSeqHeader cseq = (CSeqHeader) response.getHeader(CSeqHeader.NAME);
Request reqAck = dialog.createAck(cseq.getSeqNumber());
SipURI requestURI = (SipURI) reqAck.getRequestURI();
String contentString = new String(response.getRawContent());
// jainSip不支持y=字段 移除以解析
int ssrcIndex = contentString.indexOf("y=");
// 检查是否有y字段
SessionDescription sdp;
if (ssrcIndex >= 0) {
//ssrc规定长度为10字节不取余下长度以避免后续还有f=字段
String substring = contentString.substring(0, contentString.indexOf("y="));
sdp = SdpFactory.getInstance().createSessionDescription(substring);
} else {
sdp = SdpFactory.getInstance().createSessionDescription(contentString);
}
requestURI.setUser(sdp.getOrigin().getUsername());
try {
requestURI.setHost(event.getRemoteIpAddress());
} catch (ParseException e) {
@ -81,6 +103,18 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
}
requestURI.setPort(event.getRemotePort());
reqAck.setRequestURI(requestURI);
List<String> agentParam = new ArrayList<>();
agentParam.add("wvp-pro");
// TODO 添加版本信息以及日期
UserAgentHeader userAgentHeader = null;
try {
userAgentHeader = sipFactory.createHeaderFactory().createUserAgentHeader(agentParam);
} catch (ParseException e) {
throw new RuntimeException(e);
}
reqAck.addHeader(userAgentHeader);
Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory().createSipURI(sipConfig.getId(), sipConfig.getIp()+":"+sipConfig.getPort()));
reqAck.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
logger.info("[回复ack] {}-> {}:{} ",requestURI, event.getRemoteIpAddress(), event.getRemotePort());
dialog.sendAck(reqAck);
@ -88,6 +122,10 @@ public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
}
} catch (InvalidArgumentException | SipException e) {
e.printStackTrace();
} catch (ParseException e) {
throw new RuntimeException(e);
} catch (SdpParseException e) {
throw new RuntimeException(e);
}
}

View File

@ -98,9 +98,7 @@ public class ZLMHttpHookListener {
@PostMapping(value = "/on_server_keepalive", produces = "application/json;charset=UTF-8")
public ResponseEntity<String> onServerKeepalive(@RequestBody JSONObject json){
if (logger.isDebugEnabled()) {
logger.debug("[ ZLM HOOK ] on_server_keepalive API调用参数" + json.toString());
}
logger.info("[ ZLM HOOK ] on_server_keepalive API调用参数" + json.toString());
String mediaServerId = json.getString("mediaServerId");
List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
if (subscribes != null && subscribes.size() > 0) {

View File

@ -277,13 +277,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
return null;
}
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
MediaServerItem serverItem=(MediaServerItem)redisUtil.get(key);
if(null==serverItem){
//zlm服务不在线启动重连
reloadZlm();
serverItem=(MediaServerItem)redisUtil.get(key);
}
return serverItem;
return (MediaServerItem)redisUtil.get(key);
}
@Override
@ -412,7 +406,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
redisUtil.set(key, serverItem);
resetOnlineServerItem(serverItem);
updateMediaServerKeepalive(serverItem.getId(), null);
if (serverItem.isAutoConfig()) {
setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable()));
}
@ -476,9 +469,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
String key = VideoManagerConstants.MEDIA_SERVERS_ONLINE_PREFIX + userSetting.getServerId();
if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
logger.info("获取负载最低的节点时无在线节点,启动重连机制");
//启动重连
reloadZlm();
if (redisUtil.zSize(key) == null || redisUtil.zSize(key) == 0) {
logger.info("获取负载最低的节点时无在线节点");
return null;
@ -643,6 +633,11 @@ public class MediaServerServiceImpl implements IMediaServerService {
public void updateMediaServerKeepalive(String mediaServerId, JSONObject data) {
MediaServerItem mediaServerItem = getOne(mediaServerId);
if (mediaServerItem == null) {
// 缓存不存在从数据库查询如果数据库不存在则是错误的
MediaServerItem mediaServerItemFromDatabase = getOneFromDatabase(mediaServerId);
if (mediaServerItemFromDatabase == null) {
return;
}
// zlm连接重试
logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息,尝试重连zlm");
reloadZlm();
@ -658,6 +653,10 @@ public class MediaServerServiceImpl implements IMediaServerService {
redisUtil.set(key, data, hookAliveInterval);
}
private MediaServerItem getOneFromDatabase(String mediaServerId) {
return mediaServerMapper.queryOne(mediaServerId);
}
@Override
public void syncCatchFromDatabase() {
List<MediaServerItem> allInCatch = getAll();

View File

@ -17,10 +17,9 @@ public interface IRedisCatchStorage {
/**
* 计数器为cseq进行计数
*
* @param method sip 方法
* @return
*/
Long getCSEQ(String method);
Long getCSEQ();
/**
* 开始播放时将流存入

View File

@ -42,8 +42,8 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
private UserSetting userSetting;
@Override
public Long getCSEQ(String method) {
String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId() + "_" + method;
public Long getCSEQ() {
String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetting.getServerId();
long result = redis.incr(key, 1L);
if (result > Integer.MAX_VALUE) {