From 18b8c78b30fb13e475dbbd1324d3d02c7caef2f8 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Wed, 23 Aug 2023 00:05:10 +0800 Subject: [PATCH] =?UTF-8?q?SipSubscribe=20=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/api/record/RecordController.java | 7 +- .../core/sip/listener/SipListenerImpl.java | 2 +- .../core/sip/message/event/SipEventItem.java | 58 ---------------- .../core/sip/message/event/SipEventType.java | 9 --- .../core/sip/message/event/SipSubscriber.java | 7 -- .../event/custom/DeviceNotFoundEvent.java | 20 ------ .../message/subscribe/GenericSubscribe.java | 46 +++++++++++++ .../subscribe/RecordInfoSubscribe.java | 32 +++++++++ .../{event => subscribe}/SipSubscribe.java | 16 ++--- .../core/sip/message/event/SipEventTest.java | 67 +++++++++++++++---- 10 files changed, 146 insertions(+), 118 deletions(-) delete mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventItem.java delete mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventType.java delete mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscriber.java delete mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/custom/DeviceNotFoundEvent.java create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/GenericSubscribe.java create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/RecordInfoSubscribe.java rename gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/{event => subscribe}/SipSubscribe.java (59%) diff --git a/api/src/main/java/cn/skcks/docking/gb28181/api/record/RecordController.java b/api/src/main/java/cn/skcks/docking/gb28181/api/record/RecordController.java index b60dd10..b9cbe28 100644 --- a/api/src/main/java/cn/skcks/docking/gb28181/api/record/RecordController.java +++ b/api/src/main/java/cn/skcks/docking/gb28181/api/record/RecordController.java @@ -13,6 +13,7 @@ import org.springdoc.core.models.GroupedOpenApi; import org.springframework.context.annotation.Bean; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; @Tag(name="历史录像") @RestController @@ -27,8 +28,10 @@ public class RecordController { } @GetJson("/getInfo") - public JsonResponse getInfo(@ParameterObject @Validated GetInfoDTO dto){ + public DeferredResult> getInfo(@ParameterObject @Validated GetInfoDTO dto){ recordService.requestRecordInfo(dto.getDeviceId()); - return JsonResponse.success(null); + DeferredResult> result = new DeferredResult<>(); + result.setResult(JsonResponse.success(null)); + return result; } } diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java index 8dae9ab..38c6a2d 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java @@ -1,7 +1,7 @@ package cn.skcks.docking.gb28181.core.sip.listener; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; -import cn.skcks.docking.gb28181.core.sip.message.event.SipSubscribe; +import cn.skcks.docking.gb28181.core.sip.message.subscribe.SipSubscribe; import cn.skcks.docking.gb28181.core.sip.message.processor.MessageProcessor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventItem.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventItem.java deleted file mode 100644 index a65ea74..0000000 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventItem.java +++ /dev/null @@ -1,58 +0,0 @@ -package cn.skcks.docking.gb28181.core.sip.message.event; - -import cn.skcks.docking.gb28181.common.json.ResponseStatus; -import cn.skcks.docking.gb28181.core.sip.message.event.custom.DeviceNotFoundEvent; -import gov.nist.javax.sip.message.SIPRequest; -import gov.nist.javax.sip.message.SIPResponse; -import lombok.Data; - -import javax.sip.DialogTerminatedEvent; -import javax.sip.ResponseEvent; -import javax.sip.TimeoutEvent; -import java.util.EventObject; - -@Data -public class SipEventItem { - private int statusCode; - private SipEventType type; - private String msg; - private String callId; - private final EventObject event; - - public SipEventItem(EventObject eventObject) { - event = eventObject; - msg = ResponseStatus.UNDEFINED.getMessage(); - statusCode = ResponseStatus.UNDEFINED.getCode(); - if(eventObject instanceof ResponseEvent responseEvent){ - SIPResponse response = (SIPResponse)responseEvent.getResponse(); - type = SipEventType.Response; - if (response != null) { - msg = response.getReasonPhrase(); - statusCode = response.getStatusCode(); - callId = response.getCallIdHeader().getCallId(); - } - } else if(eventObject instanceof TimeoutEvent timeoutEvent){ - type = SipEventType.TimeOut; - msg = "消息超时未回复"; - statusCode = ResponseStatus.REQUEST_TIMEOUT.getCode(); - - SIPRequest request; - if (timeoutEvent.isServerTransaction()) { - request = ((SIPRequest)timeoutEvent.getServerTransaction().getRequest()); - } else { - request = ((SIPRequest)timeoutEvent.getClientTransaction().getRequest()); - } - callId = request.getCallIdHeader().getCallId(); - } else if(eventObject instanceof DialogTerminatedEvent dialogTerminatedEvent){ - type = SipEventType.End; - msg = "会话已结束"; - statusCode = ResponseStatus.GONE.getCode(); - callId = dialogTerminatedEvent.getDialog().getCallId().getCallId(); - } else if(eventObject instanceof DeviceNotFoundEvent deviceNotFoundEvent){ - type = SipEventType.DeviceNotFound; - msg = "设备未找到"; - statusCode = ResponseStatus.NOT_FOUND.getCode(); - callId = deviceNotFoundEvent.getCallId(); - } - } -} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventType.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventType.java deleted file mode 100644 index c793aa2..0000000 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventType.java +++ /dev/null @@ -1,9 +0,0 @@ -package cn.skcks.docking.gb28181.core.sip.message.event; - -public enum SipEventType { - TimeOut, - Response, - End, - DeviceNotFound, - CmdFail -} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscriber.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscriber.java deleted file mode 100644 index d943af8..0000000 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscriber.java +++ /dev/null @@ -1,7 +0,0 @@ -package cn.skcks.docking.gb28181.core.sip.message.event; - -import java.util.concurrent.Flow; - -public interface SipSubscriber extends Flow.Subscriber { - -} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/custom/DeviceNotFoundEvent.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/custom/DeviceNotFoundEvent.java deleted file mode 100644 index 56407b2..0000000 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/custom/DeviceNotFoundEvent.java +++ /dev/null @@ -1,20 +0,0 @@ -package cn.skcks.docking.gb28181.core.sip.message.event.custom; - -import javax.sip.Dialog; -import java.util.EventObject; - -public class DeviceNotFoundEvent extends EventObject { - private String callId; - - public DeviceNotFoundEvent(Dialog dialog) { - super(dialog); - } - - public String getCallId() { - return callId; - } - - public void setCallId(String callId) { - this.callId = callId; - } -} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/GenericSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/GenericSubscribe.java new file mode 100644 index 0000000..8263a7f --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/GenericSubscribe.java @@ -0,0 +1,46 @@ +package cn.skcks.docking.gb28181.core.sip.message.subscribe; + +import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; + +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; + +public interface GenericSubscribe { + void close(); + + void addPublisher(String key); + + SubmissionPublisher getPublisher(String key); + + void addSubscribe(String key,Flow.Subscriber subscribe); + + class Helper { + public static void close(Map> publishers){ + publishers.values().forEach(SubmissionPublisher::close); + publishers.clear(); + } + + public static void addPublisher(Executor executor, Map> publishers, String key){ + SubmissionPublisher publisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); + publishers.put(key, publisher); + } + + public static void addPublisher(Executor executor, Map> publishers, String key, int bufferSize){ + SubmissionPublisher publisher = new SubmissionPublisher<>(executor, bufferSize); + publishers.put(key, publisher); + } + + public static SubmissionPublisher getPublisher(Map> publishers, String key){ + return publishers.get(key); + } + + public static void addSubscribe(Map> publishers, String key,Flow.Subscriber subscribe){ + SubmissionPublisher publisher = getPublisher(publishers, key); + if(publisher != null){ + publisher.subscribe(subscribe); + } + } + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/RecordInfoSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/RecordInfoSubscribe.java new file mode 100644 index 0000000..bf1a051 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/RecordInfoSubscribe.java @@ -0,0 +1,32 @@ +package cn.skcks.docking.gb28181.core.sip.message.subscribe; + +import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; +import lombok.RequiredArgsConstructor; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; + +@RequiredArgsConstructor +public class RecordInfoSubscribe implements GenericSubscribe { + private final Executor executor; + private static final Map> publishers = new ConcurrentHashMap<>(); + + public void close() { + Helper.close(publishers); + } + + public void addPublisher(String key) { + Helper.addPublisher(executor, publishers, key); + } + + public SubmissionPublisher getPublisher(String key) { + return Helper.getPublisher(publishers, key); + } + + public void addSubscribe(String key, Flow.Subscriber subscribe) { + Helper.addSubscribe(publishers, key, subscribe); + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java similarity index 59% rename from gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscribe.java rename to gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java index 3a90754..02d5d64 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscribe.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/subscribe/SipSubscribe.java @@ -1,6 +1,7 @@ -package cn.skcks.docking.gb28181.core.sip.message.event; +package cn.skcks.docking.gb28181.core.sip.message.subscribe; import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; +import cn.skcks.docking.gb28181.core.sip.message.processor.message.types.recordinfo.reponse.dto.RecordInfoResponseDTO; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Data; @@ -10,8 +11,6 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import java.util.concurrent.Executor; -import java.util.concurrent.Flow; -import java.util.concurrent.SubmissionPublisher; @Slf4j @Data @@ -20,16 +19,15 @@ import java.util.concurrent.SubmissionPublisher; public class SipSubscribe { @Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME) private final Executor executor; - - private SubmissionPublisher publisher; + private GenericSubscribe recordInfoSubscribe; @PostConstruct - private void init(){ - publisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); + private void init() { + recordInfoSubscribe = new RecordInfoSubscribe(executor); } @PreDestroy - private void destroy(){ - publisher.close(); + private void destroy() { + recordInfoSubscribe.close(); } } diff --git a/gb28181-service/src/test/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventTest.java b/gb28181-service/src/test/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventTest.java index 7a33959..6e5a111 100644 --- a/gb28181-service/src/test/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventTest.java +++ b/gb28181-service/src/test/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventTest.java @@ -2,16 +2,16 @@ package cn.skcks.docking.gb28181.core.sip.message.event; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class SipEventTest { public static void main(String[] args) throws InterruptedException { - CountDownLatch countDownLatch = new CountDownLatch(512); - + CountDownLatch countDownLatch = new CountDownLatch(1); int threadNum = Runtime.getRuntime().availableProcessors() * 2; - int taskNum = 1000; ExecutorService executor = new ThreadPoolExecutor(threadNum, threadNum, @@ -20,21 +20,20 @@ public class SipEventTest { new ThreadPoolExecutor.DiscardPolicy()); SubmissionPublisher submissionPublisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); - submissionPublisher.subscribe(new Flow.Subscriber<>() { + List list = new ArrayList<>(); + Flow.Subscriber subscriber = new Flow.Subscriber<>() { Flow.Subscription subscription; - @Override public void onSubscribe(Flow.Subscription subscription) { - log.info("建立订阅"); this.subscription = subscription; - subscription.request(1); + log.info("建立订阅"); + subscription.request(5); } @Override public void onNext(String item) { - log.info("接收发送者消息 {}", item); - countDownLatch.countDown(); - subscription.request(1); + list.add(item); + subscription.request(5); } @Override @@ -44,21 +43,65 @@ public class SipEventTest { @Override public void onComplete() { + subscription.cancel(); + countDownLatch.countDown(); log.info("订阅结束"); } - }); - + }; + submissionPublisher.subscribe(subscriber); AtomicInteger finalI = new AtomicInteger(1); + + new Thread(()->{ + try { + Thread.sleep(500); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + subscriber.onComplete(); + }).start(); + for (int i = 0; i < 128; i++) { new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start(); new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start(); new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start(); new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start(); + Thread.sleep(10); } countDownLatch.await(); submissionPublisher.close(); + + list.parallelStream().forEach(item -> log.info("接收发送者消息 {}", item)); + + CountDownLatch countDownLatch2 = new CountDownLatch(1); + submissionPublisher.subscribe(new Flow.Subscriber<>() { + Flow.Subscription subscription; + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + log.info("建立订阅"); + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(String item) { + log.info("{}", item); + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + subscription.cancel(); + countDownLatch2.countDown(); + log.info("订阅结束"); + } + }); + countDownLatch2.await(); executor.shutdown(); } }