优化通道同步添加对SN的判断,精简代码
This commit is contained in:
parent
35b0f46bb4
commit
c0149f7bb4
@ -4,6 +4,7 @@ import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
public class CatalogData {
|
||||
private int sn; // 命令序列号
|
||||
private int total;
|
||||
private List<DeviceChannel> channelList;
|
||||
private Date lastTime;
|
||||
@ -15,6 +16,15 @@ public class CatalogData {
|
||||
}
|
||||
private CatalogDataStatus status;
|
||||
|
||||
|
||||
public int getSn() {
|
||||
return sn;
|
||||
}
|
||||
|
||||
public void setSn(int sn) {
|
||||
this.sn = sn;
|
||||
}
|
||||
|
||||
public int getTotal() {
|
||||
return total;
|
||||
}
|
||||
|
@ -54,6 +54,7 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
|
||||
@Autowired
|
||||
private SIPCommander cmder;
|
||||
|
||||
|
||||
private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
@Override
|
||||
@ -76,7 +77,7 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
|
||||
if (deviceInStore == null) { //第一次上线
|
||||
logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
|
||||
cmder.deviceInfoQuery(device);
|
||||
cmder.catalogQuery(device, null);
|
||||
deviceService.sync(device);
|
||||
}
|
||||
break;
|
||||
// 设备主动发送心跳触发的在线事件
|
||||
|
@ -26,28 +26,35 @@ public class CatalogDataCatch {
|
||||
@Autowired
|
||||
private IVideoManagerStorage storager;
|
||||
|
||||
public void addReady(String key) {
|
||||
CatalogData catalogData = data.get(key);
|
||||
public void addReady(Device device, int sn ) {
|
||||
CatalogData catalogData = data.get(device.getDeviceId());
|
||||
if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
|
||||
catalogData = new CatalogData();
|
||||
catalogData.setChannelList(new ArrayList<>());
|
||||
catalogData.setDevice(device);
|
||||
catalogData.setSn(sn);
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
|
||||
catalogData.setLastTime(new Date(System.currentTimeMillis()));
|
||||
data.put(key, catalogData);
|
||||
data.put(device.getDeviceId(), catalogData);
|
||||
}
|
||||
}
|
||||
|
||||
public void put(String key, int total, Device device, List<DeviceChannel> deviceChannelList) {
|
||||
CatalogData catalogData = data.get(key);
|
||||
public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) {
|
||||
catalogData = new CatalogData();
|
||||
catalogData.setSn(sn);
|
||||
catalogData.setTotal(total);
|
||||
catalogData.setDevice(device);
|
||||
catalogData.setChannelList(new ArrayList<>());
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
|
||||
catalogData.setLastTime(new Date(System.currentTimeMillis()));
|
||||
data.put(key, catalogData);
|
||||
data.put(deviceId, catalogData);
|
||||
}else {
|
||||
// 同一个设备的通道同步请求只考虑一个,其他的直接忽略
|
||||
if (catalogData.getSn() != sn) {
|
||||
return;
|
||||
}
|
||||
catalogData.setTotal(total);
|
||||
catalogData.setDevice(device);
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
|
||||
@ -56,20 +63,20 @@ public class CatalogDataCatch {
|
||||
}
|
||||
}
|
||||
|
||||
public List<DeviceChannel> get(String key) {
|
||||
CatalogData catalogData = data.get(key);
|
||||
public List<DeviceChannel> get(String deviceId) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) return null;
|
||||
return catalogData.getChannelList();
|
||||
}
|
||||
|
||||
public int getTotal(String key) {
|
||||
CatalogData catalogData = data.get(key);
|
||||
public int getTotal(String deviceId) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) return 0;
|
||||
return catalogData.getTotal();
|
||||
}
|
||||
|
||||
public SyncStatus getSyncStatus(String key) {
|
||||
CatalogData catalogData = data.get(key);
|
||||
public SyncStatus getSyncStatus(String deviceId) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null) return null;
|
||||
SyncStatus syncStatus = new SyncStatus();
|
||||
syncStatus.setCurrent(catalogData.getChannelList().size());
|
||||
@ -78,10 +85,6 @@ public class CatalogDataCatch {
|
||||
return syncStatus;
|
||||
}
|
||||
|
||||
public void del(String key) {
|
||||
data.remove(key);
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
|
||||
private void timerTask(){
|
||||
Set<String> keys = data.keySet();
|
||||
@ -92,23 +95,30 @@ public class CatalogDataCatch {
|
||||
Calendar calendarBefore30S = Calendar.getInstance();
|
||||
calendarBefore30S.setTime(new Date());
|
||||
calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30);
|
||||
for (String key : keys) {
|
||||
CatalogData catalogData = data.get(key);
|
||||
if (catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
|
||||
storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
|
||||
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
|
||||
for (String deviceId : keys) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if ( catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
|
||||
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
|
||||
storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
|
||||
if (catalogData.getTotal() != catalogData.getChannelList().size()) {
|
||||
String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
|
||||
catalogData.setErrorMsg(errorMsg);
|
||||
}
|
||||
}else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) {
|
||||
String errorMsg = "同步失败,等待回复超时";
|
||||
catalogData.setErrorMsg(errorMsg);
|
||||
}
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
|
||||
catalogData.setErrorMsg(errorMsg);
|
||||
}
|
||||
if (catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除
|
||||
data.remove(key);
|
||||
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除
|
||||
data.remove(deviceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void setChannelSyncEnd(String key, String errorMsg) {
|
||||
CatalogData catalogData = data.get(key);
|
||||
public void setChannelSyncEnd(String deviceId, String errorMsg) {
|
||||
CatalogData catalogData = data.get(deviceId);
|
||||
if (catalogData == null)return;
|
||||
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
|
||||
catalogData.setErrorMsg(errorMsg);
|
||||
|
@ -250,7 +250,7 @@ public interface ISIPCommander {
|
||||
*
|
||||
* @param device 视频设备
|
||||
*/
|
||||
boolean catalogQuery(Device device, SipSubscribe.Event errorEvent);
|
||||
boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent);
|
||||
|
||||
/**
|
||||
* 查询录像信息
|
||||
|
@ -1208,14 +1208,14 @@ public class SIPCommander implements ISIPCommander {
|
||||
* @param device 视频设备
|
||||
*/
|
||||
@Override
|
||||
public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) {
|
||||
public boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) {
|
||||
try {
|
||||
StringBuffer catalogXml = new StringBuffer(200);
|
||||
String charset = device.getCharset();
|
||||
catalogXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
|
||||
catalogXml.append("<Query>\r\n");
|
||||
catalogXml.append("<CmdType>Catalog</CmdType>\r\n");
|
||||
catalogXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
|
||||
catalogXml.append("<SN>" + sn + "</SN>\r\n");
|
||||
catalogXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
|
||||
catalogXml.append("</Query>\r\n");
|
||||
|
||||
|
@ -86,23 +86,17 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
||||
rootElement = getRootElement(evt, device.getCharset());
|
||||
Element deviceListElement = rootElement.element("DeviceList");
|
||||
Element sumNumElement = rootElement.element("SumNum");
|
||||
if (sumNumElement == null || deviceListElement == null) {
|
||||
Element snElement = rootElement.element("SN");
|
||||
if (snElement == null || sumNumElement == null || deviceListElement == null) {
|
||||
responseAck(evt, Response.BAD_REQUEST, "xml error");
|
||||
return;
|
||||
}
|
||||
int sumNum = Integer.parseInt(sumNumElement.getText());
|
||||
|
||||
if (sumNum == 0) {
|
||||
// 数据已经完整接收
|
||||
storager.cleanChannelsForDevice(device.getDeviceId());
|
||||
RequestMessage msg = new RequestMessage();
|
||||
msg.setKey(key);
|
||||
WVPResult<Object> result = new WVPResult<>();
|
||||
result.setCode(0);
|
||||
result.setData(device);
|
||||
msg.setData(result);
|
||||
result.setMsg("更新成功,共0条");
|
||||
deferredResultHolder.invokeAllResult(msg);
|
||||
catalogDataCatch.del(key);
|
||||
catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
|
||||
}else {
|
||||
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
|
||||
if (deviceListIterator != null) {
|
||||
@ -123,24 +117,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
||||
|
||||
channelList.add(deviceChannel);
|
||||
}
|
||||
int sn = Integer.parseInt(snElement.getText());
|
||||
logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(key) == null ? 0 :catalogDataCatch.get(key).size(), sumNum);
|
||||
catalogDataCatch.put(key, sumNum, device, channelList);
|
||||
if (catalogDataCatch.get(key).size() == sumNum) {
|
||||
catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList);
|
||||
if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) {
|
||||
// 数据已经完整接收
|
||||
boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
|
||||
RequestMessage msg = new RequestMessage();
|
||||
msg.setKey(key);
|
||||
WVPResult<Object> result = new WVPResult<>();
|
||||
result.setCode(0);
|
||||
result.setData(device);
|
||||
if (resetChannelsResult || sumNum ==0) {
|
||||
result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
|
||||
boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId()));
|
||||
if (!resetChannelsResult) {
|
||||
String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条";
|
||||
catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg);
|
||||
}else {
|
||||
result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
|
||||
catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
|
||||
}
|
||||
msg.setData(result);
|
||||
deferredResultHolder.invokeAllResult(msg);
|
||||
catalogDataCatch.del(key);
|
||||
}
|
||||
}
|
||||
// 回复200 OK
|
||||
@ -228,21 +216,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
||||
}
|
||||
|
||||
public SyncStatus getChannelSyncProgress(String deviceId) {
|
||||
String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
|
||||
if (catalogDataCatch.get(key) == null) {
|
||||
if (catalogDataCatch.get(deviceId) == null) {
|
||||
return null;
|
||||
}else {
|
||||
return catalogDataCatch.getSyncStatus(key);
|
||||
return catalogDataCatch.getSyncStatus(deviceId);
|
||||
}
|
||||
}
|
||||
|
||||
public void setChannelSyncReady(String deviceId) {
|
||||
String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
|
||||
catalogDataCatch.addReady(key);
|
||||
public void setChannelSyncReady(Device device, int sn) {
|
||||
catalogDataCatch.addReady(device, sn);
|
||||
}
|
||||
|
||||
public void setChannelSyncEnd(String deviceId, String errorMsg) {
|
||||
String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
|
||||
catalogDataCatch.setChannelSyncEnd(key, errorMsg);
|
||||
catalogDataCatch.setChannelSyncEnd(deviceId, errorMsg);
|
||||
}
|
||||
}
|
||||
|
@ -44,15 +44,8 @@ public interface IDeviceService {
|
||||
SyncStatus getChannelSyncStatus(String deviceId);
|
||||
|
||||
/**
|
||||
* 设置通道同步状态
|
||||
* @param deviceId 设备ID
|
||||
* 通道同步
|
||||
* @param device
|
||||
*/
|
||||
void setChannelSyncReady(String deviceId);
|
||||
|
||||
/**
|
||||
* 设置同步结束
|
||||
* @param deviceId 设备ID
|
||||
* @param errorMsg 错误信息
|
||||
*/
|
||||
void setChannelSyncEnd(String deviceId, String errorMsg);
|
||||
void sync(Device device);
|
||||
}
|
||||
|
@ -100,12 +100,16 @@ public class DeviceServiceImpl implements IDeviceService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setChannelSyncReady(String deviceId) {
|
||||
catalogResponseMessageHandler.setChannelSyncReady(deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setChannelSyncEnd(String deviceId, String errorMsg) {
|
||||
catalogResponseMessageHandler.setChannelSyncEnd(deviceId, errorMsg);
|
||||
public void sync(Device device) {
|
||||
if (catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()) != null) {
|
||||
logger.info("开启同步时发现同步已经存在");
|
||||
return;
|
||||
}
|
||||
int sn = (int)((Math.random()*9+1)*100000);
|
||||
catalogResponseMessageHandler.setChannelSyncReady(device, sn);
|
||||
sipCommander.catalogQuery(device, sn, event -> {
|
||||
String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
|
||||
catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -238,12 +238,15 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
|
||||
|
||||
@Override
|
||||
public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
|
||||
if (deviceChannelList == null) {
|
||||
return false;
|
||||
}
|
||||
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
|
||||
// 数据去重
|
||||
List<DeviceChannel> channels = new ArrayList<>();
|
||||
StringBuilder stringBuilder = new StringBuilder();
|
||||
Map<String, Integer> subContMap = new HashMap<>();
|
||||
if (deviceChannelList.size() > 1) {
|
||||
if (deviceChannelList != null && deviceChannelList.size() > 1) {
|
||||
// 数据去重
|
||||
Set<String> gbIdSet = new HashSet<>();
|
||||
for (DeviceChannel deviceChannel : deviceChannelList) {
|
||||
@ -300,6 +303,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
|
||||
dataSourceTransactionManager.commit(transactionStatus); //手动提交
|
||||
return true;
|
||||
}catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
dataSourceTransactionManager.rollback(transactionStatus);
|
||||
return false;
|
||||
}
|
||||
@ -415,10 +419,9 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
|
||||
TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
|
||||
boolean result = false;
|
||||
try {
|
||||
if (platformChannelMapper.delChannelForDeviceId(deviceId) <0 // 删除与国标平台的关联
|
||||
|| deviceChannelMapper.cleanChannelsByDeviceId(deviceId) < 0 // 删除他的通道
|
||||
|| deviceMapper.del(deviceId) < 0 // 移除设备信息
|
||||
) {
|
||||
platformChannelMapper.delChannelForDeviceId(deviceId);
|
||||
deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
|
||||
if ( deviceMapper.del(deviceId) < 0 ) {
|
||||
//事务回滚
|
||||
dataSourceTransactionManager.rollback(transactionStatus);
|
||||
}
|
||||
|
@ -172,12 +172,8 @@ public class DeviceQuery {
|
||||
wvpResult.setData(syncStatus);
|
||||
return wvpResult;
|
||||
}
|
||||
SyncStatus syncStatusReady = new SyncStatus();
|
||||
deviceService.setChannelSyncReady(deviceId);
|
||||
cmder.catalogQuery(device, event -> {
|
||||
String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
|
||||
deviceService.setChannelSyncEnd(deviceId, errorMsg);
|
||||
});
|
||||
deviceService.sync(device);
|
||||
|
||||
WVPResult<SyncStatus> wvpResult = new WVPResult<>();
|
||||
wvpResult.setCode(0);
|
||||
wvpResult.setMsg("开始同步");
|
||||
|
@ -61,23 +61,36 @@ export default {
|
||||
if (!this.syncFlag) {
|
||||
this.syncFlag = true;
|
||||
}
|
||||
if (res.data.data == null) {
|
||||
this.syncStatus = "success"
|
||||
this.percentage = 100;
|
||||
this.msg = '同步成功';
|
||||
}else if (res.data.data.total == 0){
|
||||
this.msg = `等待同步中`;
|
||||
this.timmer = setTimeout(this.getProgress, 300)
|
||||
}else if (res.data.data.errorMsg !== null ){
|
||||
this.msg = res.data.data.errorMsg;
|
||||
this.syncStatus = "exception"
|
||||
}else {
|
||||
this.total = res.data.data.total;
|
||||
this.current = res.data.data.current;
|
||||
this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
|
||||
this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
|
||||
this.timmer = setTimeout(this.getProgress, 300)
|
||||
|
||||
if (res.data.data != null) {
|
||||
if (res.data.data.total == 0) {
|
||||
if (res.data.data.errorMsg !== null ){
|
||||
this.msg = res.data.data.errorMsg;
|
||||
this.syncStatus = "exception"
|
||||
}else {
|
||||
this.msg = `等待同步中`;
|
||||
this.timmer = setTimeout(this.getProgress, 300)
|
||||
}
|
||||
}else {
|
||||
if (res.data.data.total == res.data.data.current) {
|
||||
this.syncStatus = "success"
|
||||
this.percentage = 100;
|
||||
this.msg = '同步成功';
|
||||
}else {
|
||||
if (res.data.data.errorMsg !== null ){
|
||||
this.msg = res.data.data.errorMsg;
|
||||
this.syncStatus = "exception"
|
||||
}else {
|
||||
this.total = res.data.data.total;
|
||||
this.current = res.data.data.current;
|
||||
this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
|
||||
this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
|
||||
this.timmer = setTimeout(this.getProgress, 300)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}else {
|
||||
if (this.syncFlag) {
|
||||
this.syncStatus = "success"
|
||||
|
Loading…
Reference in New Issue
Block a user