添加目录订阅消息与接口

This commit is contained in:
648540858 2021-11-05 18:33:53 +08:00
parent 34f5ac913e
commit 16664d7ac9
19 changed files with 3103 additions and 0 deletions

View File

@ -0,0 +1,42 @@
package com.genersoft.iot.vmp.conf;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
/**
* 动态定时任务
*/
@Component
public class DynamicTask {
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
private Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
return new ThreadPoolTaskScheduler();
}
public String startCron(String key, Runnable task, String corn) {
stopCron(key);
ScheduledFuture future = threadPoolTaskScheduler.schedule(task, new CronTrigger(corn));
futureMap.put(key, future);
return "startCron";
}
public void stopCron(String key) {
if (futureMap.get(key) != null && !futureMap.get(key).isCancelled()) {
futureMap.get(key).cancel(true);
}
}
}

View File

@ -0,0 +1,14 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request;
import javax.sip.RequestEvent;
/**
* @description: 对SIP事件进行处理包括request response timeout ioException, transactionTerminated,dialogTerminated
* @author: panlinlin
* @date: 2021年11月5日 1547
*/
public interface ISIPRequestProcessor {
void process(RequestEvent event);
}

View File

@ -0,0 +1,179 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request;
import gov.nist.javax.sip.SipProviderImpl;
import gov.nist.javax.sip.SipStackImpl;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.stack.SIPServerTransaction;
import org.dom4j.Document;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import javax.sip.*;
import javax.sip.address.Address;
import javax.sip.address.AddressFactory;
import javax.sip.address.SipURI;
import javax.sip.header.ContentTypeHeader;
import javax.sip.header.HeaderFactory;
import javax.sip.header.ViaHeader;
import javax.sip.message.MessageFactory;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.io.ByteArrayInputStream;
import java.text.ParseException;
/**
* @description:处理接收IPCamera发来的SIP协议请求消息
* @author: songww
* @date: 2020年5月3日 下午4:42:22
*/
public abstract class SIPRequestProcessorAbstract implements InitializingBean, ISIPRequestProcessor {
private final static Logger logger = LoggerFactory.getLogger(SIPRequestProcessorAbstract.class);
@Autowired
@Qualifier(value="tcpSipProvider")
private SipProviderImpl tcpSipProvider;
@Autowired
@Qualifier(value="udpSipProvider")
private SipProviderImpl udpSipProvider;
/**
* 根据 RequestEvent 获取 ServerTransaction
* @param evt
* @return
*/
public ServerTransaction getServerTransaction(RequestEvent evt) {
Request request = evt.getRequest();
ServerTransaction serverTransaction = evt.getServerTransaction();
// 判断TCP还是UDP
boolean isTcp = false;
ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
String transport = reqViaHeader.getTransport();
if (transport.equals("TCP")) {
isTcp = true;
}
if (serverTransaction == null) {
try {
if (isTcp) {
SipStackImpl stack = (SipStackImpl)tcpSipProvider.getSipStack();
serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true);
if (serverTransaction == null) {
serverTransaction = tcpSipProvider.getNewServerTransaction(request);
}
} else {
SipStackImpl stack = (SipStackImpl)udpSipProvider.getSipStack();
serverTransaction = (SIPServerTransaction) stack.findTransaction((SIPRequest)request, true);
if (serverTransaction == null) {
serverTransaction = udpSipProvider.getNewServerTransaction(request);
}
}
} catch (TransactionAlreadyExistsException e) {
logger.error(e.getMessage());
} catch (TransactionUnavailableException e) {
logger.error(e.getMessage());
}
}
return serverTransaction;
}
public AddressFactory getAddressFactory() {
try {
return SipFactory.getInstance().createAddressFactory();
} catch (PeerUnavailableException e) {
e.printStackTrace();
}
return null;
}
public HeaderFactory getHeaderFactory() {
try {
return SipFactory.getInstance().createHeaderFactory();
} catch (PeerUnavailableException e) {
e.printStackTrace();
}
return null;
}
public MessageFactory getMessageFactory() {
try {
return SipFactory.getInstance().createMessageFactory();
} catch (PeerUnavailableException e) {
e.printStackTrace();
}
return null;
}
/***
* 回复状态码
* 100 trying
* 200 OK
* 400
* 404
* @param evt
* @throws SipException
* @throws InvalidArgumentException
* @throws ParseException
*/
public void responseAck(RequestEvent evt, int statusCode) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (statusCode >= 200) {
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
}
}
public void responseAck(RequestEvent evt, int statusCode, String msg) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
response.setReasonPhrase(msg);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (statusCode >= 200) {
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
}
}
/**
* 回复带sdp的200
* @param evt
* @param sdp
* @throws SipException
* @throws InvalidArgumentException
* @throws ParseException
*/
public void responseAck(RequestEvent evt, String sdp) throws SipException, InvalidArgumentException, ParseException {
Response response = getMessageFactory().createResponse(Response.OK, evt.getRequest());
SipFactory sipFactory = SipFactory.getInstance();
ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("APPLICATION", "SDP");
response.setContent(sdp, contentTypeHeader);
SipURI sipURI = (SipURI)evt.getRequest().getRequestURI();
Address concatAddress = sipFactory.createAddressFactory().createAddress(
sipFactory.createAddressFactory().createSipURI(sipURI.getUser(), sipURI.getHost()+":"+sipURI.getPort()
));
response.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
getServerTransaction(evt).sendResponse(response);
}
public Element getRootElement(RequestEvent evt) throws DocumentException {
return getRootElement(evt, "gb2312");
}
public Element getRootElement(RequestEvent evt, String charset) throws DocumentException {
if (charset == null) charset = "gb2312";
Request request = evt.getRequest();
SAXReader reader = new SAXReader();
reader.setEncoding(charset);
Document xml = reader.read(new ByteArrayInputStream(request.getRawContent()));
return xml.getRootElement();
}
}

View File

@ -0,0 +1,123 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.Dialog;
import javax.sip.DialogState;
import javax.sip.RequestEvent;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import java.util.HashMap;
import java.util.Map;
/**
* @description:ACK请求处理器
* @author: swwheihei
* @date: 2020年5月3日 下午5:31:45
*/
@Component
public class AckRequestProcessor extends SIPRequestProcessorAbstract {
private Logger logger = LoggerFactory.getLogger(AckRequestProcessor.class);
private String method = "ACK";
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addRequestProcessor(method, this);
}
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
@Autowired
private IMediaServerService mediaServerService;
/**
* 处理 ACK请求
*
* @param evt
*/
@Override
public void process(RequestEvent evt) {
Dialog dialog = evt.getDialog();
if (dialog == null) return;
if (dialog.getState()== DialogState.CONFIRMED) {
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
String is_Udp = sendRtpItem.isTcp() ? "0" : "1";
String deviceId = sendRtpItem.getDeviceId();
StreamInfo streamInfo = null;
if (deviceId == null) {
streamInfo = new StreamInfo();
streamInfo.setApp(sendRtpItem.getApp());
streamInfo.setStreamId(sendRtpItem.getStreamId());
}else {
streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
sendRtpItem.setStreamId(streamInfo.getStreamId());
streamInfo.setApp("rtp");
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
logger.info(platformGbId);
logger.info(channelId);
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",streamInfo.getApp());
param.put("stream",streamInfo.getStreamId());
param.put("ssrc", sendRtpItem.getSsrc());
param.put("dst_url",sendRtpItem.getIp());
param.put("dst_port", sendRtpItem.getPort());
param.put("is_udp", is_Udp);
//param.put ("src_port", sendRtpItem.getLocalPort());
// 设备推流查询成功后才能转推
boolean rtpPushed = false;
long startTime = System.currentTimeMillis();
while (!rtpPushed) {
try {
if (System.currentTimeMillis() - startTime < 30 * 1000) {
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
if (zlmrtpServerFactory.isStreamReady(mediaInfo, streamInfo.getApp(), streamInfo.getStreamId())) {
rtpPushed = true;
logger.info("已获取设备推流[{}/{}],开始向上级推流[{}:{}]",
streamInfo.getApp() ,streamInfo.getStreamId(), sendRtpItem.getIp(), sendRtpItem.getPort());
zlmrtpServerFactory.startSendRtpStream(mediaInfo, param);
} else {
logger.info("等待设备推流[{}/{}].......",
streamInfo.getApp() ,streamInfo.getStreamId());
Thread.sleep(1000);
continue;
}
} else {
rtpPushed = true;
logger.info("设备推流[{}/{}]超时,终止向上级推流",
streamInfo.getApp() ,streamInfo.getStreamId());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

View File

@ -0,0 +1,115 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.header.HeaderAddress;
import javax.sip.header.ToHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.HashMap;
import java.util.Map;
/**
* @description: BYE请求处理器
* @author: lawrencehj
* @date: 2021年3月9日
*/
@Component
public class ByeRequestProcessor extends SIPRequestProcessorAbstract {
private Logger logger = LoggerFactory.getLogger(ByeRequestProcessor.class);
@Autowired
private ISIPCommander cmder;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IVideoManagerStorager storager;
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
@Autowired
private IMediaServerService mediaServerService;
private String method = "BYE";
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addRequestProcessor(method, this);
}
/**
* 处理BYE请求
* @param evt
*/
@Override
public void process(RequestEvent evt) {
try {
responseAck(evt, Response.OK);
Dialog dialog = evt.getDialog();
if (dialog == null) return;
if (dialog.getState().equals(DialogState.TERMINATED)) {
String platformGbId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(FromHeader.NAME)).getAddress().getURI()).getUser();
String channelId = ((SipURI) ((HeaderAddress) evt.getRequest().getHeader(ToHeader.NAME)).getAddress().getURI()).getUser();
SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(platformGbId, channelId);
logger.info("收到bye, [{}/{}]", platformGbId, channelId);
if (sendRtpItem != null){
String streamId = sendRtpItem.getStreamId();
Map<String, Object> param = new HashMap<>();
param.put("vhost","__defaultVhost__");
param.put("app",sendRtpItem.getApp());
param.put("stream",streamId);
param.put("ssrc",sendRtpItem.getSsrc());
logger.info("停止向上级推流:" + streamId);
MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
zlmrtpServerFactory.stopSendRtpStream(mediaInfo, param);
redisCatchStorage.deleteSendRTPServer(platformGbId, channelId);
if (zlmrtpServerFactory.totalReaderCount(mediaInfo, sendRtpItem.getApp(), streamId) == 0) {
logger.info(streamId + "无其它观看者,通知设备停止推流");
cmder.streamByeCmd(sendRtpItem.getDeviceId(), channelId);
}
}
// 可能是设备主动停止
Device device = storager.queryVideoDeviceByChannelId(platformGbId);
if (device != null) {
StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
if (streamInfo != null) {
redisCatchStorage.stopPlay(streamInfo);
}
storager.stopPlay(device.getDeviceId(), channelId);
mediaServerService.closeRTPServer(device, channelId);
}
}
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,40 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.RequestEvent;
/**
* @description:CANCEL请求处理器
* @author: swwheihei
* @date: 2020年5月3日 下午5:32:23
*/
@Component
public class CancelRequestProcessor extends SIPRequestProcessorAbstract {
private String method = "CANCEL";
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addRequestProcessor(method, this);
}
/**
* 处理CANCEL请求
*
* @param evt 事件
*/
@Override
public void process(RequestEvent evt) {
// TODO 优先级99 Cancel Request消息实现此消息一般为级联消息上级给下级发送请求取消指令
}
}

View File

@ -0,0 +1,386 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sdp.*;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.FromHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Vector;
/**
* @description:处理INVITE请求
* @author: panll
* @date: 2021年1月14日
*/
@SuppressWarnings("rawtypes")
@Component
public class InviteRequestProcessor extends SIPRequestProcessorAbstract {
private final static Logger logger = LoggerFactory.getLogger(InviteRequestProcessor.class);
private String method = "INVITE";
@Autowired
private SIPCommanderFroPlatform cmderFroPlatform;
@Autowired
private IVideoManagerStorager storager;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private SIPCommander cmder;
@Autowired
private IPlayService playService;
@Autowired
private ZLMRTPServerFactory zlmrtpServerFactory;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addRequestProcessor(method, this);
}
/**
* 处理invite请求
*
* @param evt
* 请求消息
*/
@Override
public void process(RequestEvent evt) {
// Invite Request消息实现此消息一般为级联消息上级给下级发送请求视频指令
try {
Request request = evt.getRequest();
SipURI sipURI = (SipURI) request.getRequestURI();
String channelId = sipURI.getUser();
String requesterId = null;
FromHeader fromHeader = (FromHeader)request.getHeader(FromHeader.NAME);
AddressImpl address = (AddressImpl) fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI();
requesterId = uri.getUser();
if (requesterId == null || channelId == null) {
logger.info("无法从FromHeader的Address中获取到平台id返回400");
responseAck(evt, Response.BAD_REQUEST); // 参数不全 发400请求错误
return;
}
// 查询请求方是否上级平台
ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
if (platform != null) {
// 查询平台下是否有该通道
DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
GbStream gbStream = storager.queryStreamInParentPlatform(requesterId, channelId);
MediaServerItem mediaServerItem = null;
// 不是通道可能是直播流
if (channel != null && gbStream == null ) {
if (channel.getStatus() == 0) {
logger.info("通道离线返回400");
responseAck(evt, Response.BAD_REQUEST, "channel [" + channel.getChannelId() + "] offline");
return;
}
responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在发181呼叫转接中
}else if(channel == null && gbStream != null){
String mediaServerId = gbStream.getMediaServerId();
mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem == null) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410",gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(evt, Response.GONE, "media server not found");
return;
}
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
if (!streamReady ) {
logger.info("[ app={}, stream={} ]通道离线返回400",gbStream.getApp(), gbStream.getStream());
responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
return;
}
responseAck(evt, Response.CALL_IS_BEING_FORWARDED); // 通道存在发181呼叫转接中
}else {
logger.info("通道不存在返回404");
responseAck(evt, Response.NOT_FOUND); // 通道不存在发404资源不存在
return;
}
// 解析sdp消息, 使用jainsip 自带的sdp解析方式
String contentString = new String(request.getRawContent());
// jainSip不支持y=字段 移除移除以解析
int ssrcIndex = contentString.indexOf("y=");
//ssrc规定长度为10字节不取余下长度以避免后续还有f=字段
String ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
String substring = contentString.substring(0, contentString.indexOf("y="));
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
// 获取支持的格式
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
// 查看是否支持PS 负载96
//String ip = null;
int port = -1;
//boolean recvonly = false;
boolean mediaTransmissionTCP = false;
Boolean tcpActive = null;
for (Object description : mediaDescriptions) {
MediaDescription mediaDescription = (MediaDescription) description;
Media media = mediaDescription.getMedia();
Vector mediaFormats = media.getMediaFormats(false);
if (mediaFormats.contains("96")) {
port = media.getMediaPort();
//String mediaType = media.getMediaType();
String protocol = media.getProtocol();
// 区分TCP发流还是udp 当前默认udp
if ("TCP/RTP/AVP".equals(protocol)) {
String setup = mediaDescription.getAttribute("setup");
if (setup != null) {
mediaTransmissionTCP = true;
if ("active".equals(setup)) {
tcpActive = true;
} else if ("passive".equals(setup)) {
tcpActive = false;
}
}
}
break;
}
}
if (port == -1) {
logger.info("不支持的媒体格式返回415");
// 回复不支持的格式
responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415
return;
}
String username = sdp.getOrigin().getUsername();
String addressStr = sdp.getOrigin().getAddress();
//String sessionName = sdp.getSessionName().getValue();
logger.info("[上级点播]用户:{} 地址:{}:{} ssrc{}", username, addressStr, port, ssrc);
Device device = null;
// 通过 channel gbStream 是否为null 值判断来源是直播流合适国标
if (channel != null) {
device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
if (device == null) {
logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
responseAck(evt, Response.SERVER_INTERNAL_ERROR);
return;
}
mediaServerItem = playService.getNewMediaServerItem(device);
if (mediaServerItem == null) {
logger.warn("未找到可用的zlm");
responseAck(evt, Response.BUSY_HERE);
return;
}
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
device.getDeviceId(), channelId,
mediaTransmissionTCP);
if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive);
}
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(evt, Response.BUSY_HERE);
return;
}
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// 通知下级推流
PlayResult playResult = playService.play(mediaServerItem,device.getDeviceId(), channelId, (mediaServerItemInUSe, responseJSON)->{
// 收到推流 回复200OK, 等待ack
// if (sendRtpItem == null) return;
sendRtpItem.setStatus(1);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// TODO 添加对tcp的支持
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 "+mediaServerItemInUSe.getSdpIp()+"\r\n");
content.append("t=0 0\r\n");
content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
content.append("a=sendonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n");
content.append("y="+ ssrc + "\r\n");
content.append("f=\r\n");
try {
responseAck(evt, content.toString());
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
} ,((event) -> {
// 未知错误直接转发设备点播的错误
Response response = null;
try {
response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
} catch (ParseException | SipException | InvalidArgumentException e) {
e.printStackTrace();
}
}));
if (logger.isDebugEnabled()) {
logger.debug(playResult.getResult().toString());
}
}else if (gbStream != null) {
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
gbStream.getApp(), gbStream.getStream(), channelId,
mediaTransmissionTCP);
if (tcpActive != null) {
sendRtpItem.setTcpActive(tcpActive);
}
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(evt, Response.BUSY_HERE);
return;
}
// 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem);
sendRtpItem.setStatus(1);
redisCatchStorage.updateSendRTPSever(sendRtpItem);
// TODO 添加对tcp的支持
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o="+ channelId +" 0 0 IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
content.append("s=Play\r\n");
content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
content.append("t=0 0\r\n");
content.append("m=video "+ sendRtpItem.getLocalPort()+" RTP/AVP 96\r\n");
content.append("a=sendonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n");
content.append("y="+ ssrc + "\r\n");
content.append("f=\r\n");
try {
responseAck(evt, content.toString());
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
}
}
} else {
// 非上级平台请求查询是否设备请求通常为接收语音广播的设备
Device device = storager.queryVideoDevice(requesterId);
if (device != null) {
logger.info("收到设备" + requesterId + "的语音广播Invite请求");
responseAck(evt, Response.TRYING);
String contentString = new String(request.getRawContent());
// jainSip不支持y=字段 移除移除以解析
String substring = contentString;
String ssrc = "0000000404";
int ssrcIndex = contentString.indexOf("y=");
if (ssrcIndex > 0) {
substring = contentString.substring(0, ssrcIndex);
ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
}
ssrcIndex = substring.indexOf("f=");
if (ssrcIndex > 0) {
substring = contentString.substring(0, ssrcIndex);
}
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
// 获取支持的格式
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
// 查看是否支持PS 负载96
int port = -1;
//boolean recvonly = false;
boolean mediaTransmissionTCP = false;
Boolean tcpActive = null;
for (int i = 0; i < mediaDescriptions.size(); i++) {
MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i);
Media media = mediaDescription.getMedia();
Vector mediaFormats = media.getMediaFormats(false);
if (mediaFormats.contains("8")) {
port = media.getMediaPort();
String protocol = media.getProtocol();
// 区分TCP发流还是udp 当前默认udp
if ("TCP/RTP/AVP".equals(protocol)) {
String setup = mediaDescription.getAttribute("setup");
if (setup != null) {
mediaTransmissionTCP = true;
if ("active".equals(setup)) {
tcpActive = true;
} else if ("passive".equals(setup)) {
tcpActive = false;
}
}
}
break;
}
}
if (port == -1) {
logger.info("不支持的媒体格式返回415");
// 回复不支持的格式
responseAck(evt, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415
return;
}
String username = sdp.getOrigin().getUsername();
String addressStr = sdp.getOrigin().getAddress();
logger.info("设备{}请求语音流,地址:{}:{}ssrc{}", username, addressStr, port, ssrc);
} else {
logger.warn("来自无效设备/平台的请求");
responseAck(evt, Response.BAD_REQUEST);
}
}
} catch (SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
logger.warn("sdp解析错误");
e.printStackTrace();
} catch (SdpParseException e) {
e.printStackTrace();
} catch (SdpException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,384 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetup;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import com.genersoft.iot.vmp.utils.GpsUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.Iterator;
/**
* @description: Notify请求处理器
* @author: lawrencehj
* @date: 2021年1月27日
*/
@Component
public class NotifyRequestProcessor extends SIPRequestProcessorAbstract {
private final static Logger logger = LoggerFactory.getLogger(NotifyRequestProcessor.class);
@Autowired
private UserSetup userSetup;
@Autowired
private IVideoManagerStorager storager;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private EventPublisher publisher;
@Autowired
private DeviceOffLineDetector offLineDetector;
private static final String NOTIFY_CATALOG = "Catalog";
private static final String NOTIFY_ALARM = "Alarm";
private static final String NOTIFY_MOBILE_POSITION = "MobilePosition";
private String method = "NOTIFY";
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addRequestProcessor(method, this);
}
@Override
public void process(RequestEvent evt) {
try {
Element rootElement = getRootElement(evt);
String cmd = XmlUtil.getText(rootElement, "CmdType");
if (NOTIFY_CATALOG.equals(cmd)) {
logger.info("接收到Catalog通知");
processNotifyCatalogList(evt);
} else if (NOTIFY_ALARM.equals(cmd)) {
logger.info("接收到Alarm通知");
processNotifyAlarm(evt);
} else if (NOTIFY_MOBILE_POSITION.equals(cmd)) {
logger.info("接收到MobilePosition通知");
processNotifyMobilePosition(evt);
} else {
logger.info("接收到消息:" + cmd);
responseAck(evt, Response.OK);
}
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
}
}
/**
* 处理MobilePosition移动位置Notify
*
* @param evt
*/
private void processNotifyMobilePosition(RequestEvent evt) {
try {
// 回复 200 OK
Element rootElement = getRootElement(evt);
MobilePosition mobilePosition = new MobilePosition();
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getTextTrim().toString();
Device device = storager.queryVideoDevice(deviceId);
if (device != null) {
if (!StringUtils.isEmpty(device.getName())) {
mobilePosition.setDeviceName(device.getName());
}
}
mobilePosition.setDeviceId(XmlUtil.getText(rootElement, "DeviceID"));
mobilePosition.setTime(XmlUtil.getText(rootElement, "Time"));
mobilePosition.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
mobilePosition.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Speed"))) {
mobilePosition.setSpeed(Double.parseDouble(XmlUtil.getText(rootElement, "Speed")));
} else {
mobilePosition.setSpeed(0.0);
}
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Direction"))) {
mobilePosition.setDirection(Double.parseDouble(XmlUtil.getText(rootElement, "Direction")));
} else {
mobilePosition.setDirection(0.0);
}
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Altitude"))) {
mobilePosition.setAltitude(Double.parseDouble(XmlUtil.getText(rootElement, "Altitude")));
} else {
mobilePosition.setAltitude(0.0);
}
mobilePosition.setReportSource("Mobile Position");
BaiduPoint bp = new BaiduPoint();
bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat());
mobilePosition.setGeodeticSystem("BD-09");
mobilePosition.setCnLng(bp.getBdLng());
mobilePosition.setCnLat(bp.getBdLat());
if (!userSetup.getSavePositionHistory()) {
storager.clearMobilePositionsByDeviceId(deviceId);
}
storager.insertMobilePosition(mobilePosition);
responseAck(evt, Response.OK);
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
}
}
/***
* 处理alarm设备报警Notify
*
* @param evt
*/
private void processNotifyAlarm(RequestEvent evt) {
try {
Element rootElement = getRootElement(evt);
Element deviceIdElement = rootElement.element("DeviceID");
String deviceId = deviceIdElement.getText().toString();
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
return;
}
rootElement = getRootElement(evt, device.getCharset());
DeviceAlarm deviceAlarm = new DeviceAlarm();
deviceAlarm.setDeviceId(deviceId);
deviceAlarm.setAlarmPriority(XmlUtil.getText(rootElement, "AlarmPriority"));
deviceAlarm.setAlarmMethod(XmlUtil.getText(rootElement, "AlarmMethod"));
deviceAlarm.setAlarmTime(XmlUtil.getText(rootElement, "AlarmTime"));
if (XmlUtil.getText(rootElement, "AlarmDescription") == null) {
deviceAlarm.setAlarmDescription("");
} else {
deviceAlarm.setAlarmDescription(XmlUtil.getText(rootElement, "AlarmDescription"));
}
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Longitude"))) {
deviceAlarm.setLongitude(Double.parseDouble(XmlUtil.getText(rootElement, "Longitude")));
} else {
deviceAlarm.setLongitude(0.00);
}
if (NumericUtil.isDouble(XmlUtil.getText(rootElement, "Latitude"))) {
deviceAlarm.setLatitude(Double.parseDouble(XmlUtil.getText(rootElement, "Latitude")));
} else {
deviceAlarm.setLatitude(0.00);
}
if (deviceAlarm.getAlarmMethod().equals("4")) {
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setDeviceId(deviceAlarm.getDeviceId());
mobilePosition.setTime(deviceAlarm.getAlarmTime());
mobilePosition.setLongitude(deviceAlarm.getLongitude());
mobilePosition.setLatitude(deviceAlarm.getLatitude());
mobilePosition.setReportSource("GPS Alarm");
BaiduPoint bp = new BaiduPoint();
bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
logger.info("百度坐标:" + bp.getBdLng() + ", " + bp.getBdLat());
mobilePosition.setGeodeticSystem("BD-09");
mobilePosition.setCnLng(bp.getBdLng());
mobilePosition.setCnLat(bp.getBdLat());
if (!userSetup.getSavePositionHistory()) {
storager.clearMobilePositionsByDeviceId(deviceId);
}
storager.insertMobilePosition(mobilePosition);
}
// TODO: 需要实现存储报警信息报警分类
// 回复200 OK
responseAck(evt, Response.OK);
if (offLineDetector.isOnline(deviceId)) {
publisher.deviceAlarmEventPublish(deviceAlarm);
}
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
}
}
/***
* 处理catalog设备目录列表Notify
*
* @param evt
*/
private void processNotifyCatalogList(RequestEvent evt) {
try {
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
Element rootElement = getRootElement(evt);
Element deviceIdElement = rootElement.element("DeviceID");
String channelId = deviceIdElement.getText();
Device device = storager.queryVideoDevice(deviceId);
if (device == null) {
return;
}
if (device != null ) {
rootElement = getRootElement(evt, device.getCharset());
}
Element deviceListElement = rootElement.element("DeviceList");
if (deviceListElement == null) {
return;
}
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
if (deviceListIterator != null) {
// 遍历DeviceList
while (deviceListIterator.hasNext()) {
Element itemDevice = deviceListIterator.next();
Element channelDeviceElement = itemDevice.element("DeviceID");
if (channelDeviceElement == null) {
continue;
}
String channelDeviceId = channelDeviceElement.getTextTrim();
Element channdelNameElement = itemDevice.element("Name");
String channelName = channdelNameElement != null ? channdelNameElement.getTextTrim().toString() : "";
Element statusElement = itemDevice.element("Status");
String status = statusElement != null ? statusElement.getTextTrim().toString() : "ON";
DeviceChannel deviceChannel = new DeviceChannel();
deviceChannel.setName(channelName);
deviceChannel.setChannelId(channelDeviceId);
// ONLINE OFFLINE HIKVISION DS-7716N-E4 NVR的兼容性处理
if (status.equals("ON") || status.equals("On") || status.equals("ONLINE")) {
deviceChannel.setStatus(1);
}
if (status.equals("OFF") || status.equals("Off") || status.equals("OFFLINE")) {
deviceChannel.setStatus(0);
}
deviceChannel.setManufacture(XmlUtil.getText(itemDevice, "Manufacturer"));
deviceChannel.setModel(XmlUtil.getText(itemDevice, "Model"));
deviceChannel.setOwner(XmlUtil.getText(itemDevice, "Owner"));
deviceChannel.setCivilCode(XmlUtil.getText(itemDevice, "CivilCode"));
deviceChannel.setBlock(XmlUtil.getText(itemDevice, "Block"));
deviceChannel.setAddress(XmlUtil.getText(itemDevice, "Address"));
if (XmlUtil.getText(itemDevice, "Parental") == null
|| XmlUtil.getText(itemDevice, "Parental") == "") {
deviceChannel.setParental(0);
} else {
deviceChannel.setParental(Integer.parseInt(XmlUtil.getText(itemDevice, "Parental")));
}
deviceChannel.setParentId(XmlUtil.getText(itemDevice, "ParentID"));
if (XmlUtil.getText(itemDevice, "SafetyWay") == null
|| XmlUtil.getText(itemDevice, "SafetyWay") == "") {
deviceChannel.setSafetyWay(0);
} else {
deviceChannel.setSafetyWay(Integer.parseInt(XmlUtil.getText(itemDevice, "SafetyWay")));
}
if (XmlUtil.getText(itemDevice, "RegisterWay") == null
|| XmlUtil.getText(itemDevice, "RegisterWay") == "") {
deviceChannel.setRegisterWay(1);
} else {
deviceChannel.setRegisterWay(Integer.parseInt(XmlUtil.getText(itemDevice, "RegisterWay")));
}
deviceChannel.setCertNum(XmlUtil.getText(itemDevice, "CertNum"));
if (XmlUtil.getText(itemDevice, "Certifiable") == null
|| XmlUtil.getText(itemDevice, "Certifiable") == "") {
deviceChannel.setCertifiable(0);
} else {
deviceChannel.setCertifiable(Integer.parseInt(XmlUtil.getText(itemDevice, "Certifiable")));
}
if (XmlUtil.getText(itemDevice, "ErrCode") == null
|| XmlUtil.getText(itemDevice, "ErrCode") == "") {
deviceChannel.setErrCode(0);
} else {
deviceChannel.setErrCode(Integer.parseInt(XmlUtil.getText(itemDevice, "ErrCode")));
}
deviceChannel.setEndTime(XmlUtil.getText(itemDevice, "EndTime"));
deviceChannel.setSecrecy(XmlUtil.getText(itemDevice, "Secrecy"));
deviceChannel.setIpAddress(XmlUtil.getText(itemDevice, "IPAddress"));
if (XmlUtil.getText(itemDevice, "Port") == null || XmlUtil.getText(itemDevice, "Port") == "") {
deviceChannel.setPort(0);
} else {
deviceChannel.setPort(Integer.parseInt(XmlUtil.getText(itemDevice, "Port")));
}
deviceChannel.setPassword(XmlUtil.getText(itemDevice, "Password"));
if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Longitude"))) {
deviceChannel.setLongitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Longitude")));
} else {
deviceChannel.setLongitude(0.00);
}
if (NumericUtil.isDouble(XmlUtil.getText(itemDevice, "Latitude"))) {
deviceChannel.setLatitude(Double.parseDouble(XmlUtil.getText(itemDevice, "Latitude")));
} else {
deviceChannel.setLatitude(0.00);
}
if (XmlUtil.getText(itemDevice, "PTZType") == null
|| XmlUtil.getText(itemDevice, "PTZType") == "") {
deviceChannel.setPTZType(0);
} else {
deviceChannel.setPTZType(Integer.parseInt(XmlUtil.getText(itemDevice, "PTZType")));
}
deviceChannel.setHasAudio(true); // 默认含有音频播放时再检查是否有音频及是否AAC
storager.updateChannel(device.getDeviceId(), deviceChannel);
}
// RequestMessage msg = new RequestMessage();
// msg.setDeviceId(deviceId);
// msg.setType(DeferredResultHolder.CALLBACK_CMD_CATALOG);
// msg.setData(device);
// deferredResultHolder.invokeResult(msg);
// 回复200 OK
responseAck(evt, Response.OK);
if (offLineDetector.isOnline(deviceId)) {
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
}
}
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
}
}
public void setCmder(SIPCommander cmder) {
}
public void setStorager(IVideoManagerStorager storager) {
this.storager = storager;
}
public void setPublisher(EventPublisher publisher) {
this.publisher = publisher;
}
public void setRedis(RedisUtil redis) {
}
public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) {
}
public void setOffLineDetector(DeviceOffLineDetector offLineDetector) {
this.offLineDetector = offLineDetector;
}
public IRedisCatchStorage getRedisCatchStorage() {
return redisCatchStorage;
}
public void setRedisCatchStorage(IRedisCatchStorage redisCatchStorage) {
this.redisCatchStorage = redisCatchStorage;
}
}

View File

@ -0,0 +1,197 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.auth.DigestServerAuthenticationHelper;
import com.genersoft.iot.vmp.gb28181.auth.RegisterLogicHandler;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.WvpSipDate;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import gov.nist.javax.sip.RequestEventExt;
import gov.nist.javax.sip.address.AddressImpl;
import gov.nist.javax.sip.address.SipUri;
import gov.nist.javax.sip.header.Expires;
import gov.nist.javax.sip.header.SIPDateHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.header.*;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.security.NoSuchAlgorithmException;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Locale;
/**
* @description:收到注册请求 处理
* @author: swwheihei
* @date: 2020年5月3日 下午4:47:25
*/
@Component
public class RegisterRequestProcessor extends SIPRequestProcessorAbstract {
private Logger logger = LoggerFactory.getLogger(RegisterRequestProcessor.class);
public String method = "REGISTER";
@Autowired
private SipConfig sipConfig;
@Autowired
private RegisterLogicHandler handler;
@Autowired
private IVideoManagerStorager storager;
@Autowired
private EventPublisher publisher;
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addRequestProcessor(method, this);
}
/**
* 收到注册请求 处理
* @param evt
*/
@Override
public void process(RequestEvent evt) {
try {
RequestEventExt evtExt = (RequestEventExt)evt;
String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort();
logger.info("[{}] 收到注册请求,开始处理", requestAddress);
Request request = evt.getRequest();
Response response = null;
boolean passwordCorrect = false;
// 注册标志 0未携带授权头或者密码错误 1注册成功 2注销成功
int registerFlag = 0;
FromHeader fromHeader = (FromHeader) request.getHeader(FromHeader.NAME);
AddressImpl address = (AddressImpl) fromHeader.getAddress();
SipUri uri = (SipUri) address.getURI();
String deviceId = uri.getUser();
Device device = storager.queryVideoDevice(deviceId);
AuthorizationHeader authorhead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME);
// 校验密码是否正确
if (authorhead != null) {
passwordCorrect = new DigestServerAuthenticationHelper().doAuthenticatePlainTextPassword(request,
sipConfig.getPassword());
}
if (StringUtils.isEmpty(sipConfig.getPassword())){
passwordCorrect = true;
}
// 未携带授权头或者密码错误 回复401
if (authorhead == null ) {
logger.info("[{}] 未携带授权头 回复401", requestAddress);
response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request);
new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain());
}else {
if (!passwordCorrect){
// 注册失败
response = getMessageFactory().createResponse(Response.FORBIDDEN, request);
response.setReasonPhrase("wrong password");
logger.info("[{}] 密码/SIP服务器ID错误, 回复403", requestAddress);
}else {
// 携带授权头并且密码正确
response = getMessageFactory().createResponse(Response.OK, request);
// 添加date头
SIPDateHeader dateHeader = new SIPDateHeader();
// 使用自己修改的
WvpSipDate wvpSipDate = new WvpSipDate(Calendar.getInstance(Locale.ENGLISH).getTimeInMillis());
dateHeader.setDate(wvpSipDate);
response.addHeader(dateHeader);
ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
if (expiresHeader == null) {
response = getMessageFactory().createResponse(Response.BAD_REQUEST, request);
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
return;
}
// 添加Contact头
response.addHeader(request.getHeader(ContactHeader.NAME));
// 添加Expires头
response.addHeader(request.getExpires());
// 获取到通信地址等信息
ViaHeader viaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
String received = viaHeader.getReceived();
int rPort = viaHeader.getRPort();
// 解析本地地址替代
if (StringUtils.isEmpty(received) || rPort == -1) {
received = viaHeader.getHost();
rPort = viaHeader.getPort();
}
//
if (device == null) {
device = new Device();
device.setStreamMode("UDP");
device.setCharset("gb2312");
device.setDeviceId(deviceId);
device.setFirsRegister(true);
}
device.setIp(received);
device.setPort(rPort);
device.setHostAddress(received.concat(":").concat(String.valueOf(rPort)));
// 注销成功
if (expiresHeader.getExpires() == 0) {
registerFlag = 2;
}
// 注册成功
else {
device.setExpires(expiresHeader.getExpires());
registerFlag = 1;
// 判断TCP还是UDP
boolean isTcp = false;
ViaHeader reqViaHeader = (ViaHeader) request.getHeader(ViaHeader.NAME);
String transport = reqViaHeader.getTransport();
if (transport.equals("TCP")) {
isTcp = true;
}
device.setTransport(isTcp ? "TCP" : "UDP");
}
}
}
ServerTransaction serverTransaction = getServerTransaction(evt);
serverTransaction.sendResponse(response);
if (serverTransaction.getDialog() != null) serverTransaction.getDialog().delete();
// 注册成功
// 保存到redis
// 下发catelog查询目录
if (registerFlag == 1 ) {
logger.info("[{}] 注册成功! deviceId:" + device.getDeviceId(), requestAddress);
publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_REGISTER);
// 重新注册更新设备和通道以免设备替换或更新后信息无法更新
handler.onRegister(device);
} else if (registerFlag == 2) {
logger.info("[{}] 注销成功! deviceId:" + device.getDeviceId(), requestAddress);
publisher.outlineEventPublish(device.getDeviceId(), VideoManagerConstants.EVENT_OUTLINE_UNREGISTER);
}
} catch (SipException | InvalidArgumentException | NoSuchAlgorithmException | ParseException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,75 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorAbstract;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.ServerTransaction;
import javax.sip.SipException;
import javax.sip.header.ExpiresHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
/**
* @description:SUBSCRIBE请求处理器
* @author: swwheihei
* @date: 2020年5月3日 下午5:31:20
*/
@Component
public class SubscribeRequestProcessor extends SIPRequestProcessorAbstract {
private Logger logger = LoggerFactory.getLogger(SubscribeRequestProcessor.class);
private String method = "SUBSCRIBE";
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addRequestProcessor(method, this);
}
/**
* 处理SUBSCRIBE请求
*
* @param evt
*/
@Override
public void process(RequestEvent evt) {
Request request = evt.getRequest();
try {
Response response = null;
response = getMessageFactory().createResponse(200, request);
if (response != null) {
ExpiresHeader expireHeader = getHeaderFactory().createExpiresHeader(30);
response.setExpires(expireHeader);
}
logger.info("response : " + response.toString());
ServerTransaction transaction = getServerTransaction(evt);
if (transaction != null) {
transaction.sendResponse(response);
transaction.getDialog().delete();
transaction.terminate();
} else {
logger.info("processRequest serverTransactionId is null.");
}
} catch (ParseException e) {
e.printStackTrace();
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,15 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.response;
import javax.sip.ResponseEvent;
/**
* @description:处理接收IPCamera发来的SIP协议响应消息
* @author: swwheihei
* @date: 2020年5月3日 下午4:42:22
*/
public interface ISIPResponseProcessor {
void process(ResponseEvent evt);
}

View File

@ -0,0 +1,49 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.ResponseEvent;
/**
* @description: BYE请求响应器
* @author: swwheihei
* @date: 2020年5月3日 下午5:32:05
*/
@Component
public class ByeResponseProcessor extends SIPResponseProcessorAbstract {
private String method = "BYE";
@Autowired
private SipLayer sipLayer;
@Autowired
private SipConfig config;
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addResponseProcessor(method, this);
}
/**
* 处理BYE响应
*
* @param evt
*/
@Override
public void process(ResponseEvent evt) {
// TODO Auto-generated method stub
System.out.println("收到bye");
}
}

View File

@ -0,0 +1,47 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.ResponseEvent;
/**
* @description: CANCEL响应处理器
* @author: panlinlin
* @date: 2021年11月5日 16:35
*/
@Component
public class CancelResponseProcessor extends SIPResponseProcessorAbstract {
private String method = "CANCEL";
@Autowired
private SipLayer sipLayer;
@Autowired
private SipConfig config;
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addResponseProcessor(method, this);
}
/**
* 处理CANCEL响应
*
* @param evt
*/
@Override
public void process(ResponseEvent evt) {
// TODO Auto-generated method stub
}
}

View File

@ -0,0 +1,97 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.gb28181.SipLayer;
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import gov.nist.javax.sip.ResponseEventExt;
import gov.nist.javax.sip.stack.SIPDialog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.ResponseEvent;
import javax.sip.SipException;
import javax.sip.address.SipURI;
import javax.sip.header.CSeqHeader;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.text.ParseException;
/**
* @description: 处理INVITE响应
* @author: panlinlin
* @date: 2021年11月5日 1640
*/
@Component
public class InviteResponseProcessor extends SIPResponseProcessorAbstract {
private final static Logger logger = LoggerFactory.getLogger(InviteResponseProcessor.class);
private String method = "INVITE";
@Autowired
private SipLayer sipLayer;
@Autowired
private SipConfig config;
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addResponseProcessor(method, this);
}
@Autowired
private VideoStreamSessionManager streamSession;
/**
* 处理invite响应
*
* @param evt 响应消息
* @throws ParseException
*/
@Override
public void process(ResponseEvent evt ){
try {
Response response = evt.getResponse();
int statusCode = response.getStatusCode();
// trying不会回复
if (statusCode == Response.TRYING) {
}
// 成功响应
// 下发ack
if (statusCode == Response.OK) {
ResponseEventExt event = (ResponseEventExt)evt;
SIPDialog dialog = (SIPDialog)evt.getDialog();
CSeqHeader cseq = (CSeqHeader) response.getHeader(CSeqHeader.NAME);
Request reqAck = dialog.createAck(cseq.getSeqNumber());
SipURI requestURI = (SipURI) reqAck.getRequestURI();
try {
requestURI.setHost(event.getRemoteIpAddress());
} catch (ParseException e) {
e.printStackTrace();
}
requestURI.setPort(event.getRemotePort());
reqAck.setRequestURI(requestURI);
logger.info("" + event.getRemoteIpAddress() + ":" + event.getRemotePort() + "回复ack");
SipURI sipURI = (SipURI)dialog.getRemoteParty().getURI();
String deviceId = requestURI.getUser();
String channelId = sipURI.getUser();
dialog.sendAck(reqAck);
}
} catch (InvalidArgumentException | SipException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,104 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.response.impl;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.gb28181.transmit.event.response.SIPResponseProcessorAbstract;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.ResponseEvent;
import javax.sip.header.CallIdHeader;
import javax.sip.header.WWWAuthenticateHeader;
import javax.sip.message.Response;
/**
* @description:Register响应处理器
* @author: swwheihei
* @date: 2020年5月3日 下午5:32:23
*/
@Component
public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class);
private String method = "REGISTER";
@Autowired
private ISIPCommanderForPlatform sipCommanderForPlatform;
@Autowired
private IVideoManagerStorager storager;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private SIPProcessorObserver sipProcessorObserver;
@Override
public void afterPropertiesSet() throws Exception {
// 添加消息处理的订阅
sipProcessorObserver.addResponseProcessor(method, this);
}
/**
* 处理Register响应
*
* @param evt 事件
*/
@Override
public void process(ResponseEvent evt) {
Response response = evt.getResponse();
CallIdHeader callIdHeader = (CallIdHeader) response.getHeader(CallIdHeader.NAME);
String callId = callIdHeader.getCallId();
String platformGBId = redisCatchStorage.queryPlatformRegisterInfo(callId);
if (platformGBId == null) {
logger.info(String.format("未找到callId %s 的注册/注销平台id", callId ));
return;
}
ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId);
if (parentPlatformCatch == null) {
logger.warn(String.format("收到 %s 的注册/注销%S请求, 但是平台缓存信息未查询到!!!", platformGBId, response.getStatusCode()));
return;
}
String action = parentPlatformCatch.getParentPlatform().getExpires().equals("0") ? "注销" : "注册";
logger.info(String.format("收到 %s %s的%S响应", platformGBId, action, response.getStatusCode() ));
ParentPlatform parentPlatform = parentPlatformCatch.getParentPlatform();
if (parentPlatform == null) {
logger.warn(String.format("收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformGBId, action, response.getStatusCode()));
return;
}
if (response.getStatusCode() == 401) {
WWWAuthenticateHeader www = (WWWAuthenticateHeader)response.getHeader(WWWAuthenticateHeader.NAME);
sipCommanderForPlatform.register(parentPlatform, callId, www, null, null);
}else if (response.getStatusCode() == 200){
// 注册/注销成功
logger.info(String.format("%s %s成功", platformGBId, action));
redisCatchStorage.delPlatformRegisterInfo(callId);
parentPlatform.setStatus("注册".equals(action));
// 取回Expires设置避免注销过程中被置为0
ParentPlatform parentPlatformTmp = storager.queryParentPlatByServerGBId(platformGBId);
String expires = parentPlatformTmp.getExpires();
parentPlatform.setExpires(expires);
parentPlatform.setId(parentPlatformTmp.getId());
storager.updateParentPlatformStatus(platformGBId, "注册".equals(action));
redisCatchStorage.updatePlatformRegister(parentPlatform);
redisCatchStorage.updatePlatformKeepalive(parentPlatform);
parentPlatformCatch.setParentPlatform(parentPlatform);
redisCatchStorage.updatePlatformCatchInfo(parentPlatformCatch);
}
}
}

View File

@ -0,0 +1,24 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.Device;
/**
* 设备相关业务处理
*/
public interface IDeviceService {
/**
* 添加目录订阅
* @param device 设备信息
* @return
*/
boolean addCatalogSubscribe(Device device);
/**
* 移除目录订阅
* @param device 设备信息
* @return
*/
boolean removeCatalogSubscribe(Device device);
}

View File

@ -0,0 +1,48 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sip.ResponseEvent;
public class CatalogSubscribeTask implements Runnable{
private final Logger logger = LoggerFactory.getLogger(CatalogSubscribeTask.class);
private Device device;
private ISIPCommander sipCommander;
public CatalogSubscribeTask(Device device, ISIPCommander sipCommander) {
this.device = device;
this.sipCommander = sipCommander;
}
@Override
public void run() {
sipCommander.catalogSubscribe(device, eventResult -> {
ResponseEvent event = (ResponseEvent) eventResult.event;
Element rootElement = null;
try {
rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312");
} catch (DocumentException e) {
e.printStackTrace();
}
Element resultElement = rootElement.element("Result");
String result = resultElement.getText();
if (result.toUpperCase().equals("OK")){
// 成功
logger.info("目录订阅成功: {}", device.getDeviceId());
}else {
// 失败
logger.info("目录订阅失败: {}-{}", device.getDeviceId(), result);
}
},eventResult -> {
// 失败
logger.warn("目录订阅失败: {}-信令发送失败", device.getDeviceId());
});
}
}

View File

@ -0,0 +1,61 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.service.IDeviceService;
import com.genersoft.iot.vmp.service.bean.CatalogSubscribeTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class DeviceServiceImpl implements IDeviceService {
private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class);
@Autowired
private DynamicTask dynamicTask;
;
@Autowired
private ISIPCommander sipCommander;
@Override
public boolean addCatalogSubscribe(Device device) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
// 添加目录订阅
CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander);
catalogSubscribeTask.run();
// 提前开始刷新订阅
String cron = getCron(device.getSubscribeCycleForCatalog() - 60);
dynamicTask.startCron(device.getDeviceId(), catalogSubscribeTask, cron);
return true;
}
@Override
public boolean removeCatalogSubscribe(Device device) {
if (device == null || device.getSubscribeCycleForCatalog() < 0) {
return false;
}
dynamicTask.stopCron(device.getDeviceId());
return true;
}
public String getCron(int time) {
if (time <= 59) {
return "0/" + time +" * * * * ?";
}else if (time <= 60* 59) {
int minute = time/(60);
return "0 0/" + minute +" * * * ?";
}else if (time <= 60* 60* 59) {
int hour = time/(60*60);
return "0 0 0/" + hour +" * * ?";
}else {
return "0 0/10 * * * ?";
}
}
}