From 92dee06429eda5f56f0ab85d3ccb276c385511b7 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Fri, 15 Dec 2023 09:42:20 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E6=97=B6=E7=82=B9=E6=92=AD=E4=BF=A1?= =?UTF-8?q?=E6=81=AF=E7=BC=93=E5=AD=98=E5=88=B0=20redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/Gb28181DownloadService.java | 38 ++++++++++--------- .../request/request/ByeRequestProcessor.java | 10 ++++- 2 files changed, 29 insertions(+), 19 deletions(-) diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index 220642f..81a6471 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -8,6 +8,7 @@ import cn.hutool.core.io.IoUtil; import cn.hutool.core.util.IdUtil; import cn.skcks.docking.gb28181.common.json.JsonException; import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.common.json.JsonUtils; import cn.skcks.docking.gb28181.common.redis.RedisUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; @@ -41,7 +42,6 @@ import gov.nist.javax.sdp.fields.TimeField; import gov.nist.javax.sdp.fields.URIField; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; -import jakarta.annotation.PreDestroy; import jakarta.servlet.AsyncContext; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; @@ -82,8 +82,6 @@ public class Gb28181DownloadService { private final WvpProxyConfig wvpProxyConfig; private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); private final ConcurrentMap>> requestMap = new ConcurrentHashMap<>(); - private final ConcurrentMap realtimeVideo = new ConcurrentHashMap<>(); - private final ConcurrentMap realtimeVideoInfoMap = new ConcurrentHashMap<>(); @Getter private final RealTime realtime = new RealTime(); @@ -241,7 +239,8 @@ public class Gb28181DownloadService { result.setResult(JsonResponse.error("设备(通道)不存在")); return result; } - String existUrl = realtimeVideo.get(deviceCode); + + String existUrl = RedisUtil.StringOps.get(CacheUtil.getKey(deviceCode, MediaSdpHelper.Action.PLAY.getAction())); if(Optional.ofNullable(existUrl).isPresent()){ result.setResult(JsonResponse.success(existUrl)); return result; @@ -249,14 +248,18 @@ public class Gb28181DownloadService { // 间隔一定时间(200ms) 给设备足够的时间结束前次请求 scheduledExecutorService.schedule(()->{ realtime.realtime(deviceCode).whenComplete((videoInfo, e)->{ + if(videoInfo == null){ + result.setResult(JsonResponse.error("媒体信息获取失败")); + return; + } log.info("获取媒体信息 {}", videoInfo); String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); - realtimeVideoInfoMap.put(cacheKey, videoInfo); + RedisUtil.StringOps.set(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(),videoInfo.getCallId()), JsonUtils.toJson(videoInfo)); String url = StringUtils.isNotBlank(proxySipConfig.getProxyMediaUrl()) ? StringUtils.replace(videoInfo.getUrl(), zlmMediaConfig.getUrl(), proxySipConfig.getProxyMediaUrl()): videoInfo.getUrl(); - realtimeVideo.put(deviceCode, url); + RedisUtil.StringOps.set(CacheUtil.getKey(deviceCode, MediaSdpHelper.Action.PLAY.getAction()), url); result.setResult(JsonResponse.success(url)); }); }, 200, TimeUnit.MILLISECONDS); @@ -294,15 +297,17 @@ public class Gb28181DownloadService { String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); String existCallId = RedisUtil.StringOps.get(cacheKey); - realtimeVideoInfoMap.computeIfPresent(cacheKey, (key, videoInfo) -> { + String infoKey = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), existCallId); + VideoInfo videoInfo = JsonUtils.parse(RedisUtil.StringOps.get(infoKey), VideoInfo.class); + if(videoInfo != null){ + RedisUtil.KeyOps.delete(infoKey); closeRealtimeVideoNow(docking,device,videoInfo,cacheKey,existCallId); - return null; - }); + } } public void closeRealtimeVideoNow(WvpProxyDocking docking, WvpProxyDevice device, Gb28181DownloadService.VideoInfo videoInfo, String cacheKey, String existCallId){ log.info("结束实时视频 发送 bye 关闭 {} {}", videoInfo.getDevice().getGbDeviceChannelId(), videoInfo.getCallId()); - realtimeVideo.remove(device.getDeviceCode()); + RedisUtil.KeyOps.delete(CacheUtil.getKey(device.getDeviceCode(), MediaSdpHelper.Action.PLAY.getAction())); String deviceIp = docking.getIp(); int devicePort = Integer.parseInt(docking.getPort()); if (StringUtils.isNotBlank(existCallId)) { @@ -606,6 +611,12 @@ public class Gb28181DownloadService { RedisUtil.KeyOps.delete(cacheKey); String transport = request.getTopmostViaHeader().getTransport(); String ip = request.getLocalAddress().getHostAddress(); + if(time <= 0) { + String callId = request.getCallId().getCallId(); + String infoKey = CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), callId); + RedisUtil.KeyOps.delete(infoKey); + } + sender.getProvider(transport,ip) .sendResponse(SipResponseBuilder.response(request, Response.OK, "OK")); onComplete(); @@ -632,11 +643,4 @@ public class Gb28181DownloadService { } return subscriber; } - - @PreDestroy - private void destroy(){ - realtimeVideo.forEach((key,item)->{ - closeRealtimeVideoNow(key); - }); - } } diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/bye/request/request/ByeRequestProcessor.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/bye/request/request/ByeRequestProcessor.java index 9611563..d350c01 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/bye/request/request/ByeRequestProcessor.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/sip/message/bye/request/request/ByeRequestProcessor.java @@ -1,5 +1,8 @@ package cn.skcks.docking.gb28181.wvp.sip.message.bye.request.request; +import cn.skcks.docking.gb28181.common.redis.RedisUtil; +import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; +import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; import cn.skcks.docking.gb28181.core.sip.listener.SipListener; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; @@ -46,7 +49,10 @@ public class ByeRequestProcessor implements MessageProcessor { Optional.ofNullable(subscribe.getByeSubscribe().getPublisher(key)) .ifPresentOrElse( publisher -> publisher.submit(request), - () -> sender.sendResponse(ip, transport, ((provider, ip1, port) -> - SipResponseBuilder.response(request, Response.OK, "OK")))); + () -> { + RedisUtil.KeyOps.delete(CacheUtil.getKey(MediaSdpHelper.Action.PLAY.getAction(), callId)); + sender.sendResponse(ip, transport, ((provider, ip1, port) -> + SipResponseBuilder.response(request, Response.OK, "OK"))); + }); } }