Skip to content

Commit

Permalink
Complete implementation of automatic request and response compression.
Browse files Browse the repository at this point in the history
Make sure bulk requests work
Properly parse 403 errors from OpenSearch service (they don't follow OS format)
Ensure that every transport error with a status code is reported as an OpenSearchError

Signed-off-by: Matt Timmermans <matt@timmermans.org>
  • Loading branch information
mtimmermansTa committed Jul 10, 2022
1 parent 5ba60f9 commit fe825c4
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 74 deletions.
14 changes: 7 additions & 7 deletions java-client/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,13 @@ dependencies {
testImplementation("com.fasterxml.jackson.datatype", "jackson-datatype-jakarta-jsonp", jacksonVersion)

// For AwsSdk2Transport
"awsSdk2SupportImplementation"("software.amazon.awssdk","sdk-core","2.15+")
"awsSdk2SupportImplementation"("software.amazon.awssdk","auth","2.15+")
testImplementation("software.amazon.awssdk","sdk-core","2.15+")
testImplementation("software.amazon.awssdk","auth","2.15+")
testImplementation("software.amazon.awssdk","aws-crt-client","2.15+")
testImplementation("software.amazon.awssdk","apache-client","2.15+")
testImplementation("software.amazon.awssdk","sts","2.15+")
"awsSdk2SupportImplementation"("software.amazon.awssdk","sdk-core","[2.15,3.0)")
"awsSdk2SupportImplementation"("software.amazon.awssdk","auth","[2.15,3.0)")
testImplementation("software.amazon.awssdk","sdk-core","[2.15,3.0)")
testImplementation("software.amazon.awssdk","auth","[2.15,3.0)")
testImplementation("software.amazon.awssdk","aws-crt-client","[2.15,3.0)")
testImplementation("software.amazon.awssdk","apache-client","[2.15,3.0)")
testImplementation("software.amazon.awssdk","sts","[2.15,3.0)")

// EPL-2.0 OR BSD-3-Clause
// https://eclipse-ee4j.github.io/yasson/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,22 @@

package org.opensearch.client.transport.aws;

import jakarta.json.JsonObject;
import jakarta.json.stream.JsonParser;
import org.opensearch.client.json.JsonpDeserializer;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch._types.ErrorCause;
import org.opensearch.client.opensearch._types.ErrorResponse;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.transport.Endpoint;
import org.opensearch.client.transport.JsonEndpoint;
import org.opensearch.client.transport.OpenSearchRequestBodyBuffer;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.TransportException;
import org.opensearch.client.transport.TransportOptions;
import org.opensearch.client.transport.endpoints.BooleanEndpoint;
import org.opensearch.client.transport.endpoints.BooleanResponse;
import org.opensearch.client.util.MissingRequiredPropertyException;
import org.opensearch.client.util.OpenSearchRequestBodyBuffer;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.signer.Aws4Signer;
Expand Down Expand Up @@ -78,13 +79,20 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.zip.GZIPInputStream;


/**
* Implementation of the OpenSearchTransport interface that sends signed requests using
* the AWS v2 SDK HTTP clients, to connect to an AWS OpenSearch service using IAM authentication
*/
public class AwsSdk2Transport implements OpenSearchTransport {
/**
* By default, requests that exceed this size will be automatically compressed.
* {@link AwsSdk2TransportOptions} can be used to override this setting or disable compresson.
*/
public static final Integer DEFAULT_REQUEST_COMPRESSION_SIZE = 8192;

private static final byte[] NO_BYTES = new byte[0];
private final SdkHttpClient httpClient;
private final SdkAsyncHttpClient asyncHttpClient;
Expand Down Expand Up @@ -253,11 +261,11 @@ private <RequestT> OpenSearchRequestBodyBuffer prepareRequestBody(
.map(o -> o instanceof AwsSdk2TransportOptions ? ((AwsSdk2TransportOptions) o) : null)
.map(AwsSdk2TransportOptions::mapper)
.orElse(defaultMapper);
final Integer maxUncompressedSize = Optional.ofNullable(options)
final int maxUncompressedSize = Optional.ofNullable(options)
.map(o -> o instanceof AwsSdk2TransportOptions ? ((AwsSdk2TransportOptions) o) : null)
.map(AwsSdk2TransportOptions::requestCompressionSize)
.or(()->Optional.ofNullable(transportOptions.requestCompressionSize()))
.orElse(Integer.MAX_VALUE);
.orElse(DEFAULT_REQUEST_COMPRESSION_SIZE);

OpenSearchRequestBodyBuffer buffer = new OpenSearchRequestBodyBuffer(mapper, maxUncompressedSize);
buffer.addContent(request);
Expand Down Expand Up @@ -308,7 +316,18 @@ private <RequestT> SdkHttpFullRequest prepareRequest(
req.putHeader("Content-Encoding", encoding);
}
req.putHeader("Content-Length", String.valueOf(body.getContentLength()));
req.contentStreamProvider(() -> body.getInputStream());
req.contentStreamProvider(body::getInputStream);
}

boolean responseCompression = Optional.ofNullable(options)
.map(o -> o instanceof AwsSdk2TransportOptions ? ((AwsSdk2TransportOptions) o) : null)
.map(AwsSdk2TransportOptions::responseCompression)
.or(() -> Optional.ofNullable(transportOptions.responseCompression()))
.orElse(Boolean.TRUE);
if (responseCompression) {
req.putHeader("Accept-Encoding", "gzip");
} else {
req.removeHeader("Accept-Encoding");
}

final AwsCredentialsProvider credentials = Optional.ofNullable(options)
Expand Down Expand Up @@ -347,7 +366,7 @@ private void applyOptionsHeaders(SdkHttpFullRequest.Builder builder, TransportOp
Collection<Map.Entry<String, String>> headers = options.headers();
if (headers != null && !headers.isEmpty()) {
for (Map.Entry<String, String> header : headers) {
builder.putHeader(header.getKey(), header.getValue());
builder.appendHeader(header.getKey(), header.getValue());
}
}
}
Expand Down Expand Up @@ -404,16 +423,51 @@ private <ResponseT> CompletableFuture<ResponseT> executeAsync(
}

private <ResponseT, ErrorT> ResponseT parseResponse(
SdkHttpResponse httpResponse,
InputStream bodyStream,
Endpoint<?, ResponseT, ErrorT> endpoint,
TransportOptions options
@Nonnull SdkHttpResponse httpResponse,
@CheckForNull InputStream bodyStream,
@Nonnull Endpoint<?, ResponseT, ErrorT> endpoint,
@CheckForNull TransportOptions options
) throws IOException {
final JsonpMapper mapper = Optional.ofNullable(options)
.map(o -> o instanceof AwsSdk2TransportOptions ? ((AwsSdk2TransportOptions) o) : null)
.map(AwsSdk2TransportOptions::mapper)
.orElse(defaultMapper);

int statusCode = httpResponse.statusCode();
boolean isZipped = httpResponse.firstMatchingHeader("Content-Encoding")
.map(enc -> enc.contains("gzip"))
.orElse(Boolean.FALSE);
if (bodyStream != null && isZipped) {
bodyStream = new GZIPInputStream(bodyStream);
}

if (statusCode == 403) {
// Authentication errors from AWS do not follow OpenSearch exception format
ErrorCause.Builder cause = new ErrorCause.Builder();
cause.type("security_exception");
cause.reason("authentication/authorization failure");

if (bodyStream != null) {
try (JsonParser parser = mapper.jsonProvider().createParser(bodyStream)) {
JsonObject val = JsonpDeserializer.jsonValueDeserializer()
.deserialize(parser, mapper)
.asJsonObject();
String message = val.getString("Message", null);
if (message == null) {
message = val.getString("message", null);
}
if (message != null) {
cause.reason(message);
}
} catch (Exception e) {
// OK. We'll use default message
}
}

ErrorResponse error = ErrorResponse.of(err -> err.status(statusCode).error(cause.build()));
throw new OpenSearchException(error);
}

if (endpoint.isError(statusCode)) {
JsonpDeserializer<ErrorT> errorDeserializer = endpoint.errorDeserializer(statusCode);
if (errorDeserializer == null || bodyStream == null) {
Expand All @@ -426,10 +480,13 @@ private <ResponseT, ErrorT> ResponseT parseResponse(
ErrorT error = errorDeserializer.deserialize(parser, mapper);
throw new OpenSearchException((ErrorResponse) error);
}
} catch (MissingRequiredPropertyException errorEx) {
throw new TransportException(
"Request failed with status code '" + statusCode + "'. Can't decode response"
);
} catch (Exception e) {
// can't parse the error - use a general exception
ErrorCause.Builder cause = new ErrorCause.Builder();
cause.type("http_exception");
cause.reason("server returned " + statusCode);
ErrorResponse error = ErrorResponse.of(err -> err.status(statusCode).error(cause.build()));
throw new OpenSearchException(error);
}
} else {
if (endpoint instanceof BooleanEndpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,40 +43,56 @@ public interface AwsSdk2TransportOptions extends TransportOptions {

/**
* Get the credentials provider to user for signing requests.
* <P>
* If this is null, then a default provider should be used -- either a provider specified
* in a more general {@link AwsSdk2TransportOptions} that applies to the request, or the
* default credential chain if there is none.
* <p>
* If this is null, then a default provider will be used -- either a provider specified
* in a more general {@link AwsSdk2TransportOptions} that applies to the request, or the
* default credential chain if there is none.
* </P>
*
* @return A credentials provider or null
* @return A credentials provider or null
*/
AwsCredentialsProvider credentials();

/**
* Get the maximum size for uncompressed requests. Requests larger than this size will
* be sent with Content-Encoding: gzip.
* <P>
* If this is null, then a default should be used -- either a value specified
* in a more general {@link AwsSdk2TransportOptions} that applies to the request, or the
* Integer.MAX_VALUE if there is none.
* <p>
* If this is null, then a default will be used -- either a value specified
* in a more general {@link AwsSdk2TransportOptions} that applies to the request, or a
* reasonable default if there is none.
* </P><P>
* If this is Integer.MAX_VALUE, then requests will not be compressed. If this is 0, then all non-empty
* request bodies will be compressed.
* If this is Integer.MAX_VALUE, then requests will not be compressed. If this is 0, then all non-empty
* request bodies will be compressed.
* </P>
* @return
*
* @return An integer size limit or null
*/
Integer requestCompressionSize();

/**
* Get the response compression enable/disable value. If this is true, then an
* Accept-Encoding: gzip header will be sent with the request. The server will
* decide whether or not to compress its responses.
* <p>
* If this is null, then a default will be used -- either a value specified
* in a more general {@link AwsSdk2TransportOptions} that applies to the request, or
* {@link Boolean#TRUE} if there is none.
* </P>
*
* @return response compression enable/disable flag, or null
*/
Boolean responseCompression();

/**
* Get mapper used for serializing and deserializing requests and responses.
* <P>
* If this is null, then a default should be used -- either a value specified
* in a more general {@link AwsSdk2TransportOptions} that applies to the request, or a
* new {@link org.opensearch.client.json.jackson.JacksonJsonpMapper} or equivalent if
* there is none.
* <p>
* If this is null, then a default will be used -- either a value specified
* in a more general {@link AwsSdk2TransportOptions} that applies to the request, or a
* new {@link org.opensearch.client.json.jackson.JacksonJsonpMapper} or equivalent if
* there is none.
* </P>
* @return
*
* @return A mapper or null
*/
JsonpMapper mapper();

Expand All @@ -88,18 +104,27 @@ static AwsSdk2TransportOptions.Builder builder() {

interface Builder extends TransportOptions.Builder {
Builder addHeader(String name, String value);

Builder setParameter(String name, String value);

Builder onWarnings(Function<List<String>, Boolean> listener);

Builder setCredentials(AwsCredentialsProvider credentials);

Builder setRequestCompressionSize(Integer size);

Builder setResponseCompression(Boolean enabled);

Builder setMapper(JsonpMapper mapper);

AwsSdk2TransportOptions build();
}

class BuilderImpl extends TransportOptions.BuilderImpl implements Builder {

protected AwsCredentialsProvider credentials;
protected Integer requestCompressionSize;
protected Boolean responseCompression;
protected JsonpMapper mapper;

public BuilderImpl() {
Expand All @@ -109,6 +134,7 @@ public BuilderImpl(AwsSdk2TransportOptions src) {
super(src);
credentials = src.credentials();
requestCompressionSize = src.requestCompressionSize();
responseCompression = src.responseCompression();
mapper = src.mapper();
}

Expand Down Expand Up @@ -148,6 +174,12 @@ public Builder setMapper(JsonpMapper mapper) {
return this;
}

@Override
public Builder setResponseCompression(Boolean enabled) {
this.responseCompression = enabled;
return this;
}

@Override
public AwsSdk2TransportOptions build() {
return new DefaultImpl(this);
Expand All @@ -158,6 +190,7 @@ class DefaultImpl extends TransportOptions.DefaultImpl implements AwsSdk2Transpo

private AwsCredentialsProvider credentials;
private Integer requestCompressionSize;
private Boolean responseCompression;
private JsonpMapper mapper;

DefaultImpl(AwsSdk2TransportOptions.BuilderImpl builder) {
Expand All @@ -177,6 +210,11 @@ public Integer requestCompressionSize() {
return requestCompressionSize;
}

@Override
public Boolean responseCompression() {
return responseCompression;
}

@Override
public JsonpMapper mapper() {
return mapper;
Expand Down
Loading

0 comments on commit fe825c4

Please sign in to comment.