Flow 订阅

This commit is contained in:
shikong 2023-08-11 15:13:16 +08:00
parent a1b1a17d23
commit 005b9b0dc1
11 changed files with 256 additions and 0 deletions

View File

@ -39,6 +39,8 @@ public class SipListenerImpl implements SipListener {
Response response = responseEvent.getResponse(); Response response = responseEvent.getResponse();
int status = response.getStatusCode(); int status = response.getStatusCode();
// log.debug();
// Success // Success
if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) { if (((status >= Response.OK) && (status < Response.MULTIPLE_CHOICES)) || status == Response.UNAUTHORIZED) {
CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME); CSeqHeader cseqHeader = (CSeqHeader) responseEvent.getResponse().getHeader(CSeqHeader.NAME);

View File

@ -0,0 +1,58 @@
package cn.skcks.docking.gb28181.core.sip.message.event;
import cn.skcks.docking.gb28181.common.json.ResponseStatus;
import cn.skcks.docking.gb28181.core.sip.message.event.custom.DeviceNotFoundEvent;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import lombok.Data;
import javax.sip.DialogTerminatedEvent;
import javax.sip.ResponseEvent;
import javax.sip.TimeoutEvent;
import java.util.EventObject;
@Data
public class SipEventItem {
private int statusCode;
private SipEventType type;
private String msg;
private String callId;
private final EventObject event;
public SipEventItem(EventObject eventObject) {
event = eventObject;
msg = ResponseStatus.UNDEFINED.getMessage();
statusCode = ResponseStatus.UNDEFINED.getCode();
if(eventObject instanceof ResponseEvent responseEvent){
SIPResponse response = (SIPResponse)responseEvent.getResponse();
type = SipEventType.Response;
if (response != null) {
msg = response.getReasonPhrase();
statusCode = response.getStatusCode();
callId = response.getCallIdHeader().getCallId();
}
} else if(eventObject instanceof TimeoutEvent timeoutEvent){
type = SipEventType.TimeOut;
msg = "消息超时未回复";
statusCode = ResponseStatus.REQUEST_TIMEOUT.getCode();
SIPRequest request;
if (timeoutEvent.isServerTransaction()) {
request = ((SIPRequest)timeoutEvent.getServerTransaction().getRequest());
} else {
request = ((SIPRequest)timeoutEvent.getClientTransaction().getRequest());
}
callId = request.getCallIdHeader().getCallId();
} else if(eventObject instanceof DialogTerminatedEvent dialogTerminatedEvent){
type = SipEventType.End;
msg = "会话已结束";
statusCode = ResponseStatus.GONE.getCode();
callId = dialogTerminatedEvent.getDialog().getCallId().getCallId();
} else if(eventObject instanceof DeviceNotFoundEvent deviceNotFoundEvent){
type = SipEventType.DeviceNotFound;
msg = "设备未找到";
statusCode = ResponseStatus.NOT_FOUND.getCode();
callId = deviceNotFoundEvent.getCallId();
}
}
}

View File

@ -0,0 +1,9 @@
package cn.skcks.docking.gb28181.core.sip.message.event;
public enum SipEventType {
TimeOut,
Response,
End,
DeviceNotFound,
CmdFail
}

View File

@ -0,0 +1,35 @@
package cn.skcks.docking.gb28181.core.sip.message.event;
import cn.skcks.docking.gb28181.core.sip.executor.DefaultSipExecutor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
@Slf4j
@Data
@RequiredArgsConstructor
@Service
public class SipSubscribe {
@Qualifier(DefaultSipExecutor.EXECUTOR_BEAN_NAME)
private final Executor executor;
private SubmissionPublisher<SipEventItem> submissionPublisher;
@PostConstruct
private void init(){
submissionPublisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
}
@PreDestroy
private void destroy(){
submissionPublisher.close();
}
}

View File

@ -0,0 +1,7 @@
package cn.skcks.docking.gb28181.core.sip.message.event;
import java.util.concurrent.Flow;
public interface SipSubscriber extends Flow.Subscriber<SipEventItem> {
}

View File

@ -0,0 +1,20 @@
package cn.skcks.docking.gb28181.core.sip.message.event.custom;
import javax.sip.Dialog;
import java.util.EventObject;
public class DeviceNotFoundEvent extends EventObject {
private String callId;
public DeviceNotFoundEvent(Dialog dialog) {
super(dialog);
}
public String getCallId() {
return callId;
}
public void setCallId(String callId) {
this.callId = callId;
}
}

View File

@ -0,0 +1,64 @@
package cn.skcks.docking.gb28181.core.sip.message.event;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class SipEventTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(512);
int threadNum = Runtime.getRuntime().availableProcessors() * 2;
int taskNum = 1000;
ExecutorService executor = new ThreadPoolExecutor(threadNum, threadNum,
1, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(taskNum), // 使用有界队列避免OOM
new ThreadPoolExecutor.DiscardPolicy());
SubmissionPublisher<String> submissionPublisher = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
submissionPublisher.subscribe(new Flow.Subscriber<>() {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("建立订阅");
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
log.info("接收发送者消息 {}", item);
countDownLatch.countDown();
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
log.info("订阅结束");
}
});
AtomicInteger finalI = new AtomicInteger(1);
for (int i = 0; i < 128; i++) {
new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start();
new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start();
new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start();
new Thread(() -> submissionPublisher.submit(String.valueOf(finalI.getAndIncrement()))).start();
}
countDownLatch.await();
submissionPublisher.close();
executor.shutdown();
}
}

1
lombok.config Normal file
View File

@ -0,0 +1 @@
lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Qualifier

38
orm/.gitignore vendored Normal file
View File

@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/
### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr
### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/
### VS Code ###
.vscode/
### Mac OS ###
.DS_Store

21
orm/pom.xml Normal file
View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.skcks.docking</groupId>
<artifactId>gb28181</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<groupId>cn.skcks.docking.gb28181</groupId>
<artifactId>orm</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

View File

@ -20,6 +20,7 @@
<module>common</module> <module>common</module>
<module>api</module> <module>api</module>
<module>gb28181-service</module> <module>gb28181-service</module>
<module>orm</module>
</modules> </modules>
<properties> <properties>