From 71b75038e2c347f6a182aa75eef9c3772ce84c34 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Mon, 16 Oct 2023 17:22:02 +0800 Subject: [PATCH] =?UTF-8?q?=E5=90=8C=E4=B8=80=E8=AE=BE=E5=A4=87=E9=99=90?= =?UTF-8?q?=E5=88=B6=20=E7=82=B9=E6=92=AD=E6=95=B0=20=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E7=82=B9=E6=92=AD=E5=88=99=E5=85=B3=E9=97=AD=E5=B7=B2=E6=9C=89?= =?UTF-8?q?=E7=82=B9=E6=92=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/Gb28181DownloadService.java | 27 ++++++++++++++++--- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java index 58e97cb..b496113 100644 --- a/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java +++ b/gb28181-wvp-proxy-service/src/main/java/cn/skcks/docking/gb28181/wvp/service/gb28181/Gb28181DownloadService.java @@ -8,6 +8,8 @@ import cn.hutool.core.io.IoUtil; import cn.hutool.core.util.IdUtil; import cn.skcks.docking.gb28181.common.json.JsonException; import cn.skcks.docking.gb28181.common.json.JsonResponse; +import cn.skcks.docking.gb28181.common.redis.RedisUtil; +import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.constant.GB28181Constant; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; @@ -274,18 +276,34 @@ public class Gb28181DownloadService { return result; } + @SneakyThrows public SipSender.SendRequest inviteRequest(WvpProxyDocking docking, WvpProxyDevice device, GB28181Description description, MediaSdpHelper.Action action, String ssrc, String streamId, CompletableFuture result, long time) { + String cacheKey = CacheUtil.getKey(docking.getGbDeviceId(), device.getGbDeviceChannelId()); + String existCallId = RedisUtil.StringOps.get(cacheKey); + + // 限制单个设备/通道 只能 点播 一路 + if(StringUtils.isNotBlank(existCallId)){ + String deviceIp = docking.getIp(); + int devicePort = Integer.parseInt(docking.getPort()); + sender.sendRequest((provider,localIp,localPort)-> + SipRequestBuilder.createByeRequest(deviceIp, devicePort, device.getGbDeviceChannelId(), SipUtil.generateFromTag(), null, existCallId)); + RedisUtil.KeyOps.delete(cacheKey); + log.info("关闭已存在的 点播请求 {} {}", cacheKey,existCallId); + Thread.sleep(200); + } + return (provider, ip, port) -> { CallIdHeader callId = provider.getNewCallId(); String subscribeKey = GenericSubscribe.Helper.getKey(Request.INVITE, callId.getCallId()); subscribe.getInviteSubscribe().addPublisher(subscribeKey); - Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS); + Flow.Subscriber subscriber = inviteSubscriber(docking,device,subscribeKey,cacheKey, ssrc, streamId, result, time + 60, TimeUnit.SECONDS); subscribe.getInviteSubscribe().addSubscribe(subscribeKey, subscriber); + RedisUtil.StringOps.set(cacheKey, callId.getCallId()); return SipRequestBuilder.createInviteRequest(ip, port, docking, device.getGbDeviceChannelId(), description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); }; } - public Flow.Subscriber inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey,String ssrc,String streamId,CompletableFuture result, long time, TimeUnit unit){ + public Flow.Subscriber inviteSubscriber(WvpProxyDocking docking, WvpProxyDevice device, String subscribeKey, String cacheKey, String ssrc,String streamId,CompletableFuture result, long time, TimeUnit unit){ ScheduledFuture[] schedule = new ScheduledFuture[1]; Flow.Subscriber subscriber = new Flow.Subscriber<>() { private Flow.Subscription subscription; @@ -313,7 +331,7 @@ public class Gb28181DownloadService { String toTag = item.getToTag(); String key = GenericSubscribe.Helper.getKey(Request.BYE, callId); subscribe.getByeSubscribe().addPublisher(key); - subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key,streamId, time, unit)); + subscribe.getByeSubscribe().addSubscribe(key, byeSubscriber(key,cacheKey, streamId, time, unit)); return SipRequestBuilder.createAckRequest(Response.OK, ip, port, docking, device.getGbDeviceChannelId(), fromTag, toTag, callId); })); result.complete(new VideoInfo(videoUrl(streamId), callId, device)); @@ -341,7 +359,7 @@ public class Gb28181DownloadService { return subscriber; } - public Flow.Subscriber byeSubscriber(String key,String streamId, long time, TimeUnit unit){ + public Flow.Subscriber byeSubscriber(String key,String cacheKey,String streamId, long time, TimeUnit unit){ ScheduledFuture[] schedule = new ScheduledFuture[1]; Flow.Subscriber subscriber = new Flow.Subscriber<>() { @Override @@ -353,6 +371,7 @@ public class Gb28181DownloadService { @SneakyThrows @Override public void onNext(SIPRequest request) { + RedisUtil.KeyOps.delete(cacheKey); String transport = request.getTopmostViaHeader().getTransport(); String ip = request.getLocalAddress().getHostAddress(); sender.getProvider(transport,ip)