From 22d4420465ed9c9e9ae2e2f6215501ca6339ace6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Matthias=20Bl=C3=A4sing?= Date: Tue, 29 Sep 2020 05:49:08 +0200 Subject: [PATCH] Support Server Sent Events (SSE) for chunked responses. (#176) The expectation is that the chunks are immediately sent and not buffered until the whole response is done. If a response is detected, the chunks are normally read and each read chunk is immediately flushed, replicating the upstream behavior downstream. Supporting changes: - add jetty as a test dependency used as servlet execution engine - bump javax.servlet-api to version 3.1.0 which is required by jetty Jetty 9.4 requires java 8 as minimum version. Co-authored-by: David Smiley --- .gitignore | 1 + CHANGES.md | 5 + pom.xml | 11 +- .../mitre/dsmiley/httpproxy/ProxyServlet.java | 23 ++- .../httpproxy/ChunkedTransferTest.java | 143 ++++++++++++++++++ 5 files changed, 179 insertions(+), 4 deletions(-) create mode 100644 src/test/java/org/mitre/dsmiley/httpproxy/ChunkedTransferTest.java diff --git a/.gitignore b/.gitignore index 04b5ba0a..142cc258 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ /.settings .classpath .project +/nb-configuration.xml diff --git a/CHANGES.md b/CHANGES.md index e92232a6..8fb076a8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -2,9 +2,14 @@ Java 8 is now the minimum java version the servlet works with. +Servlet API 3.1.0 is now the minimum servlet API supported. + \#158: More extension points RE HttpClient building and cookies. Thanks Mark Michaelis. +\#176: Flush chunked responses to support Server Sent Events (SSE). +Thanks Matthias Bläsing + # Version 1.11 2019-01-12 \#155: Add OSGI manifiest headers. diff --git a/pom.xml b/pom.xml index e3ee7fb7..c7431f66 100644 --- a/pom.xml +++ b/pom.xml @@ -58,11 +58,11 @@ - + javax.servlet javax.servlet-api - 3.0.1 + 3.1.0 provided @@ -100,6 +100,13 @@ 1.2.17 test + + + org.eclipse.jetty + jetty-servlet + 9.4.30.v20200611 + test + diff --git a/src/main/java/org/mitre/dsmiley/httpproxy/ProxyServlet.java b/src/main/java/org/mitre/dsmiley/httpproxy/ProxyServlet.java index 36833cab..3d47332d 100644 --- a/src/main/java/org/mitre/dsmiley/httpproxy/ProxyServlet.java +++ b/src/main/java/org/mitre/dsmiley/httpproxy/ProxyServlet.java @@ -44,6 +44,7 @@ import javax.servlet.http.HttpServletResponse; import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.HttpCookie; import java.net.URI; @@ -635,8 +636,26 @@ protected void copyResponseEntity(HttpResponse proxyResponse, HttpServletRespons throws IOException { HttpEntity entity = proxyResponse.getEntity(); if (entity != null) { - OutputStream servletOutputStream = servletResponse.getOutputStream(); - entity.writeTo(servletOutputStream); + if (entity.isChunked()) { + // Flush intermediate results before blocking on input -- needed for SSE + InputStream is = entity.getContent(); + try { + byte[] buffer = new byte[10 * 1024]; + int read; + OutputStream os = servletResponse.getOutputStream(); + while ((read = is.read(buffer)) != -1) { + os.write(buffer, 0, read); + if (is.available() == 0) { // next is.read will block + os.flush(); + } + } + } finally { + closeQuietly(is); + } + } else { + OutputStream servletOutputStream = servletResponse.getOutputStream(); + entity.writeTo(servletOutputStream); + } } } diff --git a/src/test/java/org/mitre/dsmiley/httpproxy/ChunkedTransferTest.java b/src/test/java/org/mitre/dsmiley/httpproxy/ChunkedTransferTest.java new file mode 100644 index 00000000..02e00abe --- /dev/null +++ b/src/test/java/org/mitre/dsmiley/httpproxy/ChunkedTransferTest.java @@ -0,0 +1,143 @@ +/* + * Copyright 2020 Matthias Bläsing. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.mitre.dsmiley.httpproxy; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertTrue; + +public class ChunkedTransferTest { + + private Server server; + private ServletHandler servletHandler; + private int serverPort; + + @Before + public void setUp() throws Exception { + server = new Server(0); + servletHandler = new ServletHandler(); + server.setHandler(servletHandler); + server.start(); + + serverPort = ((ServerConnector) server.getConnectors()[0]).getLocalPort(); + } + + @After + public void tearDown() throws Exception { + server.stop(); + serverPort = -1; + } + + @Test + public void testChunkedTransfer() throws Exception { + /* + Check that proxy requests are not buffered in the ProxyServlet, but + immediately flushed. The test works by creating a servlet, that writes + the first message and flushes the outputstream, further processing + is blocked by a count down latch. + + The client now reads the first message. The message must be completely + received and further data must not be present. + + After the first message is consumed, the CountDownLatch is released and + the second messsage is expected. This in turn must be completely be read. + + If the CountDownLatch is not released, it will timeout and the second + message will not be send. + */ + + final CountDownLatch guardForSecondRead = new CountDownLatch(1); + final byte[] data1 = "event: message\ndata: Dummy Data1\n\n".getBytes(StandardCharsets.UTF_8); + final byte[] data2 = "event: message\ndata: Dummy Data2\n\n".getBytes(StandardCharsets.UTF_8); + + ServletHolder servletHolder = servletHandler.addServletWithMapping(ProxyServlet.class, "/chatProxied/*"); + servletHolder.setInitParameter(ProxyServlet.P_LOG, "true"); + servletHolder.setInitParameter(ProxyServlet.P_TARGET_URI, String.format("http://localhost:%d/chat/", serverPort)); + + ServletHolder dummyBackend = new ServletHolder(new HttpServlet() { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + resp.setContentType("text/event-stream"); + OutputStream os = resp.getOutputStream(); + // Write first message for client and flush it out + os.write(data1); + os.flush(); + try { + // Wait for client to request the second message by counting down the + // latch - if the latch times out, the second message will not be + // send and the corresponding assert will fail + if (guardForSecondRead.await(10, TimeUnit.SECONDS)) { + os.write(data2); + os.flush(); + } + } catch (InterruptedException ex) { + throw new IOException(ex); + } + } + }); + servletHandler.addServletWithMapping(dummyBackend, "/chat/*"); + + URL url = new URL(String.format("http://localhost:%d/chatProxied/test", serverPort)); + + try (InputStream is = url.openStream()) { + byte[] readData = readUntilBlocked(is); + assertTrue("No data received (message1)", readData.length > 0); + assertArrayEquals("Received data: '" + toString(readData) + "' (message1)", data1, readData); + guardForSecondRead.countDown(); + readData = readUntilBlocked(is); + assertTrue("No data received (message2)", readData.length > 0); + assertArrayEquals("Received data: '" + toString(readData) + "' (message2)", data2, readData); + } + } + + private static String toString(byte[] data) { + return new String(data, StandardCharsets.UTF_8); + } + + private static byte[] readUntilBlocked(InputStream is) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[10 * 1024]; + do { + int read = is.read(buffer); + if (read >= 0) { + baos.write(buffer, 0, read); + } else { + break; + } + } while (is.available() > 0); + return baos.toByteArray(); + } +}