Skip to content

Commit

Permalink
Improve Stability of GCS Mock API (#49592) (#49597)
Browse files Browse the repository at this point in the history
Same as #49518 pretty much but for GCS.
Fixing a few more spots where input stream can get closed
without being fully drained and adding assertions to make sure
it's always drained.
Moved the no-close stream wrapper to production code utilities since
there's a number of spots in production code where it's also useful
(will reuse it there in a follow-up).
  • Loading branch information
original-brownbear authored Nov 26, 2019
1 parent 26a8ca0 commit 495b543
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ protected String requestUniqueId(HttpExchange exchange) {
if ("/token".equals(exchange.getRequestURI().getPath())) {
try {
// token content is unique per node (not per request)
return Streams.readFully(exchange.getRequestBody()).utf8ToString();
return Streams.readFully(Streams.noCloseStream(exchange.getRequestBody())).utf8ToString();
} catch (IOException e) {
throw new AssertionError("Unable to read token request body", e);
}
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/org/elasticsearch/common/io/Streams.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.BufferedReader;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -219,6 +220,21 @@ public static void readAllLines(InputStream input, Consumer<String> consumer) th
}
}

/**
* Wraps an {@link InputStream} such that it's {@code close} method becomes a noop
*
* @param stream {@code InputStream} to wrap
* @return wrapped {@code InputStream}
*/
public static InputStream noCloseStream(InputStream stream) {
return new FilterInputStream(stream) {
@Override
public void close() {
// noop
}
};
}

/**
* Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when
* close is called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,20 @@
@SuppressForbidden(reason = "Uses a HttpServer to emulate a fake OAuth2 authentication service")
public class FakeOAuth2HttpHandler implements HttpHandler {

private static final byte[] BUFFER = new byte[1024];

@Override
public void handle(final HttpExchange exchange) throws IOException {
byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
exchange.close();
try {
byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
while (exchange.getRequestBody().read(BUFFER) >= 0) ;
} finally {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
exchange.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public void handle(final HttpExchange exchange) throws IOException {
assert read == -1 : "Request body should have been empty but saw [" + read + "]";
}
try {
// Request body is closed in the finally block
final InputStream wrappedRequest = Streams.noCloseStream(exchange.getRequestBody());
if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
final Map<String, String> params = new HashMap<>();
Expand Down Expand Up @@ -159,7 +161,7 @@ public void handle(final HttpExchange exchange) throws IOException {
// Batch https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
final String uri = "/storage/v1/b/" + bucket + "/o/";
final StringBuilder batch = new StringBuilder();
for (String line : Streams.readAllLines(new BufferedInputStream(exchange.getRequestBody()))) {
for (String line : Streams.readAllLines(new BufferedInputStream(wrappedRequest))) {
if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) {
batch.append(line).append('\n');
} else if (line.startsWith("DELETE")) {
Expand All @@ -180,7 +182,7 @@ public void handle(final HttpExchange exchange) throws IOException {

} else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) {
// Multipart upload
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(exchange.getRequestBody());
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(wrappedRequest);
if (content.isPresent()) {
blobs.put(content.get().v1(), content.get().v2());

Expand All @@ -199,7 +201,7 @@ public void handle(final HttpExchange exchange) throws IOException {
final String blobName = params.get("name");
blobs.put(blobName, BytesArray.EMPTY);

byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8);
byte[] response = Streams.readFully(wrappedRequest).utf8ToString().getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.getResponseHeaders().add("Location", httpServerUrl(exchange) + "/upload/storage/v1/b/" + bucket + "/o?"
+ "uploadType=resumable"
Expand All @@ -225,7 +227,7 @@ public void handle(final HttpExchange exchange) throws IOException {
final int end = getContentRangeEnd(range);

final ByteArrayOutputStream out = new ByteArrayOutputStream();
long bytesRead = Streams.copy(exchange.getRequestBody(), out);
long bytesRead = Streams.copy(wrappedRequest, out);
int length = Math.max(end + 1, limit != null ? limit : 0);
if ((int) bytesRead > length) {
throw new AssertionError("Requesting more bytes than available for blob");
Expand All @@ -250,6 +252,8 @@ public void handle(final HttpExchange exchange) throws IOException {
exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
}
} finally {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
exchange.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,26 @@ protected ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPe

@Override
public void handle(final HttpExchange exchange) throws IOException {
final String requestId = requestUniqueId(exchange);
assert Strings.hasText(requestId);

final boolean canFailRequest = canFailRequest(exchange);
final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
if (count >= maxErrorsPerRequest || canFailRequest == false) {
requests.remove(requestId);
delegate.handle(exchange);
} else {
handleAsError(exchange);
try {
final String requestId = requestUniqueId(exchange);
assert Strings.hasText(requestId);

final boolean canFailRequest = canFailRequest(exchange);
final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
if (count >= maxErrorsPerRequest || canFailRequest == false) {
requests.remove(requestId);
delegate.handle(exchange);
} else {
handleAsError(exchange);
}
} finally {
try {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
} catch (IOException e) {
// ignored, stream is assumed to have been closed by previous handler
}
exchange.close();
}
}

Expand Down

0 comments on commit 495b543

Please sign in to comment.