From d6bd3e1f80bf6c9b62246483256dc81ed0fb3774 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Tue, 6 Feb 2024 14:54:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=A0=B9=E6=8D=AE=20?= =?UTF-8?q?=E6=8C=87=E5=AE=9A=E6=97=B6=E9=97=B4=E8=8C=83=E5=9B=B4=E7=9A=84?= =?UTF-8?q?=20recordInfo=20=E6=9F=A5=E8=AF=A2,=20=E9=A2=84=E4=B8=8B?= =?UTF-8?q?=E8=BD=BD=E8=A7=86=E9=A2=91=E6=96=87=E4=BB=B6=E5=88=B0=E6=9C=AC?= =?UTF-8?q?=E5=9C=B0=E6=8C=87=E5=AE=9A=E8=B7=AF=E5=BE=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gb28181-mocking-service/pom.xml | 5 + .../mocking/config/sip/DeviceProxyConfig.java | 16 +++ .../RecordInfoRequestProcessor.java | 56 +++++++- .../core/sip/service/VideoCacheManager.java | 130 ++++++++++++++++++ .../message/request/catalog/dto/XmlTest.java | 88 ++++++++++++ .../src/main/resources/application-local.yml | 7 + 6 files changed, 300 insertions(+), 2 deletions(-) create mode 100644 gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/service/VideoCacheManager.java diff --git a/gb28181-mocking-service/pom.xml b/gb28181-mocking-service/pom.xml index 6616ffc..e233a4e 100644 --- a/gb28181-mocking-service/pom.xml +++ b/gb28181-mocking-service/pom.xml @@ -118,6 +118,11 @@ commons-exec 1.3 + + + org.apache.httpcomponents.client5 + httpclient5 + diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/DeviceProxyConfig.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/DeviceProxyConfig.java index 44735e2..781ec08 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/DeviceProxyConfig.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/config/sip/DeviceProxyConfig.java @@ -1,6 +1,8 @@ package cn.skcks.docking.gb28181.mocking.config.sip; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @@ -26,4 +28,18 @@ public class DeviceProxyConfig { * 代理该时间段内的历史视频查询请求 */ private Duration proxyTimeRange = Duration.ofMinutes(5); + + /** + * 预下载历史视频的配置 + */ + private PreDownloadForRecordInfo preDownloadForRecordInfo = new PreDownloadForRecordInfo(); + + @NoArgsConstructor + @AllArgsConstructor + @Data + public static class PreDownloadForRecordInfo { + private Boolean enable = true; + private Duration timeRange = Duration.ofMinutes(5); + private String cachePath = "./record"; + } } diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/recordinfo/RecordInfoRequestProcessor.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/recordinfo/RecordInfoRequestProcessor.java index c749dbb..6fc2e9d 100644 --- a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/recordinfo/RecordInfoRequestProcessor.java +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/recordinfo/RecordInfoRequestProcessor.java @@ -2,15 +2,18 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.requ import cn.hutool.core.collection.ListUtil; import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.skcks.docking.gb28181.common.xml.XmlUtils; import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.query.dto.RecordInfoRequestDTO; +import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig; import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordInfoItemDTO; import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordInfoResponseDTO; import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordListDTO; import cn.skcks.docking.gb28181.mocking.core.sip.request.SipRequestBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.response.SipResponseBuilder; import cn.skcks.docking.gb28181.mocking.core.sip.sender.SipSender; +import cn.skcks.docking.gb28181.mocking.core.sip.service.VideoCacheManager; import cn.skcks.docking.gb28181.mocking.orm.mybatis.dynamic.model.MockingDevice; import cn.skcks.docking.gb28181.mocking.service.device.DeviceService; import gov.nist.javax.sip.message.SIPRequest; @@ -23,6 +26,7 @@ import javax.sip.header.CallIdHeader; import javax.sip.header.FromHeader; import javax.sip.message.Response; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; @@ -32,6 +36,8 @@ import java.util.List; public class RecordInfoRequestProcessor { private final SipSender sender; private final DeviceService deviceService; + private final DeviceProxyConfig deviceProxyConfig; + private final VideoCacheManager videoCacheManager; public void process(SIPRequest request, byte[] content) { String senderIp = request.getLocalAddress().getHostAddress(); @@ -39,16 +45,40 @@ public class RecordInfoRequestProcessor { RecordInfoRequestDTO recordInfoRequestDTO = XmlUtils.parse(content, RecordInfoRequestDTO.class); String id = recordInfoRequestDTO.getDeviceId(); deviceService.getDeviceByGbChannelId(id).ifPresentOrElse((device) -> { - sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport); + if(preDownloadVideo(device.getDeviceCode(), recordInfoRequestDTO)){ + sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport); + } else { + sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport); + } }, () -> { deviceService.getDeviceByGbChannelId(id).ifPresentOrElse((device) -> { - sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport); + if(preDownloadVideo(device.getDeviceCode(), recordInfoRequestDTO)){ + sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport); + } else { + sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport); + } }, () -> { log.error("未能找到 deviceId: {} 的相关信息", id); sender.sendResponse(senderIp, transport, notFound(request)); }); }); + } + private boolean preDownloadVideo(String deviceCode, RecordInfoRequestDTO recordInfoRequestDTO){ + if(!deviceProxyConfig.getPreDownloadForRecordInfo().getEnable()){ + return true; + } + + Date startTime = recordInfoRequestDTO.getStartTime(); + Date endTime = recordInfoRequestDTO.getEndTime(); + if(DateUtil.between(startTime,endTime, DateUnit.SECOND) > deviceProxyConfig.getProxyTimeRange().getSeconds()){ + return true; + } + + // 添加预下载任务 + videoCacheManager.addTask(deviceCode,startTime,endTime); + + return videoCacheManager.get(deviceCode,startTime,endTime).isDone(); } private void sendRecordInfo(MockingDevice device, RecordInfoRequestDTO recordInfoRequestDTO, SIPRequest request, String senderIp, String transport) { @@ -103,6 +133,28 @@ public class RecordInfoRequestProcessor { }); } + private void sendEmptyRecordInfo(MockingDevice device, RecordInfoRequestDTO recordInfoRequestDTO, SIPRequest request, String senderIp, String transport) { + sender.sendResponse(senderIp, transport, ok(request)); + + RecordInfoResponseDTO recordInfoResponseDTO = new RecordInfoResponseDTO(); + recordInfoResponseDTO.setSn(recordInfoRequestDTO.getSn()); + recordInfoResponseDTO.setDeviceId(device.getGbChannelId()); + recordInfoResponseDTO.setName(device.getName()); + recordInfoResponseDTO.setSumNum(0L); + RecordListDTO recordListDTO = RecordListDTO.builder() + .recordList(Collections.emptyList()) + .num(0) + .build(); + recordInfoResponseDTO.setRecordList(recordListDTO); + + FromHeader fromHeader = request.getFromHeader(); + sender.sendRequest((provider, ip, port) -> { + CallIdHeader callIdHeader = provider.getNewCallId(); + return SipRequestBuilder.createMessageRequest(device, + ip, port, 1, XmlUtils.toXml(recordInfoResponseDTO), fromHeader.getTag(), callIdHeader); + }); + } + private SipSender.SendResponse ok(SIPRequest request) { return (provider, ip, port) -> SipResponseBuilder.response(request, Response.OK, "OK"); diff --git a/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/service/VideoCacheManager.java b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/service/VideoCacheManager.java new file mode 100644 index 0000000..c349152 --- /dev/null +++ b/gb28181-mocking-service/src/main/java/cn/skcks/docking/gb28181/mocking/core/sip/service/VideoCacheManager.java @@ -0,0 +1,130 @@ +package cn.skcks.docking.gb28181.mocking.core.sip.service; + +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.net.url.UrlBuilder; +import cn.skcks.docking.gb28181.common.json.JsonException; +import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig; +import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.Paths; +import java.util.Date; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; + +@Slf4j +@Service +@RequiredArgsConstructor +public class VideoCacheManager { + private final DeviceProxyConfig deviceProxyConfig; + + @Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME) + private final Executor executor; + + private final ConcurrentMap>> tasks = new ConcurrentHashMap<>(); + + public String dateFormat(Date date){ + return DateUtil.format(date, DatePattern.PURE_DATETIME_PATTERN); + } + + public String fileName(String deviceCode, Date startTime, Date endTime){ + return StringUtils.joinWith("-", deviceCode, dateFormat(startTime), dateFormat(endTime)); + } + + public void addTask(String deviceCode, Date startTime, Date endTime){ + String name = fileName(deviceCode, startTime, endTime); + if(tasks.get(name) != null){ + return; + } + + CompletableFuture> future = new CompletableFuture<>(); + tasks.put(name, future); + downloadVideo(deviceCode,startTime,endTime, future); + } + + public CompletableFuture> get(String deviceCode, Date startTime, Date endTime){ + String name = fileName(deviceCode, startTime, endTime); + return tasks.get(name); + } + + @SneakyThrows + @Async(MockingExecutor.EXECUTOR_BEAN_NAME) + protected void downloadVideo(String deviceCode, Date startTime, Date endTime, CompletableFuture> future) { + final String url = UrlBuilder.of(deviceProxyConfig.getUrl()) + .addPath("video") + .addQuery("device_id", deviceCode) + .addQuery("begin_time", dateFormat(startTime)) + .addQuery("end_time", dateFormat(endTime)) + .addQuery("useDownload", true).build(); + File file = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),fileName(deviceCode, startTime, endTime) + ".mp4.tmp").toFile(); + log.info("文件存储路径 => {}", file.getAbsolutePath()); + log.info("文件 {}, 是否存在: {}", file.getAbsolutePath(), file.exists()); + + if(file.exists()){ + file.delete(); + log.info("删除已存但未完成下载的文件 => {}", file.getAbsolutePath()); + } + + try (FileOutputStream outputStream = new FileOutputStream(file)) { + FileChannel channel = outputStream.getChannel(); + FileLock lock = channel.lock(); + + try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.custom().build()) { + asyncClient.start(); + SimpleHttpRequest request = SimpleRequestBuilder.get(url).build(); + asyncClient.execute(request, new FutureCallback<>() { + @SneakyThrows + @Override + public void completed(SimpleHttpResponse response) { + InputStream inputStream = new ByteArrayInputStream(response.getBodyBytes()); + IoUtil.copy(inputStream, outputStream); + log.info("视频下载完成 => {}", file.getAbsolutePath()); + log.info("文件 {}, 是否存在: {}", file.getAbsolutePath(), file.exists()); + File realFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),fileName(deviceCode, startTime, endTime) + ".mp4").toFile(); + file.renameTo(realFile); + lock.release(); + future.complete(JsonResponse.success(file.getAbsolutePath())); + } + + @SneakyThrows + @Override + public void failed(Exception ex) { + log.info("视频下载失败 => {}, {}", file.getAbsolutePath(), url); + lock.release(); + future.completeExceptionally(ex); + } + + @SneakyThrows + @Override + public void cancelled() { + lock.release(); + future.completeExceptionally(new JsonException("视频下载失败")); + } + }); + } + } + } +} diff --git a/gb28181-mocking-service/src/test/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/catalog/dto/XmlTest.java b/gb28181-mocking-service/src/test/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/catalog/dto/XmlTest.java index 5d192a1..fc2ad42 100644 --- a/gb28181-mocking-service/src/test/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/catalog/dto/XmlTest.java +++ b/gb28181-mocking-service/src/test/java/cn/skcks/docking/gb28181/mocking/core/sip/message/processor/message/request/catalog/dto/XmlTest.java @@ -1,12 +1,34 @@ package cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.catalog.dto; +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.net.url.UrlBuilder; +import cn.hutool.core.util.IdUtil; import cn.skcks.docking.gb28181.common.xml.XmlUtils; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.core5.concurrent.FutureCallback; import org.junit.jupiter.api.Test; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.web.client.RequestCallback; +import org.springframework.web.client.RestTemplate; +import java.io.*; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.concurrent.CountDownLatch; @Slf4j public class XmlTest { @@ -27,4 +49,70 @@ public class XmlTest { log.info("{}", XmlUtils.toXml(catalogDeviceListDTO)); } + + @Test + void restTemplate() { + final String url = UrlBuilder.of("http://192.168.2.3:18183") + .addPath("video") + .addQuery("device_id", "72439149X18C04DE739F3") + .addQuery("begin_time", "20240206000500") + .addQuery("end_time", "20240206001000") + .addQuery("useDownload", true).build(); + log.info("请求地址 => {}", url); + Path path = Paths.get(System.getProperty("java.io.tmpdir"), IdUtil.fastSimpleUUID() + ".mp4"); + log.info("文件存储路径 => {}", path.toAbsolutePath()); + + // 定义请求头的接收类型 + RequestCallback requestCallback = request -> request.getHeaders() + .setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL)); + + RestTemplate restTemplate = new RestTemplate(); + // 对响应进行流式处理而不是将其全部加载到内存中 + restTemplate.execute(url, HttpMethod.GET, requestCallback, clientHttpResponse -> { + Files.copy(clientHttpResponse.getBody(), path); + return null; + }); + } + + @Test + void httpClient() throws IOException, InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(1); + final String url = "http://192.168.2.3:18183/video?end_time=20240206001000&begin_time=20240206000500&device_id=72439149X18C04DE739F3&useDownload=true"; + File file = Paths.get(System.getProperty("java.io.tmpdir"), IdUtil.fastSimpleUUID() + ".mp4").toFile(); + log.info("文件存储路径 => {}", file.getAbsolutePath()); + log.info("文件 {}, 是否存在: {}", file.getAbsolutePath(), file.exists()); + try (FileOutputStream outputStream = new FileOutputStream(file)) { + FileChannel channel = outputStream.getChannel(); + FileLock lock = channel.lock(); + + try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.custom().build()) { + asyncClient.start(); + SimpleHttpRequest request = SimpleRequestBuilder.get(url).build(); + asyncClient.execute(request, new FutureCallback<>() { + @SneakyThrows + @Override + public void completed(SimpleHttpResponse response) { + InputStream inputStream = new ByteArrayInputStream(response.getBodyBytes()); + IoUtil.copy(inputStream, outputStream); + log.info("视频下载完成 => {}", file.getAbsolutePath()); + log.info("文件 {}, 是否存在: {}", file.getAbsolutePath(), file.exists()); + lock.release(); + countDownLatch.countDown(); + } + + @Override + public void failed(Exception ex) { + log.info("视频下载失败 => {}, {}", file.getAbsolutePath(), url); + countDownLatch.countDown(); + } + + @Override + public void cancelled() { + countDownLatch.countDown(); + } + }); + } + } + countDownLatch.await(); + } } diff --git a/gb28181-mocking-starter/src/main/resources/application-local.yml b/gb28181-mocking-starter/src/main/resources/application-local.yml index 09fee04..f174d90 100644 --- a/gb28181-mocking-starter/src/main/resources/application-local.yml +++ b/gb28181-mocking-starter/src/main/resources/application-local.yml @@ -83,6 +83,13 @@ proxy: # 代理的视频接口地址, 用于获取历史视频 # 参数 device_id, begin_time, end_time url: http://10.10.10.20:18186 + use-download-to-playback: true + proxy-video-in-time-range: true + proxy-time-range: 5m + pre-download-for-record-info: + time-range: 5m + cache-path: ./record + enable: true ffmpeg-support: task: # 最大同时推流任务数, <= 0 时不做限制