Skip to content

Commit

Permalink
Support Server Sent Events (SSE) for chunked responses. (#176)
Browse files Browse the repository at this point in the history
The expectation is that the chunks are immediately sent and not buffered until the whole
response is done.

If a response is detected, the chunks are normally read and each read
chunk is immediately flushed, replicating the upstream behavior
downstream.

Supporting changes:
- add jetty as a test dependency used as servlet execution engine
- bump javax.servlet-api to version 3.1.0 which is required by jetty

Jetty 9.4 requires java 8 as minimum version.

Co-authored-by: David Smiley <david.w.smiley@gmail.com>
  • Loading branch information
matthiasblaesing and dsmiley authored Sep 29, 2020
1 parent a2f899e commit 22d4420
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@
/.settings
.classpath
.project
/nb-configuration.xml
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

Java 8 is now the minimum java version the servlet works with.

Servlet API 3.1.0 is now the minimum servlet API supported.

\#158: More extension points RE HttpClient building and cookies.
Thanks Mark Michaelis.

\#176: Flush chunked responses to support Server Sent Events (SSE).
Thanks Matthias Bläsing

# Version 1.11 2019-01-12

\#155: Add OSGI manifiest headers.
Expand Down
11 changes: 9 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@

<dependencies>

<!-- FYI tomcat 5.5 & beyond -->
<!-- Probably works with 3.0 as well -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>3.0.1</version>
<version>3.1.0</version>
<scope>provided</scope>
</dependency>

Expand Down Expand Up @@ -100,6 +100,13 @@
<version>1.2.17</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>9.4.30.v20200611</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
23 changes: 21 additions & 2 deletions src/main/java/org/mitre/dsmiley/httpproxy/ProxyServlet.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import javax.servlet.http.HttpServletResponse;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpCookie;
import java.net.URI;
Expand Down Expand Up @@ -635,8 +636,26 @@ protected void copyResponseEntity(HttpResponse proxyResponse, HttpServletRespons
throws IOException {
HttpEntity entity = proxyResponse.getEntity();
if (entity != null) {
OutputStream servletOutputStream = servletResponse.getOutputStream();
entity.writeTo(servletOutputStream);
if (entity.isChunked()) {
// Flush intermediate results before blocking on input -- needed for SSE
InputStream is = entity.getContent();
try {
byte[] buffer = new byte[10 * 1024];
int read;
OutputStream os = servletResponse.getOutputStream();
while ((read = is.read(buffer)) != -1) {
os.write(buffer, 0, read);
if (is.available() == 0) { // next is.read will block
os.flush();
}
}
} finally {
closeQuietly(is);
}
} else {
OutputStream servletOutputStream = servletResponse.getOutputStream();
entity.writeTo(servletOutputStream);
}
}
}

Expand Down
143 changes: 143 additions & 0 deletions src/test/java/org/mitre/dsmiley/httpproxy/ChunkedTransferTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright 2020 Matthias Bläsing.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.mitre.dsmiley.httpproxy;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;

public class ChunkedTransferTest {

private Server server;
private ServletHandler servletHandler;
private int serverPort;

@Before
public void setUp() throws Exception {
server = new Server(0);
servletHandler = new ServletHandler();
server.setHandler(servletHandler);
server.start();

serverPort = ((ServerConnector) server.getConnectors()[0]).getLocalPort();
}

@After
public void tearDown() throws Exception {
server.stop();
serverPort = -1;
}

@Test
public void testChunkedTransfer() throws Exception {
/*
Check that proxy requests are not buffered in the ProxyServlet, but
immediately flushed. The test works by creating a servlet, that writes
the first message and flushes the outputstream, further processing
is blocked by a count down latch.
The client now reads the first message. The message must be completely
received and further data must not be present.
After the first message is consumed, the CountDownLatch is released and
the second messsage is expected. This in turn must be completely be read.
If the CountDownLatch is not released, it will timeout and the second
message will not be send.
*/

final CountDownLatch guardForSecondRead = new CountDownLatch(1);
final byte[] data1 = "event: message\ndata: Dummy Data1\n\n".getBytes(StandardCharsets.UTF_8);
final byte[] data2 = "event: message\ndata: Dummy Data2\n\n".getBytes(StandardCharsets.UTF_8);

ServletHolder servletHolder = servletHandler.addServletWithMapping(ProxyServlet.class, "/chatProxied/*");
servletHolder.setInitParameter(ProxyServlet.P_LOG, "true");
servletHolder.setInitParameter(ProxyServlet.P_TARGET_URI, String.format("http://localhost:%d/chat/", serverPort));

ServletHolder dummyBackend = new ServletHolder(new HttpServlet() {
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
resp.setContentType("text/event-stream");
OutputStream os = resp.getOutputStream();
// Write first message for client and flush it out
os.write(data1);
os.flush();
try {
// Wait for client to request the second message by counting down the
// latch - if the latch times out, the second message will not be
// send and the corresponding assert will fail
if (guardForSecondRead.await(10, TimeUnit.SECONDS)) {
os.write(data2);
os.flush();
}
} catch (InterruptedException ex) {
throw new IOException(ex);
}
}
});
servletHandler.addServletWithMapping(dummyBackend, "/chat/*");

URL url = new URL(String.format("http://localhost:%d/chatProxied/test", serverPort));

try (InputStream is = url.openStream()) {
byte[] readData = readUntilBlocked(is);
assertTrue("No data received (message1)", readData.length > 0);
assertArrayEquals("Received data: '" + toString(readData) + "' (message1)", data1, readData);
guardForSecondRead.countDown();
readData = readUntilBlocked(is);
assertTrue("No data received (message2)", readData.length > 0);
assertArrayEquals("Received data: '" + toString(readData) + "' (message2)", data2, readData);
}
}

private static String toString(byte[] data) {
return new String(data, StandardCharsets.UTF_8);
}

private static byte[] readUntilBlocked(InputStream is) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[10 * 1024];
do {
int read = is.read(buffer);
if (read >= 0) {
baos.write(buffer, 0, read);
} else {
break;
}
} while (is.available() > 0);
return baos.toByteArray();
}
}

0 comments on commit 22d4420

Please sign in to comment.