diff --git a/kool/src/main/java/org/davidmoten/kool/Stream.java b/kool/src/main/java/org/davidmoten/kool/Stream.java index e352319..a05eae3 100644 --- a/kool/src/main/java/org/davidmoten/kool/Stream.java +++ b/kool/src/main/java/org/davidmoten/kool/Stream.java @@ -444,7 +444,7 @@ static Stream merge(Stream... streams) { static Stream interval(long duration, TimeUnit unit) { return range(1, Integer.MAX_VALUE).doOnNext(x -> unit.sleep(duration)).prepend(0); } - + static InputStream inputStream(Stream stream) { return StreamUtils.toInputStream(stream); } @@ -671,15 +671,23 @@ default Stream prepend(T[] values) { default Stream prepend(StreamIterable values) { return new Concat(values, this); } + + default Stream prepend(Iterable values) { + return prepend(Stream.from(values)); + } default Stream concatWith(StreamIterable values) { return new Concat(this, values); } + + default Stream concatWith(Iterable values) { + return concatWith(Stream.from(values)); + } default Stream flatMap(Function> function) { return new FlatMap(function, this); } - + default Stream flatMap(BiConsumer> generator, Consumer> onFinish) { return new FlatMapGenerator(generator, onFinish, this); @@ -752,6 +760,15 @@ default Stream doAfterDispose(Action action) { default Stream doOnEmpty(Action action) { return new DoOnEmpty(this, action); } + + default Stream doOnComplete(Consumer countAction) { + return defer(() -> { + long[] count = new long[1]; + return this // + .doOnNext(t -> count[0]++) // + .doOnComplete(() -> countAction.accept(count[0])); + }); + } default Maybe last() { return new Last(this); diff --git a/kool/src/test/java/org/davidmoten/kool/StreamTest.java b/kool/src/test/java/org/davidmoten/kool/StreamTest.java index ae12aa4..2417d5e 100644 --- a/kool/src/test/java/org/davidmoten/kool/StreamTest.java +++ b/kool/src/test/java/org/davidmoten/kool/StreamTest.java @@ -92,6 +92,11 @@ private static void checkTrue(Consumer consumer) { public void testPrepend() { Stream.of(1, 2, 3).prepend(0).test().assertValuesOnly(0, 1, 2, 3); } + + @Test + public void testPrependIterable() { + Stream.of(1, 2, 3).prepend(Arrays.asList(5, 6, 7)).test().assertValuesOnly(5, 6, 7, 1, 2, 3); + } @Test public void testPrependMany() { @@ -425,9 +430,14 @@ public Integer next() { } @Test - public void testConcat() { + public void testConcatWith() { Stream.of(1, 2).concatWith(Stream.of(3, 4)).test().assertValuesOnly(1, 2, 3, 4); } + + @Test + public void testConcatWithIterable() { + Stream.of(1, 2).concatWith(Arrays.asList(3, 4)).test().assertValuesOnly(1, 2, 3, 4); + } @Test public void testConcatEmpties() { @@ -448,6 +458,15 @@ public void testDoOnComplete() { .forEach(); assertEquals(Lists.newArrayList(1, 2, 3), list); } + + @Test + public void testDoOnCompleteCount() { + List list = new ArrayList<>(); + Stream.of(1, 1, 1) // + .doOnComplete(count -> list.add(count.intValue())) // + .forEach(); + assertEquals(Lists.newArrayList(3), list); + } @Test public void testZip() {