添加重试

This commit is contained in:
shikong 2023-12-25 14:10:31 +08:00
parent b5068617bb
commit f1db3e3d1d

View File

@ -61,7 +61,7 @@ public class WvpService {
private final DownloadService downloadService; private final DownloadService downloadService;
private final VideoService videoService; private final VideoService videoService;
private final DockingService dockingService; private final DockingService dockingService;
private final TimedCache<String,String> cache = CacheUtil.newTimedCache(TimeUnit.HOURS.toMillis(1)); private final TimedCache<String, String> cache = CacheUtil.newTimedCache(TimeUnit.HOURS.toMillis(1));
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private final ConcurrentMap<String, ScheduledFuture<?>> playing = new ConcurrentHashMap<>(); private final ConcurrentMap<String, ScheduledFuture<?>> playing = new ConcurrentHashMap<>();
@ -102,7 +102,7 @@ public class WvpService {
} catch (RetryException e) { } catch (RetryException e) {
Attempt<?> failedAttempt = e.getLastFailedAttempt(); Attempt<?> failedAttempt = e.getLastFailedAttempt();
String reason; String reason;
if(failedAttempt.hasException()){ if (failedAttempt.hasException()) {
reason = MessageFormat.format("调用 wvp api 查询设备: {0} 历史失败, 已重试 {1} 次, 异常: {2}", deviceCode, e.getNumberOfFailedAttempts(), failedAttempt.getExceptionCause().getMessage()); reason = MessageFormat.format("调用 wvp api 查询设备: {0} 历史失败, 已重试 {1} 次, 异常: {2}", deviceCode, e.getNumberOfFailedAttempts(), failedAttempt.getExceptionCause().getMessage());
} else { } else {
reason = MessageFormat.format("调用 wvp api 查询设备: {0} 历史失败, 已重试 {1} 次", deviceCode, e.getNumberOfFailedAttempts()); reason = MessageFormat.format("调用 wvp api 查询设备: {0} 历史失败, 已重试 {1} 次", deviceCode, e.getNumberOfFailedAttempts());
@ -125,12 +125,12 @@ public class WvpService {
* <p>如果返回值为 null code 不为 0 200 则视为执行失败</p> * <p>如果返回值为 null code 不为 0 200 则视为执行失败</p>
* <p>如果排除异常 也视为执行失败</p> * <p>如果排除异常 也视为执行失败</p>
* *
* @param response 异步响应 * @param response 异步响应
* @param deviceCode 设备编码 21位 * @param deviceCode 设备编码 21位
* @param deviceId 国标设备编码 20位 * @param deviceId 国标设备编码 20位
* @param channelId 通道id * @param channelId 通道id
* @param startDateTime 开始时间 * @param startDateTime 开始时间
* @param endDateTime 结束时间 * @param endDateTime 结束时间
* @return JsonResponse 类型的执行结果 如果 null code 不为 0 200 则视为执行失败 * @return JsonResponse 类型的执行结果 如果 null code 不为 0 200 则视为执行失败
*/ */
@SneakyThrows @SneakyThrows
@ -176,16 +176,16 @@ public class WvpService {
}); });
List<QueryRecordResp.RecordListDTO> recordList = recordListResponse.getData(); List<QueryRecordResp.RecordListDTO> recordList = recordListResponse.getData();
recordList.forEach(record->{ recordList.forEach(record -> {
log.debug("{}", record); log.debug("{}", record);
}); });
Boolean useWvpAssist = wvpProxyConfig.getUseWvpAssist(); Boolean useWvpAssist = wvpProxyConfig.getUseWvpAssist();
log.info("准备下载 deviceCode: {}, deviceId: {}, channelId:{}, ({}~{}) 历史视频, 通过 wvp-assist: {}", deviceCode, deviceId, channelId, startTime, endTime, useWvpAssist); log.info("准备下载 deviceCode: {}, deviceId: {}, channelId:{}, ({}~{}) 历史视频, 通过 wvp-assist: {}", deviceCode, deviceId, channelId, startTime, endTime, useWvpAssist);
if(useWvpAssist){ if (useWvpAssist) {
try{ try {
downloadFromWvpAssist(response, token, deviceCode, deviceId, channelId, startTime, endTime); downloadFromWvpAssist(response, token, deviceCode, deviceId, channelId, startTime, endTime);
} catch (Exception e){ } catch (Exception e) {
log.warn("尝试通过 wvp-assist 下载视频失败, 尝试通过 视频回放 拉取视频"); log.warn("尝试通过 wvp-assist 下载视频失败, 尝试通过 视频回放 拉取视频");
downloadFromPlayback(response, token, deviceId, channelId, startTime, endTime); downloadFromPlayback(response, token, deviceId, channelId, startTime, endTime);
} }
@ -196,7 +196,7 @@ public class WvpService {
return JsonResponse.success(null); return JsonResponse.success(null);
} }
private void downloadFromWvpAssist(HttpServletResponse response, String token, String deviceCode, String deviceId, String channelId, String startTime, String endTime){ private void downloadFromWvpAssist(HttpServletResponse response, String token, String deviceCode, String deviceId, String channelId, String startTime, String endTime) {
JsonResponse<String> videoPathResponse = downloadFromWvpAssist(deviceCode, deviceId, channelId, startTime, endTime, token); JsonResponse<String> videoPathResponse = downloadFromWvpAssist(deviceCode, deviceId, channelId, startTime, endTime, token);
String videoUrl = videoPathResponse.getData(); String videoUrl = videoPathResponse.getData();
log.info("设备(deviceCode {}) (deviceId {}, channel{}) ({} ~ {}) 视频下载地址 {}", deviceCode, deviceId, channelId, startTime, endTime, videoUrl); log.info("设备(deviceCode {}) (deviceId {}, channel{}) ({} ~ {}) 视频下载地址 {}", deviceCode, deviceId, channelId, startTime, endTime, videoUrl);
@ -204,7 +204,7 @@ public class WvpService {
} }
@SneakyThrows @SneakyThrows
private void downloadFromPlayback(HttpServletResponse response, String token, String deviceId, String channelId, String startTime, String endTime){ private void downloadFromPlayback(HttpServletResponse response, String token, String deviceId, String channelId, String startTime, String endTime) {
Retryer<JsonResponse<StreamContent>> playBackRetryer = RetryUtil Retryer<JsonResponse<StreamContent>> playBackRetryer = RetryUtil
.<StreamContent>getDefaultRetryerBuilder("通过回放获取实时视频流下载", 100, TimeUnit.MILLISECONDS, 5) .<StreamContent>getDefaultRetryerBuilder("通过回放获取实时视频流下载", 100, TimeUnit.MILLISECONDS, 5)
.build(); .build();
@ -341,6 +341,8 @@ public class WvpService {
return deviceChannel; return deviceChannel;
} }
@SneakyThrows
@SuppressWarnings("UnstableApiUsage")
public DeferredResult<JsonResponse<String>> realtimeVideoUrl(String deviceCode) { public DeferredResult<JsonResponse<String>> realtimeVideoUrl(String deviceCode) {
DeferredResult<JsonResponse<String>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(60)); DeferredResult<JsonResponse<String>> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(60));
result.onTimeout(() -> { result.onTimeout(() -> {
@ -349,12 +351,30 @@ public class WvpService {
}); });
ScheduledFuture<?> schedule = playing.get(deviceCode); ScheduledFuture<?> schedule = playing.get(deviceCode);
if(schedule != null){ if (schedule != null) {
schedule.cancel(true); schedule.cancel(true);
} }
String token = login(); String token = login();
Optional<DeviceChannel> deviceChannel = getDeviceChannelByDeviceCode(token, deviceCode); Retryer<Optional<DeviceChannel>> defaultGenericRetryer = RetryerBuilder.<Optional<DeviceChannel>>newBuilder()
// 异常就重试
.retryIfException()
.retryIfRuntimeException()
// 重试间隔
.withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
// 重试次数
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.hasException()) {
log.info("异常 {}", attempt.getExceptionCause().getMessage());
cache.remove("token");
}
}
}).build();
Optional<DeviceChannel> deviceChannel = defaultGenericRetryer.call(() -> getDeviceChannelByDeviceCode(token, deviceCode));
if (deviceChannel.isEmpty()) { if (deviceChannel.isEmpty()) {
result.setResult(JsonResponse.error(MessageFormat.format("未能获取 设备: {0} 的通道信息", deviceCode))); result.setResult(JsonResponse.error(MessageFormat.format("未能获取 设备: {0} 的通道信息", deviceCode)));
return result; return result;
@ -372,7 +392,7 @@ public class WvpService {
StringUtils.joinWith(".", data.getStream(), "live", "flv")); StringUtils.joinWith(".", data.getStream(), "live", "flv"));
// 定时任务 // 定时任务
schedule = scheduledExecutorService.schedule(()->{ schedule = scheduledExecutorService.schedule(() -> {
log.info("[定时任务] 关闭设备(deviceCode => {}) 视频", deviceCode); log.info("[定时任务] 关闭设备(deviceCode => {}) 视频", deviceCode);
playing.remove(deviceCode); playing.remove(deviceCode);
closeRealtimeVideo(deviceCode); closeRealtimeVideo(deviceCode);
@ -389,9 +409,9 @@ public class WvpService {
return result; return result;
} }
public void closeRealtimeVideo(String deviceCode){ public void closeRealtimeVideo(String deviceCode) {
String token = login(); String token = login();
Optional<DeviceChannel> deviceChannel = getDeviceChannelByDeviceCode(token, deviceCode); Optional<DeviceChannel> deviceChannel = getDeviceChannelByDeviceCode(token, deviceCode);
if (deviceChannel.isEmpty()) { if (deviceChannel.isEmpty()) {
return; return;
} }
@ -399,12 +419,12 @@ public class WvpService {
DeviceChannel dc = deviceChannel.get(); DeviceChannel dc = deviceChannel.get();
log.info("设备通道信息 => {}", dc); log.info("设备通道信息 => {}", dc);
wvpProxyClient.playStop(token, dc.getDeviceId(), dc.getChannelId(),false); wvpProxyClient.playStop(token, dc.getDeviceId(), dc.getChannelId(), false);
wvpProxyClient.playStop(token, dc.getDeviceId(), dc.getChannelId(),true); wvpProxyClient.playStop(token, dc.getDeviceId(), dc.getChannelId(), true);
} }
private String login(){ private String login() {
return cache.get("token",false,()->{ return cache.get("token", false, () -> {
String passwdMd5 = MD5.create().digestHex(wvpProxyConfig.getPasswd()); String passwdMd5 = MD5.create().digestHex(wvpProxyConfig.getPasswd());
WvpLoginReq loginReq = WvpLoginReq.builder() WvpLoginReq loginReq = WvpLoginReq.builder()
.username(wvpProxyConfig.getUser()) .username(wvpProxyConfig.getUser())