diff --git a/README.md b/README.md
index f9ef44a5..7ba443fe 100644
--- a/README.md
+++ b/README.md
@@ -106,7 +106,6 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git
- [X] 添加RTMP视频
- [X] 云端录像(需要部署单独服务配合使用)
- [X] 多流媒体节点,自动选择负载最低的节点使用。
-- [X] 支持使用mysql作为数据库,默认sqlite3,开箱即用。
- [X] WEB端支持播放H264与H265,音频支持G.711A/G.711U/AAC,覆盖国标常用编码格式。
[//]: # (# docker快速体验)
diff --git a/pom.xml b/pom.xml
index eade1f08..1546e39c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -101,13 +101,6 @@
8.0.22
-
-
- org.xerial
- sqlite-jdbc
- 3.32.3.2
-
-
com.github.pagehelper
diff --git a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
index 2812f92c..3b021de4 100644
--- a/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
+++ b/src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
@@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.conf;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
-import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -9,25 +8,27 @@ import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component;
-import java.util.Date;
+import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
/**
* 动态定时任务
+ * @author lin
*/
@Component
public class DynamicTask {
- private Logger logger = LoggerFactory.getLogger(DynamicTask.class);
+ private final Logger logger = LoggerFactory.getLogger(DynamicTask.class);
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
- private Map> futureMap = new ConcurrentHashMap<>();
- private Map runnableMap = new ConcurrentHashMap<>();
+ private final Map> futureMap = new ConcurrentHashMap<>();
+ private final Map runnableMap = new ConcurrentHashMap<>();
@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
@@ -47,7 +48,7 @@ public class DynamicTask {
* @return
*/
public void startCron(String key, Runnable task, int cycleForCatalog) {
- ScheduledFuture future = futureMap.get(key);
+ ScheduledFuture> future = futureMap.get(key);
if (future != null) {
if (future.isCancelled()) {
logger.debug("任务【{}】已存在但是关闭状态!!!", key);
@@ -76,7 +77,9 @@ public class DynamicTask {
*/
public void startDelay(String key, Runnable task, int delay) {
stop(key);
- Date starTime = new Date(System.currentTimeMillis() + delay);
+
+ // 获取执行的时刻
+ Instant startInstant = Instant.now().plusMillis(TimeUnit.MILLISECONDS.toMillis(delay));
ScheduledFuture future = futureMap.get(key);
if (future != null) {
@@ -88,7 +91,7 @@ public class DynamicTask {
}
}
// scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
- future = threadPoolTaskScheduler.schedule(task, starTime);
+ future = threadPoolTaskScheduler.schedule(task, startInstant);
if (future != null){
futureMap.put(key, future);
runnableMap.put(key, task);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
index 2b93d702..a3428b10 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
@@ -81,11 +81,11 @@ public class SipLayer{
tcpSipProvider.setDialogErrorsAutomaticallyHandled();
tcpSipProvider.addSipListener(sipProcessorObserver);
// tcpSipProvider.setAutomaticDialogSupportEnabled(false);
- logger.info("Sip Server TCP 启动成功 port {" + sipConfig.getMonitorIp() + ":" + sipConfig.getPort() + "}");
+ logger.info("[Sip Server] TCP 启动成功 {}:{}", sipConfig.getMonitorIp(), sipConfig.getPort());
} catch (TransportNotSupportedException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
- logger.error("无法使用 [ {}:{} ]作为SIP[ TCP ]服务,可排查: 1. sip.monitor-ip 是否为本机网卡IP; 2. sip.port 是否已被占用"
+ logger.error("[Sip Server] 无法使用 [ {}:{} ]作为SIP[ TCP ]服务,可排查: 1. sip.monitor-ip 是否为本机网卡IP; 2. sip.port 是否已被占用"
, sipConfig.getMonitorIp(), sipConfig.getPort());
} catch (TooManyListenersException e) {
e.printStackTrace();
@@ -108,14 +108,14 @@ public class SipLayer{
} catch (TransportNotSupportedException e) {
e.printStackTrace();
} catch (InvalidArgumentException e) {
- logger.error("无法使用 [ {}:{} ]作为SIP[ UDP ]服务,可排查: 1. sip.monitor-ip 是否为本机网卡IP; 2. sip.port 是否已被占用"
+ logger.error("[Sip Server] 无法使用 [ {}:{} ]作为SIP[ UDP ]服务,可排查: 1. sip.monitor-ip 是否为本机网卡IP; 2. sip.port 是否已被占用"
, sipConfig.getMonitorIp(), sipConfig.getPort());
} catch (TooManyListenersException e) {
e.printStackTrace();
} catch (ObjectInUseException e) {
e.printStackTrace();
}
- logger.info("Sip Server UDP 启动成功 port [" + sipConfig.getMonitorIp() + ":" + sipConfig.getPort() + "]");
+ logger.info("[Sip Server] UDP 启动成功 {}:{}", sipConfig.getMonitorIp(), sipConfig.getPort());
return udpSipProvider;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java
index f6284f5a..a0e16bff 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/auth/DigestServerAuthenticationHelper.java
@@ -27,8 +27,7 @@ package com.genersoft.iot.vmp.gb28181.auth;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
-import java.text.DecimalFormat;
-import java.util.Date;
+import java.time.Instant;
import java.util.Random;
import javax.sip.address.URI;
@@ -90,17 +89,12 @@ public class DigestServerAuthenticationHelper {
* @return a generated nonce.
*/
private String generateNonce() {
- // Get the time of day and run MD5 over it.
- Date date = new Date();
- long time = date.getTime();
+ long time = Instant.now().toEpochMilli();
Random rand = new Random();
long pad = rand.nextLong();
- // String nonceString = (new Long(time)).toString()
- // + (new Long(pad)).toString();
String nonceString = Long.valueOf(time).toString()
+ Long.valueOf(pad).toString();
byte mdbytes[] = messageDigest.digest(nonceString.getBytes());
- // Convert the mdbytes array into a hex string.
return toHexString(mdbytes);
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java
index 338f8ad5..8a96d356 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java
@@ -1,13 +1,13 @@
package com.genersoft.iot.vmp.gb28181.bean;
-import java.util.Date;
+import java.time.Instant;
import java.util.List;
public class CatalogData {
private int sn; // 命令序列号
private int total;
private List channelList;
- private Date lastTime;
+ private Instant lastTime;
private Device device;
private String errorMsg;
@@ -41,11 +41,11 @@ public class CatalogData {
this.channelList = channelList;
}
- public Date getLastTime() {
+ public Instant getLastTime() {
return lastTime;
}
- public void setLastTime(Date lastTime) {
+ public void setLastTime(Instant lastTime) {
this.lastTime = lastTime;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java
index 24fc2212..2121db7a 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordInfo.java
@@ -1,8 +1,6 @@
package com.genersoft.iot.vmp.gb28181.bean;
-
-//import gov.nist.javax.sip.header.SIPDate;
-
+import java.time.Instant;
import java.util.List;
/**
@@ -21,6 +19,8 @@ public class RecordInfo {
private String name;
private int sumNum;
+
+ private Instant lastTime;
private List recordList;
@@ -71,4 +71,12 @@ public class RecordInfo {
public void setSn(String sn) {
this.sn = sn;
}
+
+ public Instant getLastTime() {
+ return lastTime;
+ }
+
+ public void setLastTime(Instant lastTime) {
+ this.lastTime = lastTime;
+ }
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java
index 3349cdc5..a47147a1 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/RecordItem.java
@@ -5,7 +5,8 @@ import com.genersoft.iot.vmp.utils.DateUtil;
import org.jetbrains.annotations.NotNull;
import java.text.ParseException;
-import java.util.Date;
+import java.time.Instant;
+import java.time.temporal.TemporalAccessor;
/**
* @description:设备录像bean
@@ -116,17 +117,17 @@ public class RecordItem implements Comparable{
@Override
public int compareTo(@NotNull RecordItem recordItem) {
- try {
- Date startTime_now = DateUtil.format.parse(startTime);
- Date startTime_param = DateUtil.format.parse(recordItem.getStartTime());
- if (startTime_param.compareTo(startTime_now) > 0) {
- return -1;
- }else {
- return 1;
- }
- } catch (ParseException e) {
- e.printStackTrace();
+ TemporalAccessor startTimeNow = DateUtil.formatter.parse(startTime);
+ TemporalAccessor startTimeParam = DateUtil.formatter.parse(recordItem.getStartTime());
+ Instant startTimeParamInstant = Instant.from(startTimeParam);
+ Instant startTimeNowInstant = Instant.from(startTimeNow);
+ if (startTimeNowInstant.equals(startTimeParamInstant)) {
+ return 0;
+ }else if (Instant.from(startTimeParam).isAfter(Instant.from(startTimeNow)) ) {
+ return -1;
+ }else {
+ return 1;
}
- return 0;
+
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
index 2736be2d..f191c005 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
@@ -66,7 +66,6 @@ public class SubscribeHolder {
dynamicTask.stop(taskOverdueKey);
// 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> {
- System.out.println("订阅过期");
removeMobilePositionSubscribe(subscribeInfo.getId());
},
subscribeInfo.getExpires() * 1000);
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
index bc775e46..3d817c34 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/SipSubscribe.java
@@ -9,11 +9,14 @@ import org.springframework.stereotype.Component;
import javax.sip.*;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
-import java.util.Calendar;
-import java.util.Date;
+import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+/**
+ * @author lin
+ */
@Component
public class SipSubscribe {
@@ -23,28 +26,25 @@ public class SipSubscribe {
private Map okSubscribes = new ConcurrentHashMap<>();
- private Map okTimeSubscribes = new ConcurrentHashMap<>();
- private Map errorTimeSubscribes = new ConcurrentHashMap<>();
+ private Map okTimeSubscribes = new ConcurrentHashMap<>();
+ private Map errorTimeSubscribes = new ConcurrentHashMap<>();
// @Scheduled(cron="*/5 * * * * ?") //每五秒执行一次
-// @Scheduled(fixedRate= 100 * 60 * 60 )
+ // @Scheduled(fixedRate= 100 * 60 * 60 )
@Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
public void execute(){
logger.info("[定时任务] 清理过期的SIP订阅信息");
- Calendar calendar = Calendar.getInstance();
- calendar.setTime(new Date());
- calendar.set(Calendar.MINUTE, calendar.get(Calendar.MINUTE) - 5);
+
+ Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
for (String key : okTimeSubscribes.keySet()) {
- if (okTimeSubscribes.get(key).before(calendar.getTime())){
-// logger.info("[定时任务] 清理过期的订阅信息: {}", key);
+ if (okTimeSubscribes.get(key).isBefore(instant)){
okSubscribes.remove(key);
okTimeSubscribes.remove(key);
}
}
for (String key : errorTimeSubscribes.keySet()) {
- if (errorTimeSubscribes.get(key).before(calendar.getTime())){
-// logger.info("[定时任务] 清理过期的订阅信息: {}", key);
+ if (errorTimeSubscribes.get(key).isBefore(instant)){
errorSubscribes.remove(key);
errorTimeSubscribes.remove(key);
}
@@ -117,12 +117,12 @@ public class SipSubscribe {
public void addErrorSubscribe(String key, SipSubscribe.Event event) {
errorSubscribes.put(key, event);
- errorTimeSubscribes.put(key, new Date());
+ errorTimeSubscribes.put(key, Instant.now());
}
public void addOkSubscribe(String key, SipSubscribe.Event event) {
okSubscribes.put(key, event);
- okTimeSubscribes.put(key, new Date());
+ okTimeSubscribes.put(key, Instant.now());
}
public SipSubscribe.Event getErrorSubscribe(String key) {
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
index 75b9f59a..7ed3c116 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
@@ -4,25 +4,21 @@ import com.genersoft.iot.vmp.gb28181.bean.CatalogData;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
-import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
-import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
@Component
public class CatalogDataCatch {
public static Map data = new ConcurrentHashMap<>();
- @Autowired
- private DeferredResultHolder deferredResultHolder;
-
@Autowired
private IVideoManagerStorage storager;
@@ -34,7 +30,7 @@ public class CatalogDataCatch {
catalogData.setDevice(device);
catalogData.setSn(sn);
catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
- catalogData.setLastTime(new Date(System.currentTimeMillis()));
+ catalogData.setLastTime(Instant.now());
data.put(device.getDeviceId(), catalogData);
}
}
@@ -48,7 +44,7 @@ public class CatalogDataCatch {
catalogData.setDevice(device);
catalogData.setChannelList(Collections.synchronizedList(new ArrayList<>()));
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
- catalogData.setLastTime(new Date(System.currentTimeMillis()));
+ catalogData.setLastTime(Instant.now());
data.put(deviceId, catalogData);
}else {
// 同一个设备的通道同步请求只考虑一个,其他的直接忽略
@@ -59,7 +55,7 @@ public class CatalogDataCatch {
catalogData.setDevice(device);
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
catalogData.getChannelList().addAll(deviceChannelList);
- catalogData.setLastTime(new Date(System.currentTimeMillis()));
+ catalogData.setLastTime(Instant.now());
}
}
@@ -102,16 +98,13 @@ public class CatalogDataCatch {
@Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
private void timerTask(){
Set keys = data.keySet();
- Calendar calendarBefore5S = Calendar.getInstance();
- calendarBefore5S.setTime(new Date());
- calendarBefore5S.set(Calendar.SECOND, calendarBefore5S.get(Calendar.SECOND) - 5);
- Calendar calendarBefore30S = Calendar.getInstance();
- calendarBefore30S.setTime(new Date());
- calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30);
+ Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5));
+ Instant instantBefore30S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(30));
+
for (String deviceId : keys) {
CatalogData catalogData = data.get(deviceId);
- if ( catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
+ if ( catalogData.getLastTime().isBefore(instantBefore5S)) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
if (catalogData.getTotal() != catalogData.getChannelList().size()) {
@@ -124,7 +117,7 @@ public class CatalogDataCatch {
}
catalogData.setStatus(CatalogData.CatalogDataStatus.end);
}
- if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除
+ if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().isBefore(instantBefore30S)) { // 超过三十秒,如果标记为end则删除
data.remove(deviceId);
}
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java
new file mode 100644
index 00000000..dd5b8dfe
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/RecordDataCatch.java
@@ -0,0 +1,91 @@
+package com.genersoft.iot.vmp.gb28181.session;
+
+import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
+import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.time.Instant;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author lin
+ */
+@Component
+public class RecordDataCatch {
+
+ public static Map data = new ConcurrentHashMap<>();
+
+ @Autowired
+ private DeferredResultHolder deferredResultHolder;
+
+
+ public int put(String deviceId, String sn, int sumNum, List recordItems) {
+ String key = deviceId + sn;
+ RecordInfo recordInfo = data.get(key);
+ if (recordInfo == null) {
+ recordInfo = new RecordInfo();
+ recordInfo.setDeviceId(deviceId);
+ recordInfo.setSn(sn.trim());
+ recordInfo.setSumNum(sumNum);
+ recordInfo.setRecordList(Collections.synchronizedList(new ArrayList<>()));
+ recordInfo.setLastTime(Instant.now());
+ recordInfo.getRecordList().addAll(recordItems);
+ data.put(key, recordInfo);
+ }else {
+ // 同一个设备的通道同步请求只考虑一个,其他的直接忽略
+ if (!Objects.equals(sn.trim(), recordInfo.getSn())) {
+ return 0;
+ }
+ recordInfo.getRecordList().addAll(recordItems);
+ recordInfo.setLastTime(Instant.now());
+ }
+ return recordInfo.getRecordList().size();
+ }
+
+ @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
+ private void timerTask(){
+ Set keys = data.keySet();
+ // 获取五秒前的时刻
+ Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5));
+ for (String key : keys) {
+ RecordInfo recordInfo = data.get(key);
+ // 超过五秒收不到消息任务超时, 只更新这一部分数据
+ if ( recordInfo.getLastTime().isBefore(instantBefore5S)) {
+ // 处理录像数据, 返回给前端
+ String msgKey = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn();
+
+ WVPResult wvpResult = new WVPResult<>();
+ wvpResult.setCode(0);
+ wvpResult.setMsg("success");
+ // 对数据进行排序
+ Collections.sort(recordInfo.getRecordList());
+ wvpResult.setData(recordInfo);
+
+ RequestMessage msg = new RequestMessage();
+ msg.setKey(msgKey);
+ msg.setData(wvpResult);
+ deferredResultHolder.invokeAllResult(msg);
+ data.remove(key);
+ }
+ }
+ }
+
+ public boolean isComplete(String deviceId, String sn) {
+ RecordInfo recordInfo = data.get(deviceId + sn);
+ return recordInfo != null && recordInfo.getRecordList().size() == recordInfo.getSumNum();
+ }
+
+ public RecordInfo getRecordInfo(String deviceId, String sn) {
+ return data.get(deviceId + sn);
+ }
+
+ public void remove(String deviceId, String sn) {
+ data.remove(deviceId + sn);
+ }
+}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
index 8d72a282..85bc39dc 100644
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
+++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/VideoStreamSessionManager.java
@@ -76,8 +76,8 @@ public class VideoStreamSessionManager {
}
- public ClientTransaction getTransactionByStream(String deviceId, String channelId, String stream){
- SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream);
+ public ClientTransaction getTransaction(String deviceId, String channelId, String stream, String callId){
+ SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, callId, stream);
if (ssrcTransaction == null) {
return null;
}
diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java
deleted file mode 100644
index 8a2e9009..00000000
--- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/CheckForAllRecordsThread.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package com.genersoft.iot.vmp.gb28181.transmit.callback;
-
-import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
-import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
-import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.RecordInfoResponseMessageHandler;
-import com.genersoft.iot.vmp.utils.redis.RedisUtil;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-@SuppressWarnings("unchecked")
-public class CheckForAllRecordsThread extends Thread {
-
- private String key;
-
- private RecordInfo recordInfo;
-
- private RedisUtil redis;
-
- private Logger logger;
-
- private DeferredResultHolder deferredResultHolder;
-
- public CheckForAllRecordsThread(String key, RecordInfo recordInfo) {
- this.key = key;
- this.recordInfo = recordInfo;
- }
-
- @Override
- public void run() {
-
- String cacheKey = this.key;
-
- for (long stop = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); stop > System.nanoTime();) {
- List