From f1db3e3d1d29cc782bd1dfea4a26c5f9a7812bfa Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Mon, 25 Dec 2023 14:10:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=87=8D=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/wvp/service/wvp/WvpService.java | 66 ++++++++++++------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java index 5117215..95a5c6f 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java @@ -61,7 +61,7 @@ public class WvpService { private final DownloadService downloadService; private final VideoService videoService; private final DockingService dockingService; - private final TimedCache cache = CacheUtil.newTimedCache(TimeUnit.HOURS.toMillis(1)); + private final TimedCache cache = CacheUtil.newTimedCache(TimeUnit.HOURS.toMillis(1)); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentMap> playing = new ConcurrentHashMap<>(); @@ -102,7 +102,7 @@ public class WvpService { } catch (RetryException e) { Attempt failedAttempt = e.getLastFailedAttempt(); String reason; - if(failedAttempt.hasException()){ + if (failedAttempt.hasException()) { reason = MessageFormat.format("调用 wvp api 查询设备: {0} 历史失败, 已重试 {1} 次, 异常: {2}", deviceCode, e.getNumberOfFailedAttempts(), failedAttempt.getExceptionCause().getMessage()); } else { reason = MessageFormat.format("调用 wvp api 查询设备: {0} 历史失败, 已重试 {1} 次", deviceCode, e.getNumberOfFailedAttempts()); @@ -125,12 +125,12 @@ public class WvpService { *

如果返回值为 null 或 code 不为 0 或 200 则视为执行失败

*

如果排除异常 也视为执行失败

* - * @param response 异步响应 - * @param deviceCode 设备编码 21位 - * @param deviceId 国标设备编码 20位 - * @param channelId 通道id - * @param startDateTime 开始时间 - * @param endDateTime 结束时间 + * @param response 异步响应 + * @param deviceCode 设备编码 21位 + * @param deviceId 国标设备编码 20位 + * @param channelId 通道id + * @param startDateTime 开始时间 + * @param endDateTime 结束时间 * @return JsonResponse 类型的执行结果 如果 为 null 或 code 不为 0 或 200 则视为执行失败 */ @SneakyThrows @@ -176,16 +176,16 @@ public class WvpService { }); List recordList = recordListResponse.getData(); - recordList.forEach(record->{ + recordList.forEach(record -> { log.debug("{}", record); }); Boolean useWvpAssist = wvpProxyConfig.getUseWvpAssist(); log.info("准备下载 deviceCode: {}, deviceId: {}, channelId:{}, ({}~{}) 历史视频, 通过 wvp-assist: {}", deviceCode, deviceId, channelId, startTime, endTime, useWvpAssist); - if(useWvpAssist){ - try{ + if (useWvpAssist) { + try { downloadFromWvpAssist(response, token, deviceCode, deviceId, channelId, startTime, endTime); - } catch (Exception e){ + } catch (Exception e) { log.warn("尝试通过 wvp-assist 下载视频失败, 尝试通过 视频回放 拉取视频"); downloadFromPlayback(response, token, deviceId, channelId, startTime, endTime); } @@ -196,7 +196,7 @@ public class WvpService { return JsonResponse.success(null); } - private void downloadFromWvpAssist(HttpServletResponse response, String token, String deviceCode, String deviceId, String channelId, String startTime, String endTime){ + private void downloadFromWvpAssist(HttpServletResponse response, String token, String deviceCode, String deviceId, String channelId, String startTime, String endTime) { JsonResponse videoPathResponse = downloadFromWvpAssist(deviceCode, deviceId, channelId, startTime, endTime, token); String videoUrl = videoPathResponse.getData(); log.info("设备(deviceCode {}) (deviceId {}, channel{}) ({} ~ {}) 视频下载地址 {}", deviceCode, deviceId, channelId, startTime, endTime, videoUrl); @@ -204,7 +204,7 @@ public class WvpService { } @SneakyThrows - private void downloadFromPlayback(HttpServletResponse response, String token, String deviceId, String channelId, String startTime, String endTime){ + private void downloadFromPlayback(HttpServletResponse response, String token, String deviceId, String channelId, String startTime, String endTime) { Retryer> playBackRetryer = RetryUtil .getDefaultRetryerBuilder("通过回放获取实时视频流下载", 100, TimeUnit.MILLISECONDS, 5) .build(); @@ -341,6 +341,8 @@ public class WvpService { return deviceChannel; } + @SneakyThrows + @SuppressWarnings("UnstableApiUsage") public DeferredResult> realtimeVideoUrl(String deviceCode) { DeferredResult> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(60)); result.onTimeout(() -> { @@ -349,12 +351,30 @@ public class WvpService { }); ScheduledFuture schedule = playing.get(deviceCode); - if(schedule != null){ + if (schedule != null) { schedule.cancel(true); } String token = login(); - Optional deviceChannel = getDeviceChannelByDeviceCode(token, deviceCode); + Retryer> defaultGenericRetryer = RetryerBuilder.>newBuilder() + // 异常就重试 + .retryIfException() + .retryIfRuntimeException() + // 重试间隔 + .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS)) + // 重试次数 + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .withRetryListener(new RetryListener() { + @Override + public void onRetry(Attempt attempt) { + if (attempt.hasException()) { + log.info("异常 {}", attempt.getExceptionCause().getMessage()); + cache.remove("token"); + } + } + }).build(); + Optional deviceChannel = defaultGenericRetryer.call(() -> getDeviceChannelByDeviceCode(token, deviceCode)); + if (deviceChannel.isEmpty()) { result.setResult(JsonResponse.error(MessageFormat.format("未能获取 设备: {0} 的通道信息", deviceCode))); return result; @@ -372,7 +392,7 @@ public class WvpService { StringUtils.joinWith(".", data.getStream(), "live", "flv")); // 定时任务 - schedule = scheduledExecutorService.schedule(()->{ + schedule = scheduledExecutorService.schedule(() -> { log.info("[定时任务] 关闭设备(deviceCode => {}) 视频", deviceCode); playing.remove(deviceCode); closeRealtimeVideo(deviceCode); @@ -389,9 +409,9 @@ public class WvpService { return result; } - public void closeRealtimeVideo(String deviceCode){ + public void closeRealtimeVideo(String deviceCode) { String token = login(); - Optional deviceChannel = getDeviceChannelByDeviceCode(token, deviceCode); + Optional deviceChannel = getDeviceChannelByDeviceCode(token, deviceCode); if (deviceChannel.isEmpty()) { return; } @@ -399,12 +419,12 @@ public class WvpService { DeviceChannel dc = deviceChannel.get(); log.info("设备通道信息 => {}", dc); - wvpProxyClient.playStop(token, dc.getDeviceId(), dc.getChannelId(),false); - wvpProxyClient.playStop(token, dc.getDeviceId(), dc.getChannelId(),true); + wvpProxyClient.playStop(token, dc.getDeviceId(), dc.getChannelId(), false); + wvpProxyClient.playStop(token, dc.getDeviceId(), dc.getChannelId(), true); } - private String login(){ - return cache.get("token",false,()->{ + private String login() { + return cache.get("token", false, () -> { String passwdMd5 = MD5.create().digestHex(wvpProxyConfig.getPasswd()); WvpLoginReq loginReq = WvpLoginReq.builder() .username(wvpProxyConfig.getUser())