Skip to content

Commit 4d058ce

Browse files
violetaggpoutsma
authored andcommitted
Resume Undertow writes only when data is available
1 parent 55833dc commit 4d058ce

File tree

3 files changed

+40
-8
lines changed

3 files changed

+40
-8
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,18 @@ protected void receiveData(T data) {
150150
*/
151151
protected abstract boolean write(T data) throws IOException;
152152

153+
/**
154+
* Suspend writing. Defaults to no-op.
155+
*/
156+
protected void suspendWriting() {
157+
}
158+
159+
/**
160+
* Invoked when writing is complete. Defaults to no-op.
161+
*/
162+
protected void writingComplete() {
163+
}
164+
153165

154166
private boolean changeState(State oldState, State newState) {
155167
return this.state.compareAndSet(oldState, newState);
@@ -223,6 +235,7 @@ public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
223235
@Override
224236
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
225237
if (processor.changeState(this, COMPLETED)) {
238+
processor.writingComplete();
226239
processor.resultPublisher.publishComplete();
227240
}
228241
}
@@ -248,10 +261,12 @@ public <T> void onWritePossible(AbstractListenerWriteProcessor<T> processor) {
248261
processor.releaseData();
249262
if (!processor.subscriberCompleted) {
250263
processor.changeState(WRITING, REQUESTED);
264+
processor.suspendWriting();
251265
processor.subscription.request(1);
252266
}
253267
else {
254268
processor.changeState(WRITING, COMPLETED);
269+
processor.writingComplete();
255270
processor.resultPublisher.publishComplete();
256271
}
257272
}
@@ -311,6 +326,7 @@ public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
311326

312327
public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) {
313328
if (processor.changeState(this, COMPLETED)) {
329+
processor.writingComplete();
314330
processor.resultPublisher.publishError(ex);
315331
}
316332
}

spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,11 @@ protected boolean write(DataBuffer dataBuffer) throws IOException {
306306
return false;
307307
}
308308
}
309+
310+
@Override
311+
protected void writingComplete() {
312+
bodyProcessor = null;
313+
}
309314
}
310315

311316
}

spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,7 @@ private ResponseBodyProcessor createBodyProcessor() {
138138
if (this.responseChannel == null) {
139139
this.responseChannel = this.exchange.getResponseChannel();
140140
}
141-
ResponseBodyProcessor bodyProcessor = new ResponseBodyProcessor( this.responseChannel);
142-
bodyProcessor.registerListener();
143-
return bodyProcessor;
141+
return new ResponseBodyProcessor(this.responseChannel);
144142
}
145143

146144

@@ -153,16 +151,18 @@ private static class ResponseBodyProcessor extends AbstractListenerWriteProcesso
153151
public ResponseBodyProcessor(StreamSinkChannel channel) {
154152
Assert.notNull(channel, "StreamSinkChannel must not be null");
155153
this.channel = channel;
156-
}
157-
158-
public void registerListener() {
159154
this.channel.getWriteSetter().set(c -> onWritePossible());
160-
this.channel.resumeWrites();
155+
this.channel.suspendWrites();
161156
}
162157

163158
@Override
164159
protected boolean isWritePossible() {
165-
return false;
160+
if (this.channel.isWriteResumed()) {
161+
return true;
162+
} else {
163+
this.channel.resumeWrites();
164+
return false;
165+
}
166166
}
167167

168168
@Override
@@ -214,6 +214,17 @@ protected void releaseData() {
214214
protected boolean isDataEmpty(DataBuffer dataBuffer) {
215215
return (dataBuffer.readableByteCount() == 0);
216216
}
217+
218+
@Override
219+
protected void suspendWriting() {
220+
this.channel.suspendWrites();
221+
}
222+
223+
@Override
224+
protected void writingComplete() {
225+
this.channel.getWriteSetter().set(null);
226+
this.channel.resumeWrites();
227+
}
217228
}
218229

219230

0 commit comments

Comments
 (0)