Skip to content

Commit 1194ef0

Browse files
committed
Eliminate unnecessary flushes
Signed-off-by: jansupol <jan.supol@oracle.com>
1 parent 29bb990 commit 1194ef0

File tree

6 files changed

+181
-12
lines changed

6 files changed

+181
-12
lines changed

connectors/apache5-connector/src/test/java/org/glassfish/jersey/apache5/connector/StreamingTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2022 Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2022, 2025 Oracle and/or its affiliates. All rights reserved.
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -129,6 +129,8 @@ private void clientCloseTest(int readTimeout) throws IOException {
129129
inputStream.close();
130130
// trigger sending another 'A' to the stream; it should fail
131131
// (indicating that the streaming has been terminated on the server)
132+
// But only the second flush causes the Exception
133+
assertEquals("OK", sendTarget.request().get().readEntity(String.class));
132134
assertEquals("NOK", sendTarget.request().get().readEntity(String.class));
133135
assertEquals(0, counter.get());
134136
}

core-common/src/main/java/org/glassfish/jersey/io/spi/FlushedCloseable.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2024 Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -17,6 +17,7 @@
1717
package org.glassfish.jersey.io.spi;
1818

1919
import java.io.Closeable;
20+
import java.io.FilterOutputStream;
2021
import java.io.Flushable;
2122
import java.io.IOException;
2223
import java.io.OutputStream;
@@ -27,10 +28,9 @@
2728
* That way, {@link #flush()} method is not called twice.
2829
*
2930
* <p>
30-
* Usable by {@link javax.ws.rs.client.ClientRequestContext#setEntityStream(OutputStream)}.
31-
* Usable by {@link javax.ws.rs.container.ContainerResponseContext#setEntityStream(OutputStream)}.
31+
* Usable by {@link jakarta.ws.rs.client.ClientRequestContext#setEntityStream(OutputStream)}.
32+
* Usable by {@link jakarta.ws.rs.container.ContainerResponseContext#setEntityStream(OutputStream)}.
3233
* </p>
33-
*
3434
* <p>
3535
* This marker interface can be useful for the customer OutputStream to know the {@code flush} did not come from
3636
* Jersey before close. By default, when the entity stream is to be closed by Jersey, {@code flush} is called first.
@@ -52,4 +52,14 @@ public interface FlushedCloseable extends Flushable, Closeable {
5252
* @throws IOException if an I/O error occurs
5353
*/
5454
public void close() throws IOException;
55+
56+
57+
/**
58+
* Determine if the stream {@link OutputStream#flush() flushes} on {@link OutputStream#close()}.
59+
* @param stream the provided {@link OutputStream}
60+
* @return {@code true} if the stream ensures to call {@link OutputStream#flush()} on {@link OutputStream#close()}.
61+
*/
62+
public static boolean flushOnClose(OutputStream stream) {
63+
return FilterOutputStream.class.isInstance(stream) || FlushedCloseable.class.isInstance(stream);
64+
}
5565
}

core-common/src/main/java/org/glassfish/jersey/message/internal/CommittingOutputStream.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2010, 2024 Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2010, 2025 Oracle and/or its affiliates. All rights reserved.
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -26,6 +26,7 @@
2626
import org.glassfish.jersey.innate.VirtualThreadSupport;
2727
import org.glassfish.jersey.internal.LocalizationMessages;
2828
import org.glassfish.jersey.internal.guava.Preconditions;
29+
import org.glassfish.jersey.io.spi.FlushedCloseable;
2930

3031
/**
3132
* A committing output stream with optional serialized entity buffering functionality
@@ -155,6 +156,12 @@ void enableBuffering() {
155156
enableBuffering(DEFAULT_BUFFER_SIZE);
156157
}
157158

159+
/* package */ void flushOnClose() throws IOException {
160+
if (!FlushedCloseable.flushOnClose(adaptedOutput)) {
161+
flush();
162+
}
163+
}
164+
158165
/**
159166
* Determine whether the stream was already committed or not.
160167
*

core-common/src/main/java/org/glassfish/jersey/message/internal/OutboundMessageContext.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2012, 2024 Oracle and/or its affiliates. All rights reserved.
2+
* Copyright (c) 2012, 2025 Oracle and/or its affiliates. All rights reserved.
33
*
44
* This program and the accompanying materials are made available under the
55
* terms of the Eclipse Public License v. 2.0, which is available at
@@ -562,7 +562,9 @@ public void close() {
562562
if (hasEntity()) {
563563
try {
564564
final OutputStream es = getEntityStream();
565-
if (!FlushedCloseable.class.isInstance(es)) {
565+
if (CommittingOutputStream.class.isInstance(es)) {
566+
((CommittingOutputStream) es).flushOnClose();
567+
} else if (!FlushedCloseable.flushOnClose(es)) {
566568
es.flush();
567569
}
568570
es.close();

core-common/src/main/java/org/glassfish/jersey/message/internal/ReaderWriter.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public final class ReaderWriter {
5959
public static final int BUFFER_SIZE = getBufferSize();
6060

6161
/**
62-
* Whether {@linkplain BUFFER_SIZE} is to be ignored in favor of JRE's own decision.
62+
* Whether {@linkplain #BUFFER_SIZE} is to be ignored in favor of JRE's own decision.
6363
*/
6464
public static final boolean AUTOSIZE_BUFFER = getAutosizeBuffer();
6565

@@ -263,9 +263,7 @@ private static byte[] readAllBytes(InputStream inputStream) throws IOException {
263263
* @throws IOException in case of a write failure.
264264
*/
265265
public static void writeToAsString(String s, OutputStream out, MediaType type) throws IOException {
266-
Writer osw = new OutputStreamWriter(out, getCharset(type));
267-
osw.write(s);
268-
osw.flush();
266+
out.write(s.getBytes(getCharset(type)));
269267
}
270268

271269
/**
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright (c) 2024, 2025 Oracle and/or its affiliates. All rights reserved.
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0, which is available at
6+
* http://www.eclipse.org/legal/epl-2.0.
7+
*
8+
* This Source Code may also be made available under the following Secondary
9+
* Licenses when the conditions for such availability set forth in the
10+
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
11+
* version 2 with the GNU Classpath Exception, which is available at
12+
* https://www.gnu.org/software/classpath/license.html.
13+
*
14+
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
15+
*/
16+
17+
package org.glassfish.jersey.server;
18+
19+
20+
import jakarta.ws.rs.GET;
21+
import jakarta.ws.rs.Path;
22+
import jakarta.ws.rs.Produces;
23+
import jakarta.ws.rs.core.MediaType;
24+
import jakarta.ws.rs.core.Response;
25+
import jakarta.ws.rs.core.StreamingOutput;
26+
import org.glassfish.jersey.internal.MapPropertiesDelegate;
27+
import org.glassfish.jersey.io.spi.FlushedCloseable;
28+
import org.glassfish.jersey.message.MessageBodyWorkers;
29+
import org.glassfish.jersey.server.RequestContextBuilder.TestContainerRequest;
30+
import org.glassfish.jersey.server.spi.ContainerResponseWriter;
31+
import org.hamcrest.MatcherAssert;
32+
import org.hamcrest.Matchers;
33+
import org.junit.jupiter.api.Test;
34+
35+
import java.io.ByteArrayOutputStream;
36+
import java.io.IOException;
37+
import java.io.OutputStream;
38+
import java.net.URI;
39+
import java.nio.charset.StandardCharsets;
40+
import java.util.concurrent.Future;
41+
import java.util.concurrent.TimeUnit;
42+
import java.util.concurrent.atomic.AtomicInteger;
43+
44+
public class ContainerResponseWriterNoFlushTest {
45+
private static final String RESPONSE = "RESPONSE";
46+
private static AtomicInteger flushCounter = new AtomicInteger(0);
47+
private static class TestResponseOutputStream extends ByteArrayOutputStream implements FlushedCloseable {
48+
private boolean closed = false;
49+
@Override
50+
public void close() throws IOException {
51+
if (!closed) {
52+
closed = true;
53+
flush();
54+
super.close();
55+
}
56+
}
57+
58+
@Override
59+
public void flush() throws IOException {
60+
flushCounter.incrementAndGet();
61+
}
62+
}
63+
64+
private static class TestContainerWriter implements ContainerResponseWriter {
65+
TestResponseOutputStream outputStream;
66+
private final boolean buffering;
67+
68+
private TestContainerWriter(boolean buffering) {
69+
this.buffering = buffering;
70+
}
71+
72+
@Override
73+
public OutputStream writeResponseStatusAndHeaders(long contentLength, ContainerResponse responseContext)
74+
throws ContainerException {
75+
outputStream = new TestResponseOutputStream();
76+
return outputStream;
77+
}
78+
79+
@Override
80+
public boolean suspend(long timeOut, TimeUnit timeUnit, TimeoutHandler timeoutHandler) {
81+
return false;
82+
}
83+
84+
@Override
85+
public void setSuspendTimeout(long timeOut, TimeUnit timeUnit) throws IllegalStateException {
86+
}
87+
88+
@Override
89+
public void commit() {
90+
}
91+
92+
@Override
93+
public void failure(Throwable error) {
94+
throw new RuntimeException(error);
95+
}
96+
97+
@Override
98+
public boolean enableResponseBuffering() {
99+
return buffering;
100+
}
101+
}
102+
103+
@Path("/test")
104+
public static class StreamResource {
105+
106+
@GET
107+
@Path(value = "/stream")
108+
@Produces(MediaType.TEXT_PLAIN)
109+
public Response stream() {
110+
111+
StreamingOutput stream = output -> {
112+
output.write(RESPONSE.getBytes(StandardCharsets.UTF_8));
113+
};
114+
return Response.ok(stream).build();
115+
}
116+
}
117+
118+
@Test
119+
public void testWriterBuffering() {
120+
TestContainerWriter writer = new TestContainerWriter(true);
121+
testWriter(writer);
122+
}
123+
124+
@Test
125+
public void testWriterNoBuffering() {
126+
TestContainerWriter writer = new TestContainerWriter(false);
127+
testWriter(writer);
128+
}
129+
130+
private void testWriter(TestContainerWriter writer) {
131+
flushCounter.set(0);
132+
RequestContextBuilder rcb = RequestContextBuilder.from("/test/stream", "GET");
133+
134+
TestContainerRequest request = rcb.new TestContainerRequest(
135+
null, URI.create("/test/stream"), "GET", null, new MapPropertiesDelegate()) {
136+
@Override
137+
public void setWorkers(MessageBodyWorkers workers) {
138+
if (workers != null) {
139+
setWriter(writer);
140+
}
141+
super.setWorkers(workers);
142+
}
143+
};
144+
145+
ApplicationHandler applicationHandler = new ApplicationHandler(new ResourceConfig(StreamResource.class));
146+
Future<ContainerResponse> future = applicationHandler.apply(request);
147+
MatcherAssert.assertThat(writer.outputStream.toString(), Matchers.is(RESPONSE));
148+
MatcherAssert.assertThat(flushCounter.get(), Matchers.is(1));
149+
}
150+
}

0 commit comments

Comments
 (0)