调整/调试 重试逻辑

This commit is contained in:
shikong 2023-09-09 20:53:52 +08:00
parent 85962d1373
commit 45ac5d5d56
5 changed files with 170 additions and 25 deletions

View File

@ -1,6 +1,7 @@
package cn.skcks.docking.gb28181.wvp.api.video;
import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig;
import cn.skcks.docking.gb28181.wvp.api.video.dto.VideoReq;
import cn.skcks.docking.gb28181.wvp.config.SwaggerConfig;
import cn.skcks.docking.gb28181.wvp.service.video.RecordService;
import cn.skcks.docking.gb28181.wvp.service.wvp.WvpService;
@ -10,6 +11,7 @@ import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springdoc.core.annotations.ParameterObject;
import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
@ -36,7 +38,7 @@ public class VideoController {
@Operation(summary = "获取视频 (目前仅供测试)")
@GetMapping(produces = MediaType.APPLICATION_OCTET_STREAM_VALUE)
@ResponseBody
public void video(HttpServletRequest request, HttpServletResponse response) {
wvpService.video(request,response);
public void video(HttpServletRequest request, HttpServletResponse response, @ParameterObject VideoReq req) {
wvpService.video(request,response,req.getDeviceCode(), req.getStartTime(), req.getEndTime());
}
}

View File

@ -0,0 +1,31 @@
package cn.skcks.docking.gb28181.wvp.api.video.dto;
import cn.hutool.core.date.DatePattern;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.format.annotation.DateTimeFormat;
import java.util.Date;
@NoArgsConstructor
@Data
public class VideoReq {
@NotBlank(message = "设备编码 不能为空")
@Schema(description = "设备编码")
private String deviceCode;
@Schema(description = "开始时间",example = "20230909000000")
@NotBlank(message = "开始时间 不能为空")
@DateTimeFormat(pattern= DatePattern.PURE_DATETIME_PATTERN)
@JsonFormat(pattern = DatePattern.PURE_DATETIME_PATTERN)
private Date startTime;
@Schema(description = "结束时间",example = "20230909000500")
@NotBlank(message = "结束时间 不能为空")
@DateTimeFormat(pattern= DatePattern.PURE_DATETIME_PATTERN)
@JsonFormat(pattern = DatePattern.PURE_DATETIME_PATTERN)
private Date endTime;
}

View File

@ -1,6 +1,7 @@
package cn.skcks.docking.gb28181.wvp.service.download;
import cn.hutool.core.io.IoUtil;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
@ -11,10 +12,13 @@ import org.apache.hc.client5.http.classic.methods.HttpHead;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
@Slf4j
@ -56,9 +60,7 @@ public class DownloadService {
}
@SneakyThrows
public void download(HttpServletRequest request, HttpServletResponse response, String url) {
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
public void download(AsyncContext asyncContext , HttpServletResponse response, String url) {
asyncContext.start(() -> {
try {
response.setHeader("Accept-Ranges", "none");
@ -72,7 +74,14 @@ public class DownloadService {
}
@SneakyThrows
private void download(HttpServletResponse response, String url) {
public void download(HttpServletRequest request, HttpServletResponse response, String url) {
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
download(asyncContext, response, url);
}
@SneakyThrows
public void download(HttpServletResponse response, String url) {
OutputStream outputStream = response.getOutputStream();
try (CloseableHttpClient client = HttpClients.custom().build()) {

View File

@ -1,23 +1,37 @@
package cn.skcks.docking.gb28181.wvp.service.wvp;
import cn.hutool.core.io.IoUtil;
import cn.hutool.crypto.digest.MD5;
import cn.skcks.docking.gb28181.common.json.JsonException;
import cn.skcks.docking.gb28181.common.json.JsonResponse;
import cn.skcks.docking.gb28181.common.json.JsonUtils;
import cn.skcks.docking.gb28181.wvp.config.WvpProxyConfig;
import cn.skcks.docking.gb28181.wvp.dto.device.DeviceChannel;
import cn.skcks.docking.gb28181.wvp.dto.device.GetDeviceChannelsReq;
import cn.skcks.docking.gb28181.wvp.dto.device.GetDeviceChannelsResp;
import cn.skcks.docking.gb28181.wvp.dto.login.WvpLoginReq;
import cn.skcks.docking.gb28181.wvp.dto.login.WvpLoginResp;
import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice;
import cn.skcks.docking.gb28181.wvp.proxy.WvpProxyClient;
import cn.skcks.docking.gb28181.wvp.service.device.DeviceService;
import cn.skcks.docking.gb28181.wvp.service.download.DownloadService;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import com.github.rholder.retry.*;
import jakarta.servlet.AsyncContext;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Service;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
@ -26,34 +40,123 @@ import java.util.concurrent.TimeUnit;
public class WvpService {
private final WvpProxyClient wvpProxyClient;
private final WvpProxyConfig wvpProxyConfig;
private final DeviceService deviceService;
private final DownloadService downloadService;
@SneakyThrows
public void video(HttpServletRequest request, HttpServletResponse response) {
Retryer<JsonResponse<?>> retryer = RetryerBuilder.<JsonResponse<?>>newBuilder()
/**
* 默认重试次数
*/
public final static int DEFAULT_RETRY_TIME = 3;
/**
* 默认每次重试等待时间
*/
public final static int DEFAULT_RETRY_WAIT_TIME = 3;
@SuppressWarnings("UnstableApiUsage")
private static RetryListener defaultRetryListener(){
return new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
log.info("第 {} 次 执行结束",attempt.getAttemptNumber());
if(attempt.hasException()){
log.info("异常 {}", attempt.getExceptionCause().getMessage());
}
}
};
}
@SuppressWarnings("UnstableApiUsage")
private static <T> Retryer<JsonResponse<T>> getDefaultRetryer() {
return RetryerBuilder.<JsonResponse<T>>newBuilder()
// 异常就重试
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(5, TimeUnit.SECONDS))
.withWaitStrategy(WaitStrategies.fixedWait(DEFAULT_RETRY_WAIT_TIME, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(5))
.retryIfResult(result -> result == null || (result.getCode() != 0 && result.getCode() != 200))
.withStopStrategy(StopStrategies.stopAfterAttempt(DEFAULT_RETRY_TIME))
.retryIfResult(result -> result != null && (result.getCode() != 0 || result.getCode() != 200))
.withRetryListener(defaultRetryListener())
.build();
}
@SuppressWarnings("UnstableApiUsage")
private static Retryer<JsonResponse<?>> getDefaultGenericRetryer() {
return RetryerBuilder.<JsonResponse<?>>newBuilder()
// 异常就重试
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(DEFAULT_RETRY_WAIT_TIME, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(DEFAULT_RETRY_TIME))
.retryIfResult(result -> result != null && (result.getCode() != 0 || result.getCode() != 200))
.withRetryListener(defaultRetryListener())
.build();
}
@SneakyThrows
private void writeErrorToResponse(HttpServletResponse response, JsonResponse<?> json) {
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
response.setCharacterEncoding(StandardCharsets.UTF_8.name());
response.setContentType(MediaType.APPLICATION_JSON_VALUE);
IoUtil.writeUtf8(response.getOutputStream(), false, json);
}
@SneakyThrows
public void video(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime) {
WvpProxyDevice wvpProxyDevice = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null);
if (wvpProxyDevice == null) {
writeErrorToResponse(response, JsonResponse.error("设备不存在"));
return;
}
String deviceId = wvpProxyDevice.getGbDeviceId();
String channelId = wvpProxyDevice.getGbDeviceChannelId();
log.info("设备编码 (deviceCode=>{}) 查询到的设备信息 国标id(gbDeviceId => {}), 通道(channelId => {})", deviceCode, deviceId, channelId);
Retryer<JsonResponse<?>> genericRetryer = getDefaultGenericRetryer();
String passwdMd5 = MD5.create().digestHex(wvpProxyConfig.getPasswd());
WvpLoginReq loginReq = WvpLoginReq.builder()
.username(wvpProxyConfig.getUser())
.password(passwdMd5)
.build();
retryer.call(()->{
JsonResponse<WvpLoginResp> login = wvpProxyClient.login(loginReq);
String accessToken = login.getData().getAccessToken();
log.info("wvp 登录成功 accessToken => {}", accessToken);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
asyncContext.start(() -> {
HttpServletResponse asyncResponse = (HttpServletResponse) asyncContext.getResponse();
try {
genericRetryer.call(() -> {
JsonResponse<WvpLoginResp> login = wvpProxyClient.login(loginReq);
String accessToken = login.getData().getAccessToken();
log.info("wvp 登录成功 accessToken => {}", accessToken);
downloadService.download(request,response,"http://192.168.1.241:18979/download/recordTemp/0490d767d94ce20aedce57c862b6bfe9/rtp/59777645.mp4");
return login;
log.debug("通过 wvp 查询设备 国标id(gbDeviceId => {}) 通道信息", deviceId);
JsonResponse<GetDeviceChannelsResp> deviceChannels = wvpProxyClient.getDeviceChannels(accessToken, deviceId, GetDeviceChannelsReq.builder().build());
if (deviceChannels.getData() == null || deviceChannels.getData().getTotal() == 0) {
writeErrorToResponse(asyncResponse, JsonResponse.error(MessageFormat.format("未能获取 设备:{0}, 国标id: {1}, 的通道信息", deviceCode, deviceId)));
return JsonResponse.success(null);
}
List<DeviceChannel> list = deviceChannels.getData().getList();
log.info("通过 wvp 获取到 查询设备 国标id(gbDeviceId => {}), 通道数量 => {}", deviceId, list.size());
DeviceChannel deviceChannel = list.parallelStream().filter(item -> item.getChannelId().equalsIgnoreCase(channelId)).findFirst().orElse(null);
if (deviceChannel == null) {
writeErrorToResponse(asyncResponse, JsonResponse.error(MessageFormat.format("未查询到 设备:{0}, 国标id: {1}, 通道: {2} 信息", deviceCode, deviceId, channelId)));
return JsonResponse.success(null);
}
downloadService.download(asyncResponse, "http://192.168.1.241:18979/download/recordTemp/0490d767d94ce20aedce57c862b6bfe9/rtp/59777645.mp4");
return login;
});
} catch (RetryException e) {
String reason = MessageFormat.format("查询失败, 已重试 {0} 次", e.getNumberOfFailedAttempts());
log.error(reason);
writeErrorToResponse(asyncResponse, JsonResponse.error(reason));
} catch (Exception e) {
writeErrorToResponse(asyncResponse, JsonResponse.error(e.getMessage()));
} finally {
asyncContext.complete();
}
});
}
}

View File

@ -25,8 +25,8 @@ spring:
username: root
password: 123456a
url: jdbc:mysql://192.168.1.241:3306/gb28181_docking_platform?createDatabaseIfNotExist=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
# profiles:
# active: local
profiles:
active: local
cloud:
openfeign:
httpclient: