This commit is contained in:
shikong 2024-04-30 16:14:16 +08:00
parent e00590336e
commit 949c1ee51a

View File

@ -4,6 +4,7 @@ import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@ -17,14 +18,18 @@ public class RxJavaStudyStarterTest {
Disposable disposable = Flowable.just("Hello world").subscribe(System.out::println);
disposable.dispose();
Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
Observable<Integer> integerObservable = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
for (int i = 1; i <= 5; i++) {
emitter.onNext(i);
}
emitter.onComplete();
}).subscribe(item -> System.out.println("Next: " + item));
}).observeOn(Schedulers.io());
Observable<Integer> take = Observable.fromArray(1, 2, 3, 4, 5).skip(2).take(1);
integerObservable.subscribe(item -> System.out.println("Subscribe Next: " + item));
integerObservable.subscribe(item -> log.info("[logInfo] Subscribe Next: {}", item));
Observable<Integer> take = Observable.fromArray(1, 2, 3, 4, 5)
.observeOn(Schedulers.computation()).skip(2).take(1);
take.subscribe(System.out::println);
}
}