Skip to content

Commit

Permalink
[Proxy] Limit replay buffer size in AdminProxyHandler (apache#10944)
Browse files Browse the repository at this point in the history
Fixes apache#10908

### Motivation

Pulsar Proxy uses a lot of heap memory when uploading large function jar files. This also leads to high GC activity since a continuous block of memory (byte array for the size of the upload) is allocated. GC will have to do compaction for the heap (which gets fragmented) to find a continuous block of memory. This is the reason why allocating large arrays are costly from GC perspective.

The buffering solution added as part of apache#5361. The solution buffers also very large uploads to memory.

### Modifications

* Limit the replay buffer size to a configurable limit which defaults to 5MB. This is configured with the `httpInputMaxReplayBufferSize` proxy configuration parameter.
* Add unit test to see that buffer size gets limited
* Add unit test for apache#5361

(cherry picked from commit 2324618)
  • Loading branch information
lhotari authored and eolivelli committed Jun 23, 2021
1 parent a8b372e commit 7fa88cc
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,26 +165,43 @@ protected HttpClient createHttpClient() throws ServletException {
// This class allows the request body to be replayed, the default implementation
// does not
protected class ReplayableProxyContentProvider extends ProxyInputStreamContentProvider {
private Boolean firstIteratorCalled = false;
static final int MIN_REPLAY_BODY_BUFFER_SIZE = 64;
private boolean bodyBufferAvailable = false;
private boolean bodyBufferMaxSizeReached = false;
private final ByteArrayOutputStream bodyBuffer;
protected ReplayableProxyContentProvider(HttpServletRequest request, HttpServletResponse response, Request proxyRequest, InputStream input) {
private final long httpInputMaxReplayBufferSize;

protected ReplayableProxyContentProvider(HttpServletRequest request, HttpServletResponse response,
Request proxyRequest, InputStream input,
int httpInputMaxReplayBufferSize) {
super(request, response, proxyRequest, input);
bodyBuffer = new ByteArrayOutputStream(Math.max(request.getContentLength(), 0));
bodyBuffer = new ByteArrayOutputStream(
Math.min(Math.max(request.getContentLength(), MIN_REPLAY_BODY_BUFFER_SIZE),
httpInputMaxReplayBufferSize));
this.httpInputMaxReplayBufferSize = httpInputMaxReplayBufferSize;
}

@Override
public Iterator<ByteBuffer> iterator() {
if (firstIteratorCalled) {
if (bodyBufferAvailable) {
return Collections.singleton(ByteBuffer.wrap(bodyBuffer.toByteArray())).iterator();
} else {
firstIteratorCalled = true;
bodyBufferAvailable = true;
return super.iterator();
}
}

@Override
protected ByteBuffer onRead(byte[] buffer, int offset, int length) {
bodyBuffer.write(buffer, offset, length);
if (!bodyBufferMaxSizeReached) {
if (bodyBuffer.size() + length < httpInputMaxReplayBufferSize) {
bodyBuffer.write(buffer, offset, length);
} else {
bodyBufferMaxSizeReached = true;
bodyBufferAvailable = false;
bodyBuffer.reset();
}
}
return super.onRead(buffer, offset, length);
}
}
Expand Down Expand Up @@ -217,8 +234,10 @@ protected Request copyRequest(HttpRequest oldRequest, URI newURI) {

@Override
protected ContentProvider proxyRequestContent(HttpServletRequest request,
HttpServletResponse response, Request proxyRequest) throws IOException {
return new ReplayableProxyContentProvider(request, response, proxyRequest, request.getInputStream());
HttpServletResponse response, Request proxyRequest)
throws IOException {
return new ReplayableProxyContentProvider(request, response, proxyRequest, request.getInputStream(),
config.getHttpInputMaxReplayBufferSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,16 @@ public class ProxyConfiguration implements PulsarConfiguration {
)
private int httpOutputBufferSize = 32*1024;

@FieldContext(
minValue = 1,
category = CATEGORY_HTTP,
doc = "Http input buffer max size.\n\n"
+ "The maximum amount of data that will be buffered for incoming http requests "
+ "so that the request body can be replayed when the backend broker "
+ "issues a redirect response."
)
private int httpInputMaxReplayBufferSize = 5 * 1024 * 1024;

@FieldContext(
minValue = 1,
category = CATEGORY_HTTP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,56 @@
*/
package org.apache.pulsar.proxy.server;

import static org.mockito.Mockito.*;

import org.eclipse.jetty.client.api.Request;
import org.testng.Assert;
import org.testng.annotations.Test;

import java.io.InputStream;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.lang.reflect.Field;

import java.nio.ByteBuffer;
import java.util.Iterator;
import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

public class AdminProxyHandlerTest {
private AdminProxyHandler adminProxyHandler;

@BeforeClass
public void setupMocks() throws ServletException {
// given
HttpClient httpClient = mock(HttpClient.class);
adminProxyHandler = new AdminProxyHandler(mock(ProxyConfiguration.class),
mock(BrokerDiscoveryProvider.class)) {
@Override
protected HttpClient createHttpClient() throws ServletException {
return httpClient;
}
};
ServletConfig servletConfig = mock(ServletConfig.class);
when(servletConfig.getServletName()).thenReturn("AdminProxyHandler");
when(servletConfig.getServletContext()).thenReturn(mock(ServletContext.class));
adminProxyHandler.init(servletConfig);
}

@Test
public void replayableProxyContentProviderTest() throws Exception {

AdminProxyHandler adminProxyHandler = new AdminProxyHandler(mock(ProxyConfiguration.class),
mock(BrokerDiscoveryProvider.class));

HttpServletRequest request = mock(HttpServletRequest.class);
doReturn(-1).when(request).getContentLength();

try {
AdminProxyHandler.ReplayableProxyContentProvider replayableProxyContentProvider = adminProxyHandler.new ReplayableProxyContentProvider(
request, mock(HttpServletResponse.class), mock(Request.class), mock(InputStream.class));
AdminProxyHandler.ReplayableProxyContentProvider replayableProxyContentProvider =
adminProxyHandler.new ReplayableProxyContentProvider(
request, mock(HttpServletResponse.class), mock(Request.class), mock(InputStream.class),
1024);
Field field = replayableProxyContentProvider.getClass().getDeclaredField("bodyBuffer");
field.setAccessible(true);
Assert.assertEquals(((ByteArrayOutputStream) field.get(replayableProxyContentProvider)).size(), 0);
Expand All @@ -53,4 +76,68 @@ public void replayableProxyContentProviderTest() throws Exception {
}

}

@Test
public void shouldLimitReplayBodyBufferSize() throws Exception {
HttpServletRequest request = mock(HttpServletRequest.class);
int maxRequestBodySize = 1024 * 1024;
int requestBodySize = maxRequestBodySize + 1;
doReturn(requestBodySize).when(request).getContentLength();
byte[] inputBuffer = new byte[requestBodySize];

AdminProxyHandler.ReplayableProxyContentProvider replayableProxyContentProvider =
adminProxyHandler.new ReplayableProxyContentProvider(request, mock(HttpServletResponse.class),
mock(Request.class), new ByteArrayInputStream(inputBuffer),
maxRequestBodySize);

// when

// content is consumed
Iterator<ByteBuffer> byteBufferIterator = replayableProxyContentProvider.iterator();
int consumedBytes = 0;
while (byteBufferIterator.hasNext()) {
ByteBuffer byteBuffer = byteBufferIterator.next();
consumedBytes += byteBuffer.limit();
}

// then
Assert.assertEquals(consumedBytes, requestBodySize);
Field field = replayableProxyContentProvider.getClass().getDeclaredField("bodyBufferMaxSizeReached");
field.setAccessible(true);
Assert.assertEquals(((boolean) field.get(replayableProxyContentProvider)), true);
}

@Test
public void shouldReplayBodyBuffer() {
// given
HttpServletRequest request = mock(HttpServletRequest.class);
int maxRequestBodySize = 1024 * 1024;
byte[] inputBuffer = new byte[maxRequestBodySize - 1];
for (int i = 0; i < inputBuffer.length; i++) {
inputBuffer[i] = (byte) (i & 0xff);
}
doReturn(inputBuffer.length).when(request).getContentLength();

AdminProxyHandler.ReplayableProxyContentProvider replayableProxyContentProvider =
adminProxyHandler.new ReplayableProxyContentProvider(request, mock(HttpServletResponse.class),
mock(Request.class), new ByteArrayInputStream(inputBuffer),
maxRequestBodySize);

ByteBuffer consumeBuffer = ByteBuffer.allocate(maxRequestBodySize);
// content can be consumed multiple times
for (int i = 0; i < 3; i++) {
// when
consumeBuffer.clear();
Iterator<ByteBuffer> byteBufferIterator = replayableProxyContentProvider.iterator();
while (byteBufferIterator.hasNext()) {
ByteBuffer byteBuffer = byteBufferIterator.next();
consumeBuffer.put(byteBuffer);
}
consumeBuffer.flip();
byte[] consumedBytes = new byte[consumeBuffer.limit()];
consumeBuffer.get(consumedBytes);
// then
Assert.assertEquals(consumedBytes, inputBuffer);
}
}
}

0 comments on commit 7fa88cc

Please sign in to comment.