初步完成下载流程, 未测试

This commit is contained in:
shikong 2023-09-10 04:33:28 +08:00
parent 330e279347
commit dfec2e8754
4 changed files with 194 additions and 63 deletions

View File

@ -2,10 +2,14 @@ package cn.skcks.docking.gb28181.wvp.dto.download;
import cn.hutool.core.date.DatePattern;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class DownloadStartReq {
@ -17,5 +21,6 @@ public class DownloadStartReq {
@JsonFormat(pattern = DatePattern.NORM_DATETIME_PATTERN)
private String endTime;
@Builder.Default
private int downloadSpeed = 4;
}

View File

@ -1,8 +1,12 @@
package cn.skcks.docking.gb28181.wvp.dto.media.proxy;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class GetDownloadTaskReq {
/**

View File

@ -5,19 +5,24 @@ import cn.hutool.core.io.IoUtil;
import cn.hutool.crypto.digest.MD5;
import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig;
import cn.skcks.docking.gb28181.wvp.dto.device.DeviceChannel;
import cn.skcks.docking.gb28181.wvp.dto.device.GetDeviceChannelsReq;
import cn.skcks.docking.gb28181.wvp.dto.device.GetDeviceChannelsResp;
import cn.skcks.docking.gb28181.wvp.dto.download.DownloadStartReq;
import cn.skcks.docking.gb28181.wvp.dto.login.WvpLoginReq;
import cn.skcks.docking.gb28181.wvp.dto.login.WvpLoginResp;
import cn.skcks.docking.gb28181.wvp.dto.media.proxy.AddDownloadTaskReq;
import cn.skcks.docking.gb28181.wvp.dto.media.proxy.GetDownloadTaskReq;
import cn.skcks.docking.gb28181.wvp.dto.media.proxy.GetDownloadTaskResp;
import cn.skcks.docking.gb28181.wvp.dto.record.QueryRecordReq;
import cn.skcks.docking.gb28181.wvp.dto.record.QueryRecordResp;
import cn.skcks.docking.gb28181.wvp.dto.stream.StreamContent;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.proxy.WvpProxyClient;
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
import cn.skcks.docking.gb28181.wvp.service.download.DownloadService;
import cn.skcks.docking.gb28181.wvp.utils.RetryUtil;
import com.github.rholder.retry.*;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.http.HttpServletRequest;
@ -25,19 +30,16 @@ import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@Service
@ -48,57 +50,7 @@ public class WvpService {
private final DeviceService deviceService;
private final DownloadService downloadService;
/**
* 默认重试次数
*/
public final static int DEFAULT_RETRY_TIME = 3;
/**
* 默认每次重试等待时间
*/
public final static int DEFAULT_RETRY_WAIT_TIME = 3;
@SuppressWarnings("UnstableApiUsage")
private static RetryListener defaultRetryListener(String name) {
return new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
log.info("第 {} 次 执行 {} 结束", attempt.getAttemptNumber(), name);
if (attempt.hasException()) {
log.info("执行 {} 异常 {}", name, attempt.getExceptionCause().getMessage());
}
}
};
}
@SuppressWarnings("UnstableApiUsage")
private static <T> Retryer<JsonResponse<T>> getDefaultRetryer(String name) {
return RetryerBuilder.<JsonResponse<T>>newBuilder()
// 异常就重试
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(DEFAULT_RETRY_WAIT_TIME, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(DEFAULT_RETRY_TIME))
.retryIfResult(result -> result == null || (result.getCode() != 0 && result.getCode() != 200))
.withRetryListener(defaultRetryListener(name))
.build();
}
@SuppressWarnings("UnstableApiUsage")
private static Retryer<JsonResponse<?>> getDefaultGenericRetryer(String name) {
return RetryerBuilder.<JsonResponse<?>>newBuilder()
// 异常就重试
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(DEFAULT_RETRY_WAIT_TIME, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(DEFAULT_RETRY_TIME))
.retryIfResult(result -> result == null || (result.getCode() != 0 && result.getCode() != 200))
.withRetryListener(defaultRetryListener(name))
.build();
}
@SneakyThrows
private void writeErrorToResponse(HttpServletResponse response, JsonResponse<?> json) {
@ -119,7 +71,7 @@ public class WvpService {
String channelId = wvpProxyDevice.getGbDeviceChannelId();
log.info("设备编码 (deviceCode=>{}) 查询到的设备信息 国标id(gbDeviceId => {}), 通道(channelId => {})", deviceCode, deviceId, channelId);
Retryer<JsonResponse<?>> genericRetryer = getDefaultGenericRetryer("调用 wvp api 查询设备历史");
Retryer<JsonResponse<?>> genericRetryer = RetryUtil.getDefaultGenericRetryer("调用 wvp api 查询设备历史");
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
asyncContext.start(() -> {
@ -155,12 +107,13 @@ public class WvpService {
* @param deviceCode 设备编码 21位
* @param deviceId 国标设备编码 20位
* @param channelId 通道id
* @param startTime 开始时间
* @param endTime 结束时间
* @param startDateTime 开始时间
* @param endDateTime 结束时间
* @return JsonResponse 类型的执行结果 如果 null code 不为 0 200 则视为执行失败
*/
@SuppressWarnings("UnstableApiUsage")
@SneakyThrows
public JsonResponse<?> video(HttpServletResponse response, String deviceCode, String deviceId, String channelId, Date startTime, Date endTime) {
public JsonResponse<?> video(HttpServletResponse response, String deviceCode, String deviceId, String channelId, Date startDateTime, Date endDateTime) {
String passwdMd5 = MD5.create().digestHex(wvpProxyConfig.getPasswd());
WvpLoginReq loginReq = WvpLoginReq.builder()
.username(wvpProxyConfig.getUser())
@ -184,9 +137,11 @@ public class WvpService {
return JsonResponse.success(null);
}
Retryer<JsonResponse<List<QueryRecordResp.RecordListDTO>>> queryRecordRetryer = getDefaultRetryer("调用 wvp 设备历史查询 api");
String startTime = DateUtil.formatDateTime(startDateTime);
String endTime = DateUtil.formatDateTime(endDateTime);
Retryer<JsonResponse<List<QueryRecordResp.RecordListDTO>>> queryRecordRetryer = RetryUtil.getDefaultRetryer("调用 wvp 设备历史查询 api");
JsonResponse<List<QueryRecordResp.RecordListDTO>> recordListResponse = queryRecordRetryer.call(() -> {
JsonResponse<QueryRecordResp> queryRecord = wvpProxyClient.queryRecord(token, deviceId, channelId, new QueryRecordReq(DateUtil.formatDateTime(startTime), DateUtil.formatDateTime(endTime)));
JsonResponse<QueryRecordResp> queryRecord = wvpProxyClient.queryRecord(token, deviceId, channelId, new QueryRecordReq(startTime, endTime));
QueryRecordResp queryRecordData = queryRecord.getData();
if (queryRecordData == null) {
String reason = MessageFormat.format("通过 wvp 查询历史录像 失败 设备: {0}, 国标id: {1}, 通道: {2}, 错误信息: {3}", deviceCode, deviceId, channelId, queryRecord.getMsg());
@ -202,12 +157,85 @@ public class WvpService {
log.info("通过 wvp 查询到 {} 条历史录像 设备: {}, 国标id: {}, 通道: {}, 开始时间: {}, 结束时间: {}", recordList.size(), deviceCode, deviceId, channelId, startTime, endTime);
return JsonResponse.success(recordList);
});
List<QueryRecordResp.RecordListDTO> recordList = recordListResponse.getData();
recordList.forEach(record->{
log.info("{}", record);
log.debug("{}", record);
});
downloadService.download(response, "http://192.168.1.241:18979/download/recordTemp/0490d767d94ce20aedce57c862b6bfe9/rtp/59777645.mp4");
Retryer<JsonResponse<String>> downloadRetryer = RetryUtil.getDefaultRetryer("调用 wvp 设备历史视频下载 api");
JsonResponse<String> videoPathResponse = downloadRetryer.call(() -> {
JsonResponse<StreamContent> downloadStart = wvpProxyClient.downloadStart(token, deviceId, channelId, DownloadStartReq.builder()
.startTime(startTime)
.endTime(endTime)
.build());
if(downloadStart.getData() == null){
throw new JsonException(downloadStart.getMsg());
}
StreamContent downloadStartData = downloadStart.getData();
String mediaServerId = downloadStartData.getMediaServerId();
String app = downloadStartData.getApp();
String stream = downloadStartData.getStream();
log.info("开始下载 mediaServerId: {}, app: {}, stream: {}", mediaServerId, app, stream);
AtomicLong keepTime = new AtomicLong();
Double downloadProgress = 0D;
Retryer<JsonResponse<StreamContent>> downloadProgressRetryer = RetryUtil.<StreamContent>getDefaultRetryerBuilder(MessageFormat.format("查询设备(deviceCode {0}) (deviceId {1}, channel{2}) ({3} ~ {4}) 历史视频下载进度", deviceCode, deviceId, channelId, startTime, endTime))
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(1, TimeUnit.SECONDS))
.retryIfResult(result -> {
keepTime.getAndIncrement();
if (keepTime.get() > 60) {
return false;
}
if (result == null || result.getData() == null) {
return true;
}
if (!result.getData().getProgress().equals(downloadProgress)) {
keepTime.set(0);
return true;
}
return result.getData().getProgress().equals(downloadProgress) && keepTime.get() <= 60;
})
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.hasResult()) {
log.debug("第 {} 次 获取 设备(deviceCode {}) (deviceId {}, channel{}) ({} ~ {}) 历史视频下载进度 mediaServerId: {}, app: {}, stream: {}", attempt.getAttemptNumber(), deviceCode, deviceId, channelId, startTime, endTime, mediaServerId, app, stream);
} else {
log.debug("第 {} 次 获取 设备(deviceCode {}) (deviceId {}, channel{}) ({} ~ {}) 历史视频下载进度失败 mediaServerId: {}, app: {}, stream: {}", attempt.getAttemptNumber(), deviceCode, deviceId, channelId, startTime, endTime, mediaServerId, app, stream);
}
}
})
.withStopStrategy(StopStrategies.neverStop())
.build();
downloadProgressRetryer.call(() -> {
JsonResponse<StreamContent> downloadProgressResp = wvpProxyClient.downloadProgress(token, deviceId, channelId, stream);
StreamContent data = downloadProgressResp.getData();
log.debug("设备(deviceCode {}) (deviceId {}, channel{}) ({} ~ {}) 历史视频下载进度 {}% mediaServerId: {}, app: {}, stream: {}", deviceCode, deviceId, channelId, startTime, endTime, data.getProgress(), mediaServerId, app, stream);
return downloadProgressResp;
});
wvpProxyClient.downloadStop(token, deviceId, channelId, stream);
JsonResponse<String> addDownloadTask2MediaServer = wvpProxyClient.addDownloadTask2MediaServer(token, mediaServerId, new AddDownloadTaskReq(app, stream));
String taskId = addDownloadTask2MediaServer.getData();
Retryer<JsonResponse<String>> mediaServerRetryer = RetryUtil.<String>getDefaultRetryerBuilder("从 mediaServer 获取视频")
.withStopStrategy(StopStrategies.stopAfterAttempt(10))
.build();
return mediaServerRetryer.call(() -> {
JsonResponse<List<GetDownloadTaskResp>> downloadTask4MediaServer = wvpProxyClient.getDownloadTask4MediaServer(token, mediaServerId, new GetDownloadTaskReq(app, stream, taskId, true));
if (downloadTask4MediaServer.getData().size() > 0) {
return JsonResponse.success(downloadTask4MediaServer.getData().get(0).getPlayFile());
} else {
return JsonResponse.error(null);
}
});
});
String videoUrl = videoPathResponse.getData();
log.info("设备(deviceCode {}) (deviceId {}, channel{}) ({} ~ {}) 视频下载地址 {}", deviceCode, deviceId, channelId, startTime, endTime, videoUrl);
downloadService.download(response, videoUrl);
return login;
}
}

View File

@ -0,0 +1,94 @@
package cn.skcks.docking.gb28181.wvp.utils;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import com.github.rholder.retry.*;
import com.google.common.base.Predicate;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
@SuppressWarnings("UnstableApiUsage")
public class RetryUtil {
/**
* 默认重试次数
*/
public final static int DEFAULT_RETRY_TIME = 3;
/**
* 默认每次重试等待时间
*/
public final static long DEFAULT_RETRY_INTERVAL = 3;
public final static TimeUnit DEFAULT_RETRY_INTERVAL_UNIT = TimeUnit.SECONDS;
public static RetryListener defaultRetryListener(String name) {
return new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
log.info("第 {} 次 执行 {} 结束", attempt.getAttemptNumber(), name);
if (attempt.hasException()) {
log.info("执行 {} 异常 {}", name, attempt.getExceptionCause().getMessage());
}
}
};
}
public static <T> Predicate<JsonResponse<T>> defaultRetryIf(){
return result -> result == null || (result.getCode() != 0 && result.getCode() != 200);
}
public static Predicate<JsonResponse<?>> defaultGenericRetryIf(){
return result -> result == null || (result.getCode() != 0 && result.getCode() != 200);
}
public static <T> RetryerBuilder<JsonResponse<T>> getDefaultRetryerBuilder(String name) {
return RetryerBuilder.<JsonResponse<T>>newBuilder()
// 异常就重试
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(DEFAULT_RETRY_INTERVAL, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(DEFAULT_RETRY_TIME))
.retryIfResult(defaultRetryIf())
.withRetryListener(defaultRetryListener(name));
}
public static <T> Retryer<JsonResponse<T>> getDefaultRetryer(String name) {
return RetryUtil.<T>getDefaultRetryerBuilder(name).build();
}
public static RetryerBuilder<JsonResponse<?>> getDefaultGenericRetryerBuilder(String name, int retryTime, long retryInterval, TimeUnit retryTimeUnit,RetryListener retryListener) {
return RetryerBuilder.<JsonResponse<?>>newBuilder()
// 异常就重试
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(retryInterval, retryTimeUnit))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(retryTime))
.retryIfResult(defaultGenericRetryIf())
.withRetryListener(retryListener);
}
public static RetryerBuilder<JsonResponse<?>> getDefaultGenericRetryerBuilder(String name) {
return getDefaultGenericRetryerBuilder(name, DEFAULT_RETRY_TIME, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_INTERVAL_UNIT, defaultRetryListener(name));
}
public static Retryer<JsonResponse<?>> getDefaultGenericRetryer(String name, int retryTime, long retryInterval, TimeUnit retryTimeUnit, RetryListener retryListener) {
return getDefaultGenericRetryerBuilder(name, retryTime, retryInterval, retryTimeUnit, retryListener).build();
}
public static Retryer<JsonResponse<?>> getDefaultGenericRetryer(String name, int retryTime, long retryInterval, TimeUnit retryTimeUnit) {
return getDefaultGenericRetryer(name, retryTime, retryInterval, retryTimeUnit, defaultRetryListener(name));
}
public static Retryer<JsonResponse<?>> getDefaultGenericRetryer(String name, int retryTime, long retryInterval) {
return getDefaultGenericRetryer(name, retryTime, retryInterval, DEFAULT_RETRY_INTERVAL_UNIT);
}
public static Retryer<JsonResponse<?>> getDefaultGenericRetryer(String name) {
return getDefaultGenericRetryer(name, DEFAULT_RETRY_TIME, DEFAULT_RETRY_INTERVAL);
}
}