This commit is contained in:
shikong 2024-05-01 01:24:49 +08:00
parent 8dbf5293ba
commit 79d5d38737
4 changed files with 48 additions and 16 deletions

View File

@ -13,9 +13,9 @@
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" />
<processorPath useClasspath="false">
<entry name="E:/Maven/repository/org/projectlombok/lombok/1.18.32/lombok-1.18.32.jar" />
<entry name="E:/Maven/repository/org/projectlombok/lombok-mapstruct-binding/0.2.0/lombok-mapstruct-binding-0.2.0.jar" />
<entry name="E:/Maven/repository/org/springframework/boot/spring-boot-configuration-processor/3.2.5/spring-boot-configuration-processor-3.2.5.jar" />
<entry name="D:/Repository/maven/org/projectlombok/lombok/1.18.32/lombok-1.18.32.jar" />
<entry name="D:/Repository/maven/org/projectlombok/lombok-mapstruct-binding/0.2.0/lombok-mapstruct-binding-0.2.0.jar" />
<entry name="D:/Repository/maven/org/springframework/boot/spring-boot-configuration-processor/3.2.5/spring-boot-configuration-processor-3.2.5.jar" />
</processorPath>
<module name="rxjava-study-starter" />
</profile>
@ -24,6 +24,7 @@
<component name="JavacSettings">
<option name="ADDITIONAL_OPTIONS_OVERRIDE">
<module name="RxJavaStudy" options="-parameters" />
<module name="rxjava-study" options="-parameters" />
<module name="rxjava-study-starter" options="-parameters" />
</option>
</component>

View File

@ -16,5 +16,10 @@
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Central Repository" />
<option name="url" value="http://10.10.10.200:18081/repository/maven-public/" />
</remote-repository>
</component>
</project>

View File

@ -1,4 +1,3 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CodeInsightWorkspaceSettings">
<option name="optimizeImportsOnTheFly" value="true" />

View File

@ -2,13 +2,14 @@ package cn.skcks.study.rxjava;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.concurrent.TimeUnit;
@Slf4j
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class RxJavaStudyStarterTest {
@ -18,18 +19,44 @@ public class RxJavaStudyStarterTest {
Disposable disposable = Flowable.just("Hello world").subscribe(System.out::println);
disposable.dispose();
Observable<Integer> integerObservable = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
for (int i = 1; i <= 5; i++) {
emitter.onNext(i);
Thread.sleep((long) (Math.random() * 100));
}
emitter.onComplete();
}).subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread());
// Observable<Long> longObservable = Observable.create((ObservableOnSubscribe<Long>) emitter -> {
// for (long i = 1L; i <= 5L; i++) {
// emitter.onNext(i);
// Thread.sleep((long) (Math.random() * 100));
// }
// emitter.onComplete();
// }).subscribeOn(Schedulers.io())
// .observeOn(Schedulers.newThread())
// .doOnError(e -> log.error("Error: ", e));
integerObservable.subscribe(item -> System.out.println("println: Subscribe Next: " + item));
integerObservable.subscribe(item -> log.info("[logInfo] Subscribe Next: {}", item));
integerObservable.blockingSubscribe();
Observable<Long> intervalObservable = Observable.interval(20, TimeUnit.MILLISECONDS)
.take(50)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.doOnError(e -> log.error("Error: ", e));
Disposable delayDisposable = intervalObservable.subscribe(item -> {
try {
log.debug("println: Subscribe Next: {}", item);
Thread.sleep(100);
} catch (InterruptedException e) {
log.error("执行中断 ", e);
}
}, e -> {
log.error("Error: ", e);
}, () -> log.info("Complete"));
intervalObservable.subscribe(item -> log.info("[logInfo] Subscribe Next: {}", item));
new Thread(()->{
try {
Thread.sleep(200);
} catch (InterruptedException ignored) {
}
delayDisposable.dispose();
}).start();
intervalObservable.blockingSubscribe();
Observable<Integer> take = Observable.fromArray(1, 2, 3, 4, 5)
.observeOn(Schedulers.computation()).skip(2).take(1);