Skip to content

Commit

Permalink
Replace Writer.awaitDemandAsync() with Writer.onDemand()
Browse files Browse the repository at this point in the history
.. which is less confusing
  • Loading branch information
trustin committed Jul 19, 2016
1 parent a7d214f commit 8fd6ec1
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public boolean write(Supplier<? extends HttpObject> o) {
}

@Override
public CompletableFuture<Void> awaitDemandAsync() {
return delegate.awaitDemandAsync();
public CompletableFuture<Void> onDemand(Runnable task) {
return delegate.onDemand(task);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,17 @@ public boolean write(Supplier<? extends T> supplier) {
}

@Override
public CompletableFuture<Void> awaitDemandAsync() {
public CompletableFuture<Void> onDemand(Runnable task) {
requireNonNull(task, "task");

final AwaitDemandFuture f = new AwaitDemandFuture();
if (!isOpen()) {
f.completeExceptionally(ClosedPublisherException.get());
return f;
} else {
pushObject(f);
}

pushObject(f);
return f;
return f.thenRun(task);
}

private void pushObject(Object obj) {
Expand Down Expand Up @@ -285,6 +287,7 @@ private boolean notifySubscriber(Subscriber<Object> subscriber, Queue<Object> qu

protected void onRemoval(T obj) {}

@Override
public CompletableFuture<Void> closeFuture() {
return closeFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.linecorp.armeria.common.reactivestreams;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Supplier;

public interface Writer<T> {
Expand All @@ -26,11 +27,7 @@ public interface Writer<T> {
boolean write(T o);
boolean write(Supplier<? extends T> o);

CompletableFuture<Void> awaitDemandAsync();

default void awaitDemand() {
awaitDemandAsync().join();
}
CompletableFuture<Void> onDemand(Runnable task);

void close();
void close(Throwable cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,7 @@ private static void stream(Writer<HttpObject> writer, long size, int chunkSize)
return;
}

writer.awaitDemandAsync()
.thenRun(() -> stream(writer, remaining, (int) Math.min(remaining, chunkSize)))
writer.onDemand(() -> stream(writer, remaining, (int) Math.min(remaining, chunkSize)))
.exceptionally(cause -> {
logger.warn("Unexpected exception:", cause);
writer.close(cause);
Expand Down

0 comments on commit 8fd6ec1

Please sign in to comment.