Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Stability of GCS Mock API (#49592) #49597

Merged
merged 1 commit into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ protected String requestUniqueId(HttpExchange exchange) {
if ("/token".equals(exchange.getRequestURI().getPath())) {
try {
// token content is unique per node (not per request)
return Streams.readFully(exchange.getRequestBody()).utf8ToString();
return Streams.readFully(Streams.noCloseStream(exchange.getRequestBody())).utf8ToString();
} catch (IOException e) {
throw new AssertionError("Unable to read token request body", e);
}
Expand Down
16 changes: 16 additions & 0 deletions server/src/main/java/org/elasticsearch/common/io/Streams.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.BufferedReader;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand Down Expand Up @@ -219,6 +220,21 @@ public static void readAllLines(InputStream input, Consumer<String> consumer) th
}
}

/**
* Wraps an {@link InputStream} such that it's {@code close} method becomes a noop
*
* @param stream {@code InputStream} to wrap
* @return wrapped {@code InputStream}
*/
public static InputStream noCloseStream(InputStream stream) {
return new FilterInputStream(stream) {
@Override
public void close() {
// noop
}
};
}

/**
* Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when
* close is called.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,20 @@
@SuppressForbidden(reason = "Uses a HttpServer to emulate a fake OAuth2 authentication service")
public class FakeOAuth2HttpHandler implements HttpHandler {

private static final byte[] BUFFER = new byte[1024];

@Override
public void handle(final HttpExchange exchange) throws IOException {
byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
exchange.close();
try {
byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length);
exchange.getResponseBody().write(response);
while (exchange.getRequestBody().read(BUFFER) >= 0) ;
} finally {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
exchange.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public void handle(final HttpExchange exchange) throws IOException {
assert read == -1 : "Request body should have been empty but saw [" + read + "]";
}
try {
// Request body is closed in the finally block
final InputStream wrappedRequest = Streams.noCloseStream(exchange.getRequestBody());
if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) {
// List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list
final Map<String, String> params = new HashMap<>();
Expand Down Expand Up @@ -159,7 +161,7 @@ public void handle(final HttpExchange exchange) throws IOException {
// Batch https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
final String uri = "/storage/v1/b/" + bucket + "/o/";
final StringBuilder batch = new StringBuilder();
for (String line : Streams.readAllLines(new BufferedInputStream(exchange.getRequestBody()))) {
for (String line : Streams.readAllLines(new BufferedInputStream(wrappedRequest))) {
if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) {
batch.append(line).append('\n');
} else if (line.startsWith("DELETE")) {
Expand All @@ -180,7 +182,7 @@ public void handle(final HttpExchange exchange) throws IOException {

} else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) {
// Multipart upload
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(exchange.getRequestBody());
Optional<Tuple<String, BytesArray>> content = parseMultipartRequestBody(wrappedRequest);
if (content.isPresent()) {
blobs.put(content.get().v1(), content.get().v2());

Expand All @@ -199,7 +201,7 @@ public void handle(final HttpExchange exchange) throws IOException {
final String blobName = params.get("name");
blobs.put(blobName, BytesArray.EMPTY);

byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8);
byte[] response = Streams.readFully(wrappedRequest).utf8ToString().getBytes(UTF_8);
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.getResponseHeaders().add("Location", httpServerUrl(exchange) + "/upload/storage/v1/b/" + bucket + "/o?"
+ "uploadType=resumable"
Expand All @@ -225,7 +227,7 @@ public void handle(final HttpExchange exchange) throws IOException {
final int end = getContentRangeEnd(range);

final ByteArrayOutputStream out = new ByteArrayOutputStream();
long bytesRead = Streams.copy(exchange.getRequestBody(), out);
long bytesRead = Streams.copy(wrappedRequest, out);
int length = Math.max(end + 1, limit != null ? limit : 0);
if ((int) bytesRead > length) {
throw new AssertionError("Requesting more bytes than available for blob");
Expand All @@ -250,6 +252,8 @@ public void handle(final HttpExchange exchange) throws IOException {
exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1);
}
} finally {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
exchange.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,16 +190,26 @@ protected ErroneousHttpHandler(final HttpHandler delegate, final int maxErrorsPe

@Override
public void handle(final HttpExchange exchange) throws IOException {
final String requestId = requestUniqueId(exchange);
assert Strings.hasText(requestId);

final boolean canFailRequest = canFailRequest(exchange);
final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
if (count >= maxErrorsPerRequest || canFailRequest == false) {
requests.remove(requestId);
delegate.handle(exchange);
} else {
handleAsError(exchange);
try {
final String requestId = requestUniqueId(exchange);
assert Strings.hasText(requestId);

final boolean canFailRequest = canFailRequest(exchange);
final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet();
if (count >= maxErrorsPerRequest || canFailRequest == false) {
requests.remove(requestId);
delegate.handle(exchange);
} else {
handleAsError(exchange);
}
} finally {
try {
int read = exchange.getRequestBody().read();
assert read == -1 : "Request body should have been fully read here but saw [" + read + "]";
} catch (IOException e) {
// ignored, stream is assumed to have been closed by previous handler
}
exchange.close();
}
}

Expand Down