优化info消息的cseq计数

This commit is contained in:
648540858 2021-12-14 18:41:50 +08:00
parent 886645ff56
commit c4de9d674e
12 changed files with 85 additions and 58 deletions

View File

@ -56,6 +56,8 @@ public class VideoManagerConstants {
public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_MEDIA_TRANSACTION_";
public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_";
//************************** redis 消息*********************************
public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";

View File

@ -1,5 +1,7 @@
package com.genersoft.iot.vmp.conf.runner;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.beans.factory.annotation.Autowired;
@ -23,6 +25,9 @@ public class SipDeviceRunner implements CommandLineRunner {
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private UserSetup userSetup;
@Override
public void run(String... args) throws Exception {
// 读取redis没有心跳信息的则设置为离线等收到下次心跳设置为在线
@ -32,7 +37,8 @@ public class SipDeviceRunner implements CommandLineRunner {
for (String deviceId : onlineForAll) {
storager.online(deviceId);
}
// 重置cseq计数
redisCatchStorage.resetAllCSEQ();
// TODO 查询在线设备那些开启了订阅为设备开启定时的目录订阅
}
}

View File

@ -14,7 +14,7 @@ import javax.sip.message.Request;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@ -35,6 +35,9 @@ public class SIPRequestHeaderProvider {
@Autowired
private SipFactory sipFactory;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private VideoStreamSessionManager streamSession;
@ -195,6 +198,7 @@ public class SIPRequestHeaderProvider {
// Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
// ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.SUBSCRIBE);
@ -218,7 +222,7 @@ public class SIPRequestHeaderProvider {
return request;
}
public Request createInfoRequest(Device device, StreamInfo streamInfo, String content)
public Request createInfoRequest(Device device, StreamInfo streamInfo, String content, Long cseq)
throws PeerUnavailableException, ParseException, InvalidArgumentException {
Request request = null;
Dialog dialog = streamSession.getDialog(streamInfo.getDeviceID(), streamInfo.getChannelId());
@ -247,10 +251,12 @@ public class SIPRequestHeaderProvider {
// Forwards
MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
if (cseq == null) {
cseq = redisCatchStorage.getCSEQ(Request.INFO);
}
// ceq
CSeqHeader cSeqHeader = sipFactory.createHeaderFactory()
.createCSeqHeader(InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()), Request.INFO);
.createCSeqHeader(cseq, Request.INFO);
request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INFO, callIdHeader, cSeqHeader,
fromHeader, toHeader, viaHeaders, maxForwards);

View File

@ -18,7 +18,6 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.message.SIPRequest;
@ -1553,12 +1552,12 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playPauseCmd(Device device, StreamInfo streamInfo) {
try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200);
content.append("PAUSE RTSP/1.0\r\n");
content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
content.append("CSeq: " + cseq + "\r\n");
content.append("PauseTime: now\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
logger.info(request.toString());
ClientTransaction clientTransaction = null;
if ("TCP".equals(device.getTransport())) {
@ -1581,11 +1580,12 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playResumeCmd(Device device, StreamInfo streamInfo) {
try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200);
content.append("PLAY RTSP/1.0\r\n");
content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
content.append("CSeq: " + cseq + "\r\n");
content.append("Range: npt=now-\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
logger.info(request.toString());
ClientTransaction clientTransaction = null;
if ("TCP".equals(device.getTransport())) {
@ -1607,12 +1607,13 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) {
try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200);
content.append("PLAY RTSP/1.0\r\n");
content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
content.append("CSeq: " + cseq + "\r\n");
content.append("Range: npt=" + Math.abs(seekTime) + "-\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
logger.info(request.toString());
ClientTransaction clientTransaction = null;
if ("TCP".equals(device.getTransport())) {
@ -1634,11 +1635,12 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed) {
try {
Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
StringBuffer content = new StringBuffer(200);
content.append("PLAY RTSP/1.0\r\n");
content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
content.append("CSeq: " + cseq + "\r\n");
content.append("Scale: " + String.format("%.1f",speed) + "\r\n");
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
logger.info(request.toString());
ClientTransaction clientTransaction = null;
if ("TCP".equals(device.getTransport())) {

View File

@ -89,7 +89,7 @@ public class ZLMRunner implements CommandLineRunner {
});
// 获取zlm信息
logger.info("等待默认zlm接入...");
logger.info("[zlm接入]等待默认zlm中...");
// 获取所有的zlm 并开启主动连接
List<MediaServerItem> all = mediaServerService.getAllFromDatabase();

View File

@ -25,24 +25,28 @@ public class CatalogSubscribeTask implements Runnable{
sipCommander.catalogSubscribe(device, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
Element rootElement = null;
try {
rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312");
} catch (DocumentException e) {
e.printStackTrace();
}
Element resultElement = rootElement.element("Result");
String result = resultElement.getText();
if (result.toUpperCase().equals("OK")){
// 成功
logger.info("目录订阅成功: {}", device.getDeviceId());
if (event.getResponse().getRawContent() != null) {
try {
rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312");
} catch (DocumentException e) {
e.printStackTrace();
}
Element resultElement = rootElement.element("Result");
String result = resultElement.getText();
if (result.toUpperCase().equals("OK")){
// 成功
logger.info("[目录订阅]成功: {}", device.getDeviceId());
}else {
// 失败
logger.info("[目录订阅]失败: {}-{}", device.getDeviceId(), result);
}
}else {
// 失败
logger.info("目录订阅失败: {}-{}", device.getDeviceId(), result);
// 成功
logger.info("[目录订阅]成功: {}", device.getDeviceId());
}
},eventResult -> {
// 失败
logger.warn("目录订阅失败: {}-信令发送失败", device.getDeviceId());
logger.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
});
}
}

View File

@ -51,6 +51,8 @@ public class DeviceServiceImpl implements IDeviceService {
dynamicTask.stopCron(device.getDeviceId());
device.setSubscribeCycleForCatalog(0);
sipCommander.catalogSubscribe(device, null, null);
// 清空cseq计数
return true;
}
}

View File

@ -83,7 +83,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
*/
@Override
public void run(String... args) throws Exception {
logger.info("Media Server 缓存初始化");
logger.info("[缓存初始化] Media Server ");
List<MediaServerItem> mediaServerItemList = mediaServerMapper.queryAll();
for (MediaServerItem mediaServerItem : mediaServerItemList) {
if (StringUtils.isEmpty(mediaServerItem.getId())) {

View File

@ -14,6 +14,14 @@ import java.util.Map;
public interface IRedisCatchStorage {
/**
* 计数器为cseq进行计数
*
* @param method sip 方法
* @return
*/
Long getCSEQ(String method);
/**
* 开始播放时将流存入
*
@ -181,4 +189,6 @@ public interface IRedisCatchStorage {
* 获取Device
*/
Device getDevice(String deviceId);
void resetAllCSEQ();
}

View File

@ -36,6 +36,28 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public Long getCSEQ(String method) {
String key = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetup.getServerId() + "_" + method;
long result = redis.incr(key, 1L);
if (result > Integer.MAX_VALUE) {
redis.set(key, 1);
result = 1;
}
return result;
}
@Override
public void resetAllCSEQ() {
String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX + userSetup.getServerId() + "_*";
List<Object> keys = redis.scan(scanKey);
for (int i = 0; i < keys.size(); i++) {
String key = (String) keys.get(i);
redis.set(key, 1);
}
}
/**
* 开始播放时将流存入redis
*

View File

@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
@ -31,7 +30,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.springframework.web.context.request.async.DeferredResult;
import javax.sip.message.Response;
import java.util.UUID;
@Api(tags = "视频回放")
@ -168,7 +166,6 @@ public class PlaybackController {
logger.warn("streamId不存在!");
return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
}
setCseq(streamId);
Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
cmder.playPauseCmd(device, streamInfo);
json.put("msg", "ok");
@ -189,7 +186,6 @@ public class PlaybackController {
logger.warn("streamId不存在!");
return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
}
setCseq(streamId);
Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
cmder.playResumeCmd(device, streamInfo);
json.put("msg", "ok");
@ -211,7 +207,6 @@ public class PlaybackController {
logger.warn("streamId不存在!");
return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
}
setCseq(streamId);
Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
cmder.playSeekCmd(device, streamInfo, seekTime);
json.put("msg", "ok");
@ -238,18 +233,10 @@ public class PlaybackController {
logger.warn("不支持的speed " + speed);
return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
}
setCseq(streamId);
Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
cmder.playSpeedCmd(device, streamInfo, speed);
json.put("msg", "ok");
return new ResponseEntity<String>(json.toString(), HttpStatus.OK);
}
public void setCseq(String streamId) {
if (InfoCseqCache.CSEQCACHE.containsKey(streamId)) {
InfoCseqCache.CSEQCACHE.put(streamId, InfoCseqCache.CSEQCACHE.get(streamId) + 1);
} else {
InfoCseqCache.CSEQCACHE.put(streamId, 2L);
}
}
}

View File

@ -1,14 +0,0 @@
package com.genersoft.iot.vmp.vmanager.gb28181.session;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @ClassName: InfoCseqCache
* @Description: INFO类型的Sip中cseq的缓存
*/
public class InfoCseqCache {
public static Map<String, Long> CSEQCACHE = new ConcurrentHashMap<>();
}