新增根据 指定时间范围的 recordInfo 查询, 预下载视频文件到本地指定路径

This commit is contained in:
shikong 2024-02-06 14:54:50 +08:00
parent d799f10420
commit d6bd3e1f80
6 changed files with 300 additions and 2 deletions

View File

@ -118,6 +118,11 @@
<artifactId>commons-exec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -1,6 +1,8 @@
package cn.skcks.docking.gb28181.mocking.config.sip;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ -26,4 +28,18 @@ public class DeviceProxyConfig {
* 代理该时间段内的历史视频查询请求
*/
private Duration proxyTimeRange = Duration.ofMinutes(5);
/**
* 预下载历史视频的配置
*/
private PreDownloadForRecordInfo preDownloadForRecordInfo = new PreDownloadForRecordInfo();
@NoArgsConstructor
@AllArgsConstructor
@Data
public static class PreDownloadForRecordInfo {
private Boolean enable = true;
private Duration timeRange = Duration.ofMinutes(5);
private String cachePath = "./record";
}
}

View File

@ -2,15 +2,18 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.requ
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.query.dto.RecordInfoRequestDTO;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordListDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder;
import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender;
import cn.skcks.docking.gb28181.mocking.core.sip.service.VideoCacheManager;
import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice;
import cn.skcks.docking.gb28181.mocking.service.device.DeviceService;
import gov.nist.javax.sip.message.SIPRequest;
@ -23,6 +26,7 @@ import javax.sip.header.CallIdHeader;
import javax.sip.header.FromHeader;
import javax.sip.message.Response;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@ -32,6 +36,8 @@ import java.util.List;
public class RecordInfoRequestProcessor {
private final SipSender sender;
private final DeviceService deviceService;
private final DeviceProxyConfig deviceProxyConfig;
private final VideoCacheManager videoCacheManager;
public void process(SIPRequest request, byte[] content) {
String senderIp = request.getLocalAddress().getHostAddress();
@ -39,16 +45,40 @@ public class RecordInfoRequestProcessor {
RecordInfoRequestDTO recordInfoRequestDTO = XmlUtils.parse(content, RecordInfoRequestDTO.class);
String id = recordInfoRequestDTO.getDeviceId();
deviceService.getDeviceByGbChannelId(id).ifPresentOrElse((device) -> {
if(preDownloadVideo(device.getDeviceCode(), recordInfoRequestDTO)){
sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
} else {
sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
}
}, () -> {
deviceService.getDeviceByGbChannelId(id).ifPresentOrElse((device) -> {
if(preDownloadVideo(device.getDeviceCode(), recordInfoRequestDTO)){
sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
} else {
sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
}
}, () -> {
log.error("未能找到 deviceId: {} 的相关信息", id);
sender.sendResponse(senderIp, transport, notFound(request));
});
});
}
private boolean preDownloadVideo(String deviceCode, RecordInfoRequestDTO recordInfoRequestDTO){
if(!deviceProxyConfig.getPreDownloadForRecordInfo().getEnable()){
return true;
}
Date startTime = recordInfoRequestDTO.getStartTime();
Date endTime = recordInfoRequestDTO.getEndTime();
if(DateUtil.between(startTime,endTime, DateUnit.SECOND) > deviceProxyConfig.getProxyTimeRange().getSeconds()){
return true;
}
// 添加预下载任务
videoCacheManager.addTask(deviceCode,startTime,endTime);
return videoCacheManager.get(deviceCode,startTime,endTime).isDone();
}
private void sendRecordInfo(MockingDevice device, RecordInfoRequestDTO recordInfoRequestDTO, SIPRequest request, String senderIp, String transport) {
@ -103,6 +133,28 @@ public class RecordInfoRequestProcessor {
});
}
private void sendEmptyRecordInfo(MockingDevice device, RecordInfoRequestDTO recordInfoRequestDTO, SIPRequest request, String senderIp, String transport) {
sender.sendResponse(senderIp, transport, ok(request));
RecordInfoResponseDTO recordInfoResponseDTO = new RecordInfoResponseDTO();
recordInfoResponseDTO.setSn(recordInfoRequestDTO.getSn());
recordInfoResponseDTO.setDeviceId(device.getGbChannelId());
recordInfoResponseDTO.setName(device.getName());
recordInfoResponseDTO.setSumNum(0L);
RecordListDTO recordListDTO = RecordListDTO.builder()
.recordList(Collections.emptyList())
.num(0)
.build();
recordInfoResponseDTO.setRecordList(recordListDTO);
FromHeader fromHeader = request.getFromHeader();
sender.sendRequest((provider, ip, port) -> {
CallIdHeader callIdHeader = provider.getNewCallId();
return SipRequestBuilder.createMessageRequest(device,
ip, port, 1, XmlUtils.toXml(recordInfoResponseDTO), fromHeader.getTag(), callIdHeader);
});
}
private SipSender.SendResponse ok(SIPRequest request) {
return (provider, ip, port) -> SipResponseBuilder.response(request, Response.OK,
"OK");

View File

@ -0,0 +1,130 @@
package cn.skcks.docking.gb28181.mocking.core.sip.service;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.net.url.UrlBuilder;
import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Paths;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@Slf4j
@Service
@RequiredArgsConstructor
public class VideoCacheManager {
private final DeviceProxyConfig deviceProxyConfig;
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private final ConcurrentMap<String, CompletableFuture<JsonResponse<String>>> tasks = new ConcurrentHashMap<>();
public String dateFormat(Date date){
return DateUtil.format(date, DatePattern.PURE_DATETIME_PATTERN);
}
public String fileName(String deviceCode, Date startTime, Date endTime){
return StringUtils.joinWith("-", deviceCode, dateFormat(startTime), dateFormat(endTime));
}
public void addTask(String deviceCode, Date startTime, Date endTime){
String name = fileName(deviceCode, startTime, endTime);
if(tasks.get(name) != null){
return;
}
CompletableFuture<JsonResponse<String>> future = new CompletableFuture<>();
tasks.put(name, future);
downloadVideo(deviceCode,startTime,endTime, future);
}
public CompletableFuture<JsonResponse<String>> get(String deviceCode, Date startTime, Date endTime){
String name = fileName(deviceCode, startTime, endTime);
return tasks.get(name);
}
@SneakyThrows
@Async(MockingExecutor.EXECUTOR_BEAN_NAME)
protected void downloadVideo(String deviceCode, Date startTime, Date endTime, CompletableFuture<JsonResponse<String>> future) {
final String url = UrlBuilder.of(deviceProxyConfig.getUrl())
.addPath("video")
.addQuery("device_id", deviceCode)
.addQuery("begin_time", dateFormat(startTime))
.addQuery("end_time", dateFormat(endTime))
.addQuery("useDownload", true).build();
File file = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),fileName(deviceCode, startTime, endTime) + ".mp4.tmp").toFile();
log.info("文件存储路径 => {}", file.getAbsolutePath());
log.info("文件 {}, 是否存在: {}", file.getAbsolutePath(), file.exists());
if(file.exists()){
file.delete();
log.info("删除已存但未完成下载的文件 => {}", file.getAbsolutePath());
}
try (FileOutputStream outputStream = new FileOutputStream(file)) {
FileChannel channel = outputStream.getChannel();
FileLock lock = channel.lock();
try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.custom().build()) {
asyncClient.start();
SimpleHttpRequest request = SimpleRequestBuilder.get(url).build();
asyncClient.execute(request, new FutureCallback<>() {
@SneakyThrows
@Override
public void completed(SimpleHttpResponse response) {
InputStream inputStream = new ByteArrayInputStream(response.getBodyBytes());
IoUtil.copy(inputStream, outputStream);
log.info("视频下载完成 => {}", file.getAbsolutePath());
log.info("文件 {}, 是否存在: {}", file.getAbsolutePath(), file.exists());
File realFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),fileName(deviceCode, startTime, endTime) + ".mp4").toFile();
file.renameTo(realFile);
lock.release();
future.complete(JsonResponse.success(file.getAbsolutePath()));
}
@SneakyThrows
@Override
public void failed(Exception ex) {
log.info("视频下载失败 => {}, {}", file.getAbsolutePath(), url);
lock.release();
future.completeExceptionally(ex);
}
@SneakyThrows
@Override
public void cancelled() {
lock.release();
future.completeExceptionally(new JsonException("视频下载失败"));
}
});
}
}
}
}

View File

@ -1,12 +1,34 @@
package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.catalog.dto;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.net.url.UrlBuilder;
import cn.hutool.core.util.IdUtil;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.junit.jupiter.api.Test;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.web.client.RequestCallback;
import org.springframework.web.client.RestTemplate;
import java.io.*;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@Slf4j
public class XmlTest {
@ -27,4 +49,70 @@ public class XmlTest {
log.info("{}", XmlUtils.toXml(catalogDeviceListDTO));
}
@Test
void restTemplate() {
final String url = UrlBuilder.of("http://192.168.2.3:18183")
.addPath("video")
.addQuery("device_id", "72439149X18C04DE739F3")
.addQuery("begin_time", "20240206000500")
.addQuery("end_time", "20240206001000")
.addQuery("useDownload", true).build();
log.info("请求地址 => {}", url);
Path path = Paths.get(System.getProperty("java.io.tmpdir"), IdUtil.fastSimpleUUID() + ".mp4");
log.info("文件存储路径 => {}", path.toAbsolutePath());
// 定义请求头的接收类型
RequestCallback requestCallback = request -> request.getHeaders()
.setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL));
RestTemplate restTemplate = new RestTemplate();
// 对响应进行流式处理而不是将其全部加载到内存中
restTemplate.execute(url, HttpMethod.GET, requestCallback, clientHttpResponse -> {
Files.copy(clientHttpResponse.getBody(), path);
return null;
});
}
@Test
void httpClient() throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
final String url = "http://192.168.2.3:18183/video?end_time=20240206001000&begin_time=20240206000500&device_id=72439149X18C04DE739F3&useDownload=true";
File file = Paths.get(System.getProperty("java.io.tmpdir"), IdUtil.fastSimpleUUID() + ".mp4").toFile();
log.info("文件存储路径 => {}", file.getAbsolutePath());
log.info("文件 {}, 是否存在: {}", file.getAbsolutePath(), file.exists());
try (FileOutputStream outputStream = new FileOutputStream(file)) {
FileChannel channel = outputStream.getChannel();
FileLock lock = channel.lock();
try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.custom().build()) {
asyncClient.start();
SimpleHttpRequest request = SimpleRequestBuilder.get(url).build();
asyncClient.execute(request, new FutureCallback<>() {
@SneakyThrows
@Override
public void completed(SimpleHttpResponse response) {
InputStream inputStream = new ByteArrayInputStream(response.getBodyBytes());
IoUtil.copy(inputStream, outputStream);
log.info("视频下载完成 => {}", file.getAbsolutePath());
log.info("文件 {}, 是否存在: {}", file.getAbsolutePath(), file.exists());
lock.release();
countDownLatch.countDown();
}
@Override
public void failed(Exception ex) {
log.info("视频下载失败 => {}, {}", file.getAbsolutePath(), url);
countDownLatch.countDown();
}
@Override
public void cancelled() {
countDownLatch.countDown();
}
});
}
}
countDownLatch.await();
}
}

View File

@ -83,6 +83,13 @@ proxy:
# 代理的视频接口地址, 用于获取历史视频
# 参数 device_id, begin_time, end_time
url: http://10.10.10.20:18186
use-download-to-playback: true
proxy-video-in-time-range: true
proxy-time-range: 5m
pre-download-for-record-info:
time-range: 5m
cache-path: ./record
enable: true
ffmpeg-support:
task:
# 最大同时推流任务数, <= 0 时不做限制