From 0d5e134943c18da6d23f3e20142fa6c5dc83bcf1 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Thu, 24 Aug 2023 12:21:22 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message/subscribe/InviteSubscribe.java | 37 +++++++++++++++++++ .../sip/message/subscribe/SipSubscribe.java | 4 ++ .../gb28181/service/play/PlayService.java | 8 +++- 3 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/InviteSubscribe.java diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/InviteSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/InviteSubscribe.java new file mode 100644 index 0000000..1e57695 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/InviteSubscribe.java @@ -0,0 +1,37 @@ +package cn.skcks.docking.gb28181.core.sip.message.subscribe; + +import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; +import lombok.RequiredArgsConstructor; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; + +@RequiredArgsConstructor +public class InviteSubscribe implements GenericSubscribe { + private final Executor executor; + private static final Map> publishers = new ConcurrentHashMap<>(); + + public void close() { + Helper.close(publishers); + } + + public void addPublisher(String key) { + Helper.addPublisher(executor, publishers, key); + } + + public SubmissionPublisher getPublisher(String key) { + return Helper.getPublisher(publishers, key); + } + + public void addSubscribe(String key, Flow.Subscriber subscribe) { + Helper.addSubscribe(publishers, key, subscribe); + } + + @Override + public void delPublisher(String key) { + Helper.delPublisher(publishers, key); + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java index 02d5d64..eb6b81a 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java @@ -1,5 +1,6 @@ package cn.skcks.docking.gb28181.core.sip.message.subscribe; +import cn.skcks.docking.gb28181.core.sip.dto.SipTransactionInfo; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; import jakarta.annotation.PostConstruct; @@ -20,14 +21,17 @@ public class SipSubscribe { @Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME) private final Executor executor; private GenericSubscribe recordInfoSubscribe; + private GenericSubscribe inviteSubscribe; @PostConstruct private void init() { recordInfoSubscribe = new RecordInfoSubscribe(executor); + inviteSubscribe = new InviteSubscribe(executor); } @PreDestroy private void destroy() { + inviteSubscribe.close(); recordInfoSubscribe.close(); } } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java index f2203c0..ec7bff0 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/service/play/PlayService.java @@ -6,8 +6,11 @@ import cn.skcks.docking.gb28181.core.sip.gb28181.cache.CacheUtil; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.GB28181Description; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.MediaSdpHelper; import cn.skcks.docking.gb28181.core.sip.gb28181.sdp.StreamMode; +import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import cn.skcks.docking.gb28181.core.sip.message.request.SipRequestBuilder; import cn.skcks.docking.gb28181.core.sip.message.sender.SipMessageSender; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.GenericSubscribe; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.service.SipService; import cn.skcks.docking.gb28181.core.sip.utils.SipUtil; import cn.skcks.docking.gb28181.media.config.ZlmMediaConfig; @@ -45,9 +48,10 @@ public class PlayService { private final SsrcService ssrcService; private final SipService sipService; private final SipMessageSender sender; + private final SipSubscribe subscribe; /** - * + * 实时视频点播 * @param deviceId 设备id * @param channelId 通道id */ @@ -99,6 +103,8 @@ public class PlayService { Request request = SipRequestBuilder.createInviteRequest(device, channelId, description.toString(), SipUtil.generateViaTag(), SipUtil.generateFromTag(), null, ssrc, callId); sender.send(senderIp, request); + String subscribeKey = GenericSubscribe.Helper.getKey(MessageProcessor.Method.INVITE, deviceId, streamId); +// subscribe.getInviteSubscribe().addPublisher(subscribeKey); result.setResult(JsonResponse.success(StringUtils.joinWith("/", zlmMediaConfig.getUrl(),"rtp", streamId + ".live.flv"))); return result; // zlmMediaService.getRtpInfo();