This commit is contained in:
shikong 2024-05-05 17:25:58 +08:00
parent 596a973514
commit c50667380a

View File

@ -6,6 +6,7 @@ import cn.skcks.docking.gb28181.sip.method.register.request.RegisterRequestBuild
import cn.skcks.docking.gb28181.sip.method.register.response.RegisterResponseBuilder; import cn.skcks.docking.gb28181.sip.method.register.response.RegisterResponseBuilder;
import gov.nist.javax.sip.message.SIPRequest; import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse; import gov.nist.javax.sip.message.SIPResponse;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.disposables.Disposable;
@ -19,6 +20,7 @@ import org.springframework.boot.test.context.SpringBootTest;
import javax.sip.ListeningPoint; import javax.sip.ListeningPoint;
import javax.sip.message.Request; import javax.sip.message.Request;
import javax.sip.message.Response; import javax.sip.message.Response;
import java.util.Arrays;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -149,22 +151,31 @@ public class RxJavaStudyStarterTest {
@SneakyThrows @SneakyThrows
@Test @Test
void timeoutTest(){ void timeoutTest(){
Observable<Object> observable = Observable.create(emitter -> { Flowable<Object> observable = Flowable.create(emitter -> {
Thread.sleep(50); Thread.sleep(50);
emitter.onNext("Hello"); Arrays.stream("Hello".split("")).forEach(emitter::onNext);
Thread.sleep(200); Thread.sleep(200);
emitter.onNext("World"); emitter.onNext("World");
emitter.onComplete(); emitter.onComplete();
}); }, BackpressureStrategy.LATEST);
// 缓存数据
observable = observable.cacheWithInitialCapacity(1);
// 超时时间 100ms
observable = observable.timeout(100, TimeUnit.MILLISECONDS); observable = observable.timeout(100, TimeUnit.MILLISECONDS);
// 超时时间 100ms + 返回默认值
// observable.timeout(100, TimeUnit.MILLISECONDS, Observable.empty()); // observable.timeout(100, TimeUnit.MILLISECONDS, Observable.empty());
observable = observable.retry(2)
.doOnError(e -> log.error("Observable Error: ", e)) // 异常或失败 最多重试2次
.onErrorResumeNext(throwable -> Observable.empty()); observable = observable.retry(2);
observable = observable.doOnError(e -> log.error("Observable Error: ", e))
.onErrorResumeNext(throwable -> {
log.error("Observable Error (异常恢复): ", throwable);
return Flowable.empty();
});
observable.subscribe(item -> { observable.subscribe(item -> {
log.info("onNext: {}", item); log.info("subscribe onNext: {}", item);
}, e -> { }, e -> {
if(e instanceof TimeoutException){ if(e instanceof TimeoutException){
log.info("subscribe 超时"); log.info("subscribe 超时");