diff --git a/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java b/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java index 979283a..06bec66 100644 --- a/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java +++ b/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java @@ -3,7 +3,9 @@ package cn.skcks.study.rxjava; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.RandomUtil; import cn.skcks.docking.gb28181.sip.method.register.request.RegisterRequestBuilder; +import cn.skcks.docking.gb28181.sip.method.register.response.RegisterResponseBuilder; import gov.nist.javax.sip.message.SIPRequest; +import gov.nist.javax.sip.message.SIPResponse; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; @@ -16,7 +18,9 @@ import org.springframework.boot.test.context.SpringBootTest; import javax.sip.ListeningPoint; import javax.sip.message.Request; +import javax.sip.message.Response; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @Slf4j @@ -104,7 +108,10 @@ public class RxJavaStudyStarterTest { .map(i -> { String callId = IdUtil.fastSimpleUUID(); produce.getAndAdd(1); - return requestBuilder.createNoAuthorizationRequest(callId, 3600); + Request request = requestBuilder.createNoAuthorizationRequest(callId, 3600); + SIPRequest sipRequest = (SIPRequest) request; + log.info("发送SIP请求: \n{}", sipRequest); + return request; }) .cache() .share() @@ -121,9 +128,16 @@ public class RxJavaStudyStarterTest { Thread.sleep(RandomUtil.randomLong(10,100)); log.info("处理SIP请求: {}", sipRequest.getCallId().getCallId()); consume.getAndAdd(1); + + RegisterResponseBuilder registerResponseBuilder = RegisterResponseBuilder.builder().build(); + Response passedAuthorzatioinResponse = registerResponseBuilder.createPassedAuthorzatioinResponse(request); + Observable.just(passedAuthorzatioinResponse).subscribeOn(Schedulers.io()).subscribe(response -> { + SIPResponse sipResponse = (SIPResponse) response; + log.info("响应SIP请求: {}", sipResponse.getCallId().getCallId()); + log.info("响应SIP: \n{}", sipResponse); + }); }; - requestFlowable.subscribe(serverConsumer); requestFlowable.subscribe(serverConsumer); // requestObservable.subscribe(serverConsumer); // requestObservable.blockingSubscribe(); @@ -131,4 +145,35 @@ public class RxJavaStudyStarterTest { Thread.sleep(TimeUnit.SECONDS.toMillis(30)); log.info("生产数量: {}, 消费数量: {}", produce.get(), consume.get()); } + + @SneakyThrows + @Test + void timeoutTest(){ + Observable observable = Observable.create(emitter -> { + Thread.sleep(50); + emitter.onNext("Hello"); + Thread.sleep(200); + emitter.onNext("World"); + emitter.onComplete(); + }); + + observable = observable.timeout(100, TimeUnit.MILLISECONDS); + // observable.timeout(100, TimeUnit.MILLISECONDS, Observable.empty()); + observable = observable.retry(2) + .doOnError(e -> log.error("Observable Error: ", e)) + .onErrorResumeNext(throwable -> Observable.empty()); + + observable.subscribe(item -> { + log.info("onNext: {}", item); + }, e -> { + if(e instanceof TimeoutException){ + log.info("subscribe 超时"); + } else { + log.error("subscribe Error: ", e); + } + }); + + observable.blockingSubscribe(); + Thread.sleep(1000); + } }