Skip to content

Commit

Permalink
Refactor two-client usage. (opensearch-project#333)
Browse files Browse the repository at this point in the history
* Refactor two-client usage.

Signed-off-by: dblock <dblock@amazon.com>

* Avoid double-closing the client on cleanup.

Signed-off-by: dblock <dblock@amazon.com>

* Refactor constructors to take sync/async explicitly.

Signed-off-by: dblock <dblock@amazon.com>

* Corrected comments and cleaned up variable naming.

Signed-off-by: dblock <dblock@amazon.com>

Signed-off-by: dblock <dblock@amazon.com>
  • Loading branch information
dblock committed Jan 23, 2023
1 parent b522980 commit a0ca15c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.utils.SdkAutoCloseable;

import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -70,154 +71,103 @@ public class AwsSdk2Transport implements OpenSearchTransport {
public static final Integer DEFAULT_REQUEST_COMPRESSION_SIZE = 8192;

private static final byte[] NO_BYTES = new byte[0];
private final SdkHttpClient httpClient;
private final SdkAsyncHttpClient asyncHttpClient;
private final SdkAutoCloseable httpClient;
private final String host;
private final String signingServiceName;
private final Region signingRegion;
private final JsonpMapper defaultMapper;
private final AwsSdk2TransportOptions transportOptions;

/**
* Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client.
* Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client.
* <p>
* Note that asynchronous OpenSearch requests sent through this transport will be dispatched
* *synchronously* on the calling thread.
*
* @param httpClient HTTP client to use for OpenSearch requests.
* @param asyncHttpClient Asynchronous HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkHttpClient httpClient,
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
@Nonnull String host,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(httpClient, null, host, "es", signingRegion, options);
this(asyncHttpClient, host, "es", signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client.
* <p>
* Note that asynchronous OpenSearch requests sent through this transport will be dispatched
* *synchronously* on the calling thread.
*
* @param httpClient HTTP client to use for OpenSearch requests.
* @param syncHttpClient Synchronous HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkHttpClient httpClient,
@CheckForNull SdkHttpClient syncHttpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(httpClient, null, host, signingServiceName, signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client
* <p>
* Note that synchronous OpenSearch requests sent through this transport will be dispatched
* using the asynchronous client, but the calling thread will block until they are complete.
*
* @param asyncHttpClient HTTP client to use for OpenSearch requests.
* @param host The target host.
* @param signingRegion The AWS region for which requests will be signed. This should typically match region in `host`.
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkAsyncHttpClient asyncHttpClient,
@Nonnull String host,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(null, asyncHttpClient, host, "es", signingRegion, options);
this(syncHttpClient, host, "es", signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client.
* <p>
* Note that synchronous OpenSearch requests sent through this transport will be dispatched
* using the asynchronous client, but the calling thread will block until they are complete.
* Note that asynchronous OpenSearch requests sent through this transport will be dispatched
* *synchronously* on the calling thread.
*
* @param asyncHttpClient HTTP client to use for OpenSearch requests.
* @param host The target host.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param asyncHttpClient Asynchronous HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkAsyncHttpClient asyncHttpClient,
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(null, asyncHttpClient, host, signingServiceName, signingRegion, options);
this((SdkAutoCloseable) asyncHttpClient, host, signingServiceName, signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with both synchronous and asynchronous AWS HTTP clients.
* <p>
* The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client
* will be used for asynchronous HTTP requests.
* Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client.
*
* @param httpClient HTTP client to use for OpenSearch requests.
* @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests.
* @param syncHttpClient Synchronous HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@CheckForNull SdkHttpClient httpClient,
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
@CheckForNull SdkHttpClient syncHttpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(httpClient, asyncHttpClient, host, "es", signingRegion, options);
this((SdkAutoCloseable) syncHttpClient, host, signingServiceName, signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with both synchronous and asynchronous AWS HTTP clients.
* <p>
* The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client
* will be used for asynchronous HTTP requests.
*
* @param httpClient HTTP client to use for OpenSearch requests.
* @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@CheckForNull SdkHttpClient httpClient,
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
private AwsSdk2Transport(
@CheckForNull SdkAutoCloseable httpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
if (httpClient == null && asyncHttpClient == null)
{
throw new IllegalArgumentException("At least one SdkHttpClient or SdkAsyncHttpClient must be provided");
}
Objects.requireNonNull(host, "Target OpenSearch service host must not be null");
this.httpClient = httpClient;
this.asyncHttpClient = asyncHttpClient;
this.host = host;
this.signingServiceName = signingServiceName;
this.signingRegion = signingRegion;
Expand All @@ -237,11 +187,11 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options);
SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody);

if (httpClient != null) {
return executeSync(clientReq, endpoint, options);
} else {
if (httpClient instanceof SdkHttpClient) {
return executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options);
} else if (httpClient instanceof SdkAsyncHttpClient) {
try {
return executeAsync(clientReq, requestBody, endpoint, options).get();
return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options).get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
Expand All @@ -257,6 +207,8 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
} catch (InterruptedException e) {
throw new IOException("HttpRequest was interrupted", e);
}
} else {
throw new IOException("invalid httpClient: " + httpClient);
}
}

Expand All @@ -269,11 +221,13 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest
try {
OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options);
SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody);
if (asyncHttpClient != null) {
return executeAsync(clientReq, requestBody, endpoint, options);
} else {
ResponseT result = executeSync(clientReq, endpoint, options);
if (httpClient instanceof SdkAsyncHttpClient) {
return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options);
} else if (httpClient instanceof SdkHttpClient) {
ResponseT result = executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options);
return CompletableFuture.completedFuture(result);
} else {
throw new IOException("invalid httpClient: " + httpClient);
}
} catch (Throwable e) {
CompletableFuture<ResponseT> cf = new CompletableFuture<>();
Expand Down Expand Up @@ -418,6 +372,7 @@ private void applyOptionsHeaders(SdkHttpFullRequest.Builder builder, TransportOp
}

private <ResponseT> ResponseT executeSync(
SdkHttpClient syncHttpClient,
SdkHttpFullRequest httpRequest,
Endpoint<?, ResponseT, ?> endpoint,
TransportOptions options
Expand All @@ -427,7 +382,7 @@ private <ResponseT> ResponseT executeSync(
if (httpRequest.contentStreamProvider().isPresent()) {
executeRequest.contentStreamProvider(httpRequest.contentStreamProvider().get());
}
HttpExecuteResponse executeResponse = httpClient.prepareRequest(executeRequest.build()).call();
HttpExecuteResponse executeResponse = syncHttpClient.prepareRequest(executeRequest.build()).call();
AbortableInputStream bodyStream = null;
try {
bodyStream = executeResponse.responseBody().orElse(null);
Expand All @@ -441,6 +396,7 @@ private <ResponseT> ResponseT executeSync(
}

private <ResponseT> CompletableFuture<ResponseT> executeAsync(
SdkAsyncHttpClient asyncHttpClient,
SdkHttpFullRequest httpRequest,
@CheckForNull OpenSearchRequestBodyBuffer requestBody,
Endpoint<?, ResponseT, ?> endpoint,
Expand Down Expand Up @@ -543,7 +499,6 @@ private <ResponseT, ErrorT> ResponseT parseResponse(
ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode));
return response;
} else if (endpoint instanceof JsonEndpoint) {
@SuppressWarnings("unchecked")
JsonEndpoint<?, ResponseT, ?> jsonEndpoint = (JsonEndpoint<?, ResponseT, ?>) endpoint;
// Successful response
ResponseT response = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.client.opensearch.integTest.aws;


import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -42,7 +41,6 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;


public abstract class AwsSdk2TransportTestCase {
public static final String TEST_INDEX = "opensearch-java-integtest";

Expand Down Expand Up @@ -85,16 +83,14 @@ protected OpenSearchClient getClient(
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
} else {
transport = new AwsSdk2Transport(
getHttpClient(),
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
}
return new OpenSearchClient(transport);
}
Expand All @@ -111,16 +107,14 @@ protected OpenSearchAsyncClient getAsyncClient(
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
} else {
transport = new AwsSdk2Transport(
getHttpClient(),
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
}
return new OpenSearchAsyncClient(transport);
}
Expand All @@ -137,16 +131,14 @@ protected OpenSearchIndicesClient getIndexesClient(
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
} else {
transport = new AwsSdk2Transport(
getHttpClient(),
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
}
return new OpenSearchIndicesClient(transport);
}
Expand All @@ -171,13 +163,15 @@ public static void cleanupClients() throws IOException {
if (httpClient != null) {
try {
httpClient.close();
httpClient = null;
} catch (Throwable e) {
// Not our problem
}
}
if (asyncHttpClient != null) {
try {
asyncHttpClient.close();
asyncHttpClient = null;
} catch (Throwable e) {
// Not our problem
}
Expand All @@ -204,10 +198,8 @@ public void resetTestIndex(boolean async) throws Exception {
IndexState indexInfo = client.get(b -> b.index(TEST_INDEX)).get(TEST_INDEX);
if (indexInfo != null) {
indexExists = true;

}
} catch (
OpenSearchException e) {
} catch (OpenSearchException e) {
if (e.status() != 404) {
throw e;
}
Expand Down Expand Up @@ -237,17 +229,14 @@ protected SearchResponse<SimplePojo> query(OpenSearchClient client, String title
.ignoreThrottled(false)
.sort(
new SortOptions.Builder().score(o -> o.order(SortOrder.Desc)).build(),
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build()
)
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build())
.query(query);


return client.search(req.build(), SimplePojo.class);
}

protected CompletableFuture<SearchResponse<SimplePojo>> query(
OpenSearchAsyncClient client, String title, String text
) {
OpenSearchAsyncClient client, String title, String text) {
var query = Query.of(qb -> {
if (title != null) {
qb.match(mb -> mb.field("title").query(vb -> vb.stringValue(title)));
Expand All @@ -264,8 +253,7 @@ protected CompletableFuture<SearchResponse<SimplePojo>> query(
.ignoreThrottled(false)
.sort(
new SortOptions.Builder().score(o -> o.order(SortOrder.Desc)).build(),
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build()
)
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build())
.query(query);

try {
Expand Down

0 comments on commit a0ca15c

Please sign in to comment.