新增根据 指定时间范围的 recordInfo 查询, 预下载视频文件到本地指定路径

This commit is contained in:
shikong 2024-02-06 15:51:38 +08:00
parent 67f48ad730
commit 92fb1555fd

View File

@ -4,25 +4,22 @@ import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.IoUtil; import cn.hutool.core.io.IoUtil;
import cn.hutool.core.net.url.UrlBuilder; import cn.hutool.core.net.url.UrlBuilder;
import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig; import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor; import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients; import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.core5.concurrent.FutureCallback;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.ByteArrayInputStream;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.InputStream; import java.io.InputStream;
@ -30,10 +27,7 @@ import java.nio.channels.FileChannel;
import java.nio.channels.FileLock; import java.nio.channels.FileLock;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Date; import java.util.Date;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
@Slf4j @Slf4j
@Service @Service
@ -46,6 +40,16 @@ public class VideoCacheManager {
private final ConcurrentMap<String, CompletableFuture<JsonResponse<String>>> tasks = new ConcurrentHashMap<>(); private final ConcurrentMap<String, CompletableFuture<JsonResponse<String>>> tasks = new ConcurrentHashMap<>();
private final PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager();
@PostConstruct
private void init(){
manager.setDefaultConnectionConfig(
ConnectionConfig.custom()
.setConnectTimeout(5, TimeUnit.MINUTES)
.build());
}
public String dateFormat(Date date){ public String dateFormat(Date date){
return DateUtil.format(date, DatePattern.PURE_DATETIME_PATTERN); return DateUtil.format(date, DatePattern.PURE_DATETIME_PATTERN);
} }
@ -54,16 +58,13 @@ public class VideoCacheManager {
return StringUtils.joinWith("-", deviceCode, dateFormat(startTime), dateFormat(endTime)); return StringUtils.joinWith("-", deviceCode, dateFormat(startTime), dateFormat(endTime));
} }
@Async(MockingExecutor.EXECUTOR_BEAN_NAME)
public void addTask(String deviceCode, Date startTime, Date endTime){ public void addTask(String deviceCode, Date startTime, Date endTime){
String name = fileName(deviceCode, startTime, endTime); String name = fileName(deviceCode, startTime, endTime);
if(tasks.get(name) != null){ if(tasks.get(name) != null){
return; return;
} }
CompletableFuture<JsonResponse<String>> future = new CompletableFuture<>(); tasks.put(name, downloadVideo(deviceCode,startTime,endTime));
tasks.put(name, future);
downloadVideo(deviceCode,startTime,endTime, future);
} }
public CompletableFuture<JsonResponse<String>> get(String deviceCode, Date startTime, Date endTime){ public CompletableFuture<JsonResponse<String>> get(String deviceCode, Date startTime, Date endTime){
@ -72,8 +73,8 @@ public class VideoCacheManager {
} }
@SneakyThrows @SneakyThrows
@Async(MockingExecutor.EXECUTOR_BEAN_NAME) protected CompletableFuture<JsonResponse<String>> downloadVideo(String deviceCode, Date startTime, Date endTime) {
protected void downloadVideo(String deviceCode, Date startTime, Date endTime, CompletableFuture<JsonResponse<String>> future) { return CompletableFuture.supplyAsync(()->{
final String url = UrlBuilder.of(deviceProxyConfig.getUrl()) final String url = UrlBuilder.of(deviceProxyConfig.getUrl())
.addPath("video") .addPath("video")
.addQuery("device_id", deviceCode) .addQuery("device_id", deviceCode)
@ -93,39 +94,25 @@ public class VideoCacheManager {
FileChannel channel = outputStream.getChannel(); FileChannel channel = outputStream.getChannel();
FileLock lock = channel.lock(); FileLock lock = channel.lock();
try (CloseableHttpAsyncClient asyncClient = HttpAsyncClients.custom().build()) { try (CloseableHttpClient client = HttpClients.custom().setConnectionManager(manager).build()) {
asyncClient.start(); HttpGet httpGet = new HttpGet(url);
SimpleHttpRequest request = SimpleRequestBuilder.get(url).build(); client.execute(httpGet, response -> {
asyncClient.execute(request, new FutureCallback<>() { InputStream stream = response.getEntity().getContent();
@SneakyThrows IoUtil.copy(stream,outputStream);
@Override return stream;
public void completed(SimpleHttpResponse response) { });
InputStream inputStream = new ByteArrayInputStream(response.getBodyBytes());
IoUtil.copy(inputStream, outputStream);
log.info("视频下载完成 => {}", file.getAbsolutePath()); log.info("视频下载完成 => {}", file.getAbsolutePath());
log.info("文件 {}, 是否存在: {}", file.getAbsolutePath(), file.exists()); log.info("文件 {}, 是否存在: {}", file.getAbsolutePath(), file.exists());
File realFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),fileName(deviceCode, startTime, endTime) + ".mp4").toFile(); File realFile = Paths.get(deviceProxyConfig.getPreDownloadForRecordInfo().getCachePath(),fileName(deviceCode, startTime, endTime) + ".mp4").toFile();
file.renameTo(realFile); file.renameTo(realFile);
lock.release(); lock.release();
future.complete(JsonResponse.success(file.getAbsolutePath())); return JsonResponse.success(realFile.getAbsolutePath());
}
@SneakyThrows
@Override
public void failed(Exception ex) {
log.info("视频下载失败 => {}, {}", file.getAbsolutePath(), url);
lock.release();
future.completeExceptionally(ex);
}
@SneakyThrows
@Override
public void cancelled() {
lock.release();
future.completeExceptionally(new JsonException("视频下载失败"));
}
});
} }
} catch (Exception e) {
log.error("视频下载失败 => {}", e.getMessage());
file.delete();
return JsonResponse.error(e.getMessage());
} }
},executor);
} }
} }