Skip to content

AbstractListenerWriteFlushProcessor: Ensure the last flush will be performed #1526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,29 @@ protected void flushingFailed(Throwable t) {
*/
protected abstract void flush() throws IOException;

/**
* Whether writing is possible.
*/
protected abstract boolean isWritePossible();

/**
* Whether flushing is pending.
*/
protected abstract boolean isFlushPending();

/**
* Listeners can call this to notify when flushing is possible.
*/
protected final void onFlushPossible() {
this.state.get().onFlushPossible(this);
}

private void flushIfPossible() {
if (isWritePossible()) {
onFlushPossible();
}
}


private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
Expand Down Expand Up @@ -181,7 +204,12 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
return;
}
if (processor.subscriberCompleted) {
if (processor.changeState(this, COMPLETED)) {
if (processor.isFlushPending()) {
// Ensure the final flush
processor.changeState(this, FLUSHING);
processor.flushIfPossible();
}
else if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete();
}
}
Expand All @@ -198,6 +226,28 @@ public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
}
},

FLUSHING {
public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) {
try {
processor.flush();
}
catch (IOException ex) {
processor.flushingFailed(ex);
return;
}
if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete();
}
}
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
// ignore
}
@Override
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
// ignore
}
},

COMPLETED {
@Override
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
Expand Down Expand Up @@ -235,6 +285,10 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
// ignore
}

public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) {
// ignore
}


private static class WriteSubscriber implements Subscriber<Void> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -52,6 +51,8 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons

private final HttpServletResponse response;

private final ServletOutputStream outputStream;

private final int bufferSize;

@Nullable
Expand All @@ -73,6 +74,7 @@ public ServletServerHttpResponse(HttpServletResponse response, AsyncContext asyn
Assert.isTrue(bufferSize > 0, "Buffer size must be greater than 0");

this.response = response;
this.outputStream = response.getOutputStream();
this.bufferSize = bufferSize;

asyncContext.addListener(new ResponseAsyncListener());
Expand Down Expand Up @@ -147,7 +149,7 @@ protected Processor<? super Publisher<? extends DataBuffer>, Void> createBodyFlu
* @return the number of bytes written
*/
protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
ServletOutputStream outputStream = response.getOutputStream();
ServletOutputStream outputStream = this.outputStream;
InputStream input = dataBuffer.asInputStream();
int bytesWritten = 0;
byte[] buffer = new byte[this.bufferSize];
Expand All @@ -160,7 +162,7 @@ protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
}

private void flush() throws IOException {
ServletOutputStream outputStream = this.response.getOutputStream();
ServletOutputStream outputStream = this.outputStream;
if (outputStream.isReady()) {
try {
outputStream.flush();
Expand All @@ -176,6 +178,10 @@ private void flush() throws IOException {
}
}

private boolean isWritePossible() {
return this.outputStream.isReady();
}


private final class ResponseAsyncListener implements AsyncListener {

Expand Down Expand Up @@ -233,6 +239,12 @@ public void onWritePossible() throws IOException {
if (processor != null) {
processor.onWritePossible();
}
else {
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
if (flushProcessor != null) {
flushProcessor.onFlushPossible();
}
}
}

@Override
Expand All @@ -242,6 +254,13 @@ public void onError(Throwable ex) {
processor.cancel();
processor.onError(ex);
}
else {
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
if (flushProcessor != null) {
flushProcessor.cancel();
flushProcessor.onError(ex);
}
}
}
}

Expand All @@ -250,15 +269,9 @@ private class ResponseBodyFlushProcessor extends AbstractListenerWriteFlushProce

@Override
protected Processor<? super DataBuffer, Void> createWriteProcessor() {
try {
ServletOutputStream outputStream = response.getOutputStream();
ResponseBodyProcessor processor = new ResponseBodyProcessor(outputStream);
bodyProcessor = processor;
return processor;
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
ResponseBodyProcessor processor = new ResponseBodyProcessor();
bodyProcessor = processor;
return processor;
}

@Override
Expand All @@ -268,20 +281,24 @@ protected void flush() throws IOException {
}
ServletServerHttpResponse.this.flush();
}
}

@Override
protected boolean isWritePossible() {
return ServletServerHttpResponse.this.isWritePossible();
}

private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
@Override
protected boolean isFlushPending() {
return flushOnNext;
}
}

private final ServletOutputStream outputStream;

public ResponseBodyProcessor(ServletOutputStream outputStream) {
this.outputStream = outputStream;
}
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {

@Override
protected boolean isWritePossible() {
return this.outputStream.isReady();
return ServletServerHttpResponse.this.isWritePossible();
}

@Override
Expand All @@ -306,7 +323,7 @@ protected boolean write(DataBuffer dataBuffer) throws IOException {
}
flush();
}
boolean ready = this.outputStream.isReady();
boolean ready = ServletServerHttpResponse.this.isWritePossible();
if (this.logger.isTraceEnabled()) {
this.logger.trace("write: " + dataBuffer + " ready: " + ready);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,20 @@ private ResponseBodyProcessor createBodyProcessor() {
return new ResponseBodyProcessor(this.responseChannel);
}

private boolean isWritePossible() {
if (this.responseChannel == null) {
this.responseChannel = this.exchange.getResponseChannel();
}
if (this.responseChannel.isWriteResumed()) {
return true;
} else {
this.responseChannel.resumeWrites();
return false;
}
}


private static class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {

private final StreamSinkChannel channel;

Expand All @@ -164,12 +176,7 @@ public ResponseBodyProcessor(StreamSinkChannel channel) {

@Override
protected boolean isWritePossible() {
if (this.channel.isWriteResumed()) {
return true;
} else {
this.channel.resumeWrites();
return false;
}
return UndertowServerHttpResponse.this.isWritePossible();
}

@Override
Expand Down Expand Up @@ -264,6 +271,16 @@ protected void flushingFailed(Throwable t) {
cancel();
onError(t);
}

@Override
protected boolean isWritePossible() {
return UndertowServerHttpResponse.this.isWritePossible();
}

@Override
protected boolean isFlushPending() {
return false;
}
}

}