Disruptor 试水

This commit is contained in:
shikong 2024-05-05 16:40:05 +08:00
parent 1eb055c662
commit 596a973514

View File

@ -3,7 +3,9 @@ package cn.skcks.study.rxjava;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.RandomUtil;
import cn.skcks.docking.gb28181.sip.method.register.request.RegisterRequestBuilder;
import cn.skcks.docking.gb28181.sip.method.register.response.RegisterResponseBuilder;
import gov.nist.javax.sip.message.SIPRequest;
import gov.nist.javax.sip.message.SIPResponse;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
@ -16,7 +18,9 @@ import org.springframework.boot.test.context.SpringBootTest;
import javax.sip.ListeningPoint;
import javax.sip.message.Request;
import javax.sip.message.Response;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
@Slf4j
@ -104,7 +108,10 @@ public class RxJavaStudyStarterTest {
.map(i -> {
String callId = IdUtil.fastSimpleUUID();
produce.getAndAdd(1);
return requestBuilder.createNoAuthorizationRequest(callId, 3600);
Request request = requestBuilder.createNoAuthorizationRequest(callId, 3600);
SIPRequest sipRequest = (SIPRequest) request;
log.info("发送SIP请求: \n{}", sipRequest);
return request;
})
.cache()
.share()
@ -121,9 +128,16 @@ public class RxJavaStudyStarterTest {
Thread.sleep(RandomUtil.randomLong(10,100));
log.info("处理SIP请求: {}", sipRequest.getCallId().getCallId());
consume.getAndAdd(1);
RegisterResponseBuilder registerResponseBuilder = RegisterResponseBuilder.builder().build();
Response passedAuthorzatioinResponse = registerResponseBuilder.createPassedAuthorzatioinResponse(request);
Observable.just(passedAuthorzatioinResponse).subscribeOn(Schedulers.io()).subscribe(response -> {
SIPResponse sipResponse = (SIPResponse) response;
log.info("响应SIP请求: {}", sipResponse.getCallId().getCallId());
log.info("响应SIP: \n{}", sipResponse);
});
};
requestFlowable.subscribe(serverConsumer);
requestFlowable.subscribe(serverConsumer);
// requestObservable.subscribe(serverConsumer);
// requestObservable.blockingSubscribe();
@ -131,4 +145,35 @@ public class RxJavaStudyStarterTest {
Thread.sleep(TimeUnit.SECONDS.toMillis(30));
log.info("生产数量: {}, 消费数量: {}", produce.get(), consume.get());
}
@SneakyThrows
@Test
void timeoutTest(){
Observable<Object> observable = Observable.create(emitter -> {
Thread.sleep(50);
emitter.onNext("Hello");
Thread.sleep(200);
emitter.onNext("World");
emitter.onComplete();
});
observable = observable.timeout(100, TimeUnit.MILLISECONDS);
// observable.timeout(100, TimeUnit.MILLISECONDS, Observable.empty());
observable = observable.retry(2)
.doOnError(e -> log.error("Observable Error: ", e))
.onErrorResumeNext(throwable -> Observable.empty());
observable.subscribe(item -> {
log.info("onNext: {}", item);
}, e -> {
if(e instanceof TimeoutException){
log.info("subscribe 超时");
} else {
log.error("subscribe Error: ", e);
}
});
observable.blockingSubscribe();
Thread.sleep(1000);
}
}