Merge branch 'wvp-28181-2.0' of https://github.com/mk1990/wvp-GB28181-pro into wvp-28181-2.0

This commit is contained in:
mk1990 2022-05-16 10:14:24 +08:00
commit 57febb3b76
34 changed files with 368 additions and 331 deletions

View File

@ -106,7 +106,6 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git
- [X] 添加RTMP视频 - [X] 添加RTMP视频
- [X] 云端录像(需要部署单独服务配合使用) - [X] 云端录像(需要部署单独服务配合使用)
- [X] 多流媒体节点,自动选择负载最低的节点使用。 - [X] 多流媒体节点,自动选择负载最低的节点使用。
- [X] 支持使用mysql作为数据库默认sqlite3,开箱即用。
- [X] WEB端支持播放H264与H265音频支持G.711A/G.711U/AAC,覆盖国标常用编码格式。 - [X] WEB端支持播放H264与H265音频支持G.711A/G.711U/AAC,覆盖国标常用编码格式。
[//]: # (# docker快速体验) [//]: # (# docker快速体验)

View File

@ -101,13 +101,6 @@
<version>8.0.22</version> <version>8.0.22</version>
</dependency> </dependency>
<!-- 添加sqlite-jdbc数据库驱动 -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.32.3.2</version>
</dependency>
<!--Mybatis分页插件 --> <!--Mybatis分页插件 -->
<dependency> <dependency>
<groupId>com.github.pagehelper</groupId> <groupId>com.github.pagehelper</groupId>

View File

@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.conf; package com.genersoft.iot.vmp.conf;
import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask; import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -9,25 +8,27 @@ import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Date; import java.time.Instant;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/** /**
* 动态定时任务 * 动态定时任务
* @author lin
*/ */
@Component @Component
public class DynamicTask { public class DynamicTask {
private Logger logger = LoggerFactory.getLogger(DynamicTask.class); private final Logger logger = LoggerFactory.getLogger(DynamicTask.class);
@Autowired @Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler; private ThreadPoolTaskScheduler threadPoolTaskScheduler;
private Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>(); private final Map<String, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();
private Map<String, Runnable> runnableMap = new ConcurrentHashMap<>(); private final Map<String, Runnable> runnableMap = new ConcurrentHashMap<>();
@Bean @Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() { public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
@ -47,7 +48,7 @@ public class DynamicTask {
* @return * @return
*/ */
public void startCron(String key, Runnable task, int cycleForCatalog) { public void startCron(String key, Runnable task, int cycleForCatalog) {
ScheduledFuture future = futureMap.get(key); ScheduledFuture<?> future = futureMap.get(key);
if (future != null) { if (future != null) {
if (future.isCancelled()) { if (future.isCancelled()) {
logger.debug("任务【{}】已存在但是关闭状态!!!", key); logger.debug("任务【{}】已存在但是关闭状态!!!", key);
@ -76,7 +77,9 @@ public class DynamicTask {
*/ */
public void startDelay(String key, Runnable task, int delay) { public void startDelay(String key, Runnable task, int delay) {
stop(key); stop(key);
Date starTime = new Date(System.currentTimeMillis() + delay);
// 获取执行的时刻
Instant startInstant = Instant.now().plusMillis(TimeUnit.MILLISECONDS.toMillis(delay));
ScheduledFuture future = futureMap.get(key); ScheduledFuture future = futureMap.get(key);
if (future != null) { if (future != null) {
@ -88,7 +91,7 @@ public class DynamicTask {
} }
} }
// scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period cycleForCatalog表示执行的间隔 // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period cycleForCatalog表示执行的间隔
future = threadPoolTaskScheduler.schedule(task, starTime); future = threadPoolTaskScheduler.schedule(task, startInstant);
if (future != null){ if (future != null){
futureMap.put(key, future); futureMap.put(key, future);
runnableMap.put(key, task); runnableMap.put(key, task);

View File

@ -81,11 +81,11 @@ public class SipLayer{
tcpSipProvider.setDialogErrorsAutomaticallyHandled(); tcpSipProvider.setDialogErrorsAutomaticallyHandled();
tcpSipProvider.addSipListener(sipProcessorObserver); tcpSipProvider.addSipListener(sipProcessorObserver);
// tcpSipProvider.setAutomaticDialogSupportEnabled(false); // tcpSipProvider.setAutomaticDialogSupportEnabled(false);
logger.info("Sip Server TCP 启动成功 port {" + sipConfig.getMonitorIp() + ":" + sipConfig.getPort() + "}"); logger.info("[Sip Server] TCP 启动成功 {}:{}", sipConfig.getMonitorIp(), sipConfig.getPort());
} catch (TransportNotSupportedException e) { } catch (TransportNotSupportedException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InvalidArgumentException e) { } catch (InvalidArgumentException e) {
logger.error("无法使用 [ {}:{} ]作为SIP[ TCP ]服务,可排查: 1. sip.monitor-ip 是否为本机网卡IP; 2. sip.port 是否已被占用" logger.error("[Sip Server] 无法使用 [ {}:{} ]作为SIP[ TCP ]服务,可排查: 1. sip.monitor-ip 是否为本机网卡IP; 2. sip.port 是否已被占用"
, sipConfig.getMonitorIp(), sipConfig.getPort()); , sipConfig.getMonitorIp(), sipConfig.getPort());
} catch (TooManyListenersException e) { } catch (TooManyListenersException e) {
e.printStackTrace(); e.printStackTrace();
@ -108,14 +108,14 @@ public class SipLayer{
} catch (TransportNotSupportedException e) { } catch (TransportNotSupportedException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InvalidArgumentException e) { } catch (InvalidArgumentException e) {
logger.error("无法使用 [ {}:{} ]作为SIP[ UDP ]服务,可排查: 1. sip.monitor-ip 是否为本机网卡IP; 2. sip.port 是否已被占用" logger.error("[Sip Server] 无法使用 [ {}:{} ]作为SIP[ UDP ]服务,可排查: 1. sip.monitor-ip 是否为本机网卡IP; 2. sip.port 是否已被占用"
, sipConfig.getMonitorIp(), sipConfig.getPort()); , sipConfig.getMonitorIp(), sipConfig.getPort());
} catch (TooManyListenersException e) { } catch (TooManyListenersException e) {
e.printStackTrace(); e.printStackTrace();
} catch (ObjectInUseException e) { } catch (ObjectInUseException e) {
e.printStackTrace(); e.printStackTrace();
} }
logger.info("Sip Server UDP 启动成功 port [" + sipConfig.getMonitorIp() + ":" + sipConfig.getPort() + "]"); logger.info("[Sip Server] UDP 启动成功 {}:{}", sipConfig.getMonitorIp(), sipConfig.getPort());
return udpSipProvider; return udpSipProvider;
} }

View File

@ -27,8 +27,7 @@ package com.genersoft.iot.vmp.gb28181.auth;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.text.DecimalFormat; import java.time.Instant;
import java.util.Date;
import java.util.Random; import java.util.Random;
import javax.sip.address.URI; import javax.sip.address.URI;
@ -90,17 +89,12 @@ public class DigestServerAuthenticationHelper {
* @return a generated nonce. * @return a generated nonce.
*/ */
private String generateNonce() { private String generateNonce() {
// Get the time of day and run MD5 over it. long time = Instant.now().toEpochMilli();
Date date = new Date();
long time = date.getTime();
Random rand = new Random(); Random rand = new Random();
long pad = rand.nextLong(); long pad = rand.nextLong();
// String nonceString = (new Long(time)).toString()
// + (new Long(pad)).toString();
String nonceString = Long.valueOf(time).toString() String nonceString = Long.valueOf(time).toString()
+ Long.valueOf(pad).toString(); + Long.valueOf(pad).toString();
byte mdbytes[] = messageDigest.digest(nonceString.getBytes()); byte mdbytes[] = messageDigest.digest(nonceString.getBytes());
// Convert the mdbytes array into a hex string.
return toHexString(mdbytes); return toHexString(mdbytes);
} }

View File

@ -1,13 +1,13 @@
package com.genersoft.iot.vmp.gb28181.bean; package com.genersoft.iot.vmp.gb28181.bean;
import java.util.Date; import java.time.Instant;
import java.util.List; import java.util.List;
public class CatalogData { public class CatalogData {
private int sn; // 命令序列号 private int sn; // 命令序列号
private int total; private int total;
private List<DeviceChannel> channelList; private List<DeviceChannel> channelList;
private Date lastTime; private Instant lastTime;
private Device device; private Device device;
private String errorMsg; private String errorMsg;
@ -41,11 +41,11 @@ public class CatalogData {
this.channelList = channelList; this.channelList = channelList;
} }
public Date getLastTime() { public Instant getLastTime() {
return lastTime; return lastTime;
} }
public void setLastTime(Date lastTime) { public void setLastTime(Instant lastTime) {
this.lastTime = lastTime; this.lastTime = lastTime;
} }

View File

@ -1,8 +1,6 @@
package com.genersoft.iot.vmp.gb28181.bean; package com.genersoft.iot.vmp.gb28181.bean;
import java.time.Instant;
//import gov.nist.javax.sip.header.SIPDate;
import java.util.List; import java.util.List;
/** /**
@ -21,6 +19,8 @@ public class RecordInfo {
private String name; private String name;
private int sumNum; private int sumNum;
private Instant lastTime;
private List<RecordItem> recordList; private List<RecordItem> recordList;
@ -71,4 +71,12 @@ public class RecordInfo {
public void setSn(String sn) { public void setSn(String sn) {
this.sn = sn; this.sn = sn;
} }
public Instant getLastTime() {
return lastTime;
}
public void setLastTime(Instant lastTime) {
this.lastTime = lastTime;
}
} }

View File

@ -5,7 +5,8 @@ import com.genersoft.iot.vmp.utils.DateUtil;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.text.ParseException; import java.text.ParseException;
import java.util.Date; import java.time.Instant;
import java.time.temporal.TemporalAccessor;
/** /**
* @description:设备录像bean * @description:设备录像bean
@ -116,17 +117,17 @@ public class RecordItem implements Comparable<RecordItem>{
@Override @Override
public int compareTo(@NotNull RecordItem recordItem) { public int compareTo(@NotNull RecordItem recordItem) {
try { TemporalAccessor startTimeNow = DateUtil.formatter.parse(startTime);
Date startTime_now = DateUtil.format.parse(startTime); TemporalAccessor startTimeParam = DateUtil.formatter.parse(recordItem.getStartTime());
Date startTime_param = DateUtil.format.parse(recordItem.getStartTime()); Instant startTimeParamInstant = Instant.from(startTimeParam);
if (startTime_param.compareTo(startTime_now) > 0) { Instant startTimeNowInstant = Instant.from(startTimeNow);
return -1; if (startTimeNowInstant.equals(startTimeParamInstant)) {
}else { return 0;
return 1; }else if (Instant.from(startTimeParam).isAfter(Instant.from(startTimeNow)) ) {
} return -1;
} catch (ParseException e) { }else {
e.printStackTrace(); return 1;
} }
return 0;
} }
} }

View File

@ -66,7 +66,6 @@ public class SubscribeHolder {
dynamicTask.stop(taskOverdueKey); dynamicTask.stop(taskOverdueKey);
// 添加任务处理订阅过期 // 添加任务处理订阅过期
dynamicTask.startDelay(taskOverdueKey, () -> { dynamicTask.startDelay(taskOverdueKey, () -> {
System.out.println("订阅过期");
removeMobilePositionSubscribe(subscribeInfo.getId()); removeMobilePositionSubscribe(subscribeInfo.getId());
}, },
subscribeInfo.getExpires() * 1000); subscribeInfo.getExpires() * 1000);

View File

@ -9,11 +9,14 @@ import org.springframework.stereotype.Component;
import javax.sip.*; import javax.sip.*;
import javax.sip.header.CallIdHeader; import javax.sip.header.CallIdHeader;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.util.Calendar; import java.time.Instant;
import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author lin
*/
@Component @Component
public class SipSubscribe { public class SipSubscribe {
@ -23,28 +26,25 @@ public class SipSubscribe {
private Map<String, SipSubscribe.Event> okSubscribes = new ConcurrentHashMap<>(); private Map<String, SipSubscribe.Event> okSubscribes = new ConcurrentHashMap<>();
private Map<String, Date> okTimeSubscribes = new ConcurrentHashMap<>(); private Map<String, Instant> okTimeSubscribes = new ConcurrentHashMap<>();
private Map<String, Date> errorTimeSubscribes = new ConcurrentHashMap<>(); private Map<String, Instant> errorTimeSubscribes = new ConcurrentHashMap<>();
// @Scheduled(cron="*/5 * * * * ?") //每五秒执行一次 // @Scheduled(cron="*/5 * * * * ?") //每五秒执行一次
// @Scheduled(fixedRate= 100 * 60 * 60 ) // @Scheduled(fixedRate= 100 * 60 * 60 )
@Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次 @Scheduled(cron="0 0/5 * * * ?") //每5分钟执行一次
public void execute(){ public void execute(){
logger.info("[定时任务] 清理过期的SIP订阅信息"); logger.info("[定时任务] 清理过期的SIP订阅信息");
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date()); Instant instant = Instant.now().minusMillis(TimeUnit.MINUTES.toMillis(5));
calendar.set(Calendar.MINUTE, calendar.get(Calendar.MINUTE) - 5);
for (String key : okTimeSubscribes.keySet()) { for (String key : okTimeSubscribes.keySet()) {
if (okTimeSubscribes.get(key).before(calendar.getTime())){ if (okTimeSubscribes.get(key).isBefore(instant)){
// logger.info("[定时任务] 清理过期的订阅信息: {}", key);
okSubscribes.remove(key); okSubscribes.remove(key);
okTimeSubscribes.remove(key); okTimeSubscribes.remove(key);
} }
} }
for (String key : errorTimeSubscribes.keySet()) { for (String key : errorTimeSubscribes.keySet()) {
if (errorTimeSubscribes.get(key).before(calendar.getTime())){ if (errorTimeSubscribes.get(key).isBefore(instant)){
// logger.info("[定时任务] 清理过期的订阅信息: {}", key);
errorSubscribes.remove(key); errorSubscribes.remove(key);
errorTimeSubscribes.remove(key); errorTimeSubscribes.remove(key);
} }
@ -117,12 +117,12 @@ public class SipSubscribe {
public void addErrorSubscribe(String key, SipSubscribe.Event event) { public void addErrorSubscribe(String key, SipSubscribe.Event event) {
errorSubscribes.put(key, event); errorSubscribes.put(key, event);
errorTimeSubscribes.put(key, new Date()); errorTimeSubscribes.put(key, Instant.now());
} }
public void addOkSubscribe(String key, SipSubscribe.Event event) { public void addOkSubscribe(String key, SipSubscribe.Event event) {
okSubscribes.put(key, event); okSubscribes.put(key, event);
okTimeSubscribes.put(key, new Date()); okTimeSubscribes.put(key, Instant.now());
} }
public SipSubscribe.Event getErrorSubscribe(String key) { public SipSubscribe.Event getErrorSubscribe(String key) {

View File

@ -4,25 +4,21 @@ import com.genersoft.iot.vmp.gb28181.bean.CatalogData;
import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@Component @Component
public class CatalogDataCatch { public class CatalogDataCatch {
public static Map<String, CatalogData> data = new ConcurrentHashMap<>(); public static Map<String, CatalogData> data = new ConcurrentHashMap<>();
@Autowired
private DeferredResultHolder deferredResultHolder;
@Autowired @Autowired
private IVideoManagerStorage storager; private IVideoManagerStorage storager;
@ -34,7 +30,7 @@ public class CatalogDataCatch {
catalogData.setDevice(device); catalogData.setDevice(device);
catalogData.setSn(sn); catalogData.setSn(sn);
catalogData.setStatus(CatalogData.CatalogDataStatus.ready); catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
catalogData.setLastTime(new Date(System.currentTimeMillis())); catalogData.setLastTime(Instant.now());
data.put(device.getDeviceId(), catalogData); data.put(device.getDeviceId(), catalogData);
} }
} }
@ -48,7 +44,7 @@ public class CatalogDataCatch {
catalogData.setDevice(device); catalogData.setDevice(device);
catalogData.setChannelList(Collections.synchronizedList(new ArrayList<>())); catalogData.setChannelList(Collections.synchronizedList(new ArrayList<>()));
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
catalogData.setLastTime(new Date(System.currentTimeMillis())); catalogData.setLastTime(Instant.now());
data.put(deviceId, catalogData); data.put(deviceId, catalogData);
}else { }else {
// 同一个设备的通道同步请求只考虑一个其他的直接忽略 // 同一个设备的通道同步请求只考虑一个其他的直接忽略
@ -59,7 +55,7 @@ public class CatalogDataCatch {
catalogData.setDevice(device); catalogData.setDevice(device);
catalogData.setStatus(CatalogData.CatalogDataStatus.runIng); catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
catalogData.getChannelList().addAll(deviceChannelList); catalogData.getChannelList().addAll(deviceChannelList);
catalogData.setLastTime(new Date(System.currentTimeMillis())); catalogData.setLastTime(Instant.now());
} }
} }
@ -102,16 +98,13 @@ public class CatalogDataCatch {
@Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 @Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
private void timerTask(){ private void timerTask(){
Set<String> keys = data.keySet(); Set<String> keys = data.keySet();
Calendar calendarBefore5S = Calendar.getInstance();
calendarBefore5S.setTime(new Date());
calendarBefore5S.set(Calendar.SECOND, calendarBefore5S.get(Calendar.SECOND) - 5);
Calendar calendarBefore30S = Calendar.getInstance(); Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5));
calendarBefore30S.setTime(new Date()); Instant instantBefore30S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(30));
calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30);
for (String deviceId : keys) { for (String deviceId : keys) {
CatalogData catalogData = data.get(deviceId); CatalogData catalogData = data.get(deviceId);
if ( catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时 只更新这一部分数据 if ( catalogData.getLastTime().isBefore(instantBefore5S)) { // 超过五秒收不到消息任务超时 只更新这一部分数据
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) { if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList()); storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
if (catalogData.getTotal() != catalogData.getChannelList().size()) { if (catalogData.getTotal() != catalogData.getChannelList().size()) {
@ -124,7 +117,7 @@ public class CatalogDataCatch {
} }
catalogData.setStatus(CatalogData.CatalogDataStatus.end); catalogData.setStatus(CatalogData.CatalogDataStatus.end);
} }
if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒如果标记为end则删除 if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().isBefore(instantBefore30S)) { // 超过三十秒如果标记为end则删除
data.remove(deviceId); data.remove(deviceId);
} }
} }

View File

@ -0,0 +1,91 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* @author lin
*/
@Component
public class RecordDataCatch {
public static Map<String, RecordInfo> data = new ConcurrentHashMap<>();
@Autowired
private DeferredResultHolder deferredResultHolder;
public int put(String deviceId, String sn, int sumNum, List<RecordItem> recordItems) {
String key = deviceId + sn;
RecordInfo recordInfo = data.get(key);
if (recordInfo == null) {
recordInfo = new RecordInfo();
recordInfo.setDeviceId(deviceId);
recordInfo.setSn(sn.trim());
recordInfo.setSumNum(sumNum);
recordInfo.setRecordList(Collections.synchronizedList(new ArrayList<>()));
recordInfo.setLastTime(Instant.now());
recordInfo.getRecordList().addAll(recordItems);
data.put(key, recordInfo);
}else {
// 同一个设备的通道同步请求只考虑一个其他的直接忽略
if (!Objects.equals(sn.trim(), recordInfo.getSn())) {
return 0;
}
recordInfo.getRecordList().addAll(recordItems);
recordInfo.setLastTime(Instant.now());
}
return recordInfo.getRecordList().size();
}
@Scheduled(fixedRate = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
private void timerTask(){
Set<String> keys = data.keySet();
// 获取五秒前的时刻
Instant instantBefore5S = Instant.now().minusMillis(TimeUnit.SECONDS.toMillis(5));
for (String key : keys) {
RecordInfo recordInfo = data.get(key);
// 超过五秒收不到消息任务超时 只更新这一部分数据
if ( recordInfo.getLastTime().isBefore(instantBefore5S)) {
// 处理录像数据 返回给前端
String msgKey = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn();
WVPResult<RecordInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(0);
wvpResult.setMsg("success");
// 对数据进行排序
Collections.sort(recordInfo.getRecordList());
wvpResult.setData(recordInfo);
RequestMessage msg = new RequestMessage();
msg.setKey(msgKey);
msg.setData(wvpResult);
deferredResultHolder.invokeAllResult(msg);
data.remove(key);
}
}
}
public boolean isComplete(String deviceId, String sn) {
RecordInfo recordInfo = data.get(deviceId + sn);
return recordInfo != null && recordInfo.getRecordList().size() == recordInfo.getSumNum();
}
public RecordInfo getRecordInfo(String deviceId, String sn) {
return data.get(deviceId + sn);
}
public void remove(String deviceId, String sn) {
data.remove(deviceId + sn);
}
}

View File

@ -76,8 +76,8 @@ public class VideoStreamSessionManager {
} }
public ClientTransaction getTransactionByStream(String deviceId, String channelId, String stream){ public ClientTransaction getTransaction(String deviceId, String channelId, String stream, String callId){
SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, null, stream); SsrcTransaction ssrcTransaction = getSsrcTransaction(deviceId, channelId, callId, stream);
if (ssrcTransaction == null) { if (ssrcTransaction == null) {
return null; return null;
} }

View File

@ -1,76 +0,0 @@
package com.genersoft.iot.vmp.gb28181.transmit.callback;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.RecordInfoResponseMessageHandler;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("unchecked")
public class CheckForAllRecordsThread extends Thread {
private String key;
private RecordInfo recordInfo;
private RedisUtil redis;
private Logger logger;
private DeferredResultHolder deferredResultHolder;
public CheckForAllRecordsThread(String key, RecordInfo recordInfo) {
this.key = key;
this.recordInfo = recordInfo;
}
@Override
public void run() {
String cacheKey = this.key;
for (long stop = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); stop > System.nanoTime();) {
List<Object> cacheKeys = redis.scan(cacheKey + "_*");
List<RecordItem> totalRecordList = new ArrayList<RecordItem>();
for (int i = 0; i < cacheKeys.size(); i++) {
totalRecordList.addAll((List<RecordItem>) redis.get(cacheKeys.get(i).toString()));
}
if (totalRecordList.size() < this.recordInfo.getSumNum()) {
logger.info("已获取" + totalRecordList.size() + "项录像数据,共" + this.recordInfo.getSumNum() + "");
} else {
logger.info("录像数据已全部获取,共 {} 项", this.recordInfo.getSumNum());
this.recordInfo.setRecordList(totalRecordList);
for (int i = 0; i < cacheKeys.size(); i++) {
redis.del(cacheKeys.get(i).toString());
}
break;
}
}
// 自然顺序排序, 元素进行升序排列
this.recordInfo.getRecordList().sort(Comparator.naturalOrder());
RequestMessage msg = new RequestMessage();
msg.setKey(DeferredResultHolder.CALLBACK_CMD_RECORDINFO + recordInfo.getDeviceId() + recordInfo.getSn());
msg.setData(recordInfo);
deferredResultHolder.invokeAllResult(msg);
logger.info("处理完成,返回结果");
RecordInfoResponseMessageHandler.threadNameList.remove(cacheKey);
}
public void setRedis(RedisUtil redis) {
this.redis = redis;
}
public void setDeferredResultHolder(DeferredResultHolder deferredResultHolder) {
this.deferredResultHolder = deferredResultHolder;
}
public void setLogger(Logger logger) {
this.logger = logger;
}
}

View File

@ -370,7 +370,7 @@ public class SIPCommander implements ISIPCommander {
// //
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n"); content.append("v=0\r\n");
content.append("o="+ sipConfig.getId()+" 0 0 IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n"); content.append("o="+ channelId+" 0 0 IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n");
content.append("s=Play\r\n"); content.append("s=Play\r\n");
content.append("c=IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n"); content.append("c=IN IP4 "+ mediaServerItem.getSdpIp() +"\r\n");
content.append("t=0 0\r\n"); content.append("t=0 0\r\n");
@ -389,8 +389,7 @@ public class SIPCommander implements ISIPCommander {
content.append("a=rtpmap:126 H264/90000\r\n"); content.append("a=rtpmap:126 H264/90000\r\n");
content.append("a=rtpmap:125 H264S/90000\r\n"); content.append("a=rtpmap:125 H264S/90000\r\n");
content.append("a=fmtp:125 profile-level-id=42e01e\r\n"); content.append("a=fmtp:125 profile-level-id=42e01e\r\n");
content.append("a=rtpmap:99 MP4V-ES/90000\r\n"); content.append("a=rtpmap:99 H265/90000\r\n");
content.append("a=fmtp:99 profile-level-id=3\r\n");
content.append("a=rtpmap:98 H264/90000\r\n"); content.append("a=rtpmap:98 H264/90000\r\n");
content.append("a=rtpmap:97 MPEG4/90000\r\n"); content.append("a=rtpmap:97 MPEG4/90000\r\n");
if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式 if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式
@ -402,16 +401,17 @@ public class SIPCommander implements ISIPCommander {
} }
}else { }else {
if("TCP-PASSIVE".equals(streamMode)) { if("TCP-PASSIVE".equals(streamMode)) {
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n"); content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
}else if ("TCP-ACTIVE".equals(streamMode)) { }else if ("TCP-ACTIVE".equals(streamMode)) {
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n"); content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
}else if("UDP".equals(streamMode)) { }else if("UDP".equals(streamMode)) {
content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 98 97\r\n"); content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 97 98 99\r\n");
} }
content.append("a=recvonly\r\n"); content.append("a=recvonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n"); content.append("a=rtpmap:96 PS/90000\r\n");
content.append("a=rtpmap:98 H264/90000\r\n"); content.append("a=rtpmap:98 H264/90000\r\n");
content.append("a=rtpmap:97 MPEG4/90000\r\n"); content.append("a=rtpmap:97 MPEG4/90000\r\n");
content.append("a=rtpmap:99 H265/90000\r\n");
if ("TCP-PASSIVE".equals(streamMode)) { // tcp被动模式 if ("TCP-PASSIVE".equals(streamMode)) { // tcp被动模式
content.append("a=setup:passive\r\n"); content.append("a=setup:passive\r\n");
content.append("a=connection:new\r\n"); content.append("a=connection:new\r\n");
@ -467,7 +467,7 @@ public class SIPCommander implements ISIPCommander {
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n"); content.append("v=0\r\n");
content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("o="+channelId+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
content.append("s=Playback\r\n"); content.append("s=Playback\r\n");
content.append("u="+channelId+":0\r\n"); content.append("u="+channelId+":0\r\n");
content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
@ -490,8 +490,7 @@ public class SIPCommander implements ISIPCommander {
content.append("a=rtpmap:126 H264/90000\r\n"); content.append("a=rtpmap:126 H264/90000\r\n");
content.append("a=rtpmap:125 H264S/90000\r\n"); content.append("a=rtpmap:125 H264S/90000\r\n");
content.append("a=fmtp:125 profile-level-id=42e01e\r\n"); content.append("a=fmtp:125 profile-level-id=42e01e\r\n");
content.append("a=rtpmap:99 MP4V-ES/90000\r\n"); content.append("a=rtpmap:99 H265/90000\r\n");
content.append("a=fmtp:99 profile-level-id=3\r\n");
content.append("a=rtpmap:98 H264/90000\r\n"); content.append("a=rtpmap:98 H264/90000\r\n");
content.append("a=rtpmap:97 MPEG4/90000\r\n"); content.append("a=rtpmap:97 MPEG4/90000\r\n");
if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式 if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式
@ -503,16 +502,17 @@ public class SIPCommander implements ISIPCommander {
} }
}else { }else {
if("TCP-PASSIVE".equals(streamMode)) { if("TCP-PASSIVE".equals(streamMode)) {
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n"); content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
}else if ("TCP-ACTIVE".equals(streamMode)) { }else if ("TCP-ACTIVE".equals(streamMode)) {
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n"); content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
}else if("UDP".equals(streamMode)) { }else if("UDP".equals(streamMode)) {
content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 98 97\r\n"); content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 97 98 99\r\n");
} }
content.append("a=recvonly\r\n"); content.append("a=recvonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n"); content.append("a=rtpmap:96 PS/90000\r\n");
content.append("a=rtpmap:98 H264/90000\r\n");
content.append("a=rtpmap:97 MPEG4/90000\r\n"); content.append("a=rtpmap:97 MPEG4/90000\r\n");
content.append("a=rtpmap:98 H264/90000\r\n");
content.append("a=rtpmap:99 H265/90000\r\n");
if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式 if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式
content.append("a=setup:passive\r\n"); content.append("a=setup:passive\r\n");
content.append("a=connection:new\r\n"); content.append("a=connection:new\r\n");
@ -577,7 +577,7 @@ public class SIPCommander implements ISIPCommander {
StringBuffer content = new StringBuffer(200); StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n"); content.append("v=0\r\n");
content.append("o="+sipConfig.getId()+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n"); content.append("o="+channelId+" 0 0 IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
content.append("s=Download\r\n"); content.append("s=Download\r\n");
content.append("u="+channelId+":0\r\n"); content.append("u="+channelId+":0\r\n");
content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n"); content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
@ -613,16 +613,17 @@ public class SIPCommander implements ISIPCommander {
} }
}else { }else {
if("TCP-PASSIVE".equals(streamMode)) { if("TCP-PASSIVE".equals(streamMode)) {
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n"); content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
}else if ("TCP-ACTIVE".equals(streamMode)) { }else if ("TCP-ACTIVE".equals(streamMode)) {
content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 98 97\r\n"); content.append("m=video "+ ssrcInfo.getPort() +" TCP/RTP/AVP 96 97 98 99\r\n");
}else if("UDP".equals(streamMode)) { }else if("UDP".equals(streamMode)) {
content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 98 97\r\n"); content.append("m=video "+ ssrcInfo.getPort() +" RTP/AVP 96 97 98 99\r\n");
} }
content.append("a=recvonly\r\n"); content.append("a=recvonly\r\n");
content.append("a=rtpmap:96 PS/90000\r\n"); content.append("a=rtpmap:96 PS/90000\r\n");
content.append("a=rtpmap:98 H264/90000\r\n");
content.append("a=rtpmap:97 MPEG4/90000\r\n"); content.append("a=rtpmap:97 MPEG4/90000\r\n");
content.append("a=rtpmap:98 H264/90000\r\n");
content.append("a=rtpmap:99 H265/90000\r\n");
if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式 if("TCP-PASSIVE".equals(streamMode)){ // tcp被动模式
content.append("a=setup:passive\r\n"); content.append("a=setup:passive\r\n");
content.append("a=connection:new\r\n"); content.append("a=connection:new\r\n");
@ -651,6 +652,17 @@ public class SIPCommander implements ISIPCommander {
(MediaServerItem mediaServerItemInUse, JSONObject json)->{ (MediaServerItem mediaServerItemInUse, JSONObject json)->{
hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream())); hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey); subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
subscribeKey.put("regist", false);
subscribeKey.put("schema", "rtmp");
// 添加流注销的订阅注销了后向设备发送bye
subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
(MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{
ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId());
if (transaction != null) {
logger.info("[录像]下载结束, 发送BYE");
streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId());
}
});
}); });
Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc()); Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
@ -683,10 +695,10 @@ public class SIPCommander implements ISIPCommander {
@Override @Override
public void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent) { public void streamByeCmd(String deviceId, String channelId, String stream, String callId, SipSubscribe.Event okEvent) {
try { try {
SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, null, stream); SsrcTransaction ssrcTransaction = streamSession.getSsrcTransaction(deviceId, channelId, callId, stream);
ClientTransaction transaction = streamSession.getTransactionByStream(deviceId, channelId, stream); ClientTransaction transaction = streamSession.getTransaction(deviceId, channelId, stream, callId);
if (transaction == null) { if (transaction == null ) {
logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId); logger.warn("[ {} -> {}]停止视频流的时候发现事务已丢失", deviceId, channelId);
SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>(); SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
if (okEvent != null) { if (okEvent != null) {
@ -1663,6 +1675,7 @@ public class SIPCommander implements ISIPCommander {
sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (eventResult -> { sipSubscribe.addErrorSubscribe(callIdHeader.getCallId(), (eventResult -> {
errorEvent.response(eventResult); errorEvent.response(eventResult);
sipSubscribe.removeErrorSubscribe(eventResult.callId); sipSubscribe.removeErrorSubscribe(eventResult.callId);
sipSubscribe.removeOkSubscribe(eventResult.callId);
})); }));
} }
// 添加订阅 // 添加订阅
@ -1670,6 +1683,7 @@ public class SIPCommander implements ISIPCommander {
sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), eventResult ->{ sipSubscribe.addOkSubscribe(callIdHeader.getCallId(), eventResult ->{
okEvent.response(eventResult); okEvent.response(eventResult);
sipSubscribe.removeOkSubscribe(eventResult.callId); sipSubscribe.removeOkSubscribe(eventResult.callId);
sipSubscribe.removeErrorSubscribe(eventResult.callId);
}); });
} }

View File

@ -40,7 +40,7 @@ import javax.sip.header.CallIdHeader;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
import java.util.Date; import java.time.Instant;
import java.util.Vector; import java.util.Vector;
/** /**
@ -180,16 +180,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
Long startTime = null; Long startTime = null;
Long stopTime = null; Long stopTime = null;
Date start = null; Instant start = null;
Date end = null; Instant end = null;
if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) { if (sdp.getTimeDescriptions(false) != null && sdp.getTimeDescriptions(false).size() > 0) {
TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0)); TimeDescriptionImpl timeDescription = (TimeDescriptionImpl)(sdp.getTimeDescriptions(false).get(0));
TimeField startTimeFiled = (TimeField)timeDescription.getTime(); TimeField startTimeFiled = (TimeField)timeDescription.getTime();
startTime = startTimeFiled.getStartTime(); startTime = startTimeFiled.getStartTime();
stopTime = startTimeFiled.getStopTime(); stopTime = startTimeFiled.getStopTime();
start = new Date(startTime*1000); start = Instant.ofEpochMilli(startTime*1000);
end = new Date(stopTime*1000); end = Instant.ofEpochMilli(stopTime*1000);
} }
// 获取支持的格式 // 获取支持的格式
Vector mediaDescriptions = sdp.getMediaDescriptions(true); Vector mediaDescriptions = sdp.getMediaDescriptions(true);
@ -331,12 +331,12 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
sendRtpItem.setApp("rtp"); sendRtpItem.setApp("rtp");
if ("Playback".equals(sessionName)) { if ("Playback".equals(sessionName)) {
sendRtpItem.setPlayType(InviteStreamType.PLAYBACK); sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, true, true);
sendRtpItem.setStreamId(ssrcInfo.getStream()); sendRtpItem.setStreamId(ssrcInfo.getStream());
// 写入redis 超时时回复 // 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);
playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.format.format(start), playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
DateUtil.format.format(end), null, result -> { DateUtil.formatter.format(end), null, result -> {
if (result.getCode() != 0){ if (result.getCode() != 0){
logger.warn("录像回放失败"); logger.warn("录像回放失败");
if (result.getEvent() != null) { if (result.getEvent() != null) {
@ -372,7 +372,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
if (mediaServerItem.isRtpEnable()) { if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId); streamId = String.format("%s_%s", device.getDeviceId(), channelId);
} }
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, true); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, true, false);
sendRtpItem.setStreamId(ssrcInfo.getStream()); sendRtpItem.setStreamId(ssrcInfo.getStream());
// 写入redis 超时时回复 // 写入redis 超时时回复
redisCatchStorage.updateSendRTPSever(sendRtpItem); redisCatchStorage.updateSendRTPSever(sendRtpItem);

View File

@ -81,7 +81,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
try { try {
RequestEventExt evtExt = (RequestEventExt) evt; RequestEventExt evtExt = (RequestEventExt) evt;
String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort(); String requestAddress = evtExt.getRemoteIpAddress() + ":" + evtExt.getRemotePort();
logger.info("[{}] 收到注册请求,开始处理", requestAddress); logger.info("[注册请求] 开始处理: {}", requestAddress);
Request request = evt.getRequest(); Request request = evt.getRequest();
ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME); ExpiresHeader expiresHeader = (ExpiresHeader) request.getHeader(Expires.NAME);
Response response = null; Response response = null;
@ -95,7 +95,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME); AuthorizationHeader authHead = (AuthorizationHeader) request.getHeader(AuthorizationHeader.NAME);
if (authHead == null) { if (authHead == null) {
logger.info("[{}] 未携带授权头 回复401", requestAddress); logger.info("[注册请求] 未携带授权头 回复401: {}", requestAddress);
response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request); response = getMessageFactory().createResponse(Response.UNAUTHORIZED, request);
new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain()); new DigestServerAuthenticationHelper().generateChallenge(getHeaderFactory(), response, sipConfig.getDomain());
sendResponse(evt, response); sendResponse(evt, response);
@ -111,7 +111,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
// 注册失败 // 注册失败
response = getMessageFactory().createResponse(Response.FORBIDDEN, request); response = getMessageFactory().createResponse(Response.FORBIDDEN, request);
response.setReasonPhrase("wrong password"); response.setReasonPhrase("wrong password");
logger.info("[{}] 密码/SIP服务器ID错误, 回复403", requestAddress); logger.info("[注册请求] 密码/SIP服务器ID错误, 回复403: {}", requestAddress);
sendResponse(evt, response); sendResponse(evt, response);
return; return;
} }
@ -176,11 +176,11 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
// 注册成功 // 注册成功
// 保存到redis // 保存到redis
if (registerFlag) { if (registerFlag) {
logger.info("[{}] 注册成功! deviceId:" + deviceId, requestAddress); logger.info("[注册成功] deviceId: {}->{}", deviceId, requestAddress);
device.setRegisterTime(DateUtil.getNow()); device.setRegisterTime(DateUtil.getNow());
deviceService.online(device); deviceService.online(device);
} else { } else {
logger.info("[{}] 注销成功! deviceId:" + deviceId, requestAddress); logger.info("[注销成功] deviceId: {}->{}" ,deviceId, requestAddress);
deviceService.offline(deviceId); deviceService.offline(deviceId);
} }
} catch (SipException | InvalidArgumentException | NoSuchAlgorithmException | ParseException e) { } catch (SipException | InvalidArgumentException | NoSuchAlgorithmException | ParseException e) {
@ -192,7 +192,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
private void sendResponse(RequestEvent evt, Response response) throws InvalidArgumentException, SipException { private void sendResponse(RequestEvent evt, Response response) throws InvalidArgumentException, SipException {
ServerTransaction serverTransaction = getServerTransaction(evt); ServerTransaction serverTransaction = getServerTransaction(evt);
if (serverTransaction == null) { if (serverTransaction == null) {
logger.warn("回复失败:{}", response); logger.warn("[回复失败]{}", response);
return; return;
} }
serverTransaction.sendResponse(response); serverTransaction.sendResponse(response);

View File

@ -24,8 +24,6 @@ import javax.sip.SipException;
import javax.sip.header.ViaHeader; import javax.sip.header.ViaHeader;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
@Component @Component
public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {

View File

@ -60,10 +60,9 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME); CallIdHeader callIdHeader = (CallIdHeader)evt.getRequest().getHeader(CallIdHeader.NAME);
String NotifyType =getText(rootElement, "NotifyType"); String NotifyType =getText(rootElement, "NotifyType");
if (NotifyType.equals("121")){ if (NotifyType.equals("121")){
logger.info("媒体播放完毕,通知关流"); logger.info("[录像流]推送完毕,收到关流通知");
String channelId =getText(rootElement, "DeviceID"); String channelId =getText(rootElement, "DeviceID");
// redisCatchStorage.stopPlayback(device.getDeviceId(), channelId, null, callIdHeader.getCallId()); // 查询是设备
// redisCatchStorage.stopDownload(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
StreamInfo streamInfo = redisCatchStorage.queryDownload(device.getDeviceId(), channelId, null, callIdHeader.getCallId()); StreamInfo streamInfo = redisCatchStorage.queryDownload(device.getDeviceId(), channelId, null, callIdHeader.getCallId());
// 设置进度100% // 设置进度100%
streamInfo.setProgress(1); streamInfo.setProgress(1);

View File

@ -5,14 +5,14 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
import com.genersoft.iot.vmp.gb28181.bean.RecordItem; import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.callback.CheckForAllRecordsThread; import com.genersoft.iot.vmp.gb28181.session.RecordDataCatch;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import org.dom4j.DocumentException; import org.dom4j.DocumentException;
import org.dom4j.Element; import org.dom4j.Element;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -20,19 +20,20 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.sip.InvalidArgumentException; import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent; import javax.sip.RequestEvent;
import javax.sip.SipException; import javax.sip.SipException;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.*;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
* @author lin
*/
@Component @Component
public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
@ -45,11 +46,13 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
private ResponseMessageHandler responseMessageHandler; private ResponseMessageHandler responseMessageHandler;
@Autowired @Autowired
private RedisUtil redis; private RecordDataCatch recordDataCatch;
@Autowired @Autowired
private DeferredResultHolder deferredResultHolder; private DeferredResultHolder deferredResultHolder;
@Autowired @Autowired
private EventPublisher eventPublisher; private EventPublisher eventPublisher;
@ -66,32 +69,22 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
responseAck(evt, Response.OK); responseAck(evt, Response.OK);
rootElement = getRootElement(evt, device.getCharset()); rootElement = getRootElement(evt, device.getCharset());
String uuid = UUID.randomUUID().toString().replace("-", "");
RecordInfo recordInfo = new RecordInfo();
String sn = getText(rootElement, "SN"); String sn = getText(rootElement, "SN");
String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + device.getDeviceId() + sn;
recordInfo.setDeviceId(device.getDeviceId()); String sumNumStr = getText(rootElement, "SumNum");
recordInfo.setSn(sn); int sumNum = 0;
recordInfo.setName(getText(rootElement, "Name")); if (!StringUtils.isEmpty(sumNumStr)) {
if (getText(rootElement, "SumNum") == null || getText(rootElement, "SumNum") == "") { sumNum = Integer.parseInt(sumNumStr);
recordInfo.setSumNum(0);
} else {
recordInfo.setSumNum(Integer.parseInt(getText(rootElement, "SumNum")));
} }
Element recordListElement = rootElement.element("RecordList"); Element recordListElement = rootElement.element("RecordList");
if (recordListElement == null || recordInfo.getSumNum() == 0) { if (recordListElement == null || sumNum == 0) {
logger.info("无录像数据"); logger.info("无录像数据");
eventPublisher.recordEndEventPush(recordInfo); recordDataCatch.put(device.getDeviceId(), sn, sumNum, new ArrayList<>());
RequestMessage msg = new RequestMessage(); releaseRequest(device.getDeviceId(), sn);
msg.setKey(key);
msg.setData(recordInfo);
deferredResultHolder.invokeAllResult(msg);
} else { } else {
Iterator<Element> recordListIterator = recordListElement.elementIterator(); Iterator<Element> recordListIterator = recordListElement.elementIterator();
List<RecordItem> recordList = new ArrayList<RecordItem>();
if (recordListIterator != null) { if (recordListIterator != null) {
RecordItem record = new RecordItem(); List<RecordItem> recordList = new ArrayList<>();
logger.info("处理录像列表数据...");
// 遍历DeviceList // 遍历DeviceList
while (recordListIterator.hasNext()) { while (recordListIterator.hasNext()) {
Element itemRecord = recordListIterator.next(); Element itemRecord = recordListIterator.next();
@ -100,43 +93,31 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
logger.info("记录为空,下一个..."); logger.info("记录为空,下一个...");
continue; continue;
} }
record = new RecordItem(); RecordItem record = new RecordItem();
record.setDeviceId(getText(itemRecord, "DeviceID")); record.setDeviceId(getText(itemRecord, "DeviceID"));
record.setName(getText(itemRecord, "Name")); record.setName(getText(itemRecord, "Name"));
record.setFilePath(getText(itemRecord, "FilePath")); record.setFilePath(getText(itemRecord, "FilePath"));
record.setFileSize(getText(itemRecord, "FileSize")); record.setFileSize(getText(itemRecord, "FileSize"));
record.setAddress(getText(itemRecord, "Address")); record.setAddress(getText(itemRecord, "Address"));
record.setStartTime(
DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "StartTime"))); String startTimeStr = getText(itemRecord, "StartTime");
record.setEndTime( record.setStartTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(startTimeStr));
DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(getText(itemRecord, "EndTime")));
String endTimeStr = getText(itemRecord, "EndTime");
record.setEndTime(DateUtil.ISO8601Toyyyy_MM_dd_HH_mm_ss(endTimeStr));
record.setSecrecy(itemRecord.element("Secrecy") == null ? 0 record.setSecrecy(itemRecord.element("Secrecy") == null ? 0
: Integer.parseInt(getText(itemRecord, "Secrecy"))); : Integer.parseInt(getText(itemRecord, "Secrecy")));
record.setType(getText(itemRecord, "Type")); record.setType(getText(itemRecord, "Type"));
record.setRecorderId(getText(itemRecord, "RecorderID")); record.setRecorderId(getText(itemRecord, "RecorderID"));
recordList.add(record); recordList.add(record);
} }
recordInfo.setRecordList(recordList); int count = recordDataCatch.put(device.getDeviceId(), sn, sumNum, recordList);
logger.info("[国标录像] {}->{}: {}/{}", device.getDeviceId(), sn, count, sumNum);
} }
eventPublisher.recordEndEventPush(recordInfo);
// 改用单独线程统计已获取录像文件数量避免多包并行分别统计不完整的问题 if (recordDataCatch.isComplete(device.getDeviceId(), sn)){
String cacheKey = CACHE_RECORDINFO_KEY + device.getDeviceId() + sn; releaseRequest(device.getDeviceId(), sn);
redis.set(cacheKey + "_" + uuid, recordList, 90);
if (!threadNameList.contains(cacheKey)) {
threadNameList.add(cacheKey);
CheckForAllRecordsThread chk = new CheckForAllRecordsThread(cacheKey, recordInfo);
chk.setName(cacheKey);
chk.setDeferredResultHolder(deferredResultHolder);
chk.setRedis(redis);
chk.setLogger(logger);
chk.start();
if (logger.isDebugEnabled()) {
logger.debug("Start Thread " + cacheKey + ".");
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Thread " + cacheKey + " already started.");
}
} }
} }
} catch (SipException e) { } catch (SipException e) {
@ -154,4 +135,20 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
public void handForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element element) { public void handForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element element) {
} }
public void releaseRequest(String deviceId, String sn){
String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn;
WVPResult<RecordInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(0);
wvpResult.setMsg("success");
// 对数据进行排序
Collections.sort(recordDataCatch.getRecordInfo(deviceId, sn).getRecordList());
wvpResult.setData(recordDataCatch.getRecordInfo(deviceId, sn));
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(wvpResult);
deferredResultHolder.invokeAllResult(msg);
recordDataCatch.remove(deviceId, sn);
}
} }

View File

@ -89,7 +89,7 @@ public class ZLMRunner implements CommandLineRunner {
}); });
// 获取zlm信息 // 获取zlm信息
logger.info("[zlm接入]等待默认zlm中..."); logger.info("[zlm] 等待默认zlm中...");
// 获取所有的zlm 并开启主动连接 // 获取所有的zlm 并开启主动连接
List<MediaServerItem> all = mediaServerService.getAllFromDatabase(); List<MediaServerItem> all = mediaServerService.getAllFromDatabase();

View File

@ -39,8 +39,7 @@ public class ZLMStatusEventListener {
@Async @Async
@EventListener @EventListener
public void onApplicationEvent(ZLMOnlineEvent event) { public void onApplicationEvent(ZLMOnlineEvent event) {
logger.info("[ZLM] 上线 ID" + event.getMediaServerId());
logger.info("【ZLM上线】ID" + event.getMediaServerId());
streamPushService.zlmServerOnline(event.getMediaServerId()); streamPushService.zlmServerOnline(event.getMediaServerId());
streamProxyService.zlmServerOnline(event.getMediaServerId()); streamProxyService.zlmServerOnline(event.getMediaServerId());
@ -50,7 +49,7 @@ public class ZLMStatusEventListener {
@EventListener @EventListener
public void onApplicationEvent(ZLMOfflineEvent event) { public void onApplicationEvent(ZLMOfflineEvent event) {
logger.info("ZLM离线事件触发ID" + event.getMediaServerId()); logger.info("[ZLM] 离线ID" + event.getMediaServerId());
// 处理ZLM离线 // 处理ZLM离线
mediaServerService.zlmServerOffline(event.getMediaServerId()); mediaServerService.zlmServerOffline(event.getMediaServerId());
streamProxyService.zlmServerOffline(event.getMediaServerId()); streamProxyService.zlmServerOffline(event.getMediaServerId());

View File

@ -44,7 +44,7 @@ public interface IMediaServerService {
void updateVmServer(List<MediaServerItem> mediaServerItemList); void updateVmServer(List<MediaServerItem> mediaServerItemList);
SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean ssrcCheck); SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean ssrcCheck, boolean isPlayback);
SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback); SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback);

View File

@ -11,24 +11,18 @@ import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask; import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
import com.genersoft.iot.vmp.gb28181.bean.SyncStatus; import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IMediaService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.DeviceMapper; import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.DateUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import javax.sip.DialogState; import java.time.Instant;
import javax.sip.TimeoutEvent;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit;
/** /**
* 设备业务目录订阅 * 设备业务目录订阅
@ -66,7 +60,7 @@ public class DeviceServiceImpl implements IDeviceService {
@Override @Override
public void online(Device device) { public void online(Device device) {
logger.info("[设备上线]deviceId" + device.getDeviceId()); logger.info("[设备上线] deviceId{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId()); Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId());
Device deviceInDb = deviceMapper.getDeviceByDeviceId(device.getDeviceId()); Device deviceInDb = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
@ -101,9 +95,7 @@ public class DeviceServiceImpl implements IDeviceService {
// 刷新过期任务 // 刷新过期任务
String registerExpireTaskKey = registerExpireTaskKeyPrefix + device.getDeviceId(); String registerExpireTaskKey = registerExpireTaskKeyPrefix + device.getDeviceId();
dynamicTask.stop(registerExpireTaskKey); dynamicTask.stop(registerExpireTaskKey);
dynamicTask.startDelay(registerExpireTaskKey, ()->{ dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId()), device.getExpires() * 1000);
offline(device.getDeviceId());
}, device.getExpires() * 1000);
} }
@Override @Override
@ -217,18 +209,9 @@ public class DeviceServiceImpl implements IDeviceService {
@Override @Override
public boolean expire(Device device) { public boolean expire(Device device) {
Date registerTimeDate; Instant registerTimeDate = Instant.from(DateUtil.formatter.parse(device.getRegisterTime()));
try { Instant expireInstant = registerTimeDate.plusMillis(TimeUnit.SECONDS.toMillis(device.getExpires()));
registerTimeDate = DateUtil.format.parse(device.getRegisterTime()); return expireInstant.isBefore(Instant.now());
} catch (ParseException e) {
logger.error("设备时间格式化失败:{}->{} ", device.getDeviceId(), device.getRegisterTime() );
return false;
}
int expires = device.getExpires();
Calendar calendarForExpire = Calendar.getInstance();
calendarForExpire.setTime(registerTimeDate);
calendarForExpire.set(Calendar.SECOND, calendarForExpire.get(Calendar.SECOND) + expires);
return calendarForExpire.before(DateUtil.getNow());
} }
@Override @Override

View File

@ -95,7 +95,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
*/ */
@Override @Override
public void updateVmServer(List<MediaServerItem> mediaServerItemList) { public void updateVmServer(List<MediaServerItem> mediaServerItemList) {
logger.info("[缓存初始化] Media Server "); logger.info("[zlm] 缓存初始化 ");
for (MediaServerItem mediaServerItem : mediaServerItemList) { for (MediaServerItem mediaServerItem : mediaServerItemList) {
if (StringUtils.isEmpty(mediaServerItem.getId())) { if (StringUtils.isEmpty(mediaServerItem.getId())) {
continue; continue;
@ -116,8 +116,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
} }
@Override @Override
public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean ssrcCheck) { public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, boolean ssrcCheck, boolean isPlayback) {
return openRTPServer(mediaServerItem, streamId, null, ssrcCheck,false); return openRTPServer(mediaServerItem, streamId, null, ssrcCheck,isPlayback);
} }
@Override @Override
@ -352,7 +352,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
*/ */
@Override @Override
public void zlmServerOnline(ZLMServerConfig zlmServerConfig) { public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
logger.info("[ ZLM{} ]-[ {}:{} ]正在连接", logger.info("[ZLM] 正在连接 : {} -> {}:{}",
zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId()); MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId());
@ -405,7 +405,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable())); setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable()));
} }
publisher.zlmOnlineEventPublish(serverItem.getId()); publisher.zlmOnlineEventPublish(serverItem.getId());
logger.info("[ ZLM{} ]-[ {}:{} ]连接成功", logger.info("[ZLM] 连接成功 {} - {}:{} ",
zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort()); zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
} }
@ -483,7 +483,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
*/ */
@Override @Override
public void setZLMConfig(MediaServerItem mediaServerItem, boolean restart) { public void setZLMConfig(MediaServerItem mediaServerItem, boolean restart) {
logger.info("[ ZLM{} ]-[ {}:{} ]正在设置zlm", logger.info("[ZLM] 正在设置 {} -> {}:{}",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
String protocol = sslEnabled ? "https" : "http"; String protocol = sslEnabled ? "https" : "http";
String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort); String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
@ -527,17 +527,17 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (responseJSON != null && responseJSON.getInteger("code") == 0) { if (responseJSON != null && responseJSON.getInteger("code") == 0) {
if (restart) { if (restart) {
logger.info("[ ZLM{} ]-[ {}:{} ]设置zlm成功, 开始重启以保证配置生效", logger.info("[ZLM] 设置成功,开始重启以保证配置生效 {} -> {}:{}",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
zlmresTfulUtils.restartServer(mediaServerItem); zlmresTfulUtils.restartServer(mediaServerItem);
}else { }else {
logger.info("[ ZLM{} ]-[ {}:{} ]设置zlm成功", logger.info("[ZLM] 设置成功 {} -> {}:{}",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
} }
}else { }else {
logger.info("[ ZLM{} ]-[ {}:{} ]设置zlm失败", logger.info("[ZLM] 设置zlm失败 {} -> {}:{}",
mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort()); mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
} }

View File

@ -193,7 +193,7 @@ public class PlayServiceImpl implements IPlayService {
if (mediaServerItem.isRtpEnable()) { if (mediaServerItem.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channelId); streamId = String.format("%s_%s", device.getDeviceId(), channelId);
} }
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck()); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{ play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
if (hookEvent != null) { if (hookEvent != null) {
hookEvent.response(mediaServerItem, response); hookEvent.response(mediaServerItem, response);
@ -237,7 +237,7 @@ public class PlayServiceImpl implements IPlayService {
streamId = String.format("%s_%s", device.getDeviceId(), channelId); streamId = String.format("%s_%s", device.getDeviceId(), channelId);
} }
if (ssrcInfo == null) { if (ssrcInfo == null) {
ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck()); ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
} }
// 超时处理 // 超时处理
@ -360,7 +360,7 @@ public class PlayServiceImpl implements IPlayService {
return null; return null;
} }
MediaServerItem newMediaServerItem = getNewMediaServerItem(device); MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true);
return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback); return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
} }
@ -447,7 +447,7 @@ public class PlayServiceImpl implements IPlayService {
return null; return null;
} }
MediaServerItem newMediaServerItem = getNewMediaServerItem(device); MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true); SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true);
return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed,infoCallBack, hookCallBack); return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed,infoCallBack, hookCallBack);
} }

View File

@ -1,57 +1,58 @@
package com.genersoft.iot.vmp.utils; package com.genersoft.iot.vmp.utils;
import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Date; import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.time.temporal.TemporalAccessor;
import java.util.Locale; import java.util.Locale;
/** /**
* 全局时间工具类 * 全局时间工具类
* @author swwheihei * @author lin
*/ */
public class DateUtil { public class DateUtil {
private static final String yyyy_MM_dd_T_HH_mm_ss_SSSXXX = "yyyy-MM-dd'T'HH:mm:ss"; private static final String yyyy_MM_dd_T_HH_mm_ss_SSSXXX = "yyyy-MM-dd'T'HH:mm:ss";
private static final String yyyy_MM_dd_HH_mm_ss = "yyyy-MM-dd HH:mm:ss"; public static final String yyyy_MM_dd_HH_mm_ss = "yyyy-MM-dd HH:mm:ss";
public static final SimpleDateFormat formatISO8601 = new SimpleDateFormat(yyyy_MM_dd_T_HH_mm_ss_SSSXXX, Locale.getDefault()); public static final SimpleDateFormat formatISO8601 = new SimpleDateFormat(yyyy_MM_dd_T_HH_mm_ss_SSSXXX, Locale.getDefault());
public static final SimpleDateFormat format = new SimpleDateFormat(yyyy_MM_dd_HH_mm_ss, Locale.getDefault()); public static final SimpleDateFormat format = new SimpleDateFormat(yyyy_MM_dd_HH_mm_ss, Locale.getDefault());
public static String yyyy_MM_dd_HH_mm_ssToISO8601(String formatTime) { public static final DateTimeFormatter formatterISO8601 = DateTimeFormatter.ofPattern(yyyy_MM_dd_T_HH_mm_ss_SSSXXX, Locale.getDefault()).withZone(ZoneId.systemDefault());
public static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern(yyyy_MM_dd_HH_mm_ss, Locale.getDefault()).withZone(ZoneId.systemDefault());
try { public static String yyyy_MM_dd_HH_mm_ssToISO8601(String formatTime) {
return formatISO8601.format(format.parse(formatTime)); return formatterISO8601.format(formatter.parse(formatTime));
} catch (ParseException e) {
e.printStackTrace();
}
return "";
} }
public static String ISO8601Toyyyy_MM_dd_HH_mm_ss(String formatTime) { public static String ISO8601Toyyyy_MM_dd_HH_mm_ss(String formatTime) {
return formatter.format(formatterISO8601.parse(formatTime));
try {
return format.format(formatISO8601.parse(formatTime));
} catch (ParseException e) {
e.printStackTrace();
}
return "";
} }
public static long yyyy_MM_dd_HH_mm_ssToTimestamp(String formatTime) { public static long yyyy_MM_dd_HH_mm_ssToTimestamp(String formatTime) {
//设置要读取的时间字符串格式 TemporalAccessor temporalAccessor = formatter.parse(formatTime);
Date date; Instant instant = Instant.from(temporalAccessor);
try { return instant.getEpochSecond();
date = format.parse(formatTime);
Long timestamp=date.getTime()/1000;
//转换为Date类
return timestamp;
} catch (ParseException e) {
e.printStackTrace();
}
return 0;
} }
public static String getNow() { public static String getNow() {
return format.format(System.currentTimeMillis()); LocalDateTime nowDateTime = LocalDateTime.now();
return formatter.format(nowDateTime);
}
public static boolean verification(String timeStr, DateTimeFormatter dateTimeFormatter) {
try {
LocalDate.parse(timeStr, dateTimeFormatter);
return true;
}catch (DateTimeParseException exception) {
return false;
}
} }
} }

View File

@ -5,6 +5,8 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.service.IMediaServerService; import com.genersoft.iot.vmp.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IPlayService; import com.genersoft.iot.vmp.service.IPlayService;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiImplicitParams;
@ -27,6 +29,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
import com.genersoft.iot.vmp.storager.IVideoManagerStorage; import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
import java.time.LocalDate;
import java.util.UUID; import java.util.UUID;
@Api(tags = "国标录像") @Api(tags = "国标录像")
@ -60,15 +63,32 @@ public class GBRecordController {
@ApiImplicitParam(name = "endTime", value = "结束时间", dataTypeClass = String.class), @ApiImplicitParam(name = "endTime", value = "结束时间", dataTypeClass = String.class),
}) })
@GetMapping("/query/{deviceId}/{channelId}") @GetMapping("/query/{deviceId}/{channelId}")
public DeferredResult<ResponseEntity<RecordInfo>> recordinfo(@PathVariable String deviceId,@PathVariable String channelId, String startTime, String endTime){ public DeferredResult<ResponseEntity<WVPResult<RecordInfo>>> recordinfo(@PathVariable String deviceId, @PathVariable String channelId, String startTime, String endTime){
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug(String.format("录像信息查询 API调用deviceId%s startTime%s endTime%s",deviceId, startTime, endTime)); logger.debug(String.format("录像信息查询 API调用deviceId%s startTime%s endTime%s",deviceId, startTime, endTime));
} }
DeferredResult<ResponseEntity<WVPResult<RecordInfo>>> result = new DeferredResult<>();
if (!DateUtil.verification(startTime, DateUtil.formatter)){
WVPResult<RecordInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(-1);
wvpResult.setMsg("startTime error, format is " + DateUtil.yyyy_MM_dd_HH_mm_ss);
ResponseEntity<WVPResult<RecordInfo>> resultResponseEntity = new ResponseEntity<>(wvpResult, HttpStatus.OK);
result.setResult(resultResponseEntity);
return result;
}
if (!DateUtil.verification(endTime, DateUtil.formatter)){
WVPResult<RecordInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(-1);
wvpResult.setMsg("endTime error, format is " + DateUtil.yyyy_MM_dd_HH_mm_ss);
ResponseEntity<WVPResult<RecordInfo>> resultResponseEntity = new ResponseEntity<>(wvpResult, HttpStatus.OK);
result.setResult(resultResponseEntity);
return result;
}
Device device = storager.queryVideoDevice(deviceId); Device device = storager.queryVideoDevice(deviceId);
// 指定超时时间 1分钟30秒 // 指定超时时间 1分钟30秒
DeferredResult<ResponseEntity<RecordInfo>> result = new DeferredResult<>(90*1000L);
String uuid = UUID.randomUUID().toString(); String uuid = UUID.randomUUID().toString();
int sn = (int)((Math.random()*9+1)*100000); int sn = (int)((Math.random()*9+1)*100000);
String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn; String key = DeferredResultHolder.CALLBACK_CMD_RECORDINFO + deviceId + sn;
@ -76,7 +96,10 @@ public class GBRecordController {
msg.setId(uuid); msg.setId(uuid);
msg.setKey(key); msg.setKey(key);
cmder.recordInfoQuery(device, channelId, startTime, endTime, sn, null, null, null, (eventResult -> { cmder.recordInfoQuery(device, channelId, startTime, endTime, sn, null, null, null, (eventResult -> {
msg.setData("查询录像失败, status: " + eventResult.statusCode + ", message: " + eventResult.msg ); WVPResult<RecordInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(-1);
wvpResult.setMsg("查询录像失败, status: " + eventResult.statusCode + ", message: " + eventResult.msg);
msg.setData(wvpResult);
resultHolder.invokeResult(msg); resultHolder.invokeResult(msg);
})); }));
@ -84,6 +107,10 @@ public class GBRecordController {
resultHolder.put(key, uuid, result); resultHolder.put(key, uuid, result);
result.onTimeout(()->{ result.onTimeout(()->{
msg.setData("timeout"); msg.setData("timeout");
WVPResult<RecordInfo> wvpResult = new WVPResult<>();
wvpResult.setCode(-1);
wvpResult.setMsg("timeout");
msg.setData(wvpResult);
resultHolder.invokeResult(msg); resultHolder.invokeResult(msg);
}); });
return result; return result;

View File

@ -28,7 +28,7 @@ spring:
poolMaxIdle: 500 poolMaxIdle: 500
# [可选] 最大的等待时间(秒) # [可选] 最大的等待时间(秒)
poolMaxWait: 5 poolMaxWait: 5
# [可选] jdbc数据库配置, 项目使用sqlite作为数据库一般不需要配置 # [必选] jdbc数据库配置
datasource: datasource:
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver

View File

@ -16,7 +16,6 @@ spring:
password: face2020 password: face2020
# [可选] 超时时间 # [可选] 超时时间
timeout: 10000 timeout: 10000
# [可选] jdbc数据库配置, 项目使用sqlite作为数据库一般不需要配置
# mysql数据源 # mysql数据源
datasource: datasource:
type: com.alibaba.druid.pool.DruidDataSource type: com.alibaba.druid.pool.DruidDataSource

View File

@ -16,7 +16,7 @@ spring:
password: ${REDIS_PWD:root} password: ${REDIS_PWD:root}
# [可选] 超时时间 # [可选] 超时时间
timeout: 10000 timeout: 10000
# [可选] jdbc数据库配置, 项目使用sqlite作为数据库一般不需要配置 # [必选] jdbc数据库配置
datasource: datasource:
# 使用mysql 打开23-28行注释 删除29-36行 # 使用mysql 打开23-28行注释 删除29-36行
name: wvp name: wvp

View File

@ -244,7 +244,7 @@ export default {
}); });
}, },
queryRecords: function (itemData) { queryRecords: function (itemData) {
var format = moment().format("YYYY-M-D"); var format = moment().format("yyyy-MM-DD");
let deviceId = this.deviceId; let deviceId = this.deviceId;
let channelId = itemData.channelId; let channelId = itemData.channelId;
this.$refs.devicePlayer.openDialog("record", deviceId, channelId, {date: format}) this.$refs.devicePlayer.openDialog("record", deviceId, channelId, {date: format})

View File

@ -453,9 +453,19 @@ export default {
method: 'get', method: 'get',
url: '/api/gb_record/query/' + this.deviceId + '/' + this.channelId + '?startTime=' + startTime + '&endTime=' + endTime url: '/api/gb_record/query/' + this.deviceId + '/' + this.channelId + '?startTime=' + startTime + '&endTime=' + endTime
}).then(function (res) { }).then(function (res) {
// console.log(res)
that.videoHistory.searchHistoryResult = res.data.recordList; if(res.data.code === 0) {
that.recordsLoading = false; //
that.videoHistory.searchHistoryResult = res.data.data.recordList;
that.recordsLoading = false;
}else {
this.$message({
showClose: true,
message: res.data.msg,
type: "error",
});
}
}).catch(function (e) { }).catch(function (e) {
console.log(e.message); console.log(e.message);
// that.videoHistory.searchHistoryResult = falsificationData.recordData; // that.videoHistory.searchHistoryResult = falsificationData.recordData;
@ -671,7 +681,11 @@ export default {
this.$axios({ this.$axios({
method: 'get', method: 'get',
url: `/api/playback/seek/${this.streamId }/` + Math.floor(this.seekTime * val / 100000) url: `/api/playback/seek/${this.streamId }/` + Math.floor(this.seekTime * val / 100000)
}).then(function (res) {}); }).then( (res)=> {
setTimeout(()=>{
this.$refs.videoPlayer.play(this.videoUrl)
}, 600)
});
} }
} }

View File

@ -172,6 +172,7 @@ export default {
isEnd: true, isEnd: true,
} }
}).then((res) => { }).then((res) => {
console.log(res)
if (res.data.code == 0) { if (res.data.code == 0) {
this.percentage = parseFloat(res.data.data.percentage)*100 this.percentage = parseFloat(res.data.data.percentage)*100
if (res.data.data[0].percentage === '1') { if (res.data.data[0].percentage === '1') {