Skip to content

Commit

Permalink
CompositeFuture now delegates handler management to a Promise<Composi…
Browse files Browse the repository at this point in the history
…teFuture> in order to benefit from multiple handler support built-in Promise. - fixes #3242
  • Loading branch information
vietj committed Jan 9, 2020
1 parent df5d135 commit 8b395ad
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 78 deletions.
127 changes: 49 additions & 78 deletions src/main/java/io/vertx/core/impl/CompositeFutureImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.vertx.core.Future;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Handler;
import io.vertx.core.Promise;

import java.util.function.Function;

Expand All @@ -23,129 +24,111 @@
*/
public class CompositeFutureImpl implements CompositeFuture, Handler<AsyncResult<CompositeFuture>> {

private static final Handler<AsyncResult<CompositeFuture>> NO_HANDLER = c -> {};

public static CompositeFuture all(Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
int len = results.length;
for (int i = 0; i < len; i++) {
results[i].setHandler(ar -> {
Handler<AsyncResult<CompositeFuture>> handler = null;
for (Future<?> result : results) {
result.setHandler(ar -> {
if (ar.succeeded()) {
synchronized (composite) {
composite.count++;
if (!composite.isComplete() && composite.count == len) {
handler = composite.setCompleted(null);
if (composite.isComplete() || composite.count < len) {
return;
}
}
composite.complete();
} else {
synchronized (composite) {
if (!composite.isComplete()) {
handler = composite.setCompleted(ar.cause());
if (composite.isComplete()) {
return;
}
}
}
if (handler != null) {
handler.handle(composite);
composite.fail(ar.cause());
}
});
}
if (len == 0) {
composite.setCompleted(null);
composite.complete();
}
return composite;
}

public static CompositeFuture any(Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
int len = results.length;
for (int i = 0;i < len;i++) {
results[i].setHandler(ar -> {
Handler<AsyncResult<CompositeFuture>> handler = null;
for (Future<?> result : results) {
result.setHandler(ar -> {
if (ar.succeeded()) {
synchronized (composite) {
if (!composite.isComplete()) {
handler = composite.setCompleted(null);
if (composite.isComplete()) {
return;
}
}
composite.complete();
} else {
synchronized (composite) {
composite.count++;
if (!composite.isComplete() && composite.count == len) {
handler = composite.setCompleted(ar.cause());
if (composite.isComplete() || composite.count < len) {
return;
}
}
}
if (handler != null) {
handler.handle(composite);
composite.fail(ar.cause());
}
});
}
if (results.length == 0) {
composite.setCompleted(null);
composite.complete();
}
return composite;
}

private static final Function<CompositeFuture, Throwable> ALL = cf -> {
private static final Function<CompositeFuture, Object> ALL = cf -> {
int size = cf.size();
for (int i = 0;i < size;i++) {
if (!cf.succeeded(i)) {
return cf.cause(i);
}
}
return null;
return cf;
};

public static CompositeFuture join(Future<?>... results) {
return join(ALL, results);
}

private static CompositeFuture join(Function<CompositeFuture, Throwable> pred, Future<?>... results) {
private static CompositeFuture join(Function<CompositeFuture, Object> pred, Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
int len = results.length;
for (int i = 0; i < len; i++) {
results[i].setHandler(ar -> {
Handler<AsyncResult<CompositeFuture>> handler = null;
for (Future<?> result : results) {
result.setHandler(ar -> {
synchronized (composite) {
composite.count++;
if (!composite.isComplete() && composite.count == len) {
// Take decision here
Throwable failure = pred.apply(composite);
handler = composite.setCompleted(failure);
if (composite.isComplete() || composite.count < len) {
return;
}
}
if (handler != null) {
handler.handle(composite);
}
composite.doComplete(pred.apply(composite));
});
}
if (len == 0) {
composite.setCompleted(null);
composite.doComplete(composite);
}
return composite;
}

private final Future[] results;
private int count;
private boolean completed;
private Throwable cause;
private Handler<AsyncResult<CompositeFuture>> handler;
private Object result;
private Promise<CompositeFuture> promise;

private CompositeFutureImpl(Future<?>... results) {
this.results = results;
this.promise = Promise.promise();
}

@Override
public CompositeFuture setHandler(Handler<AsyncResult<CompositeFuture>> handler) {
boolean call;
synchronized (this) {
this.handler = handler;
call = completed;
}
if (call) {
handler.handle(this);
}
promise.future().onComplete(handler);
return this;
}

Expand Down Expand Up @@ -188,66 +171,60 @@ public int size() {

@Override
public synchronized boolean isComplete() {
return completed;
return result != null;
}

@Override
public synchronized boolean succeeded() {
return completed && cause == null;
return result == this;
}

@Override
public synchronized boolean failed() {
return completed && cause != null;
return result instanceof Throwable;
}

@Override
public synchronized Throwable cause() {
return completed && cause != null ? cause : null;
return result instanceof Throwable ? (Throwable) result : null;
}

@Override
public synchronized CompositeFuture result() {
return completed && cause == null ? this : null;
return result == this ? this : null;
}

@Override
public void complete() {
if (!tryComplete()) {
throw new IllegalStateException("Result is already complete: " + (this.cause == null ? "succeeded" : "failed"));
throw new IllegalStateException("Result is already complete: " + (result == this ? "succeeded" : "failed"));
}
}

@Override
public void complete(CompositeFuture result) {
if (!tryComplete(result)) {
throw new IllegalStateException("Result is already complete: " + (this.cause == null ? "succeeded" : "failed"));
throw new IllegalStateException("Result is already complete: " + (result == this ? "succeeded" : "failed"));
}
}

@Override
public void fail(Throwable cause) {
if (!tryFail(cause)) {
throw new IllegalStateException("Result is already complete: " + (this.cause == null ? "succeeded" : "failed"));
throw new IllegalStateException("Result is already complete: " + (result == this ? "succeeded" : "failed"));
}
}

@Override
public void fail(String failureMessage) {
if (!tryFail(failureMessage)) {
throw new IllegalStateException("Result is already complete: " + (this.cause == null ? "succeeded" : "failed"));
throw new IllegalStateException("Result is already complete: " + (result == this ? "succeeded" : "failed"));
}
}

@Override
public boolean tryComplete(CompositeFuture result) {
Handler<AsyncResult<CompositeFuture>> handler = setCompleted(null);
if (handler != null) {
handler.handle(this);
return true;
} else {
return false;
}
return doComplete(result);
}

@Override
Expand All @@ -257,29 +234,23 @@ public boolean tryComplete() {

@Override
public boolean tryFail(Throwable cause) {
Handler<AsyncResult<CompositeFuture>> handler = setCompleted(cause);
if (handler != null) {
handler.handle(this);
return true;
} else {
return false;
}
return doComplete(cause);
}

@Override
public boolean tryFail(String failureMessage) {
return tryFail(new NoStackTraceThrowable(failureMessage));
}

private Handler<AsyncResult<CompositeFuture>> setCompleted(Throwable cause) {
private boolean doComplete(Object result) {
synchronized (this) {
if (completed) {
return null;
if (this.result != null) {
return false;
}
this.completed = true;
this.cause = cause;
return handler != null ? handler : NO_HANDLER;
this.result = result;
}
promise.handle(this);
return true;
}

@Override
Expand Down
19 changes: 19 additions & 0 deletions src/test/java/io/vertx/core/FutureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,25 @@ public void testCompositeFutureToList() {
assertEquals(Arrays.asList("foo", 4), composite.list());
}

@Test
public void testCompositeFutureMulti() {
Promise<String> p1 = Promise.promise();
Future<String> f1 = p1.future();
Promise<Integer> p2 = Promise.promise();
Future<Integer> f2 = p2.future();
CompositeFuture composite = CompositeFuture.all(f1, f2);
AtomicInteger count = new AtomicInteger();
composite.onComplete(ar -> {
count.compareAndSet(0, 1);
});
composite.onComplete(ar -> {
count.compareAndSet(1, 2);
});
p1.complete("foo");
p2.complete(4);
assertEquals(2, count.get());
}

@Test
public void testComposeSuccessToSuccess() {
Promise<String> p1 = Promise.promise();
Expand Down

0 comments on commit 8b395ad

Please sign in to comment.