diff --git a/pom.xml b/pom.xml index 0dc71af..aa16e74 100644 --- a/pom.xml +++ b/pom.xml @@ -27,10 +27,19 @@ 3.2.5 3.1.8 2.5.0 + + 0.1.0 + + cn.skcks.docking + gb28181 + ${gb28181.docking.version} + pom + + io.reactivex.rxjava3 rxjava diff --git a/starter/pom.xml b/starter/pom.xml index f5c34ef..61884c6 100644 --- a/starter/pom.xml +++ b/starter/pom.xml @@ -19,6 +19,12 @@ + + cn.skcks.docking.gb28181 + gb28181-sip + ${gb28181.docking.version} + + org.springdoc springdoc-openapi-starter-webmvc-api @@ -70,4 +76,4 @@ - \ No newline at end of file + diff --git a/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java b/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java index 4da4e87..850c1a1 100644 --- a/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java +++ b/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java @@ -1,13 +1,20 @@ package cn.skcks.study.rxjava; +import cn.hutool.core.util.IdUtil; +import cn.hutool.core.util.RandomUtil; +import cn.skcks.docking.gb28181.sip.method.register.request.RegisterRequestBuilder; +import gov.nist.javax.sip.message.SIPRequest; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.functions.Consumer; import io.reactivex.rxjava3.schedulers.Schedulers; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; +import javax.sip.ListeningPoint; +import javax.sip.message.Request; import java.util.concurrent.TimeUnit; @Slf4j @@ -62,4 +69,52 @@ public class RxJavaStudyStarterTest { .observeOn(Schedulers.computation()).skip(2).take(1); take.subscribe(System.out::println); } + + @Test + void gb28181SipTest(){ + Consumer serverConsumer = request -> { + SIPRequest sipRequest = (SIPRequest) request; + String method = request.getMethod(); + log.info("收到SIP请求: {}", method); + Thread.sleep(RandomUtil.randomLong(10,500)); + log.info("处理SIP请求: {}", sipRequest.getCallId().getCallId()); + }; + + String localIp = "127.0.0.1"; + int localPort = 5060; + String localId = "44050100000000000001"; + + String serverId = "44050100002000000003"; + String serverIp = "10.10.10.20"; + int serverPort = 5060; + + String domain = "4405010000"; + + RegisterRequestBuilder requestBuilder = RegisterRequestBuilder.builder() + .localId(localId) + .localIp(localIp) + .localPort(localPort) + .targetId(serverId) + .targetIp(serverIp) + .targetPort(serverPort) + .transport(ListeningPoint.UDP) + .build(); + + Flowable requestObservable = Flowable.intervalRange(1,100, 0,10 ,TimeUnit.MILLISECONDS) + .onBackpressureBuffer() + .map(i -> { + String callId = IdUtil.fastSimpleUUID(); + return requestBuilder.createNoAuthorizationRequest(callId, 3600); + }) + .observeOn(Schedulers.computation()) + .subscribeOn(Schedulers.io()); + + requestObservable.subscribe(serverConsumer); + requestObservable.subscribe(serverConsumer); + requestObservable.subscribe(serverConsumer); + requestObservable.subscribe(serverConsumer); + requestObservable.subscribe(serverConsumer); + + requestObservable.blockingSubscribe(); + } }