新增可配置 获取历史视频之前 是否 先发起 RecordInfo 请求

This commit is contained in:
shikong 2024-02-05 16:43:43 +08:00
parent 028c178ef5
commit 071e2a2491
9 changed files with 199 additions and 24 deletions

View File

@ -4,7 +4,7 @@ import cn.skcks.docking.gb28181.annotation.web.JsonMapping;
import cn.skcks.docking.gb28181.annotation.web.methods.GetJson;
import cn.skcks.docking.gb28181.annotation.web.methods.PostJson;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.request.RecordInfoRequestDTO;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig;
import cn.skcks.docking.gb28181.wvp.service.catalog.CatalogService;
import cn.skcks.docking.gb28181.wvp.service.device.control.DeviceControlService;
@ -57,8 +57,7 @@ public class Gb28181Controller {
}
@PostJson("/recordInfo")
public JsonResponse<RecordInfoRequestDTO> recordInfo(RecordInfoDTO dto){
recordInfoService.requestRecordInfo(dto);
return JsonResponse.success(null);
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> recordInfo(RecordInfoDTO dto){
return recordInfoService.requestRecordInfo(dto);
}
}

View File

@ -2,7 +2,6 @@ package cn.skcks.docking.gb28181.wvp.config;
import cn.skcks.docking.gb28181.config.sip.SipConfig;
import cn.skcks.docking.gb28181.sdp.media.MediaStreamMode;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
@ -11,6 +10,7 @@ import org.springframework.stereotype.Component;
import javax.sip.ListeningPoint;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Component
@ConfigurationProperties(prefix = "proxy.gb28181.sip", ignoreInvalidFields = true)
@ -44,6 +44,14 @@ public class ProxySipConfig {
*/
private String proxyMediaUrl = "";
/**
* 调用 视频下载之前 是否使用 recordInfo 查询
*/
private boolean useRecordInfoQueryBeforeDownload = true;
private int retryRecordInfoQueryBeforeDownloadTimes = 20;
private long retryRecordInfoQueryBeforeDownloadInterval = 3;
private TimeUnit retryRecordInfoQueryBeforeDownloadIntervalUnit = TimeUnit.SECONDS;
@Bean
public SipConfig sipConfig(){
SipConfig sipConfig = new SipConfig();

View File

@ -25,6 +25,7 @@ import cn.skcks.docking.gb28181.media.proxy.ZlmMediaService;
import cn.skcks.docking.gb28181.sdp.GB28181Description;
import cn.skcks.docking.gb28181.sdp.GB28181SDPBuilder;
import cn.skcks.docking.gb28181.sdp.media.MediaStreamMode;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.service.ssrc.SsrcService;
import cn.skcks.docking.gb28181.wvp.config.ProxySipConfig;
import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig;
@ -32,11 +33,18 @@ import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.service.record.RecordInfoService;
import cn.skcks.docking.gb28181.wvp.service.record.dto.RecordInfoDTO;
import cn.skcks.docking.gb28181.wvp.service.video.VideoService;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.response.SipResponseBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import cn.skcks.docking.gb28181.wvp.utils.RetryUtil;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import gov.nist.javax.sdp.MediaDescriptionImpl;
import gov.nist.javax.sdp.fields.TimeField;
import gov.nist.javax.sdp.fields.URIField;
@ -62,6 +70,7 @@ import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.ZoneId;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.Vector;
import java.util.concurrent.*;
@ -80,6 +89,8 @@ public class Gb28181DownloadService {
private final SipSubscribe subscribe;
private final VideoService videoService;
private final WvpProxyConfig wvpProxyConfig;
private final RecordInfoService recordInfoService;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<String, DeferredResult<JsonResponse<String>>> requestMap = new ConcurrentHashMap<>();
@ -150,22 +161,59 @@ public class Gb28181DownloadService {
}
@SneakyThrows
@SuppressWarnings({"UnstableApiUsage", "unchecked"})
public void video(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime, Boolean fileHeader, Boolean useDownload) {
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
asyncContext.start(()->{
HttpServletResponse asyncResponse = (HttpServletResponse)asyncContext.getResponse();
try{
download(deviceCode, startTime,endTime, useDownload).whenComplete((videoInfo, e)->{
writeFileHeader(response,deviceCode,startTime,endTime,fileHeader);
if(e != null){
if(proxySipConfig.isUseRecordInfoQueryBeforeDownload()){
String name = MessageFormat.format("{0} {1}-{2}", deviceCode, startTime, endTime);
Retryer<JsonResponse<List<RecordInfoItemVO>>> retryer = RetryerBuilder.<JsonResponse<List<RecordInfoItemVO>>>newBuilder()
// 异常就重试
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(proxySipConfig.getRetryRecordInfoQueryBeforeDownloadInterval(), proxySipConfig.getRetryRecordInfoQueryBeforeDownloadIntervalUnit()))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(proxySipConfig.getRetryRecordInfoQueryBeforeDownloadTimes()))
.retryIfResult((result) -> {
log.info("{}", result);
return result == null ||
result.getCode() != Response.OK ||
result.getData() == null ||
result.getData().isEmpty();
})
.withRetryListener(RetryUtil.defaultRetryListener(name)).build();
retryer.call(()->{
CompletableFuture<JsonResponse<List<RecordInfoItemVO>>> future = new CompletableFuture<>();
// 发起设备录像查询
DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestedRecordInfo =
recordInfoService.requestRecordInfo(new RecordInfoDTO(deviceCode, startTime, endTime, "", 0, "all"));
requestedRecordInfo.setResultHandler(result -> {
future.complete((JsonResponse<List<RecordInfoItemVO>>) result);
});
requestedRecordInfo.onError((throwable)->{
future.complete(JsonResponse.error(throwable.getMessage()));
});
return future.get();
});
}
download(deviceCode, startTime, endTime, useDownload).whenComplete((videoInfo, e) -> {
writeFileHeader(response, deviceCode, startTime, endTime, fileHeader);
if (e != null) {
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
} else if(videoInfo == null){
} else if (videoInfo == null) {
writeErrorToResponse(asyncResponse, JsonResponse.error("下载失败"));
} else if(wvpProxyConfig.getUseFfmpeg()){
videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60,videoInfo.getDevice(),videoInfo.getCallId());
} else if (wvpProxyConfig.getUseFfmpeg()) {
videoService.ffmpegRecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime, endTime, DateUnit.SECOND) + 60, videoInfo.getDevice(), videoInfo.getCallId());
} else {
videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime,endTime,DateUnit.SECOND) + 60);
videoService.javaCVrecord(asyncResponse, videoInfo.getUrl(), DateUtil.between(startTime, endTime, DateUnit.SECOND) + 60);
}
asyncContext.complete();
});

View File

@ -1,8 +1,12 @@
package cn.skcks.docking.gb28181.wvp.service.record;
import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.gb28181.constant.CmdType;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.request.RecordInfoRequestDTO;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking;
@ -11,16 +15,17 @@ import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.service.record.dto.RecordInfoDTO;
import cn.skcks.docking.gb28181.wvp.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.wvp.sip.sender.SipSender;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.RecordSubscribe;
import cn.skcks.docking.gb28181.wvp.sip.subscribe.SipSubscribe;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.context.request.async.DeferredResult;
import java.text.MessageFormat;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Slf4j
@Service
@ -28,12 +33,11 @@ import java.util.concurrent.ScheduledExecutorService;
public class RecordInfoService {
private final SipSender sipSender;
private final SipSubscribe sipSubscribe;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final DockingService dockingService;
private final DeviceService deviceService;
@SneakyThrows
public void requestRecordInfo(RecordInfoDTO dto){
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestRecordInfo(RecordInfoDTO dto){
String deviceCode = dto.getDeviceCode();
Optional<WvpProxyDevice> deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode);
if (deviceByDeviceCode.isEmpty()) {
@ -42,20 +46,23 @@ public class RecordInfoService {
throw new JsonException(reason);
} else {
WvpProxyDevice device = deviceByDeviceCode.get();
requestRecordInfo(device.getGbDeviceId(), device.getGbDeviceChannelId(), dto);
return requestRecordInfo(device.getGbDeviceId(), device.getGbDeviceChannelId(), dto);
}
}
public void requestRecordInfo(String gbDeviceId, String channel, RecordInfoDTO dto){
public DeferredResult<JsonResponse<List<RecordInfoItemVO>>> requestRecordInfo(String gbDeviceId, String channel, RecordInfoDTO dto){
DeferredResult<JsonResponse<List<RecordInfoItemVO>>> result = new DeferredResult<>();
Optional<WvpProxyDocking> deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId);
if(deviceByGbDeviceId.isEmpty()){
log.info("未能找到 国标编码 {} 的注册信息", gbDeviceId);
return;
result.setResult(JsonResponse.error(MessageFormat.format("未能找到 设备编码 为 {0} 的设备", gbDeviceId)));
return result;
}
Optional<WvpProxyDevice> deviceByGbDeviceIdAndChannel = deviceService.getDeviceByGbDeviceIdAndChannel(gbDeviceId, channel);
if (deviceByGbDeviceIdAndChannel.isEmpty()) {
log.info("未能找到 编码 {}, 通道 {} 的设备", gbDeviceId, channel);
return;
result.setResult(JsonResponse.error(MessageFormat.format("未能找到 编码 {0}, 通道 {1} 的设备", gbDeviceId, channel)));
return result;
}
WvpProxyDocking device = deviceByGbDeviceId.get();
String sn = String.valueOf((int) (Math.random() * 9 + 1) * 100000);
@ -69,7 +76,12 @@ public class RecordInfoService {
.filePath(dto.getFilePath())
.indistinctQuery(0)
.build();
String key = GenericSubscribe.Helper.getKey(CmdType.RECORD_INFO, channel, sn);
sipSubscribe.getMessageSubscribe().addPublisher(key);
sipSubscribe.getMessageSubscribe().addSubscribe(key, new RecordSubscribe(sipSubscribe, key, result, gbDeviceId));
sipSender.sendRequest((provider, ip, port)-> SipRequestBuilder.createMessageRequest(device,ip,port,SipRequestBuilder.getCSeq(), XmlUtils.toXml(recordInfoRequestDTO), SipUtil.generateViaTag(),
SipUtil.generateFromTag(), provider.getNewCallId()));
return result;
}
}

View File

@ -3,11 +3,15 @@ package cn.skcks.docking.gb28181.wvp.service.record.dto;
import cn.hutool.core.date.DatePattern;
import cn.skcks.docking.gb28181.constant.GB28181Constant;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class RecordInfoDTO {
/**

View File

@ -10,6 +10,7 @@ import cn.skcks.docking.gb28181.core.sip.message.processor.message.request.dto.M
import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.utils.SipUtil;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.wvp.service.docking.DockingService;
import cn.skcks.docking.gb28181.wvp.sip.message.message.catalog.dto.CatalogResponseDTO;
import cn.skcks.docking.gb28181.wvp.sip.message.message.notify.MediaStatusRequestDTO;
@ -81,6 +82,13 @@ public class MessageRequestProcessor implements MessageProcessor {
if(StringUtils.equalsAnyIgnoreCase(messageDto.getCmdType(), CmdType.KEEPALIVE)){
response = ok;
// 更新设备在线状态
} else if(messageDto.getCmdType().equalsIgnoreCase(cn.skcks.docking.gb28181.constant.CmdType.RECORD_INFO)) {
response = ok;
RecordInfoResponseDTO dto = XmlUtils.parse(content, RecordInfoResponseDTO.class, GB28181Constant.CHARSET);
String key = GenericSubscribe.Helper.getKey(cn.skcks.docking.gb28181.constant.CmdType.RECORD_INFO, dto.getDeviceId(), dto.getSn());
Optional.ofNullable(subscribe.getMessageSubscribe().getPublisher(key))
.ifPresentOrElse(publisher -> publisher.submit(request),
() -> log.warn("对应订阅 {} 已结束, 异常数据 => {}", key, dto));
} else if(messageDto.getCmdType().equalsIgnoreCase(CmdType.CATALOG)){
response = ok;
CatalogResponseDTO dto = XmlUtils.parse(content, CatalogResponseDTO.class, GB28181Constant.CHARSET);

View File

@ -0,0 +1,81 @@
package cn.skcks.docking.gb28181.wvp.sip.subscribe;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.service.record.convertor.RecordConvertor;
import cn.skcks.docking.gb28181.service.record.vo.RecordInfoItemVO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.sip.manscdp.recordinfo.response.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.sip.utils.MANSCDPUtils;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class RecordSubscribe implements Flow.Subscriber<SIPRequest>{
private final SipSubscribe subscribe;
private final String key;
private final DeferredResult<JsonResponse<List<RecordInfoItemVO>>> result;
private final String deviceId;
private final List<RecordInfoItemDTO> list = new ArrayList<>();
private final AtomicLong atomicSum = new AtomicLong(0);
private final AtomicLong atomicNum = new AtomicLong(0);
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
log.debug("建立订阅 => {}", key);
subscription.request(1);
}
@Override
public void onNext(SIPRequest item) {
RecordInfoResponseDTO data = MANSCDPUtils.parse(item.getRawContent(), RecordInfoResponseDTO.class);
atomicSum.set(Math.max(data.getSumNum(), atomicNum.get()));
atomicNum.addAndGet(data.getRecordList().getNum());
list.addAll(data.getRecordList().getRecordList());
long num = atomicNum.get();
long sum = atomicSum.get();
if(num > sum){
log.warn("检测到 设备 => {}, 未按规范实现, 订阅 => {}, 期望总数为 => {}, 已接收数量 => {}", deviceId, key, atomicSum.get(), atomicNum.get());
} else {
log.info("获取订阅 => {}, {}/{}", key, atomicNum.get(), atomicSum.get());
}
if (num >= sum) {
// 针对某些不按规范的设备
// 如果已获取数量 >= 约定的总数
// 就执行定时任务, 500ms 内未收到新的数据视为已结束
subscribe.getMessageSubscribe().refreshPublisher(key,500, TimeUnit.MILLISECONDS);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
result.setResult(JsonResponse.success(RecordConvertor.INSTANCE.dto2Vo(sortedRecordList(list))));
log.debug("订阅结束 => {}", key);
subscribe.getMessageSubscribe().delPublisher(key);
}
private List<RecordInfoItemDTO> sortedRecordList(List<RecordInfoItemDTO> list){
return list.stream().sorted((a,b)-> DateUtil.compare(a.getStartTime(),b.getStartTime())).collect(Collectors.toList());
}
}

View File

@ -2,7 +2,9 @@ package cn.skcks.docking.gb28181.wvp.sip.subscribe;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericTimeoutSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.InviteSubscribe;
import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipRequestSubscribe;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import jakarta.annotation.PostConstruct;
@ -14,6 +16,8 @@ import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@Slf4j
@Data
@ -22,15 +26,18 @@ import java.util.concurrent.Executor;
public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
private GenericSubscribe<SIPRequest> catalogSubscribe;
private GenericSubscribe<SIPResponse> inviteSubscribe;
private GenericSubscribe<SIPRequest> byeSubscribe;
private GenericTimeoutSubscribe<SIPRequest> messageSubscribe;
@PostConstruct
private void init() {
catalogSubscribe = new CatalogSubscribe(executor);
inviteSubscribe = new InviteSubscribe(executor);
byeSubscribe = new ByeSubscribe(executor);
messageSubscribe = new SipRequestSubscribe(executor, scheduledExecutorService);
}
@PreDestroy
@ -38,5 +45,6 @@ public class SipSubscribe {
catalogSubscribe.close();
inviteSubscribe.close();
byeSubscribe.close();
messageSubscribe.close();
}
}

View File

@ -43,7 +43,8 @@ media:
proxy:
wvp:
url: http://127.0.0.1:18978
#url: http://127.0.0.1:18978
url: http://192.168.3.12:18978
user: admin
passwd: admin
use-ffmpeg: true
@ -55,6 +56,7 @@ proxy:
- 44050100002000000003
- 44050100001180000001
- 44050100001320000001
- 44050100001110000010
# 用于生成 代理 wvp 的 视频流 ws-flv 地址
#proxy-media-url: 'wss://192.168.1.241:9022/mf-config/media'
proxy-media-url: 'ws://10.10.10.200:5080'
@ -62,16 +64,21 @@ proxy:
realtime-video-duration: 15m
gb28181:
sip:
id: 44050100002000000003
id: 44050100002000000005
domain: 4405010000
password: 123456
port: 5063
ip:
- 10.10.10.20
# - 10.10.10.20
- 192.168.0.195
stream-mode: udp
use-playback-to-download: false
proxy-media-url: 'https://10.10.10.200:18181/media'
# - 192.168.1.241
use-record-info-query-before-download: true
retry-record-info-query-before-download-interval: 3
retry-record-info-query-before-download-times: 20
retry-record-info-query-before-download-interval-unit: seconds
# - 192.168.1.241
device-api:
offset:
forward: 0s