diff --git a/src/test/java/com/oreilly/rxjava/ch1/Chapter1.java b/src/test/java/com/oreilly/rxjava/ch1/Chapter1.java index ff7c09d..f8a109a 100644 --- a/src/test/java/com/oreilly/rxjava/ch1/Chapter1.java +++ b/src/test/java/com/oreilly/rxjava/ch1/Chapter1.java @@ -1,12 +1,13 @@ package com.oreilly.rxjava.ch1; import com.oreilly.rxjava.util.Sleeper; +import io.reactivex.Completable; +import io.reactivex.Observable; +import io.reactivex.Single; +import io.reactivex.schedulers.Schedulers; import org.junit.Ignore; import org.junit.Test; -import rx.Completable; -import rx.Observable; -import rx.Single; -import rx.schedulers.Schedulers; + import java.time.Duration; import java.util.Map; @@ -25,7 +26,7 @@ public class Chapter1 { public void sample_6() throws Exception { Observable.create(s -> { s.onNext("Hello World!"); - s.onCompleted(); + s.onComplete(); }).subscribe(hello -> System.out.println(hello)); } @@ -36,7 +37,7 @@ public void sample_17() throws Exception { Observable.create(s -> { s.onNext(cache.get(SOME_KEY)); - s.onCompleted(); + s.onComplete(); }).subscribe(value -> System.out.println(value)); } @@ -48,14 +49,14 @@ public void sample_35() throws Exception { if (fromCache != null) { // emit synchronously s.onNext(fromCache); - s.onCompleted(); + s.onComplete(); } else { // fetch asynchronously getDataAsynchronously(SOME_KEY) .onResponse(v -> { putInCache(SOME_KEY, v); s.onNext(v); - s.onCompleted(); + s.onComplete(); }) .onFailure(exception -> { s.onError(exception); @@ -90,7 +91,7 @@ public void sample_81() throws Exception { s.onNext(1); s.onNext(2); s.onNext(3); - s.onCompleted(); + s.onComplete(); }); o.map(i -> "Number " + i) @@ -119,7 +120,7 @@ public void sample_108() throws Exception { s.onNext("two"); s.onNext("three"); s.onNext("four"); - s.onCompleted(); + s.onComplete(); }).start(); }); } @@ -151,7 +152,7 @@ public void sample_142() throws Exception { new Thread(() -> { s.onNext("one"); s.onNext("two"); - s.onCompleted(); + s.onComplete(); }).start(); }); @@ -159,7 +160,7 @@ public void sample_142() throws Exception { new Thread(() -> { s.onNext("three"); s.onNext("four"); - s.onCompleted(); + s.onComplete(); }).start(); }); @@ -173,7 +174,7 @@ public void sample_164() throws Exception { Observable someData = Observable.create(s -> { getDataFromServerWithCallback(args, data -> { s.onNext(data); - s.onCompleted(); + s.onComplete(); }); }); @@ -265,7 +266,8 @@ public void sample_254() throws Exception { @Test public void sample_265() throws Exception { // merge a & b into an Observable stream of 2 values - Observable a_merge_b = getDataA().mergeWith(getDataB()); + Observable a_merge_b = getDataA().mergeWith(getDataB()) + .toObservable(); } public static Single getDataA() { @@ -283,7 +285,7 @@ public void sample_277() throws Exception { Single s2 = getDataAsSingle(2); // o3 is now a stream of s1 and s2 that emits each item without waiting - Observable o3 = Single.merge(s1, s2); + Observable o3 = Single.merge(s1, s2).toObservable(); } private Single getDataAsSingle(int i) { @@ -299,7 +301,7 @@ static Completable writeToDatabase(Object data) { return Completable.create(s -> { doAsyncWrite(data, // callback for successful completion - () -> s.onCompleted(), + () -> s.onComplete(), // callback for failure with Throwable error -> s.onError(error)); });