This commit is contained in:
shikong 2024-05-01 05:13:01 +08:00
parent 79d5d38737
commit 85b48056f6
3 changed files with 71 additions and 1 deletions

View File

@ -27,10 +27,19 @@
<springboot.version>3.2.5</springboot.version> <springboot.version>3.2.5</springboot.version>
<rxjava.version>3.1.8</rxjava.version> <rxjava.version>3.1.8</rxjava.version>
<springdoc.version>2.5.0</springdoc.version> <springdoc.version>2.5.0</springdoc.version>
<gb28181.docking.version>0.1.0</gb28181.docking.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
<dependencies> <dependencies>
<dependency>
<groupId>cn.skcks.docking</groupId>
<artifactId>gb28181</artifactId>
<version>${gb28181.docking.version}</version>
<type>pom</type>
</dependency>
<dependency> <dependency>
<groupId>io.reactivex.rxjava3</groupId> <groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId> <artifactId>rxjava</artifactId>

View File

@ -19,6 +19,12 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>cn.skcks.docking.gb28181</groupId>
<artifactId>gb28181-sip</artifactId>
<version>${gb28181.docking.version}</version>
</dependency>
<dependency> <dependency>
<groupId>org.springdoc</groupId> <groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-api</artifactId> <artifactId>springdoc-openapi-starter-webmvc-api</artifactId>
@ -70,4 +76,4 @@
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
</project> </project>

View File

@ -1,13 +1,20 @@
package cn.skcks.study.rxjava; package cn.skcks.study.rxjava;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import cn.skcks.docking.gb28181.sip.method.register.request.RegisterRequestBuilder;
import gov.nist.javax.sip.message.SIPRequest;
import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import javax.sip.ListeningPoint;
import javax.sip.message.Request;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@ -62,4 +69,52 @@ public class RxJavaStudyStarterTest {
.observeOn(Schedulers.computation()).skip(2).take(1); .observeOn(Schedulers.computation()).skip(2).take(1);
take.subscribe(System.out::println); take.subscribe(System.out::println);
} }
@Test
void gb28181SipTest(){
Consumer<Request> serverConsumer = request -> {
SIPRequest sipRequest = (SIPRequest) request;
String method = request.getMethod();
log.info("收到SIP请求: {}", method);
Thread.sleep(RandomUtil.randomLong(10,500));
log.info("处理SIP请求: {}", sipRequest.getCallId().getCallId());
};
String localIp = "127.0.0.1";
int localPort = 5060;
String localId = "44050100000000000001";
String serverId = "44050100002000000003";
String serverIp = "10.10.10.20";
int serverPort = 5060;
String domain = "4405010000";
RegisterRequestBuilder requestBuilder = RegisterRequestBuilder.builder()
.localId(localId)
.localIp(localIp)
.localPort(localPort)
.targetId(serverId)
.targetIp(serverIp)
.targetPort(serverPort)
.transport(ListeningPoint.UDP)
.build();
Flowable<Request> requestObservable = Flowable.intervalRange(1,100, 0,10 ,TimeUnit.MILLISECONDS)
.onBackpressureBuffer()
.map(i -> {
String callId = IdUtil.fastSimpleUUID();
return requestBuilder.createNoAuthorizationRequest(callId, 3600);
})
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io());
requestObservable.subscribe(serverConsumer);
requestObservable.subscribe(serverConsumer);
requestObservable.subscribe(serverConsumer);
requestObservable.subscribe(serverConsumer);
requestObservable.subscribe(serverConsumer);
requestObservable.blockingSubscribe();
}
} }