尝试修复catalog获取失败。服务重启后设备未注册仍上报keeplive处理

This commit is contained in:
songww 2020-05-13 14:55:06 +08:00
parent d072017bdc
commit ca5139929b
11 changed files with 145 additions and 48 deletions

View File

@ -0,0 +1,24 @@
package com.genersoft.iot.vmp.gb28181.event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
/**
* @Description:设备离在线状态检测器用于检测设备状态
* @author: songww
* @date: 2020年5月13日 下午2:40:29
*/
@Component
public class DeviceOffLineDetector {
@Autowired
private RedisUtil redis;
public boolean isOnline(String deviceId) {
String key = VideoManagerConstants.KEEPLIVEKEY_PREFIX + deviceId;
return redis.hasKey(key);
}
}

View File

@ -4,8 +4,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.gb28181.event.offline.OfflineEvent;
import com.genersoft.iot.vmp.gb28181.event.online.OnlineEvent; import com.genersoft.iot.vmp.gb28181.event.online.OnlineEvent;
import com.genersoft.iot.vmp.gb28181.event.outline.OutlineEvent;
/** /**
* @Description:Event事件通知推送器支持推送在线事件离线事件 * @Description:Event事件通知推送器支持推送在线事件离线事件
@ -26,7 +26,7 @@ public class EventPublisher {
} }
public void outlineEventPublish(String deviceId, String from){ public void outlineEventPublish(String deviceId, String from){
OutlineEvent outEvent = new OutlineEvent(this); OfflineEvent outEvent = new OfflineEvent(this);
outEvent.setDeviceId(deviceId); outEvent.setDeviceId(deviceId);
outEvent.setFrom(from); outEvent.setFrom(from);
applicationEventPublisher.publishEvent(outEvent); applicationEventPublisher.publishEvent(outEvent);

View File

@ -1,4 +1,4 @@
package com.genersoft.iot.vmp.gb28181.event.outline; package com.genersoft.iot.vmp.gb28181.event.offline;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.Message;

View File

@ -1,4 +1,4 @@
package com.genersoft.iot.vmp.gb28181.event.outline; package com.genersoft.iot.vmp.gb28181.event.offline;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
@ -7,7 +7,7 @@ import org.springframework.context.ApplicationEvent;
* @author: songww * @author: songww
* @date: 2020年5月6日 上午11:33:13 * @date: 2020年5月6日 上午11:33:13
*/ */
public class OutlineEvent extends ApplicationEvent { public class OfflineEvent extends ApplicationEvent {
/** /**
* @Title: OutlineEvent * @Title: OutlineEvent
@ -15,7 +15,7 @@ public class OutlineEvent extends ApplicationEvent {
* @param: @param source * @param: @param source
* @throws * @throws
*/ */
public OutlineEvent(Object source) { public OfflineEvent(Object source) {
super(source); super(source);
} }

View File

@ -1,4 +1,4 @@
package com.genersoft.iot.vmp.gb28181.event.outline; package com.genersoft.iot.vmp.gb28181.event.offline;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -13,14 +13,14 @@ import com.genersoft.iot.vmp.utils.redis.RedisUtil;
/** /**
* @Description: 离线事件监听器监听到离线后修改设备离在线状态 设备离线有两个来源 * @Description: 离线事件监听器监听到离线后修改设备离在线状态 设备离线有两个来源
* 1设备主动注销发送注销指令{@link com.genersoft.iot.vmp.gb28181.transmit.request.impl.RegisterRequestProcessor} * 1设备主动注销发送注销指令{@link com.genersoft.iot.vmp.gb28181.transmit.request.impl.RegisterRequestProcessor}
* 2设备未知原因离线心跳超时,{@link com.genersoft.iot.vmp.gb28181.event.outline.OutlineEventListener} * 2设备未知原因离线心跳超时,{@link com.genersoft.iot.vmp.gb28181.event.offline.OfflineEventListener}
* @author: songww * @author: songww
* @date: 2020年5月6日 下午1:51:23 * @date: 2020年5月6日 下午1:51:23
*/ */
@Component @Component
public class OutlineEventListener implements ApplicationListener<OutlineEvent> { public class OfflineEventListener implements ApplicationListener<OfflineEvent> {
private final static Logger logger = LoggerFactory.getLogger(OutlineEventListener.class); private final static Logger logger = LoggerFactory.getLogger(OfflineEventListener.class);
@Autowired @Autowired
private IVideoManagerStorager storager; private IVideoManagerStorager storager;
@ -29,7 +29,7 @@ public class OutlineEventListener implements ApplicationListener<OutlineEvent> {
private RedisUtil redis; private RedisUtil redis;
@Override @Override
public void onApplicationEvent(OutlineEvent event) { public void onApplicationEvent(OfflineEvent event) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("设备离线事件触发deviceId" + event.getDeviceId() + ",from:" + event.getFrom()); logger.debug("设备离线事件触发deviceId" + event.getDeviceId() + ",from:" + event.getFrom());

View File

@ -79,7 +79,7 @@ public interface ISIPCommander {
* @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss * @param startTime 开始时间,格式要求yyyy-MM-dd HH:mm:ss
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/ */
public String playbackStreamCmd(Device device,String channelId, String recordId, String startTime, String endTime); public String playbackStreamCmd(Device device,String channelId, String startTime, String endTime);
/** /**
* 语音广播 * 语音广播

View File

@ -1,14 +1,13 @@
package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import java.text.ParseException; import java.text.ParseException;
import java.util.Random;
import javax.sip.ClientTransaction;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.message.Request; import javax.sip.message.Request;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
@ -19,8 +18,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
import com.genersoft.iot.vmp.gb28181.utils.DateUtil; import com.genersoft.iot.vmp.gb28181.utils.DateUtil;
import com.genersoft.iot.vmp.gb28181.utils.SsrcUtil; import com.genersoft.iot.vmp.gb28181.utils.SsrcUtil;
import tk.mybatis.mapper.util.StringUtil;
/** /**
* @Description:设备能力接口用于定义设备的控制查询能力 * @Description:设备能力接口用于定义设备的控制查询能力
* @author: songww * @author: songww
@ -181,16 +178,16 @@ public class SIPCommander implements ISIPCommander {
* @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss * @param endTime 结束时间,格式要求yyyy-MM-dd HH:mm:ss
*/ */
@Override @Override
public String playbackStreamCmd(Device device, String channelId, String recordId, String startTime, String endTime) { public String playbackStreamCmd(Device device, String channelId, String startTime, String endTime) {
try { try {
String ssrc = SsrcUtil.getPlayBackSsrc(); String ssrc = SsrcUtil.getPlayBackSsrc();
// //
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n"); content.append("v=0\r\n");
content.append("o="+channelId+" 0 0 IN IP4 "+sipConfig.getSipIp()+"\r\n"); content.append("o="+device.getDeviceId()+" 0 0 IN IP4 "+sipConfig.getSipIp()+"\r\n");
content.append("s=Playback\r\n"); content.append("s=Playback\r\n");
content.append("u="+recordId+":3\r\n"); content.append("u="+channelId+":3\r\n");
content.append("c=IN IP4 "+sipConfig.getMediaIp()+"\r\n"); content.append("c=IN IP4 "+sipConfig.getMediaIp()+"\r\n");
content.append("t="+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime)+" "+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime) +"\r\n"); content.append("t="+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime)+" "+DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime) +"\r\n");
if(device.getTransport().equals("TCP")) { if(device.getTransport().equals("TCP")) {
@ -439,11 +436,15 @@ public class SIPCommander implements ISIPCommander {
} }
private void transmitRequest(Device device, Request request) throws SipException { private void transmitRequest(Device device, Request request) throws SipException {
ClientTransaction clientTransaction = null;
if(device.getTransport().equals("TCP")) { if(device.getTransport().equals("TCP")) {
sipLayer.getTcpSipProvider().sendRequest(request); clientTransaction = sipLayer.getTcpSipProvider().getNewClientTransaction(request);
//sipLayer.getTcpSipProvider().sendRequest(request);
} else if(device.getTransport().equals("UDP")) { } else if(device.getTransport().equals("UDP")) {
sipLayer.getUdpSipProvider().sendRequest(request); clientTransaction = sipLayer.getUdpSipProvider().getNewClientTransaction(request);
//sipLayer.getUdpSipProvider().sendRequest(request);
} }
clientTransaction.sendRequest();
} }
} }

View File

@ -30,6 +30,7 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.gb28181.bean.RecordItem; import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
@ -69,8 +70,21 @@ public class MessageRequestProcessor implements ISIPRequestProcessor {
@Autowired @Autowired
private DeferredResultHolder deferredResultHolder; private DeferredResultHolder deferredResultHolder;
@Autowired
private DeviceOffLineDetector offLineDetector;
private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_"; private final static String CACHE_RECORDINFO_KEY = "CACHE_RECORDINFO_";
private static final String MESSAGE_CATALOG = "Catalog";
private static final String MESSAGE_DEVICE_INFO = "DeviceInfo";
private static final String MESSAGE_KEEP_ALIVE = "Keepalive";
private static final String MESSAGE_ALARM = "Alarm";
private static final String MESSAGE_RECORD_INFO = "RecordInfo";
// private static final String MESSAGE_BROADCAST = "Broadcast";
// private static final String MESSAGE_DEVICE_STATUS = "DeviceStatus";
// private static final String MESSAGE_MOBILE_POSITION = "MobilePosition";
// private static final String MESSAGE_MOBILE_POSITION_INTERVAL = "Interval";
/** /**
* 处理MESSAGE请求 * 处理MESSAGE请求
* *
@ -85,22 +99,31 @@ public class MessageRequestProcessor implements ISIPRequestProcessor {
this.transaction = transaction; this.transaction = transaction;
Request request = evt.getRequest(); Request request = evt.getRequest();
SAXReader reader = new SAXReader();
if (new String(request.getRawContent()).contains("<CmdType>Keepalive</CmdType>")) { Document xml;
logger.info("接收到KeepAlive消息"); try {
processMessageKeepAlive(evt); xml = reader.read(new ByteArrayInputStream(request.getRawContent()));
} else if (new String(request.getRawContent()).contains("<CmdType>Catalog</CmdType>")) { Element rootElement = xml.getRootElement();
logger.info("接收到Catalog消息"); String cmd = rootElement.element("CmdType").getStringValue();
processMessageCatalogList(evt);
} else if (new String(request.getRawContent()).contains("<CmdType>DeviceInfo</CmdType>")) { if (MESSAGE_KEEP_ALIVE.equals(cmd)) {
logger.info("接收到DeviceInfo消息"); logger.info("接收到KeepAlive消息");
processMessageDeviceInfo(evt); processMessageKeepAlive(evt);
} else if (new String(request.getRawContent()).contains("<CmdType>Alarm</CmdType>")) { } else if (MESSAGE_CATALOG.equals(cmd)) {
logger.info("接收到Alarm消息"); logger.info("接收到Catalog消息");
processMessageAlarm(evt); processMessageCatalogList(evt);
} else if (new String(request.getRawContent()).contains("<CmdType>RecordInfo</CmdType>")) { } else if (MESSAGE_DEVICE_INFO.equals(cmd)) {
logger.info("接收到RecordInfo消息"); logger.info("接收到DeviceInfo消息");
processMessageRecordInfo(evt); processMessageDeviceInfo(evt);
} else if (MESSAGE_ALARM.equals(cmd)) {
logger.info("接收到Alarm消息");
processMessageAlarm(evt);
} else if (MESSAGE_RECORD_INFO.equals(cmd)) {
logger.info("接收到RecordInfo消息");
processMessageRecordInfo(evt);
}
} catch (DocumentException e) {
e.printStackTrace();
} }
} }
@ -247,12 +270,17 @@ public class MessageRequestProcessor implements ISIPRequestProcessor {
*/ */
private void processMessageKeepAlive(RequestEvent evt){ private void processMessageKeepAlive(RequestEvent evt){
try { try {
Request request = evt.getRequest();
Response response = layer.getMessageFactory().createResponse(Response.OK,request);
Element rootElement = getRootElement(evt); Element rootElement = getRootElement(evt);
Element deviceIdElement = rootElement.element("DeviceID"); String deviceId = XmlUtil.getText(rootElement,"DeviceID");
Request request = evt.getRequest();
Response response = null;
if (offLineDetector.isOnline(deviceId)) {
response = layer.getMessageFactory().createResponse(Response.OK,request);
publisher.onlineEventPublish(deviceId, VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
} else {
response = layer.getMessageFactory().createResponse(Response.BAD_REQUEST,request);
}
transaction.sendResponse(response); transaction.sendResponse(response);
publisher.onlineEventPublish(deviceIdElement.getText(), VideoManagerConstants.EVENT_ONLINE_KEEPLIVE);
} catch (ParseException | SipException | InvalidArgumentException | DocumentException e) { } catch (ParseException | SipException | InvalidArgumentException | DocumentException e) {
e.printStackTrace(); e.printStackTrace();
} }

View File

@ -1,15 +1,25 @@
package com.genersoft.iot.vmp.gb28181.transmit.response.impl; package com.genersoft.iot.vmp.gb28181.transmit.response.impl;
import java.text.ParseException;
import javax.sip.ClientTransaction;
import javax.sip.Dialog; import javax.sip.Dialog;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent; import javax.sip.ResponseEvent;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CSeqHeader;
import javax.sip.header.ViaHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.SipLayer; import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorFactory;
import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor; import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor;
/** /**
@ -20,20 +30,51 @@ import com.genersoft.iot.vmp.gb28181.transmit.response.ISIPResponseProcessor;
@Component @Component
public class InviteResponseProcessor implements ISIPResponseProcessor { public class InviteResponseProcessor implements ISIPResponseProcessor {
private final static Logger logger = LoggerFactory.getLogger(SIPProcessorFactory.class);
/** /**
* 处理invite响应 * 处理invite响应
* *
* @param request * @param evt
* 响应消息 * 响应消息
*/ */
@Override @Override
public void process(ResponseEvent evt, SipLayer layer, SipConfig config) { public void process(ResponseEvent evt, SipLayer layer, SipConfig config) {
try { try {
Dialog dialog = evt.getDialog(); Response response = evt.getResponse();
Request reqAck =dialog.createAck(1L); int statusCode = response.getStatusCode();
dialog.sendAck(reqAck); //trying不会回复
if(statusCode == Response.TRYING){
}
//成功响应
//下发ack
if(statusCode == Response.OK){
ClientTransaction clientTransaction = evt.getClientTransaction();
if(clientTransaction == null){
logger.error("回复ACK时clientTransaction为null >>> {}",response);
return;
}
Dialog clientDialog = clientTransaction.getDialog();
CSeqHeader clientCSeqHeader = (CSeqHeader) response.getHeader(CSeqHeader.NAME);
long cseqId = clientCSeqHeader.getSeqNumber();
/*
createAck函数创建的ackRequest会采用Invite响应的200OK中的contact字段中的地址作为目标地址
有的终端传上来的可能还是内网地址会造成ack发送不出去接受不到音视频流
所以在此处统一替换地址和响应消息的Via头中的地址保持一致
*/
Request ackRequest = clientDialog.createAck(cseqId);
SipURI requestURI = (SipURI) ackRequest.getRequestURI();
ViaHeader viaHeader = (ViaHeader) response.getHeader(ViaHeader.NAME);
requestURI.setHost(viaHeader.getHost());
requestURI.setPort(viaHeader.getPort());
clientDialog.sendAck(ackRequest);
}
} catch (InvalidArgumentException | SipException e) { } catch (InvalidArgumentException | SipException e) {
e.printStackTrace(); e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} }
} }

View File

@ -10,6 +10,7 @@ import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSONObject;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager; import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
@ -30,7 +31,7 @@ public class PlaybackController {
public ResponseEntity<String> play(@PathVariable String deviceId,@PathVariable String channelId, String startTime, String endTime){ public ResponseEntity<String> play(@PathVariable String deviceId,@PathVariable String channelId, String startTime, String endTime){
Device device = storager.queryVideoDevice(deviceId); Device device = storager.queryVideoDevice(deviceId);
String ssrc = cmder.playStreamCmd(device, channelId); String ssrc = cmder.playbackStreamCmd(device, channelId, startTime, endTime);
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug(String.format("设备预览 API调用deviceId%s channelId%s",deviceId, channelId)); logger.debug(String.format("设备预览 API调用deviceId%s channelId%s",deviceId, channelId));
@ -38,7 +39,9 @@ public class PlaybackController {
} }
if(ssrc!=null) { if(ssrc!=null) {
return new ResponseEntity<String>(ssrc,HttpStatus.OK); JSONObject json = new JSONObject();
json.put("ssrc", ssrc);
return new ResponseEntity<String>(json.toString(),HttpStatus.OK);
} else { } else {
logger.warn("设备预览API调用失败"); logger.warn("设备预览API调用失败");
return new ResponseEntity<String>(HttpStatus.INTERNAL_SERVER_ERROR); return new ResponseEntity<String>(HttpStatus.INTERNAL_SERVER_ERROR);

View File

@ -26,7 +26,7 @@ spring:
server: server:
port: 8080 port: 8080
sip: sip:
ip: 10.200.64.63 ip: 127.0.0.1
port: 5060 port: 5060
# 根据国标6.1.2中规定domain宜采用ID统一编码的前十位编码。国标附录D中定义前8位为中心编码由省级、市级、区级、基层编号组成参照GB/T 2260-2007 # 根据国标6.1.2中规定domain宜采用ID统一编码的前十位编码。国标附录D中定义前8位为中心编码由省级、市级、区级、基层编号组成参照GB/T 2260-2007
# 后两位为行业编码定义参照附录D.3 # 后两位为行业编码定义参照附录D.3