diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..f99768d 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -16,6 +16,7 @@ package cn.nextop.rxjava.share.practices; +import cn.nextop.rxjava.share.util.Tuples; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; @@ -24,12 +25,14 @@ */ public class Practice1 { + // + /* * 举例如下: * 参数 Observable["a","b","c"] * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.zipWith(Observable.range(1, Integer.MAX_VALUE), (i, s) -> Tuples.of(s, i)); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..550e058 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -17,11 +17,14 @@ package cn.nextop.rxjava.share.practices; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; + import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; import io.reactivex.Single; - -import java.util.Map; +import io.reactivex.functions.BiFunction; /** * @author Baoyi Chen @@ -34,7 +37,9 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.groupBy(key -> key).map(group -> { + return group.count().map(c -> new Tuple2(group.getKey(), c.intValue())).toObservable(); + }).flatMap(f -> f); } /* @@ -43,7 +48,16 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.reduceWith(new Callable>() { + @Override + public Map call() throws Exception { + return new HashMap(); + } + }, new BiFunction, String, Map>() { + @Override + public Map apply(Map t1, String t2) throws Exception { + Integer c = t1.get(t2); if(c == null) c = 0; t1.put(t2, c++); return t1; + } + }); } - } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..8b261cf 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -16,6 +16,9 @@ package cn.nextop.rxjava.share.practices; +import java.util.ArrayList; +import java.util.List; + import io.reactivex.Maybe; import io.reactivex.Observable; @@ -28,7 +31,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return this.iterate(observable).reduce((a,b) -> a + b); } /* @@ -41,10 +44,16 @@ public Maybe sum(Observable observable) { * * return Observable[4, 3, 6, 7, 5] 顺序无关 */ - public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } + public Observable iterate(Observable observable) { + return observable.flatMapIterable(n -> { + List l = new ArrayList<>(); + fill(l, n); return l; }).map(n -> n.value); + } + private void fill(List r, Node n) { + if (n == null) return; r.add(n); fill(r, n.left); fill(r, n.right); + } + public static class Node { public Node left; public Node right; diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..c5df3d2 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,6 +18,7 @@ import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; /** @@ -44,7 +45,7 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.concatMap(s -> Observable.just(s).observeOn(Schedulers.newThread())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..a786458 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,14 +16,14 @@ package cn.nextop.rxjava.share.practices; -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; + /** * @author Baoyi Chen */ @@ -35,7 +35,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.reduce(Long.valueOf("0"), (a, b) -> a + 1); } /* @@ -44,7 +44,7 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + return source.flatMapIterable(s -> s); } /* @@ -53,7 +53,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.groupBy(s -> s).map(e -> e.getKey()); } /* @@ -62,7 +62,7 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + return source.filter(conditon::test); } /* @@ -71,7 +71,7 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); + return source.skip(index).take(1).firstElement(); } /* @@ -80,7 +80,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + return Observable.range(0, count).concatMap(e -> source); } /* @@ -89,7 +89,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).concatMap(s -> s); } /* @@ -98,7 +98,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).flatMap(s -> s); } /* @@ -107,7 +107,7 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return source.concatMap(s -> Observable.just(s).delay(delay, unit)); } }