diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java index 7277586..ad2eb2b 100644 --- a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/VideoController.java @@ -4,6 +4,7 @@ import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; import cn.skcks.docking.gb28181.common.json.JsonResponse; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; +import cn.skcks.docking.gb28181.wvp.api.video.dto.RealtimeVideoReq; import cn.skcks.docking.gb28181.wvp.api.video.dto.VideoMp4Req; import cn.skcks.docking.gb28181.wvp.api.video.dto.VideoReq; import cn.skcks.docking.gb28181.wvp.config.Gb28181DeviceVideoApiConfig; @@ -76,4 +77,27 @@ public class VideoController { log.info("偏移后的时间范围 {} ~ {}", startTime, endTime); return gb28181DownloadService.videoUrl(req.getDeviceCode(), startTime, endTime); } + + @Operation(summary = "获取实时视频 (返回视频url)") + @GetMapping(value = "/device/realtime") + @ResponseBody + public DeferredResult> realtime(@ParameterObject RealtimeVideoReq req) { + return gb28181DownloadService.realtimeVideoUrl(req.getDeviceCode()); + } + + @Operation(summary = "获取实时视频 (返回视频url)") + @GetMapping(value = "/device/realtime/renew") + @ResponseBody + public JsonResponse renew(@ParameterObject RealtimeVideoReq req) { + gb28181DownloadService.renewRealtimeVideoUrl(req.getDeviceCode()); + return JsonResponse.success(null); + } + + @Operation(summary = "获取实时视频 (返回视频url)") + @GetMapping(value = "/device/realtime/close") + @ResponseBody + public JsonResponse close(@ParameterObject RealtimeVideoReq req) { + gb28181DownloadService.autoCloseReadtimeVideo(req.getDeviceCode()); + return JsonResponse.success(null); + } } diff --git a/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/dto/RealtimeVideoReq.java b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/dto/RealtimeVideoReq.java new file mode 100644 index 0000000..0bd012c --- /dev/null +++ b/gb28181-wvp-proxy-api/src/main/java/cn/skcks/docking/gb28181/wvp/api/video/dto/RealtimeVideoReq.java @@ -0,0 +1,14 @@ +package cn.skcks.docking.gb28181.wvp.api.video.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotBlank; +import lombok.Data; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +@Data +public class RealtimeVideoReq { + @NotBlank(message = "设备编码 不能为空") + @Schema(description = "设备编码") + private String deviceCode; +} diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index 28631f7..daf4361 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -65,6 +65,8 @@ import java.util.Optional; import java.util.Vector; import java.util.concurrent.*; +import static cn.skcks.docking.gb28181.wvp.orm.mybatis.dynamic.mapper.WvpProxyDeviceDynamicSqlSupport.deviceCode; + @Slf4j @Service @RequiredArgsConstructor @@ -81,6 +83,11 @@ public class Gb28181DownloadService { private final WvpProxyConfig wvpProxyConfig; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentMap>> requestMap = new ConcurrentHashMap<>(); + private final ConcurrentMap> realtimeMap = new ConcurrentHashMap<>(); + private final ConcurrentMap realtimeVideoInfoMap = new ConcurrentHashMap<>(); + + @Getter + private final RealTime realtime = new RealTime(); @NoArgsConstructor @AllArgsConstructor @@ -186,21 +193,7 @@ public class Gb28181DownloadService { result.setResult(JsonResponse.error("设备(通道)不存在")); return result; } - requestMap.computeIfPresent(deviceCode,(key,requestResult)->{ - if(!requestResult.hasResult()){ - requestResult.setResult(JsonResponse.error("同一设备重复请求, 本次请求结束")); - String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); - String existCallId = RedisUtil.StringOps.get(cacheKey); - if(StringUtils.isNotBlank(existCallId)){ - String deviceIp = docking.getIp(); - int devicePort = Integer.parseInt(docking.getPort()); - sender.sendRequest((provider,localIp,localPort)-> - SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId)); - RedisUtil.KeyOps.delete(cacheKey); - } - } - return null; - }); + closeExistRequest(deviceCode, device, docking); requestMap.put(deviceCode, result); // 间隔一定时间(200ms) 给设备足够的时间结束前次请求 @@ -233,6 +226,115 @@ public class Gb28181DownloadService { return result; } + @SneakyThrows + public DeferredResult> realtimeVideoUrl(String deviceCode){ + DeferredResult> result = new DeferredResult<>(TimeUnit.SECONDS.toMillis(60)); + result.onTimeout(()->{ + result.setResult(JsonResponse.error("请求超时")); + }); + WvpProxyDevice device = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null); + if(device == null){ + result.setResult(JsonResponse.error("设备不存在")); + return result; + } + WvpProxyDocking docking = dockingService.getDeviceByDeviceCode(device.getGbDeviceId()).orElse(null); + if(docking == null){ + result.setResult(JsonResponse.error("设备(通道)不存在")); + return result; + } + closeExistRequest(deviceCode, device, docking); + requestMap.put(deviceCode, result); + + // 间隔一定时间(200ms) 给设备足够的时间结束前次请求 + scheduledExecutorService.schedule(()->{ + realtime.realtime(deviceCode).whenComplete((videoInfo, e)->{ + log.info("获取媒体信息 {}", videoInfo); + String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); + realtimeVideoInfoMap.put(cacheKey, videoInfo); + String existCallId = RedisUtil.StringOps.get(cacheKey); + autoCloseReadtimeVideo(docking,device,videoInfo,cacheKey,existCallId); + String url = StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl()) ? + StringUtils.replace(videoInfo.getUrl(), zlmMediaConfig.getUrl(), proxySipConfig.getProxyMediaUrl()): + videoInfo.getUrl(); + result.setResult(JsonResponse.success(url)); + }); + }, 200, TimeUnit.MILLISECONDS); + return result; + } + + public void renewRealtimeVideoUrl(String deviceCode){ + WvpProxyDevice device = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null); + if(device == null){ + return; + } + WvpProxyDocking docking = dockingService.getDeviceByDeviceCode(device.getGbDeviceId()).orElse(null); + if(docking == null){ + return; + } + closeExistRequest(deviceCode, device, docking); + String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); + realtimeMap.computeIfPresent(cacheKey,(key, scheduledFuture) -> { + scheduledFuture.cancel(true); + return null; + }); + + autoCloseReadtimeVideo(deviceCode); + } + + public void autoCloseReadtimeVideo(String deviceCode){ + WvpProxyDevice device = deviceService.getDeviceByDeviceCode(deviceCode).orElse(null); + if(device == null){ + return; + } + WvpProxyDocking docking = dockingService.getDeviceByDeviceCode(device.getGbDeviceId()).orElse(null); + if(docking == null){ + return; + } + closeExistRequest(deviceCode, device, docking); + String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); + String existCallId = RedisUtil.StringOps.get(cacheKey); + realtimeVideoInfoMap.computeIfPresent(cacheKey, (key, videoInfo) -> { + autoCloseReadtimeVideo(docking,device,videoInfo,cacheKey,existCallId); + return null; + }); + } + + private void closeExistRequest(String deviceCode, WvpProxyDevice device, WvpProxyDocking docking) { + requestMap.computeIfPresent(deviceCode,(key,requestResult)->{ + if(!requestResult.hasResult()){ + requestResult.setResult(JsonResponse.error("同一设备重复请求, 本次请求结束")); + String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); + String existCallId = RedisUtil.StringOps.get(cacheKey); + if(StringUtils.isNotBlank(existCallId)){ + String deviceIp = docking.getIp(); + int devicePort = Integer.parseInt(docking.getPort()); + sender.sendRequest((provider,localIp,localPort)-> + SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId)); + RedisUtil.KeyOps.delete(cacheKey); + } + } + return null; + }); + } + + public void autoCloseReadtimeVideo(WvpProxyDocking docking,WvpProxyDevice device,Gb28181DownloadService.VideoInfo videoInfo,String cacheKey, String existCallId){ + ScheduledFuture schedule = scheduledExecutorService.schedule(() -> { + log.info("结束实时视频 发送 bye 关闭 {} {}", videoInfo.getDevice().getGbDeviceChannelId(), videoInfo.getCallId()); + String deviceIp = docking.getIp(); + int devicePort = Integer.parseInt(docking.getPort()); + if (StringUtils.isNotBlank(existCallId)) { + sender.sendRequest((provider, localIp, localPort) -> + SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId)); + } + RedisUtil.KeyOps.delete(cacheKey); + zlmMediaService.closeRtpServer(CloseRtpServer.builder() + .streamId(videoInfo.streamId) + .build()); + }, 60, TimeUnit.SECONDS); + + realtimeMap.put(cacheKey,schedule); + } + @SneakyThrows public void videoStream(HttpServletRequest request, HttpServletResponse response, String deviceCode, Date startTime, Date endTime) { AsyncContext asyncContext = request.startAsync(); @@ -271,6 +373,84 @@ public class Gb28181DownloadService { } } + private class RealTime { + @SneakyThrows + public CompletableFuture realtime(String deviceCode) { + Optional deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode); + if (deviceByDeviceCode.isEmpty()) { + String reason = MessageFormat.format("未能找到 设备编码 为 {0} 的设备", deviceCode); + log.error("{}",reason); + throw new JsonException(reason); + } else { + WvpProxyDevice device = deviceByDeviceCode.get(); + return realtime(device.getGbDeviceId(), device.getGbDeviceChannelId()); + } + } + + @SneakyThrows + public CompletableFuture realtime(String gbDeviceId, String channel){ + CompletableFuture result = new CompletableFuture<>(); + Optional deviceByGbDeviceId = dockingService.getDeviceByGbDeviceId(gbDeviceId); + if(deviceByGbDeviceId.isEmpty()){ + log.info("未能找到 国标编码 {} 的注册信息", gbDeviceId); + result.complete(null); + return result; + } + Optional deviceByGbDeviceIdAndChannel = deviceService.getDeviceByGbDeviceIdAndChannel(gbDeviceId, channel); + if (deviceByGbDeviceIdAndChannel.isEmpty()) { + log.info("未能找到 编码 {}, 通道 {} 的设备", gbDeviceId, channel); + result.complete(null); + return result; + } + + WvpProxyDevice device = deviceByGbDeviceIdAndChannel.get(); + WvpProxyDocking docking = deviceByGbDeviceId.get(); + + String streamId = MediaSdpHelper.getStreamId(gbDeviceId, channel, IdUtil.getSnowflakeNextIdStr()); + int isTcp = proxySipConfig.getStreamMode() == StreamMode.UDP ? 0 : 1; + StreamMode streamMode = proxySipConfig.getStreamMode(); + String ip = zlmMediaConfig.getIp(); + int port = openRtpServer(streamId, isTcp); + if(port <= 0){ + log.error("zlm 暂无可用端口"); + result.complete(null); + return result; + } + String ssrc = ssrcService.getPlaySsrc(); + GB28181Description gb28181Description = MediaSdpHelper.play(gbDeviceId, channel, Connection.IP4, ip, port, ssrc, streamMode); + sender.sendRequest(inviteRequest(docking, device, gb28181Description, ssrc, streamId, result)); + return result; + } + + + @SneakyThrows + public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, String ssrc, String streamId, CompletableFuture result) { + String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); + String existCallId = RedisUtil.StringOps.get(cacheKey); + + // 限制单个设备/通道 只能 点播 一路 + if(StringUtils.isNotBlank(existCallId)){ + String deviceIp = docking.getIp(); + int devicePort = Integer.parseInt(docking.getPort()); + sender.sendRequest((provider,localIp,localPort)-> + SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId)); + RedisUtil.KeyOps.delete(cacheKey); + log.info("关闭已存在的 点播请求 {} {}", cacheKey, existCallId); + Thread.sleep(500); + } + + return (provider, ip, port) -> { + CallIdHeader callId = provider.getNewCallId(); + String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); + subscribe.getInviteSubscribe().addPublisher(subscribeKey); + Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, 60, TimeUnit.SECONDS); + subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); + RedisUtil.StringOps.set(cacheKey, callId.getCallId()); + return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); + }; + } + } + @SneakyThrows public CompletableFuture download(String deviceCode, Date startTime, Date endTime) { Optional deviceByDeviceCode = deviceService.getDeviceByDeviceCode(deviceCode);