From b5068617bb0adfcf2971ceb848b624e73b33f03a Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Thu, 21 Dec 2023 15:33:13 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=20=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E8=A7=86=E9=A2=91=E8=8E=B7=E5=8F=96=20=E6=94=B9=E4=B8=BA=20?= =?UTF-8?q?=E8=B0=83=E7=94=A8=20wvp=20=E6=8E=A5=E5=8F=A3=20=E8=8E=B7?= =?UTF-8?q?=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../wvp/api/video/VideoController.java | 4 +- .../gb28181/wvp/config/WvpProxyConfig.java | 13 ++ .../gb28181/wvp/dto/stream/StreamContent.java | 3 + .../gb28181/wvp/proxy/WvpProxyClient.java | 11 ++ .../gb28181/wvp/service/wvp/WvpService.java | 136 ++++++++++++++++-- .../src/main/resources/application.yml | 7 +- 6 files changed, 161 insertions(+), 13 deletions(-) diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java index 4969b7c..3bfd87e 100644 --- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java @@ -82,14 +82,14 @@ public class VideoController { @GetMapping(value = "/device/realtime") @ResponseBody public DeferredResult> realtime(@ParameterObject RealtimeVideoReq req) { - return gb28181DownloadService.realtimeVideoUrl(req.getDeviceCode()); + return wvpService.realtimeVideoUrl(req.getDeviceCode()); } @Operation(summary = "关闭实时视频") @GetMapping(value = "/device/realtime/close") @ResponseBody public JsonResponse close(@ParameterObject RealtimeVideoReq req) { - gb28181DownloadService.closeRealtimeVideoNow(req.getDeviceCode()); + wvpService.closeRealtimeVideo(req.getDeviceCode()); return JsonResponse.success(null); } } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/WvpProxyConfig.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/WvpProxyConfig.java index c3abd48..e672d27 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/WvpProxyConfig.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/config/WvpProxyConfig.java @@ -4,6 +4,9 @@ import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; +import java.util.ArrayList; +import java.util.List; + @Configuration @ConfigurationProperties(prefix = "proxy.wvp") @Data @@ -21,4 +24,14 @@ public class WvpProxyConfig { * 是否使用 ffmpeg 编/解码, 否则使用内置 javacv */ private Boolean useFfmpeg = false; + + /** + * 需要通过 wvp 代理的 (wvp的上级) 上级平台 + */ + private List parents = new ArrayList<>(); + + /** + * 用于生成 代理 wvp 的 视频流 ws-flv 地址 + */ + private String proxyMediaUrl = ""; } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/dto/stream/StreamContent.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/dto/stream/StreamContent.java index 7f60084..760f8af 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/dto/stream/StreamContent.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/dto/stream/StreamContent.java @@ -2,12 +2,15 @@ package cn.skcks.docking.gb28181.wvp.dto.stream; import cn.hutool.core.date.DatePattern; import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonNaming; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.format.annotation.DateTimeFormat; import java.util.List; +@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class) @NoArgsConstructor @Data public class StreamContent { diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/proxy/WvpProxyClient.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/proxy/WvpProxyClient.java index 3c7b1e0..4fe0e7b 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/proxy/WvpProxyClient.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/proxy/WvpProxyClient.java @@ -21,6 +21,7 @@ import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RequestParam; import java.util.List; @@ -111,4 +112,14 @@ public interface WvpProxyClient { JsonResponse> getDownloadTask4MediaServer(@RequestHeader("access-token") String token, @PathVariable String mediaServerId, @SpringQueryMap GetDownloadTaskReq req); + + @GetMapping("/api/play/start/{deviceId}/{channelId}") + JsonResponse playStart(@RequestHeader("access-token") String token, + @PathVariable String deviceId, + @PathVariable String channelId); + @GetMapping("/api/play/stop/{deviceId}/{channelId}") + JsonResponse playStop(@RequestHeader("access-token") String token, + @PathVariable String deviceId, + @PathVariable String channelId, + @RequestParam boolean isSubStream); } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java index 43f9338..5117215 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/wvp/WvpService.java @@ -1,5 +1,7 @@ package cn.skcks.docking.gb28181.wvp.service.wvp; +import cn.hutool.cache.CacheUtil; +import cn.hutool.cache.impl.TimedCache; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import cn.hutool.core.io.IoUtil; @@ -21,8 +23,10 @@ import cn.skcks.docking.gb28181.wvp.dto.record.QueryRecordReq; import cn.skcks.docking.gb28181.wvp.dto.record.QueryRecordResp; import cn.skcks.docking.gb28181.wvp.dto.stream.StreamContent; import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDevice; +import cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.model.WvpProxyDocking; import cn.skcks.docking.gb28181.wvp.proxy.WvpProxyClient; import cn.skcks.docking.gb28181.wvp.service.device.DeviceService; +import cn.skcks.docking.gb28181.wvp.service.docking.DockingService; import cn.skcks.docking.gb28181.wvp.service.download.DownloadService; import cn.skcks.docking.gb28181.wvp.service.video.VideoService; import cn.skcks.docking.gb28181.wvp.utils.RetryUtil; @@ -33,16 +37,18 @@ import jakarta.servlet.http.HttpServletResponse; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.http.MediaType; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; +import org.springframework.web.context.request.async.DeferredResult; import java.nio.charset.StandardCharsets; import java.text.MessageFormat; import java.util.Date; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @Slf4j @@ -54,6 +60,10 @@ public class WvpService { private final DeviceService deviceService; private final DownloadService downloadService; private final VideoService videoService; + private final DockingService dockingService; + private final TimedCache cache = CacheUtil.newTimedCache(TimeUnit.HOURS.toMillis(1)); + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final ConcurrentMap> playing = new ConcurrentHashMap<>(); public void header(HttpServletResponse response) { response.setContentType("video/mp4"); @@ -125,14 +135,7 @@ public class WvpService { */ @SneakyThrows public JsonResponse video(HttpServletResponse response, String deviceCode, String deviceId, String channelId, Date startDateTime, Date endDateTime) { - String passwdMd5 = MD5.create().digestHex(wvpProxyConfig.getPasswd()); - WvpLoginReq loginReq = WvpLoginReq.builder() - .username(wvpProxyConfig.getUser()) - .password(passwdMd5) - .build(); - JsonResponse login = wvpProxyClient.login(loginReq); - String token = login.getData().getAccessToken(); - log.info("wvp 登录成功 token => {}", token); + String token = login(); log.debug("通过 wvp 查询设备 国标id(gbDeviceId => {}) 通道信息", deviceId); JsonResponse deviceChannels = wvpProxyClient.getDeviceChannels( @@ -190,7 +193,7 @@ public class WvpService { downloadFromPlayback(response, token, deviceId, channelId, startTime, endTime); } - return login; + return JsonResponse.success(null); } private void downloadFromWvpAssist(HttpServletResponse response, String token, String deviceCode, String deviceId, String channelId, String startTime, String endTime){ @@ -300,4 +303,117 @@ public class WvpService { }); }); } + + @SneakyThrows + public Optional getDeviceChannelByDeviceCode(String token, String deviceCode) { + WvpProxyDevice device = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null); + if (device == null) { + throw new JsonException("设备不存在"); + } + String channelId = device.getGbDeviceChannelId(); + + WvpProxyDocking docking = dockingService.getDeviceByDeviceCode(device.getGbDeviceId()).orElse(null); + if (docking == null) { + throw new JsonException("设备(通道)不存在"); + } + + Optional deviceChannel = Optional.empty(); + for (String deviceId : wvpProxyConfig.getParents()) { + log.debug("通过 wvp 查询设备 国标id(gbDeviceId => {}) 通道信息", deviceId); + JsonResponse deviceChannels = wvpProxyClient.getDeviceChannels( + token, + deviceId, + GetDeviceChannelsReq.builder() + .query(channelId) + .count(Integer.MAX_VALUE) + .build()); + if (deviceChannels.getData() != null && deviceChannels.getData().getTotal() > 0) { + deviceChannel = deviceChannels.getData() + .getList() + .parallelStream() + .filter(item -> item.getChannelId().equalsIgnoreCase(channelId)) + .findFirst(); + if (deviceChannel.isPresent()) { + break; + } + } + } + return deviceChannel; + } + + public DeferredResult> realtimeVideoUrl(String deviceCode) { + DeferredResult> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(60)); + result.onTimeout(() -> { + log.error("timeout"); + result.setResult(JsonResponse.error("请求超时")); + }); + + ScheduledFuture schedule = playing.get(deviceCode); + if(schedule != null){ + schedule.cancel(true); + } + + String token = login(); + Optional deviceChannel = getDeviceChannelByDeviceCode(token, deviceCode); + if (deviceChannel.isEmpty()) { + result.setResult(JsonResponse.error(MessageFormat.format("未能获取 设备: {0} 的通道信息", deviceCode))); + return result; + } + + DeviceChannel dc = deviceChannel.get(); + log.info("设备通道信息 => {}", dc); + + try { + StreamContent data = wvpProxyClient.playStart(token, dc.getDeviceId(), dc.getChannelId()).getData(); + log.info("实时流信息 {}", data); + + String url = StringUtils.joinWith("/", + wvpProxyConfig.getProxyMediaUrl(), data.getApp(), + StringUtils.joinWith(".", data.getStream(), "live", "flv")); + + // 定时任务 + schedule = scheduledExecutorService.schedule(()->{ + log.info("[定时任务] 关闭设备(deviceCode => {}) 视频", deviceCode); + playing.remove(deviceCode); + closeRealtimeVideo(deviceCode); + }, 3, TimeUnit.MINUTES); + playing.put(deviceCode, schedule); + + result.setResult(JsonResponse.success(url)); + } catch (Exception e) { + log.error("点播失败", e); + result.setResult(JsonResponse.error("点播失败")); + return result; + } + + return result; + } + + public void closeRealtimeVideo(String deviceCode){ + String token = login(); + Optional deviceChannel = getDeviceChannelByDeviceCode(token, deviceCode); + if (deviceChannel.isEmpty()) { + return; + } + + DeviceChannel dc = deviceChannel.get(); + log.info("设备通道信息 => {}", dc); + + wvpProxyClient.playStop(token, dc.getDeviceId(), dc.getChannelId(),false); + wvpProxyClient.playStop(token, dc.getDeviceId(), dc.getChannelId(),true); + } + + private String login(){ + return cache.get("token",false,()->{ + String passwdMd5 = MD5.create().digestHex(wvpProxyConfig.getPasswd()); + WvpLoginReq loginReq = WvpLoginReq.builder() + .username(wvpProxyConfig.getUser()) + .password(passwdMd5) + .build(); + JsonResponse login = wvpProxyClient.login(loginReq); + String token = login.getData().getAccessToken(); + log.info("wvp 登录成功 token => {}", token); + return token; + }); + } } diff --git a/gb28181-wvp-proxy-starter/src/main/resources/application.yml b/gb28181-wvp-proxy-starter/src/main/resources/application.yml index 33204b1..52bba6a 100644 --- a/gb28181-wvp-proxy-starter/src/main/resources/application.yml +++ b/gb28181-wvp-proxy-starter/src/main/resources/application.yml @@ -46,7 +46,7 @@ media: proxy: wvp: - url: http://192.168.1.241:18978 + url: http://192.168.3.12:18978 user: admin passwd: admin use-wvp-assist: false @@ -54,6 +54,10 @@ proxy: enable: false # 是否使用 ffmpeg 编/解码, 否则使用内置 javacv use-ffmpeg: false + parents: + - 44050100002000000003 + - 44050100001180000001 + proxy-media-url: 'wss://192.168.1.241:9022/mf-config/media' gb28181: sip: id: 44050100002000000005 @@ -68,6 +72,7 @@ proxy: use-playback-to-download: false # 用于替换 返回的 url 值, 可用 nginx 或 caddy 代理 zlm # proxy-media-url: 'http://10.10.10.200/media' + proxy-media-url: 'https://192.168.1.241:9022/mf-config/media' device-api: offset: forward: 30s