From 005b9b0dc1fb84d905aaeff90adb5c95e5b6bfa5 Mon Sep 17 00:00:00 2001 From: shikong <919411476@qq.com> Date: Fri, 11 Aug 2023 15:13:16 +0800 Subject: [PATCH] =?UTF-8?q?Flow=20=E8=AE=A2=E9=98=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/sip/listener/SipListenerImpl.java | 2 + .../core/sip/message/event/SipEventItem.java | 58 +++++++++++++++++ .../core/sip/message/event/SipEventType.java | 9 +++ .../core/sip/message/event/SipSubscribe.java | 35 ++++++++++ .../core/sip/message/event/SipSubscriber.java | 7 ++ .../event/custom/DeviceNotFoundEvent.java | 20 ++++++ .../core/sip/message/event/SipEventTest.java | 64 +++++++++++++++++++ lombok.config | 1 + orm/.gitignore | 38 +++++++++++ orm/pom.xml | 21 ++++++ pom.xml | 1 + 11 files changed, 256 insertions(+) create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventItem.java create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventType.java create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscribe.java create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscriber.java create mode 100644 gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/custom/DeviceNotFoundEvent.java create mode 100644 gb28181-service/src/test/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventTest.java create mode 100644 lombok.config create mode 100644 orm/.gitignore create mode 100644 orm/pom.xml diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java index 8a075d7..6bd27b5 100644 --- a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/listener/SipListenerImpl.java @@ -39,6 +39,8 @@ public class SipListenerImpl implements SipListener { Response response = responseEvent.getResponse(); int status = response.getStatusCode(); + // log.debug(); + // Success if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) { CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventItem.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventItem.java new file mode 100644 index 0000000..a65ea74 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventItem.java @@ -0,0 +1,58 @@ +package cn.skcks.docking.gb28181.core.sip.message.event; + +import cn.skcks.docking.gb28181.common.json.ResponseStatus; +import cn.skcks.docking.gb28181.core.sip.message.event.custom.DeviceNotFoundEvent; +import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; +import lombok.Data; + +import javax.sip.DialogTerminatedEvent; +import javax.sip.ResponseEvent; +import javax.sip.TimeoutEvent; +import java.util.EventObject; + +@Data +public class SipEventItem { + private int statusCode; + private SipEventType type; + private String msg; + private String callId; + private final EventObject event; + + public SipEventItem(EventObject eventObject) { + event = eventObject; + msg = ResponseStatus.UNDEFINED.getMessage(); + statusCode = ResponseStatus.UNDEFINED.getCode(); + if(eventObject instanceof ResponseEvent responseEvent){ + SIPResponse response = (SIPResponse)responseEvent.getResponse(); + type = SipEventType.Response; + if (response != null) { + msg = response.getReasonPhrase(); + statusCode = response.getStatusCode(); + callId = response.getCallIdHeader().getCallId(); + } + } else if(eventObject instanceof TimeoutEvent timeoutEvent){ + type = SipEventType.TimeOut; + msg = "消息超时未回复"; + statusCode = ResponseStatus.REQUEST_TIMEOUT.getCode(); + + SIPRequest request; + if (timeoutEvent.isServerTransaction()) { + request = ((SIPRequest)timeoutEvent.getServerTransaction().getRequest()); + } else { + request = ((SIPRequest)timeoutEvent.getClientTransaction().getRequest()); + } + callId = request.getCallIdHeader().getCallId(); + } else if(eventObject instanceof DialogTerminatedEvent dialogTerminatedEvent){ + type = SipEventType.End; + msg = "会话已结束"; + statusCode = ResponseStatus.GONE.getCode(); + callId = dialogTerminatedEvent.getDialog().getCallId().getCallId(); + } else if(eventObject instanceof DeviceNotFoundEvent deviceNotFoundEvent){ + type = SipEventType.DeviceNotFound; + msg = "设备未找到"; + statusCode = ResponseStatus.NOT_FOUND.getCode(); + callId = deviceNotFoundEvent.getCallId(); + } + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventType.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventType.java new file mode 100644 index 0000000..c793aa2 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventType.java @@ -0,0 +1,9 @@ +package cn.skcks.docking.gb28181.core.sip.message.event; + +public enum SipEventType { + TimeOut, + Response, + End, + DeviceNotFound, + CmdFail +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscribe.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscribe.java new file mode 100644 index 0000000..c40910a --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscribe.java @@ -0,0 +1,35 @@ +package cn.skcks.docking.gb28181.core.sip.message.event; + +import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.Data; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Service; + +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; + +@Slf4j +@Data +@RequiredArgsConstructor +@Service +public class SipSubscribe { + @Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME) + private final Executor executor; + + private SubmissionPublisher submissionPublisher; + + @PostConstruct + private void init(){ + submissionPublisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); + } + + @PreDestroy + private void destroy(){ + submissionPublisher.close(); + } +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscriber.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscriber.java new file mode 100644 index 0000000..d943af8 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/SipSubscriber.java @@ -0,0 +1,7 @@ +package cn.skcks.docking.gb28181.core.sip.message.event; + +import java.util.concurrent.Flow; + +public interface SipSubscriber extends Flow.Subscriber { + +} diff --git a/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/custom/DeviceNotFoundEvent.java b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/custom/DeviceNotFoundEvent.java new file mode 100644 index 0000000..56407b2 --- /dev/null +++ b/gb28181-service/src/main/java/cn/skcks/docking/gb28181/core/sip/message/event/custom/DeviceNotFoundEvent.java @@ -0,0 +1,20 @@ +package cn.skcks.docking.gb28181.core.sip.message.event.custom; + +import javax.sip.Dialog; +import java.util.EventObject; + +public class DeviceNotFoundEvent extends EventObject { + private String callId; + + public DeviceNotFoundEvent(Dialog dialog) { + super(dialog); + } + + public String getCallId() { + return callId; + } + + public void setCallId(String callId) { + this.callId = callId; + } +} diff --git a/gb28181-service/src/test/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventTest.java b/gb28181-service/src/test/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventTest.java new file mode 100644 index 0000000..7a33959 --- /dev/null +++ b/gb28181-service/src/test/java/cn/skcks/docking/gb28181/core/sip/message/event/SipEventTest.java @@ -0,0 +1,64 @@ +package cn.skcks.docking.gb28181.core.sip.message.event; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +@Slf4j +public class SipEventTest { + public static void main(String[] args) throws InterruptedException { + CountDownLatch countDownLatch = new CountDownLatch(512); + + int threadNum = Runtime.getRuntime().availableProcessors() * 2; + + int taskNum = 1000; + + ExecutorService executor = new ThreadPoolExecutor(threadNum, threadNum, + 1, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(taskNum), // 使用有界队列,避免OOM + new ThreadPoolExecutor.DiscardPolicy()); + + SubmissionPublisher submissionPublisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize()); + submissionPublisher.subscribe(new Flow.Subscriber<>() { + Flow.Subscription subscription; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + log.info("建立订阅"); + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(String item) { + log.info("接收发送者消息 {}", item); + countDownLatch.countDown(); + subscription.request(1); + } + + @Override + public void onError(Throwable throwable) { + + } + + @Override + public void onComplete() { + log.info("订阅结束"); + } + }); + + + AtomicInteger finalI = new AtomicInteger(1); + for (int i = 0; i < 128; i++) { + new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start(); + new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start(); + new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start(); + new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start(); + } + + countDownLatch.await(); + submissionPublisher.close(); + executor.shutdown(); + } +} diff --git a/lombok.config b/lombok.config new file mode 100644 index 0000000..53a4a72 --- /dev/null +++ b/lombok.config @@ -0,0 +1 @@ +lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier diff --git a/orm/.gitignore b/orm/.gitignore new file mode 100644 index 0000000..5ff6309 --- /dev/null +++ b/orm/.gitignore @@ -0,0 +1,38 @@ +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### IntelliJ IDEA ### +.idea/modules.xml +.idea/jarRepositories.xml +.idea/compiler.xml +.idea/libraries/ +*.iws +*.iml +*.ipr + +### Eclipse ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ + +### Mac OS ### +.DS_Store \ No newline at end of file diff --git a/orm/pom.xml b/orm/pom.xml new file mode 100644 index 0000000..2d44e20 --- /dev/null +++ b/orm/pom.xml @@ -0,0 +1,21 @@ + + + 4.0.0 + + cn.skcks.docking + gb28181 + 0.0.1-SNAPSHOT + + + cn.skcks.docking.gb28181 + orm + + + 17 + 17 + UTF-8 + + + diff --git a/pom.xml b/pom.xml index c7be6bc..43ed37f 100644 --- a/pom.xml +++ b/pom.xml @@ -20,6 +20,7 @@ common api gb28181-service + orm