From bfe6e37eb24c7159b7362124f866a10e2cbb70fa Mon Sep 17 00:00:00 2001 From: Julien Viet Date: Tue, 21 May 2024 22:15:59 +0200 Subject: [PATCH] Add a general purpose collecting method to ReadStream to facilitate the reduction of streams. The collecting method is a default method. --- src/main/asciidoc/streams.adoc | 12 ++++ src/main/java/examples/StreamsExamples.java | 13 +++- .../io/vertx/core/streams/ReadStream.java | 25 ++++++++ .../core/streams/ReadStreamReduceTest.java | 62 +++++++++++++++++++ 4 files changed, 110 insertions(+), 2 deletions(-) create mode 100644 src/test/java/io/vertx/core/streams/ReadStreamReduceTest.java diff --git a/src/main/asciidoc/streams.adoc b/src/main/asciidoc/streams.adoc index 6df8fc8c379..9fa7aaeab82 100644 --- a/src/main/asciidoc/streams.adoc +++ b/src/main/asciidoc/streams.adoc @@ -170,3 +170,15 @@ returns `true` if the write queue is considered full. Will be called if an exception occurs on the `WriteStream`. - {@link io.vertx.core.streams.WriteStream#drainHandler}: The handler will be called if the `WriteStream` is considered no longer full. + +=== Reducing streams + +Java collectors can reduce a `ReadStream` to a result in a similar fashion `java.util.Stream` does, yet in an asynchronous +fashion. + +[source,$lang] +---- +{@link examples.StreamsExamples#reduce1} +---- + +Note that `collect` overrides any previously handler set on the stream. diff --git a/src/main/java/examples/StreamsExamples.java b/src/main/java/examples/StreamsExamples.java index 4911e08f565..d094f0cb2c2 100644 --- a/src/main/java/examples/StreamsExamples.java +++ b/src/main/java/examples/StreamsExamples.java @@ -11,7 +11,7 @@ package examples; -import io.vertx.core.Handler; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.file.AsyncFile; @@ -23,8 +23,10 @@ import io.vertx.core.net.NetServerOptions; import io.vertx.core.net.NetSocket; import io.vertx.core.streams.Pipe; -import io.vertx.core.streams.Pump; import io.vertx.core.streams.ReadStream; +import io.vertx.core.streams.Pump; + +import java.util.stream.Collectors; /** * @author Julien Viet @@ -169,4 +171,11 @@ public void pipe9(AsyncFile src, AsyncFile dst) { dst.end(Buffer.buffer("done")); }); } + + public void reduce1(ReadStream stream) { + // Count the number of elements + Future result = stream.collect(Collectors.counting()); + + result.onSuccess(count -> System.out.println("Stream emitted " + count + " elements")); + } } diff --git a/src/main/java/io/vertx/core/streams/ReadStream.java b/src/main/java/io/vertx/core/streams/ReadStream.java index 6ea543d1329..616e57c7274 100644 --- a/src/main/java/io/vertx/core/streams/ReadStream.java +++ b/src/main/java/io/vertx/core/streams/ReadStream.java @@ -12,14 +12,18 @@ package io.vertx.core.streams; import io.vertx.codegen.annotations.Fluent; +import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.Nullable; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.Promise; +import io.vertx.core.impl.future.PromiseInternal; import io.vertx.core.streams.impl.PipeImpl; +import java.util.function.BiConsumer; + /** * Represents a stream of items that can be read from. *

@@ -133,4 +137,25 @@ default Future pipeTo(WriteStream dst) { default void pipeTo(WriteStream dst, Handler> handler) { new PipeImpl<>(this).to(dst, handler); } + + /** + * Apply a {@code collector} to this stream, the obtained result is returned as a future. + *

+ * Handlers of this stream are affected by this operation. + * + * @return a future notified with result produced by the {@code collector} applied to this stream + */ + @GenIgnore(GenIgnore.PERMITTED_TYPE) + default Future collect(java.util.stream.Collector collector) { + PromiseInternal promise = (PromiseInternal) Promise.promise(); + A cumulation = collector.supplier().get(); + BiConsumer accumulator = collector.accumulator(); + handler(elt -> accumulator.accept(cumulation, elt)); + endHandler(v -> { + R result = collector.finisher().apply(cumulation); + promise.tryComplete(result); + }); + exceptionHandler(promise::tryFail); + return promise.future(); + } } diff --git a/src/test/java/io/vertx/core/streams/ReadStreamReduceTest.java b/src/test/java/io/vertx/core/streams/ReadStreamReduceTest.java new file mode 100644 index 00000000000..487d4e4bb0e --- /dev/null +++ b/src/test/java/io/vertx/core/streams/ReadStreamReduceTest.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.streams; + +import io.vertx.core.Future; +import io.vertx.test.core.AsyncTestBase; +import io.vertx.test.fakestream.FakeStream; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +public class ReadStreamReduceTest extends AsyncTestBase { + + private FakeStream dst; + private Object o1 = new Object(); + private Object o2 = new Object(); + private Object o3 = new Object(); + + @Override + protected void setUp() throws Exception { + super.setUp(); + dst = new FakeStream<>(); + } + + @Test + public void testCollect() { + Future> list = dst.collect(Collectors.toList()); + assertFalse(list.isComplete()); + dst.write(o1); + assertFalse(list.isComplete()); + dst.write(o2); + assertFalse(list.isComplete()); + dst.write(o3); + dst.end(); + assertTrue(list.succeeded()); + assertEquals(Arrays.asList(o1, o2, o3), list.result()); + } + + @Test + public void testFailure() { + Future> list = dst.collect(Collectors.toList()); + assertFalse(list.isComplete()); + dst.write(o1); + assertFalse(list.isComplete()); + dst.write(o2); + assertFalse(list.isComplete()); + Throwable err = new Throwable(); + dst.fail(err); + assertTrue(list.failed()); + assertSame(err, list.cause()); + } +}