From 79d5d387370ecd88a88a6ed6092b35718aa1f1c5 Mon Sep 17 00:00:00 2001
From: shikong <919411476@qq.com>
Date: Wed, 1 May 2024 01:24:49 +0800
Subject: [PATCH] =?UTF-8?q?=E6=B5=8B=E8=AF=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.idea/compiler.xml | 7 +--
.idea/jarRepositories.xml | 5 ++
.idea/misc.xml | 1 -
.../study/rxjava/RxJavaStudyStarterTest.java | 51 ++++++++++++++-----
4 files changed, 48 insertions(+), 16 deletions(-)
diff --git a/.idea/compiler.xml b/.idea/compiler.xml
index 51d4a87..10a8f5b 100644
--- a/.idea/compiler.xml
+++ b/.idea/compiler.xml
@@ -13,9 +13,9 @@
-
-
-
+
+
+
@@ -24,6 +24,7 @@
diff --git a/.idea/jarRepositories.xml b/.idea/jarRepositories.xml
index 5a2f139..943b25e 100644
--- a/.idea/jarRepositories.xml
+++ b/.idea/jarRepositories.xml
@@ -16,5 +16,10 @@
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
index 26901cf..af50359 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,3 @@
-
diff --git a/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java b/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java
index 9323155..4da4e87 100644
--- a/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java
+++ b/starter/src/test/java/cn/skcks/study/rxjava/RxJavaStudyStarterTest.java
@@ -2,13 +2,14 @@ package cn.skcks.study.rxjava;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
-import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
+import java.util.concurrent.TimeUnit;
+
@Slf4j
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class RxJavaStudyStarterTest {
@@ -18,18 +19,44 @@ public class RxJavaStudyStarterTest {
Disposable disposable = Flowable.just("Hello world").subscribe(System.out::println);
disposable.dispose();
- Observable integerObservable = Observable.create((ObservableOnSubscribe) emitter -> {
- for (int i = 1; i <= 5; i++) {
- emitter.onNext(i);
- Thread.sleep((long) (Math.random() * 100));
- }
- emitter.onComplete();
- }).subscribeOn(Schedulers.io())
- .observeOn(Schedulers.newThread());
+ // Observable longObservable = Observable.create((ObservableOnSubscribe) emitter -> {
+ // for (long i = 1L; i <= 5L; i++) {
+ // emitter.onNext(i);
+ // Thread.sleep((long) (Math.random() * 100));
+ // }
+ // emitter.onComplete();
+ // }).subscribeOn(Schedulers.io())
+ // .observeOn(Schedulers.newThread())
+ // .doOnError(e -> log.error("Error: ", e));
- integerObservable.subscribe(item -> System.out.println("println: Subscribe Next: " + item));
- integerObservable.subscribe(item -> log.info("[logInfo] Subscribe Next: {}", item));
- integerObservable.blockingSubscribe();
+ Observable intervalObservable = Observable.interval(20, TimeUnit.MILLISECONDS)
+ .take(50)
+ .subscribeOn(Schedulers.io())
+ .observeOn(Schedulers.newThread())
+ .doOnError(e -> log.error("Error: ", e));
+
+ Disposable delayDisposable = intervalObservable.subscribe(item -> {
+ try {
+ log.debug("println: Subscribe Next: {}", item);
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ log.error("执行中断 ", e);
+ }
+
+ }, e -> {
+ log.error("Error: ", e);
+ }, () -> log.info("Complete"));
+
+ intervalObservable.subscribe(item -> log.info("[logInfo] Subscribe Next: {}", item));
+ new Thread(()->{
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ignored) {
+
+ }
+ delayDisposable.dispose();
+ }).start();
+ intervalObservable.blockingSubscribe();
Observable take = Observable.fromArray(1, 2, 3, 4, 5)
.observeOn(Schedulers.computation()).skip(2).take(1);