Skip to content

Commit

Permalink
AbstractListenerWriteFlushProcessor: Ensure the last flush will be pe…
Browse files Browse the repository at this point in the history
…rformed

When writing Publisher<Publisher<T>>, a flush operation is performed onComplete
for every Publisher. If the flush operation is not able to be performed immediately
it will be retried before starting to process data provided by the next Publisher.
For the last Publisher the implementation needs to ensure that the flush will
be performed only then whole operation will complete.

Issue: SPR-15949
  • Loading branch information
violetagg authored and bclozel committed Sep 15, 2017
1 parent 2510db0 commit ec2218c
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,29 @@ protected void flushingFailed(Throwable t) {
*/
protected abstract void flush() throws IOException;

/**
* Whether writing is possible.
*/
protected abstract boolean isWritePossible();

/**
* Whether flushing is pending.
*/
protected abstract boolean isFlushPending();

/**
* Listeners can call this to notify when flushing is possible.
*/
protected final void onFlushPossible() {
this.state.get().onFlushPossible(this);
}

private void flushIfPossible() {
if (isWritePossible()) {
onFlushPossible();
}
}


private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
Expand Down Expand Up @@ -181,7 +204,12 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
return;
}
if (processor.subscriberCompleted) {
if (processor.changeState(this, COMPLETED)) {
if (processor.isFlushPending()) {
// Ensure the final flush
processor.changeState(this, FLUSHING);
processor.flushIfPossible();
}
else if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete();
}
}
Expand All @@ -198,6 +226,28 @@ public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
}
},

FLUSHING {
public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) {
try {
processor.flush();
}
catch (IOException ex) {
processor.flushingFailed(ex);
return;
}
if (processor.changeState(this, COMPLETED)) {
processor.resultPublisher.publishComplete();
}
}
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
// ignore
}
@Override
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
// ignore
}
},

COMPLETED {
@Override
public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor, Publisher<? extends T> publisher) {
Expand Down Expand Up @@ -235,6 +285,10 @@ public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor)
// ignore
}

public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) {
// ignore
}


private static class WriteSubscriber implements Subscriber<Void> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -52,6 +51,8 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons

private final HttpServletResponse response;

private final ServletOutputStream outputStream;

private final int bufferSize;

@Nullable
Expand All @@ -73,6 +74,7 @@ public ServletServerHttpResponse(HttpServletResponse response, AsyncContext asyn
Assert.isTrue(bufferSize > 0, "Buffer size must be greater than 0");

this.response = response;
this.outputStream = response.getOutputStream();
this.bufferSize = bufferSize;

asyncContext.addListener(new ResponseAsyncListener());
Expand Down Expand Up @@ -147,7 +149,7 @@ protected Processor<? super Publisher<? extends DataBuffer>, Void> createBodyFlu
* @return the number of bytes written
*/
protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
ServletOutputStream outputStream = response.getOutputStream();
ServletOutputStream outputStream = this.outputStream;
InputStream input = dataBuffer.asInputStream();
int bytesWritten = 0;
byte[] buffer = new byte[this.bufferSize];
Expand All @@ -160,7 +162,7 @@ protected int writeToOutputStream(DataBuffer dataBuffer) throws IOException {
}

private void flush() throws IOException {
ServletOutputStream outputStream = this.response.getOutputStream();
ServletOutputStream outputStream = this.outputStream;
if (outputStream.isReady()) {
try {
outputStream.flush();
Expand All @@ -176,6 +178,10 @@ private void flush() throws IOException {
}
}

private boolean isWritePossible() {
return this.outputStream.isReady();
}


private final class ResponseAsyncListener implements AsyncListener {

Expand Down Expand Up @@ -233,6 +239,12 @@ public void onWritePossible() throws IOException {
if (processor != null) {
processor.onWritePossible();
}
else {
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
if (flushProcessor != null) {
flushProcessor.onFlushPossible();
}
}
}

@Override
Expand All @@ -242,6 +254,13 @@ public void onError(Throwable ex) {
processor.cancel();
processor.onError(ex);
}
else {
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
if (flushProcessor != null) {
flushProcessor.cancel();
flushProcessor.onError(ex);
}
}
}
}

Expand All @@ -250,15 +269,9 @@ private class ResponseBodyFlushProcessor extends AbstractListenerWriteFlushProce

@Override
protected Processor<? super DataBuffer, Void> createWriteProcessor() {
try {
ServletOutputStream outputStream = response.getOutputStream();
ResponseBodyProcessor processor = new ResponseBodyProcessor(outputStream);
bodyProcessor = processor;
return processor;
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
ResponseBodyProcessor processor = new ResponseBodyProcessor();
bodyProcessor = processor;
return processor;
}

@Override
Expand All @@ -268,20 +281,24 @@ protected void flush() throws IOException {
}
ServletServerHttpResponse.this.flush();
}
}

@Override
protected boolean isWritePossible() {
return ServletServerHttpResponse.this.isWritePossible();
}

private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
@Override
protected boolean isFlushPending() {
return flushOnNext;
}
}

private final ServletOutputStream outputStream;

public ResponseBodyProcessor(ServletOutputStream outputStream) {
this.outputStream = outputStream;
}
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {

@Override
protected boolean isWritePossible() {
return this.outputStream.isReady();
return ServletServerHttpResponse.this.isWritePossible();
}

@Override
Expand All @@ -306,7 +323,7 @@ protected boolean write(DataBuffer dataBuffer) throws IOException {
}
flush();
}
boolean ready = this.outputStream.isReady();
boolean ready = ServletServerHttpResponse.this.isWritePossible();
if (this.logger.isTraceEnabled()) {
this.logger.trace("write: " + dataBuffer + " ready: " + ready);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,20 @@ private ResponseBodyProcessor createBodyProcessor() {
return new ResponseBodyProcessor(this.responseChannel);
}

private boolean isWritePossible() {
if (this.responseChannel == null) {
this.responseChannel = this.exchange.getResponseChannel();
}
if (this.responseChannel.isWriteResumed()) {
return true;
} else {
this.responseChannel.resumeWrites();
return false;
}
}


private static class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {

private final StreamSinkChannel channel;

Expand All @@ -164,12 +176,7 @@ public ResponseBodyProcessor(StreamSinkChannel channel) {

@Override
protected boolean isWritePossible() {
if (this.channel.isWriteResumed()) {
return true;
} else {
this.channel.resumeWrites();
return false;
}
return UndertowServerHttpResponse.this.isWritePossible();
}

@Override
Expand Down Expand Up @@ -264,6 +271,16 @@ protected void flushingFailed(Throwable t) {
cancel();
onError(t);
}

@Override
protected boolean isWritePossible() {
return UndertowServerHttpResponse.this.isWritePossible();
}

@Override
protected boolean isFlushPending() {
return false;
}
}

}

0 comments on commit ec2218c

Please sign in to comment.