diff --git a/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java b/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java index 06bec66..b37e014 100644 --- a/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java +++ b/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java @@ -6,6 +6,7 @@ import cn.skcks.docking.gb28181.sip.method.register.request.RegisterRequestBuild import cn.skcks.docking.gb28181.sip.method.register.response.RegisterResponseBuilder; import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPResponse; +import io.reactivex.rxjava3.core.BackpressureStrategy; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.disposables.Disposable; @@ -19,6 +20,7 @@ import org.springframework.boot.test.context.SpringBootTest; import javax.sip.ListeningPoint; import javax.sip.message.Request; import javax.sip.message.Response; +import java.util.Arrays; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; @@ -149,22 +151,31 @@ public class RxJavaStudyStarterTest { @SneakyThrows @Test void timeoutTest(){ - Observable observable = Observable.create(emitter -> { + Flowable observable = Flowable.create(emitter -> { Thread.sleep(50); - emitter.onNext("Hello"); + Arrays.stream("Hello".split("")).forEach(emitter::onNext); Thread.sleep(200); emitter.onNext("World"); emitter.onComplete(); - }); + }, BackpressureStrategy.LATEST); + // 缓存数据 + observable = observable.cacheWithInitialCapacity(1); + // 超时时间 100ms observable = observable.timeout(100, TimeUnit.MILLISECONDS); + // 超时时间 100ms + 返回默认值 // observable.timeout(100, TimeUnit.MILLISECONDS, Observable.empty()); - observable = observable.retry(2) - .doOnError(e -> log.error("Observable Error: ", e)) - .onErrorResumeNext(throwable -> Observable.empty()); + + // 异常或失败 最多重试2次 + observable = observable.retry(2); + observable = observable.doOnError(e -> log.error("Observable Error: ", e)) + .onErrorResumeNext(throwable -> { + log.error("Observable Error (异常恢复): ", throwable); + return Flowable.empty(); + }); observable.subscribe(item -> { - log.info("onNext: {}", item); + log.info("subscribe onNext: {}", item); }, e -> { if(e instanceof TimeoutException){ log.info("subscribe 超时");