特定时间范围的历史视频查询请求 预拉取视频响应

改为异步等待完成再返回 如果1分钟内未能完成再 返回 空录像
This commit is contained in:
shikong 2024-03-07 10:47:18 +08:00
parent b42b8be747
commit 4b55f7fc54

View File

@ -4,9 +4,11 @@ import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.xml.XmlUtils;
import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.query.dto.RecordInfoRequestDTO;
import cn.skcks.docking.gb28181.mocking.config.sip.DeviceProxyConfig;
import cn.skcks.docking.gb28181.mocking.core.sip.executor.MockingExecutor;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordInfoItemDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordInfoResponseDTO;
import cn.skcks.docking.gb28181.mocking.core.sip.message.processor.message.request.recordinfo.dto.RecordListDTO;
@ -20,6 +22,7 @@ import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import javax.sip.header.CallIdHeader;
@ -29,6 +32,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
@Slf4j
@RequiredArgsConstructor
@ -38,24 +45,27 @@ public class RecordInfoRequestProcessor {
private final DeviceService deviceService;
private final DeviceProxyConfig deviceProxyConfig;
private final VideoCacheManager videoCacheManager;
@Qualifier(MockingExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
public void process(SIPRequest request, byte[] content) {
String senderIp = request.getLocalAddress().getHostAddress();
String transport = request.getTopmostViaHeader().getTransport();
RecordInfoRequestDTO recordInfoRequestDTO = XmlUtils.parse(content, RecordInfoRequestDTO.class);
String id = recordInfoRequestDTO.getDeviceId();
deviceService.getDeviceByGbChannelId(id).ifPresentOrElse((device) -> {
if(preDownloadVideo(device.getDeviceCode(), recordInfoRequestDTO)){
sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
} else {
sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
processRecordInfoRespWithVideoCacheTask(request, senderIp, transport, device, recordInfoRequestDTO);
}
}, () -> {
deviceService.getDeviceByGbChannelId(id).ifPresentOrElse((device) -> {
if(preDownloadVideo(device.getDeviceCode(), recordInfoRequestDTO)){
sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
} else {
sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
processRecordInfoRespWithVideoCacheTask(request, senderIp, transport, device, recordInfoRequestDTO);
}
}, () -> {
log.error("未能找到 deviceId: {} 的相关信息", id);
@ -64,6 +74,24 @@ public class RecordInfoRequestProcessor {
});
}
private void processRecordInfoRespWithVideoCacheTask(SIPRequest request, String senderIp, String transport, MockingDevice device, RecordInfoRequestDTO recordInfoRequestDTO){
CompletableFuture<Runnable> future = CompletableFuture.supplyAsync(()-> {
try {
preDownloadVideoTask(device.getDeviceCode(), recordInfoRequestDTO).get();
return ()-> sendRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
} catch (InterruptedException | ExecutionException e) {
log.error("preDownloadVideoTask error",e);
return() -> sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport);
}
}, executor);
future.completeOnTimeout(() -> sendEmptyRecordInfo(device, recordInfoRequestDTO, request, senderIp, transport),
1, TimeUnit.MINUTES);
future.thenApplyAsync(fn->{
fn.run();
return null;
});
}
private boolean preDownloadVideo(String deviceCode, RecordInfoRequestDTO recordInfoRequestDTO){
if(!deviceProxyConfig.getPreDownloadForRecordInfo().getEnable()){
return true;
@ -75,10 +103,16 @@ public class RecordInfoRequestProcessor {
return true;
}
return preDownloadVideoTask(deviceCode,recordInfoRequestDTO).isDone();
}
private CompletableFuture<JsonResponse<String>> preDownloadVideoTask(String deviceCode, RecordInfoRequestDTO recordInfoRequestDTO){
Date startTime = recordInfoRequestDTO.getStartTime();
Date endTime = recordInfoRequestDTO.getEndTime();
// 添加预下载任务
videoCacheManager.addTask(deviceCode,startTime,endTime);
return videoCacheManager.get(deviceCode,startTime,endTime).isDone();
return videoCacheManager.get(deviceCode,startTime,endTime);
}
private void sendRecordInfo(MockingDevice device, RecordInfoRequestDTO recordInfoRequestDTO, SIPRequest request, String senderIp, String transport) {