优化消息处理中存在可能异常的处理流程

This commit is contained in:
648540858 2022-10-17 12:39:58 +08:00
parent 4f2191dc9f
commit 4c66f6b29e
12 changed files with 414 additions and 306 deletions

View File

@ -132,7 +132,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (requesterId == null || channelId == null) {
logger.info("无法从FromHeader的Address中获取到平台id返回400");
// 参数不全 发400请求错误
responseAck(serverTransaction, Response.BAD_REQUEST);
try {
responseAck(serverTransaction, Response.BAD_REQUEST);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage());
}
return;
}
@ -141,6 +145,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
ParentPlatform platform = storager.queryParentPlatByServerGBId(requesterId);
if (platform == null) {
inviteFromDeviceHandle(serverTransaction, requesterId);
} else {
// 查询平台下是否有该通道
DeviceChannel channel = storager.queryChannelInParentPlatform(requesterId, channelId);
@ -158,7 +163,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// return;
// }
// 通道存在发100TRYING
responseAck(serverTransaction, Response.TRYING);
try {
responseAck(serverTransaction, Response.TRYING);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite TRYING: {}", e.getMessage());
}
} else if (channel == null && gbStream != null) {
String mediaServerId = gbStream.getMediaServerId();
@ -166,13 +175,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (mediaServerItem == null) {
if ("proxy".equals(gbStream.getStreamType())) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(serverTransaction, Response.GONE);
try {
responseAck(serverTransaction, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
}
return;
} else {
streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
if (streamPushItem == null || streamPushItem.getServerId().equals(userSetting.getServerId())) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(serverTransaction, Response.GONE);
try {
responseAck(serverTransaction, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
}
return;
}
}
@ -181,25 +198,47 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
streamPushItem = streamPushService.getPush(gbStream.getApp(), gbStream.getStream());
if (streamPushItem == null) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(serverTransaction, Response.GONE);
try {
responseAck(serverTransaction, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
}
return;
}
}else if("proxy".equals(gbStream.getStreamType())){
proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
if (proxyByAppAndStream == null) {
logger.info("[ app={}, stream={} ]找不到zlm {}返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
responseAck(serverTransaction, Response.GONE);
try {
responseAck(serverTransaction, Response.GONE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite GONE: {}", e.getMessage());
}
return;
}
}
}
responseAck(serverTransaction, Response.CALL_IS_BEING_FORWARDED); // 通道存在发181呼叫转接中
try {
responseAck(serverTransaction, Response.CALL_IS_BEING_FORWARDED);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite CALL_IS_BEING_FORWARDED: {}", e.getMessage());
}
} else if (catalog != null) {
responseAck(serverTransaction, Response.BAD_REQUEST, "catalog channel can not play"); // 目录不支持点播
try {
// 目录不支持点播
responseAck(serverTransaction, Response.BAD_REQUEST, "catalog channel can not play");
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 目录不支持点播: {}", e.getMessage());
}
return;
} else {
logger.info("通道不存在返回404");
responseAck(serverTransaction, Response.NOT_FOUND); // 通道不存在发404资源不存在
try {
// 通道不存在发404资源不存在
responseAck(serverTransaction, Response.NOT_FOUND);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道不存在: {}", e.getMessage());
}
return;
}
// 解析sdp消息, 使用jainsip 自带的sdp解析方式
@ -270,7 +309,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (port == -1) {
logger.info("不支持的媒体格式返回415");
// 回复不支持的格式
responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415
try {
// 不支持的格式发415
responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 不支持的格式: {}", e.getMessage());
}
return;
}
String username = sdp.getOrigin().getUsername();
@ -283,13 +327,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
device = storager.queryVideoDeviceByPlatformIdAndChannelId(requesterId, channelId);
if (device == null) {
logger.warn("点播平台{}的通道{}时未找到设备信息", requesterId, channel);
responseAck(serverTransaction, Response.SERVER_INTERNAL_ERROR);
try {
responseAck(serverTransaction, Response.SERVER_INTERNAL_ERROR);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 未找到设备信息: {}", e.getMessage());
}
return;
}
mediaServerItem = playService.getNewMediaServerItem(device);
if (mediaServerItem == null) {
logger.warn("未找到可用的zlm");
responseAck(serverTransaction, Response.BUSY_HERE);
try {
responseAck(serverTransaction, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite BUSY_HERE: {}", e.getMessage());
}
return;
}
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
@ -301,7 +353,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(serverTransaction, Response.BUSY_HERE);
try {
responseAck(serverTransaction, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
}
return;
}
sendRtpItem.setCallId(callIdHeader.getCallId());
@ -474,13 +530,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
}
} catch (SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
logger.warn("sdp解析错误");
e.printStackTrace();
} catch (SdpParseException e) {
e.printStackTrace();
logger.error("sdp解析错误", e);
} catch (SdpException e) {
e.printStackTrace();
}
@ -492,7 +543,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private void pushProxyStream(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
String channelId, String addressStr, String ssrc, String requesterId) {
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
if (streamReady) {
// 自平台内容
@ -502,7 +553,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(serverTransaction, Response.BUSY_HERE);
try {
responseAck(serverTransaction, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
}
return;
}
if (tcpActive != null) {
@ -527,7 +582,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private void pushStream(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
String channelId, String addressStr, String ssrc, String requesterId) {
// 推流
if (streamPushItem.isSelf()) {
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
@ -539,7 +594,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (sendRtpItem == null) {
logger.warn("服务器端口资源不足");
responseAck(serverTransaction, Response.BUSY_HERE);
try {
responseAck(serverTransaction, Response.BUSY_HERE);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
}
return;
}
if (tcpActive != null) {
@ -577,15 +636,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private void notifyStreamOnline(RequestEvent evt, ServerTransaction serverTransaction, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
String channelId, String addressStr, String ssrc, String requesterId) {
if ("proxy".equals(gbStream.getStreamType())) {
// TODO 控制启用以使设备上线
logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
responseAck(serverTransaction, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
try {
responseAck(serverTransaction, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
}
} else if ("push".equals(gbStream.getStreamType())) {
if (!platform.isStartOfflinePush()) {
// 平台设置中关闭了拉起离线的推流则直接回复
responseAck(serverTransaction, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
try {
responseAck(serverTransaction, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
}
return;
}
// 发送redis消息以使设备上线
@ -713,38 +780,28 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
redisCatchStorage.updateSendRTPSever(sendRtpItem);
}, (wvpResult) -> {
try {
// 错误
if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
// 离线
// 查询是否在本机上线了
StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
if (currentStreamPushItem.isPushIng()) {
// 在线状态
pushStream(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
} else {
// 不在线 拉起
notifyStreamOnline(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}
// 错误
if (wvpResult.getCode() == RedisGbPlayMsgListener.ERROR_CODE_OFFLINE) {
// 离线
// 查询是否在本机上线了
StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
if (currentStreamPushItem.isPushIng()) {
// 在线状态
pushStream(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
} else {
// 不在线 拉起
notifyStreamOnline(evt, serverTransaction, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
}
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
}
try {
responseAck(serverTransaction, Response.BUSY_HERE);
} catch (SipException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 国标级联 点播回复 BUSY_HERE: {}", e.getMessage());
}
return;
});
}
@ -782,14 +839,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
return null;
}
public void inviteFromDeviceHandle(ServerTransaction serverTransaction, String requesterId) throws InvalidArgumentException, ParseException, SipException, SdpException {
public void inviteFromDeviceHandle(ServerTransaction serverTransaction, String requesterId) {
// 非上级平台请求查询是否设备请求通常为接收语音广播的设备
Device device = redisCatchStorage.getDevice(requesterId);
if (device != null) {
logger.info("收到设备" + requesterId + "的语音广播Invite请求");
responseAck(serverTransaction, Response.TRYING);
try {
responseAck(serverTransaction, Response.TRYING);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite BAD_REQUEST: {}", e.getMessage());
}
String contentString = new String(serverTransaction.getRequest().getRawContent());
// jainSip不支持y=字段 移除移除以解析
String substring = contentString;
@ -803,51 +863,65 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (ssrcIndex > 0) {
substring = contentString.substring(0, ssrcIndex);
}
SessionDescription sdp = SdpFactory.getInstance().createSessionDescription(substring);
SessionDescription sdp = null;
try {
sdp = SdpFactory.getInstance().createSessionDescription(substring);
// 获取支持的格式
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
// 查看是否支持PS 负载96
int port = -1;
//boolean recvonly = false;
boolean mediaTransmissionTCP = false;
Boolean tcpActive = null;
for (int i = 0; i < mediaDescriptions.size(); i++) {
MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i);
Media media = mediaDescription.getMedia();
// 获取支持的格式
Vector mediaDescriptions = sdp.getMediaDescriptions(true);
// 查看是否支持PS 负载96
int port = -1;
//boolean recvonly = false;
boolean mediaTransmissionTCP = false;
Boolean tcpActive = null;
for (int i = 0; i < mediaDescriptions.size(); i++) {
MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i);
Media media = mediaDescription.getMedia();
Vector mediaFormats = media.getMediaFormats(false);
if (mediaFormats.contains("8")) {
port = media.getMediaPort();
String protocol = media.getProtocol();
// 区分TCP发流还是udp 当前默认udp
if ("TCP/RTP/AVP".equals(protocol)) {
String setup = mediaDescription.getAttribute("setup");
if (setup != null) {
mediaTransmissionTCP = true;
if ("active".equals(setup)) {
tcpActive = true;
} else if ("passive".equals(setup)) {
tcpActive = false;
Vector mediaFormats = media.getMediaFormats(false);
if (mediaFormats.contains("8")) {
port = media.getMediaPort();
String protocol = media.getProtocol();
// 区分TCP发流还是udp 当前默认udp
if ("TCP/RTP/AVP".equals(protocol)) {
String setup = mediaDescription.getAttribute("setup");
if (setup != null) {
mediaTransmissionTCP = true;
if ("active".equals(setup)) {
tcpActive = true;
} else if ("passive".equals(setup)) {
tcpActive = false;
}
}
}
break;
}
break;
}
if (port == -1) {
logger.info("不支持的媒体格式返回415");
// 回复不支持的格式
try {
responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 不支持的媒体格式返回415 {}", e.getMessage());
}
return;
}
String username = sdp.getOrigin().getUsername();
String addressStr = sdp.getOrigin().getAddress();
logger.info("设备{}请求语音流,地址:{}:{}ssrc{}", username, addressStr, port, ssrc);
} catch (SdpException e) {
logger.error("[SDP解析异常]", e);
}
if (port == -1) {
logger.info("不支持的媒体格式返回415");
// 回复不支持的格式
responseAck(serverTransaction, Response.UNSUPPORTED_MEDIA_TYPE); // 不支持的格式发415
return;
}
String username = sdp.getOrigin().getUsername();
String addressStr = sdp.getOrigin().getAddress();
logger.info("设备{}请求语音流,地址:{}:{}ssrc{}", username, addressStr, port, ssrc);
} else {
logger.warn("来自无效设备/平台的请求");
responseAck(serverTransaction, Response.BAD_REQUEST);
try {
responseAck(serverTransaction, Response.BAD_REQUEST);; // 不支持的格式发415
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] invite 来自无效设备/平台的请求, {}", e.getMessage());
}
}
}
}

View File

@ -93,46 +93,44 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
@Override
public void process(RequestEvent evt) {
ServerTransaction serverTransaction = getServerTransaction(evt);
try {
taskQueue.offer(new HandlerCatchData(evt, null, null));
ServerTransaction serverTransaction = getServerTransaction(evt);
responseAck(serverTransaction, Response.OK);
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
taskExecutor.execute(()-> {
while (!taskQueue.isEmpty()) {
try {
HandlerCatchData take = taskQueue.poll();
Element rootElement = getRootElement(take.getEvt());
if (rootElement == null) {
logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
continue;
}
String cmd = XmlUtil.getText(rootElement, "CmdType");
if (CmdType.CATALOG.equals(cmd)) {
logger.info("接收到Catalog通知");
processNotifyCatalogList(take.getEvt());
} else if (CmdType.ALARM.equals(cmd)) {
logger.info("接收到Alarm通知");
processNotifyAlarm(take.getEvt());
} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
logger.info("接收到MobilePosition通知");
processNotifyMobilePosition(take.getEvt());
} else {
logger.info("接收到消息:" + cmd);
}
} catch (DocumentException e) {
logger.error("处理NOTIFY消息时错误", e);
}
}
taskQueueHandlerRun = false;
});
}
} catch (SipException | InvalidArgumentException | ParseException e) {
}catch (SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
} finally {
taskQueueHandlerRun = false;
}
taskQueue.offer(new HandlerCatchData(evt, null, null));
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
taskExecutor.execute(()-> {
while (!taskQueue.isEmpty()) {
try {
HandlerCatchData take = taskQueue.poll();
Element rootElement = getRootElement(take.getEvt());
if (rootElement == null) {
logger.error("处理NOTIFY消息时未获取到消息体,{}", take.getEvt().getRequest());
continue;
}
String cmd = XmlUtil.getText(rootElement, "CmdType");
if (CmdType.CATALOG.equals(cmd)) {
logger.info("接收到Catalog通知");
processNotifyCatalogList(take.getEvt());
} else if (CmdType.ALARM.equals(cmd)) {
logger.info("接收到Alarm通知");
processNotifyAlarm(take.getEvt());
} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
logger.info("接收到MobilePosition通知");
processNotifyMobilePosition(take.getEvt());
} else {
logger.info("接收到消息:" + cmd);
}
} catch (DocumentException e) {
logger.error("处理NOTIFY消息时错误", e);
}
}
taskQueueHandlerRun = false;
});
}
}

View File

@ -112,10 +112,10 @@ public class DeviceControlQueryMessageHandler extends SIPRequestProcessorParent
if (deviceForPlatform == null) {
try {
responseAck(serverTransaction, Response.NOT_FOUND);
return;
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 错误信息: {}", e.getMessage());
}
return;
}
try {
cmder.fronEndCmd(deviceForPlatform, channelId, cmdString, eventResult -> {

View File

@ -52,35 +52,36 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
// 未注册的设备不做处理
return;
}
// 回复200 OK
try {
// 判断RPort是否改变改变则说明路由nat信息变化修改设备信息
// 获取到通信地址等信息
ViaHeader viaHeader = (ViaHeader) evt.getRequest().getHeader(ViaHeader.NAME);
String received = viaHeader.getReceived();
int rPort = viaHeader.getRPort();
// 解析本地地址替代
if (ObjectUtils.isEmpty(received) || rPort == -1) {
received = viaHeader.getHost();
rPort = viaHeader.getPort();
}
if (device.getPort() != rPort) {
device.setPort(rPort);
device.setHostAddress(received.concat(":").concat(String.valueOf(rPort)));
}
device.setKeepaliveTime(DateUtil.getNow());
// 回复200 OK
responseAck(getServerTransaction(evt), Response.OK);
if (device.getOnline() == 1) {
deviceService.updateDevice(device);
}else {
// 对于已经离线的设备判断他的注册是否已经过期
if (!deviceService.expire(device)){
deviceService.online(device);
}
}
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 心跳回复: {}", e.getMessage());
}
// 判断RPort是否改变改变则说明路由nat信息变化修改设备信息
// 获取到通信地址等信息
ViaHeader viaHeader = (ViaHeader) evt.getRequest().getHeader(ViaHeader.NAME);
String received = viaHeader.getReceived();
int rPort = viaHeader.getRPort();
// 解析本地地址替代
if (ObjectUtils.isEmpty(received) || rPort == -1) {
received = viaHeader.getHost();
rPort = viaHeader.getPort();
}
if (device.getPort() != rPort) {
device.setPort(rPort);
device.setHostAddress(received.concat(":").concat(String.valueOf(rPort)));
}
device.setKeepaliveTime(DateUtil.getNow());
if (device.getOnline() == 1) {
deviceService.updateDevice(device);
}else {
// 对于已经离线的设备判断他的注册是否已经过期
if (!deviceService.expire(device)){
deviceService.online(device);
}
}
}
@Override

View File

@ -81,8 +81,12 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
try {
Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset());
if (rootElementAfterCharset == null) {
logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
try {
logger.warn("[ 移动设备位置数据通知 ] content cannot be null, {}", sipMsgInfo.getEvt().getRequest());
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.BAD_REQUEST);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 移动设备位置数据通知 内容为空: {}", e.getMessage());
}
continue;
}
MobilePosition mobilePosition = new MobilePosition();
@ -133,7 +137,11 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
}
storager.updateChannelPosition(deviceChannel);
//回复 200 OK
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
try {
responseAck(getServerTransaction(sipMsgInfo.getEvt()), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 移动设备位置数据回复200: {}", e.getMessage());
}
// 发送redis消息 通知位置信息的变化
JSONObject jsonObject = new JSONObject();
@ -147,7 +155,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
jsonObject.put("speed", mobilePosition.getSpeed());
redisCatchStorage.sendMobilePositionMsg(jsonObject);
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
} catch (DocumentException e) {
e.printStackTrace();
}

View File

@ -67,33 +67,37 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
try {
// 回复200 OK
responseAck(getServerTransaction(evt), Response.OK);
Element snElement = rootElement.element("SN");
String sn = snElement.getText();
// 准备回复通道信息
List<DeviceChannel> deviceChannelInPlatforms = storager.queryChannelWithCatalog(parentPlatform.getServerGBId());
// 查询关联的直播通道
List<DeviceChannel> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
// 回复目录信息
List<DeviceChannel> catalogs = storager.queryCatalogInPlatform(parentPlatform.getServerGBId());
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 目录查询回复200OK: {}", e.getMessage());
}
Element snElement = rootElement.element("SN");
String sn = snElement.getText();
// 准备回复通道信息
List<DeviceChannel> deviceChannelInPlatforms = storager.queryChannelWithCatalog(parentPlatform.getServerGBId());
// 查询关联的直播通道
List<DeviceChannel> gbStreams = storager.queryGbStreamListInPlatform(parentPlatform.getServerGBId());
// 回复目录信息
List<DeviceChannel> catalogs = storager.queryCatalogInPlatform(parentPlatform.getServerGBId());
List<DeviceChannel> allChannels = new ArrayList<>();
List<DeviceChannel> allChannels = new ArrayList<>();
// 回复平台
// 回复平台
// DeviceChannel deviceChannel = getChannelForPlatform(parentPlatform);
// allChannels.add(deviceChannel);
// 回复目录
if (catalogs.size() > 0) {
allChannels.addAll(catalogs);
}
// 回复级联的通道
if (deviceChannelInPlatforms.size() > 0) {
allChannels.addAll(deviceChannelInPlatforms);
}
// 回复直播的通道
if (gbStreams.size() > 0) {
allChannels.addAll(gbStreams);
}
// 回复目录
if (catalogs.size() > 0) {
allChannels.addAll(catalogs);
}
// 回复级联的通道
if (deviceChannelInPlatforms.size() > 0) {
allChannels.addAll(deviceChannelInPlatforms);
}
// 回复直播的通道
if (gbStreams.size() > 0) {
allChannels.addAll(gbStreams);
}
try {
if (allChannels.size() > 0) {
cmderFroPlatform.catalogQuery(allChannels, parentPlatform, sn, fromHeader.getTag());
}else {
@ -101,9 +105,11 @@ public class CatalogQueryMessageHandler extends SIPRequestProcessorParent implem
cmderFroPlatform.catalogQuery(null, parentPlatform, sn, fromHeader.getTag(), 0);
}
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 目录查询: {}", e.getMessage());
logger.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage());
}
}
private DeviceChannel getChannelForPlatform(ParentPlatform platform) {

View File

@ -53,19 +53,20 @@ public class ConfigDownloadResponseMessageHandler extends SIPRequestProcessorPar
try {
// 回复200 OK
responseAck(getServerTransaction(evt), Response.OK);
// 此处是对本平台发出DeviceControl指令的应答
JSONObject json = new JSONObject();
XmlUtil.node2Json(element, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 设备配置查询: {}", e.getMessage());
logger.error("[命令发送失败] 设备配置查询: {}", e.getMessage());
}
// 此处是对本平台发出DeviceControl指令的应答
JSONObject json = new JSONObject();
XmlUtil.node2Json(element, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
}

View File

@ -47,20 +47,21 @@ public class DeviceControlResponseMessageHandler extends SIPRequestProcessorPare
// 此处是对本平台发出DeviceControl指令的应答
try {
responseAck(getServerTransaction(evt), Response.OK);
JSONObject json = new JSONObject();
String channelId = getText(element, "DeviceID");
XmlUtil.node2Json(element, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + device.getDeviceId() + channelId;
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 设备控制: {}", e.getMessage());
}
JSONObject json = new JSONObject();
String channelId = getText(element, "DeviceID");
XmlUtil.node2Json(element, json);
if (logger.isDebugEnabled()) {
logger.debug(json.toJSONString());
}
RequestMessage msg = new RequestMessage();
String key = DeferredResultHolder.CALLBACK_CMD_DEVICECONTROL + device.getDeviceId() + channelId;
msg.setKey(key);
msg.setData(json);
deferredResultHolder.invokeAllResult(msg);
}
@Override

View File

@ -78,9 +78,14 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent
ServerTransaction serverTransaction = getServerTransaction(evt);
try {
rootElement = getRootElement(evt, device.getCharset());
if (rootElement == null) {
if (rootElement == null) {
logger.warn("[ 接收到DeviceInfo应答消息 ] content cannot be null, {}", evt.getRequest());
responseAck(serverTransaction, Response.BAD_REQUEST);
try {
responseAck(serverTransaction, Response.BAD_REQUEST);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] DeviceInfo应答消息 BAD_REQUEST: {}", e.getMessage());
}
return;
}
Element deviceIdElement = rootElement.element("DeviceID");
@ -100,17 +105,16 @@ public class DeviceInfoResponseMessageHandler extends SIPRequestProcessorParent
msg.setKey(key);
msg.setData(device);
deferredResultHolder.invokeAllResult(msg);
} catch (DocumentException e) {
throw new RuntimeException(e);
}
try {
// 回复200 OK
responseAck(serverTransaction, Response.OK);
} catch (DocumentException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
e.printStackTrace();
} catch (ParseException e) {
e.printStackTrace();
} catch (SipException e) {
e.printStackTrace();
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] DeviceInfo应答消息 200: {}", e.getMessage());
}
}
@Override

View File

@ -71,7 +71,11 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
rootElement = getRootElement(evt, device.getCharset());
if (rootElement == null) {
logger.warn("[ 移动设备位置数据查询回复 ] content cannot be null, {}", evt.getRequest());
responseAck(serverTransaction, Response.BAD_REQUEST);
try {
responseAck(serverTransaction, Response.BAD_REQUEST);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 移动设备位置数据查询 BAD_REQUEST: {}", e.getMessage());
}
return;
}
MobilePosition mobilePosition = new MobilePosition();
@ -133,8 +137,13 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
jsonObject.put("speed", mobilePosition.getSpeed());
redisCatchStorage.sendMobilePositionMsg(jsonObject);
//回复 200 OK
responseAck(serverTransaction, Response.OK);
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
try {
responseAck(serverTransaction, Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage());
}
} catch (DocumentException e) {
e.printStackTrace();
}
}

View File

@ -58,7 +58,11 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
if (rootElement == null) {
logger.warn("[ 设备预置位查询应答 ] content cannot be null, {}", evt.getRequest());
responseAck(serverTransaction, Response.BAD_REQUEST);
try {
responseAck(serverTransaction, Response.BAD_REQUEST);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
}
return;
}
Element presetListNumElement = rootElement.element("PresetList");
@ -67,7 +71,11 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
String deviceId = getText(rootElement, "DeviceID");
String key = DeferredResultHolder.CALLBACK_CMD_PRESETQUERY + deviceId;
if (snElement == null || presetListNumElement == null) {
responseAck(serverTransaction, Response.BAD_REQUEST, "xml error");
try {
responseAck(serverTransaction, Response.BAD_REQUEST, "xml error");
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
}
return;
}
int sumNum = Integer.parseInt(presetListNumElement.attributeValue("Num"));
@ -94,11 +102,13 @@ public class PresetQueryResponseMessageHandler extends SIPRequestProcessorParent
requestMessage.setKey(key);
requestMessage.setData(presetQuerySipReqList);
deferredResultHolder.invokeAllResult(requestMessage);
responseAck(serverTransaction, Response.OK);
try {
responseAck(serverTransaction, Response.OK);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
}
} catch (DocumentException e) {
logger.error("[解析xml]失败: ", e);
} catch (InvalidArgumentException | ParseException | SipException e) {
logger.error("[命令发送失败] 设备预置位查询应答处理: {}", e.getMessage());
}
}

View File

@ -69,95 +69,91 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
@Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
// 回复200 OK
try {
// 回复200 OK
responseAck(getServerTransaction(evt), Response.OK);
taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
taskExecutor.execute(()->{
while (!taskQueue.isEmpty()) {
try {
HandlerCatchData take = taskQueue.poll();
Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset());
if (rootElement == null) {
logger.warn("[ 国标录像 ] content cannot be null, {}", evt.getRequest());
continue;
}
String sn = getText(rootElementForCharset, "SN");
String channelId = getText(rootElementForCharset, "DeviceID");
RecordInfo recordInfo = new RecordInfo();
recordInfo.setChannelId(channelId);
recordInfo.setDeviceId(take.getDevice().getDeviceId());
recordInfo.setSn(sn);
recordInfo.setName(getText(rootElementForCharset, "Name"));
String sumNumStr = getText(rootElementForCharset, "SumNum");
int sumNum = 0;
if (!ObjectUtils.isEmpty(sumNumStr)) {
sumNum = Integer.parseInt(sumNumStr);
}
recordInfo.setSumNum(sumNum);
Element recordListElement = rootElementForCharset.element("RecordList");
if (recordListElement == null || sumNum == 0) {
logger.info("无录像数据");
eventPublisher.recordEndEventPush(recordInfo);
recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>());
releaseRequest(take.getDevice().getDeviceId(), sn);
} else {
Iterator<Element> recordListIterator = recordListElement.elementIterator();
if (recordListIterator != null) {
List<RecordItem> recordList = new ArrayList<>();
// 遍历DeviceList
while (recordListIterator.hasNext()) {
Element itemRecord = recordListIterator.next();
Element recordElement = itemRecord.element("DeviceID");
if (recordElement == null) {
logger.info("记录为空,下一个...");
continue;
}
RecordItem record = new RecordItem();
record.setDeviceId(getText(itemRecord, "DeviceID"));
record.setName(getText(itemRecord, "Name"));
record.setFilePath(getText(itemRecord, "FilePath"));
record.setFileSize(getText(itemRecord, "FileSize"));
record.setAddress(getText(itemRecord, "Address"));
String startTimeStr = getText(itemRecord, "StartTime");
record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
String endTimeStr = getText(itemRecord, "EndTime");
record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
: Integer.parseInt(getText(itemRecord, "Secrecy")));
record.setType(getText(itemRecord, "Type"));
record.setRecorderId(getText(itemRecord, "RecorderID"));
recordList.add(record);
}
recordInfo.setRecordList(recordList);
// 发送消息如果是上级查询此录像则会通过这里通知给上级
eventPublisher.recordEndEventPush(recordInfo);
int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList);
logger.info("[国标录像] {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum);
}
if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){
releaseRequest(take.getDevice().getDeviceId(), sn);
}
}
} catch (DocumentException e) {
logger.error("xml解析异常 ", e);
}
}
taskQueueHandlerRun = false;
});
}
} catch (SipException | InvalidArgumentException | ParseException e) {
}catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 国标录像: {}", e.getMessage());
} finally {
taskQueueHandlerRun = false;
}
taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
if (!taskQueueHandlerRun) {
taskQueueHandlerRun = true;
taskExecutor.execute(()->{
while (!taskQueue.isEmpty()) {
try {
HandlerCatchData take = taskQueue.poll();
Element rootElementForCharset = getRootElement(take.getEvt(), take.getDevice().getCharset());
if (rootElement == null) {
logger.warn("[ 国标录像 ] content cannot be null, {}", evt.getRequest());
continue;
}
String sn = getText(rootElementForCharset, "SN");
String channelId = getText(rootElementForCharset, "DeviceID");
RecordInfo recordInfo = new RecordInfo();
recordInfo.setChannelId(channelId);
recordInfo.setDeviceId(take.getDevice().getDeviceId());
recordInfo.setSn(sn);
recordInfo.setName(getText(rootElementForCharset, "Name"));
String sumNumStr = getText(rootElementForCharset, "SumNum");
int sumNum = 0;
if (!ObjectUtils.isEmpty(sumNumStr)) {
sumNum = Integer.parseInt(sumNumStr);
}
recordInfo.setSumNum(sumNum);
Element recordListElement = rootElementForCharset.element("RecordList");
if (recordListElement == null || sumNum == 0) {
logger.info("无录像数据");
eventPublisher.recordEndEventPush(recordInfo);
recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, new ArrayList<>());
releaseRequest(take.getDevice().getDeviceId(), sn);
} else {
Iterator<Element> recordListIterator = recordListElement.elementIterator();
if (recordListIterator != null) {
List<RecordItem> recordList = new ArrayList<>();
// 遍历DeviceList
while (recordListIterator.hasNext()) {
Element itemRecord = recordListIterator.next();
Element recordElement = itemRecord.element("DeviceID");
if (recordElement == null) {
logger.info("记录为空,下一个...");
continue;
}
RecordItem record = new RecordItem();
record.setDeviceId(getText(itemRecord, "DeviceID"));
record.setName(getText(itemRecord, "Name"));
record.setFilePath(getText(itemRecord, "FilePath"));
record.setFileSize(getText(itemRecord, "FileSize"));
record.setAddress(getText(itemRecord, "Address"));
String startTimeStr = getText(itemRecord, "StartTime");
record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
String endTimeStr = getText(itemRecord, "EndTime");
record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
: Integer.parseInt(getText(itemRecord, "Secrecy")));
record.setType(getText(itemRecord, "Type"));
record.setRecorderId(getText(itemRecord, "RecorderID"));
recordList.add(record);
}
recordInfo.setRecordList(recordList);
// 发送消息如果是上级查询此录像则会通过这里通知给上级
eventPublisher.recordEndEventPush(recordInfo);
int count = recordDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, recordList);
logger.info("[国标录像] {}->{}: {}/{}", take.getDevice().getDeviceId(), sn, count, sumNum);
}
if (recordDataCatch.isComplete(take.getDevice().getDeviceId(), sn)){
releaseRequest(take.getDevice().getDeviceId(), sn);
}
}
} catch (DocumentException e) {
logger.error("xml解析异常 ", e);
}
}
taskQueueHandlerRun = false;
});
}
}