wvp_device_channel stream_id 长度过短导致国标级联上级点播历史录像出错

其他奇奇怪怪的bug 调试/修复
This commit is contained in:
zxb 2023-08-08 11:26:51 +08:00
parent f0cba184fe
commit df46dbd653
7 changed files with 64 additions and 38 deletions

13
pom.xml
View File

@ -300,11 +300,11 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- 开发工具 --> <!--&lt;!&ndash; 开发工具 &ndash;&gt;-->
<dependency> <!--<dependency>-->
<groupId>org.springframework.boot</groupId> <!-- <groupId>org.springframework.boot</groupId>-->
<artifactId>spring-boot-devtools</artifactId> <!-- <artifactId>spring-boot-devtools</artifactId>-->
</dependency> <!--</dependency>-->
<!-- lombok --> <!-- lombok -->
<dependency> <dependency>
@ -328,7 +328,8 @@
<build> <build>
<finalName>${project.artifactId}-${project.version}-${maven.build.timestamp}</finalName> <!--<finalName>${project.artifactId}-${project.version}-${maven.build.timestamp}</finalName>-->
<finalName>${project.artifactId}-${project.version}</finalName>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@ -120,7 +120,7 @@ alter table device_channel
change status status bool default false; change status status bool default false;
alter table device_channel alter table device_channel
change streamId stream_id varchar(50) null; change streamId stream_id varchar(80) null;
alter table device_channel alter table device_channel
change deviceId device_id varchar(50) not null; change deviceId device_id varchar(50) not null;

View File

@ -35,10 +35,12 @@ public class RecordEndEventListener implements ApplicationListener<RecordEndEven
String channelId = event.getRecordInfo().getChannelId(); String channelId = event.getRecordInfo().getChannelId();
int count = event.getRecordInfo().getCount(); int count = event.getRecordInfo().getCount();
int sumNum = event.getRecordInfo().getSumNum(); int sumNum = event.getRecordInfo().getSumNum();
logger.info("录像查询完成事件触发deviceId{}, channelId: {}, 录像数量{}/{}条", event.getRecordInfo().getDeviceId(), logger.info("录像查询完成事件触发deviceId{}, channelId: {}, 录像数量{}/{}条",
event.getRecordInfo().getDeviceId(),
event.getRecordInfo().getChannelId(), count,sumNum); event.getRecordInfo().getChannelId(), count,sumNum);
logger.debug("handlerMap.size => {}", handlerMap.size()); logger.debug("handlerMap.size => {}", handlerMap.size());
if (handlerMap.size() > 0) { if (handlerMap.size() > 0) {
logger.debug("handlerMap.keys => {}", handlerMap.keySet());
RecordEndEventHandler handler = handlerMap.get(deviceId + channelId); RecordEndEventHandler handler = handlerMap.get(deviceId + channelId);
logger.debug("handler => {}", handler); logger.debug("handler => {}", handler);
if (handler != null){ if (handler != null){

View File

@ -19,8 +19,7 @@ import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.GitUtil; import com.genersoft.iot.vmp.utils.GitUtil;
import gov.nist.javax.sip.message.MessageFactoryImpl; import gov.nist.javax.sip.message.MessageFactoryImpl;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import org.slf4j.Logger; import lombok.extern.slf4j.Slf4j;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
@ -37,12 +36,10 @@ import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@Slf4j
@Component @Component
@DependsOn("sipLayer") @DependsOn("sipLayer")
public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform { public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
private final Logger logger = LoggerFactory.getLogger(SIPCommanderFroPlatform.class);
@Autowired @Autowired
private SIPRequestHeaderPlarformProvider headerProviderPlatformProvider; private SIPRequestHeaderPlarformProvider headerProviderPlatformProvider;
@ -113,13 +110,13 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
// callid 写入缓存 等注册成功可以更新状态 // callid 写入缓存 等注册成功可以更新状态
String callIdFromHeader = callIdHeader.getCallId(); String callIdFromHeader = callIdHeader.getCallId();
PlatformRegisterInfo instance = PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister); PlatformRegisterInfo instance = PlatformRegisterInfo.getInstance(parentPlatform.getServerGBId(), isRegister);
logger.info("callIdFromHeader {}",callIdFromHeader); log.info("callIdFromHeader {}",callIdFromHeader);
logger.info("PlatformRegisterInfo {}",instance); log.info("PlatformRegisterInfo {}",instance);
redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, instance); redisCatchStorage.updatePlatformRegisterInfo(callIdFromHeader, instance);
sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{ sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (event)->{
if (event != null) { if (event != null) {
logger.info("向上级平台 [ {} ] 注册发生错误: {} ", log.info("向上级平台 [ {} ] 注册发生错误: {} ",
parentPlatform.getServerGBId(), parentPlatform.getServerGBId(),
event.msg); event.msg);
} }
@ -218,7 +215,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
}else { }else {
if (channel.getChannelId().length() != 20) { if (channel.getChannelId().length() != 20) {
catalogXml.append("</Item>\r\n"); catalogXml.append("</Item>\r\n");
logger.warn("[编号长度异常] {} 长度错误请使用20位长度的国标编号,当前长度:{}", channel.getChannelId(), channel.getChannelId().length()); log.warn("[编号长度异常] {} 长度错误请使用20位长度的国标编号,当前长度:{}", channel.getChannelId(), channel.getChannelId().length());
catalogXml.append("</Item>\r\n"); catalogXml.append("</Item>\r\n");
continue; continue;
} }
@ -386,9 +383,9 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
String callId = request.getCallIdHeader().getCallId(); String callId = request.getCallIdHeader().getCallId();
logger.info("[命令发送] 国标级联{} 目录查询回复: 共{}条,已发送{}条", parentPlatform.getServerGBId(), log.info("[命令发送] 国标级联{} 目录查询回复: 共{}条,已发送{}条", parentPlatform.getServerGBId(),
channels.size(), Math.min(index + parentPlatform.getCatalogGroup(), channels.size())); channels.size(), Math.min(index + parentPlatform.getCatalogGroup(), channels.size()));
logger.debug(catalogXml); log.debug(catalogXml);
if (sendAfterResponse) { if (sendAfterResponse) {
// 默认按照收到200回复后发送下一条 如果超时收不到回复就以30毫秒的间隔直接发送 // 默认按照收到200回复后发送下一条 如果超时收不到回复就以30毫秒的间隔直接发送
dynamicTask.startDelay(timeoutTaskKey, ()->{ dynamicTask.startDelay(timeoutTaskKey, ()->{
@ -397,11 +394,11 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
try { try {
sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext, false); sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext, false);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage()); log.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage());
} }
}, 3000); }, 3000);
sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, eventResult -> { sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, eventResult -> {
logger.error("[目录推送失败] 国标级联 platform : {}, code: {}, msg: {}, 停止发送", parentPlatform.getServerGBId(), eventResult.statusCode, eventResult.msg); log.error("[目录推送失败] 国标级联 platform : {}, code: {}, msg: {}, 停止发送", parentPlatform.getServerGBId(), eventResult.statusCode, eventResult.msg);
dynamicTask.stop(timeoutTaskKey); dynamicTask.stop(timeoutTaskKey);
}, eventResult -> { }, eventResult -> {
dynamicTask.stop(timeoutTaskKey); dynamicTask.stop(timeoutTaskKey);
@ -409,12 +406,12 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
try { try {
sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext, true); sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext, true);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage()); log.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage());
} }
}); });
}else { }else {
sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, eventResult -> { sipSender.transmitRequest(parentPlatform.getDeviceIp(), request, eventResult -> {
logger.error("[目录推送失败] 国标级联 platform : {}, code: {}, msg: {}, 停止发送", parentPlatform.getServerGBId(), eventResult.statusCode, eventResult.msg); log.error("[目录推送失败] 国标级联 platform : {}, code: {}, msg: {}, 停止发送", parentPlatform.getServerGBId(), eventResult.statusCode, eventResult.msg);
dynamicTask.stop(timeoutTaskKey); dynamicTask.stop(timeoutTaskKey);
}, null); }, null);
dynamicTask.startDelay(timeoutTaskKey, ()->{ dynamicTask.startDelay(timeoutTaskKey, ()->{
@ -422,7 +419,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
try { try {
sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext, false); sendCatalogResponse(channels, parentPlatform, sn, fromTag, indexNext, false);
} catch (SipException | InvalidArgumentException | ParseException e) { } catch (SipException | InvalidArgumentException | ParseException e) {
logger.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage()); log.error("[命令发送失败] 国标级联 目录查询回复: {}", e.getMessage());
} }
}, 30); }, 30);
} }
@ -501,8 +498,8 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (parentPlatform == null) { if (parentPlatform == null) {
return; return;
} }
if (logger.isDebugEnabled()) { if (log.isDebugEnabled()) {
logger.debug("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat()); log.debug("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
} }
String characterSet = parentPlatform.getCharacterSet(); String characterSet = parentPlatform.getCharacterSet();
@ -521,7 +518,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
.append("</Notify>\r\n"); .append("</Notify>\r\n");
sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> { sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
logger.error("发送NOTIFY通知消息失败。错误{} {}", eventResult.statusCode, eventResult.msg); log.error("发送NOTIFY通知消息失败。错误{} {}", eventResult.statusCode, eventResult.msg);
}, null); }, null);
} }
@ -531,7 +528,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
if (parentPlatform == null) { if (parentPlatform == null) {
return; return;
} }
logger.info("[发送报警通知]平台: {}/{}->{},{}: {}", parentPlatform.getServerGBId(), deviceAlarm.getChannelId(), log.info("[发送报警通知]平台: {}/{}->{},{}: {}", parentPlatform.getServerGBId(), deviceAlarm.getChannelId(),
deviceAlarm.getLongitude(), deviceAlarm.getLatitude(), JSON.toJSONString(deviceAlarm)); deviceAlarm.getLongitude(), deviceAlarm.getLatitude(), JSON.toJSONString(deviceAlarm));
String characterSet = parentPlatform.getCharacterSet(); String characterSet = parentPlatform.getCharacterSet();
StringBuffer deviceStatusXml = new StringBuffer(600); StringBuffer deviceStatusXml = new StringBuffer(600);
@ -579,14 +576,14 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels, String catalogXmlContent = getCatalogXmlContentForCatalogAddOrUpdate(parentPlatform, channels,
deviceChannels.size(), type, subscribeInfo); deviceChannels.size(), type, subscribeInfo);
sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> { sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
logger.error("发送NOTIFY通知消息失败。错误{} {}", eventResult.statusCode, eventResult.msg); log.error("发送NOTIFY通知消息失败。错误{} {}", eventResult.statusCode, eventResult.msg);
}, (eventResult -> { }, (eventResult -> {
try { try {
sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo, sendNotifyForCatalogAddOrUpdate(type, parentPlatform, deviceChannels, subscribeInfo,
finalIndex + parentPlatform.getCatalogGroup()); finalIndex + parentPlatform.getCatalogGroup());
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) { IllegalAccessException e) {
logger.error("[命令发送失败] 国标级联 NOTIFY通知: {}", e.getMessage()); log.error("[命令发送失败] 国标级联 NOTIFY通知: {}", e.getMessage());
} }
})); }));
} }
@ -666,7 +663,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
|| deviceChannels == null || deviceChannels == null
|| deviceChannels.size() == 0 || deviceChannels.size() == 0
|| subscribeInfo == null) { || subscribeInfo == null) {
logger.warn("[缺少必要参数]"); log.warn("[缺少必要参数]");
return; return;
} }
@ -685,14 +682,14 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
Integer finalIndex = index; Integer finalIndex = index;
String catalogXmlContent = getCatalogXmlContentForCatalogOther(parentPlatform, channels, type); String catalogXmlContent = getCatalogXmlContentForCatalogOther(parentPlatform, channels, type);
sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> { sendNotify(parentPlatform, catalogXmlContent, subscribeInfo, eventResult -> {
logger.error("发送NOTIFY通知消息失败。错误{} {}", eventResult.statusCode, eventResult.msg); log.error("发送NOTIFY通知消息失败。错误{} {}", eventResult.statusCode, eventResult.msg);
}, eventResult -> { }, eventResult -> {
try { try {
sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo, sendNotifyForCatalogOther(type, parentPlatform, deviceChannels, subscribeInfo,
finalIndex + parentPlatform.getCatalogGroup()); finalIndex + parentPlatform.getCatalogGroup());
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) { IllegalAccessException e) {
logger.error("[命令发送失败] 国标级联 NOTIFY通知: {}", e.getMessage()); log.error("[命令发送失败] 国标级联 NOTIFY通知: {}", e.getMessage());
} }
}); });
} }
@ -725,6 +722,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
} }
@Override @Override
public void recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo) throws SipException, InvalidArgumentException, ParseException { public void recordInfo(DeviceChannel deviceChannel, ParentPlatform parentPlatform, String fromTag, RecordInfo recordInfo) throws SipException, InvalidArgumentException, ParseException {
log.debug("deviceChannel => {}, parentPlatform => {}, fromTag => {}, recordInfo => {}", deviceChannel, parentPlatform, fromTag, recordInfo);
if ( parentPlatform ==null) { if ( parentPlatform ==null) {
return ; return ;
} }
@ -811,14 +809,14 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
@Override @Override
public void streamByeCmd(ParentPlatform parentPlatform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException { public void streamByeCmd(ParentPlatform parentPlatform, SendRtpItem sendRtpItem) throws SipException, InvalidArgumentException, ParseException {
if (sendRtpItem == null ) { if (sendRtpItem == null ) {
logger.info("[向上级发送BYE] sendRtpItem 为NULL"); log.info("[向上级发送BYE] sendRtpItem 为NULL");
return; return;
} }
if (parentPlatform == null) { if (parentPlatform == null) {
logger.info("[向上级发送BYE] platform 为NULL"); log.info("[向上级发送BYE] platform 为NULL");
return; return;
} }
logger.info("[向上级发送BYE] {}/{}", parentPlatform.getServerGBId(), sendRtpItem.getChannelId()); log.info("[向上级发送BYE] {}/{}", parentPlatform.getServerGBId(), sendRtpItem.getChannelId());
String mediaServerId = sendRtpItem.getMediaServerId(); String mediaServerId = sendRtpItem.getMediaServerId();
MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId); MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) { if (mediaServerItem != null) {
@ -827,7 +825,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
} }
SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(parentPlatform, sendRtpItem); SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(parentPlatform, sendRtpItem);
if (byeRequest == null) { if (byeRequest == null) {
logger.warn("[向上级发送bye]:无法创建 byeRequest"); log.warn("[向上级发送bye]:无法创建 byeRequest");
} }
sipSender.transmitRequest(parentPlatform.getDeviceIp(),byeRequest); sipSender.transmitRequest(parentPlatform.getDeviceIp(),byeRequest);
} }

View File

@ -8,9 +8,12 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.XmlUtil;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.UJson; import com.genersoft.iot.vmp.utils.UJson;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import org.dom4j.Document;
import org.dom4j.DocumentHelper;
import org.dom4j.Element; import org.dom4j.Element;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -78,7 +81,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
} }
taskExecutor.execute(()->{ taskExecutor.execute(()->{
try { try {
logger.debug("rootElement => {}",XmlUtil.getElementMap(rootElement));
String sn = getText(rootElement, "SN"); String sn = getText(rootElement, "SN");
String channelId = getText(rootElement, "DeviceID"); String channelId = getText(rootElement, "DeviceID");
RecordInfo recordInfo = new RecordInfo(); RecordInfo recordInfo = new RecordInfo();

View File

@ -644,4 +644,23 @@ public class XmlUtil {
} }
return val; return val;
} }
public static Map<String, Object> getElementMap(Element element) {
List<Element> elements = element.elements();
//没有子元素
Map<String, Object> map = new HashMap<>(16);
if (elements.size() == 0) {
String name = element.getName();
String value = element.getText();
map.put(name, value);
} else {
//有子元素
for (Element el : elements) {
String name = el.getName();
map.put(name,getElementMap(el));
}
}
return map;
}
} }

View File

@ -109,6 +109,9 @@ public class DateUtil {
} }
public static long getDifferenceForNow(String keepaliveTime) { public static long getDifferenceForNow(String keepaliveTime) {
if(keepaliveTime == null){
return 0;
}
Instant beforeInstant = Instant.from(formatter.parse(keepaliveTime)); Instant beforeInstant = Instant.from(formatter.parse(keepaliveTime));
return ChronoUnit.MILLIS.between(beforeInstant, Instant.now()); return ChronoUnit.MILLIS.between(beforeInstant, Instant.now());
} }