Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an event for headers complete #557

Merged
merged 2 commits into from
Dec 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class ResponseEventListener {
private Consumer<Throwable> contentErrorAction = cause -> { };
private Consumer<LiveHttpResponse> onCompletedAction = r -> { };
private Runnable cancelAction = () -> { };
private Runnable onHeaders = () -> { };
private Runnable whenFinishedAction = () -> { };

private volatile State state = INITIAL;
Expand Down Expand Up @@ -70,6 +71,11 @@ public ResponseEventListener whenCompleted(Consumer<LiveHttpResponse> completeAc
return this;
}

public ResponseEventListener whenHeadersComplete(Runnable action) {
this.onHeaders = requireNonNull(action);
return this;
}

/**
* Executes an action when the response terminates for any reason, normally,
* abnormally, or due to cancellation.
Expand All @@ -89,6 +95,7 @@ public Flux<LiveHttpResponse> apply() {
switch (state) {
case INITIAL:
if (event instanceof MessageHeaders) {
onHeaders.run();
state = STREAMING;
} else if (event instanceof MessageCancelled) {
cancelAction.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

public class ResponseEventListenerTest {
private AtomicBoolean cancelled;
private AtomicBoolean headers;
private AtomicReference<Throwable> responseError;
private AtomicReference<Throwable> contentError;
private AtomicReference<LiveHttpResponse> completed;
Expand All @@ -44,6 +45,7 @@ public class ResponseEventListenerTest {
@BeforeEach
public void setUp() {
cancelled = new AtomicBoolean();
headers = new AtomicBoolean();
responseError = new AtomicReference<>();
contentError = new AtomicReference<>();
completed = new AtomicReference<>();
Expand All @@ -57,6 +59,7 @@ public void doesntFireEventsThatNeverOccurred() {

Flux<LiveHttpResponse> listener = ResponseEventListener.from(publisher)
.whenCancelled(() -> cancelled.set(true))
.whenHeadersComplete(() -> headers.set(true))
.whenResponseError(cause -> responseError.set(cause))
.whenContentError(cause -> contentError.set(cause))
.whenCompleted(response -> completed.set(response))
Expand All @@ -67,6 +70,7 @@ public void doesntFireEventsThatNeverOccurred() {
.consumeNextWith(LiveHttpMessage::consume)
.then(() -> {
assertFalse(cancelled.get());
assertTrue(headers.get());
assertNull(responseError.get());
assertNull(contentError.get());
assertEquals(theResponse, completed.get());
Expand All @@ -81,6 +85,7 @@ public void firesWhenResponseIsCancelledBeforeHeaders() {

Flux<LiveHttpResponse> listener = ResponseEventListener.from(publisher)
.whenCancelled(() -> cancelled.set(true))
.whenHeadersComplete(() -> headers.set(true))
.whenFinished(() -> finished.set(true))
.apply();

Expand All @@ -89,6 +94,7 @@ public void firesWhenResponseIsCancelledBeforeHeaders() {
.thenCancel()
.verify();

assertFalse(headers.get());
assertTrue(cancelled.get());
assertTrue(finished.get());
}
Expand Down