diff --git a/src/main/java/org/opensearch/sdk/SDKClient.java b/src/main/java/org/opensearch/sdk/SDKClient.java index c4b6edd6..c1cbd07f 100644 --- a/src/main/java/org/opensearch/sdk/SDKClient.java +++ b/src/main/java/org/opensearch/sdk/SDKClient.java @@ -64,6 +64,7 @@ import org.opensearch.client.indices.rollover.RolloverRequest; import org.opensearch.client.indices.rollover.RolloverResponse; import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.rest_client.RestClientTransport; @@ -75,6 +76,7 @@ public class SDKClient implements Closeable { private OpenSearchClient javaClient; private RestClient restClient; private RestHighLevelClient sdkRestClient; + private OpenSearchAsyncClient javaAsyncClient; // Used by client.execute, populated by initialize method @SuppressWarnings("rawtypes") @@ -110,12 +112,35 @@ private static RestClientBuilder builder(String hostAddress, int port) { return builder; } + /** + * Initializes an OpenSearchTransport using RestClientTransport. This is required for JavaClient and JavaAsyncClient + * + * @param hostAddress The address of OpenSearch cluster, client can connect to + * @param port The port of OpenSearch cluster + * @return The OpenSearchTransport implementation of RestClientTransport. + */ + private OpenSearchTransport initializeTransport(String hostAddress, int port) { + RestClientBuilder builder = builder(hostAddress, port); + + restClient = builder.build(); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.registerModule(new GuavaModule()); + mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE, JsonTypeInfo.As.PROPERTY); + mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + // Create Client + OpenSearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper(mapper)); + return transport; + } + /** * Initializes an OpenSearchClient using OpenSearch JavaClient * * @param settings The Extension settings * @return The SDKClient implementation of OpenSearchClient. The user is responsible for calling - * {@link #doCloseJavaClient()} when finished with the client + * {@link #doCloseJavaClients()} when finished with the client */ public OpenSearchClient initializeJavaClient(ExtensionSettings settings) { return initializeJavaClient(settings.getOpensearchAddress(), Integer.parseInt(settings.getOpensearchPort())); @@ -127,26 +152,39 @@ public OpenSearchClient initializeJavaClient(ExtensionSettings settings) { * @param hostAddress The address of OpenSearch cluster, client can connect to * @param port The port of OpenSearch cluster * @return The SDKClient implementation of OpenSearchClient. The user is responsible for calling - * {@link #doCloseJavaClient()} when finished with the client + * {@link #doCloseJavaClients()} when finished with the client */ public OpenSearchClient initializeJavaClient(String hostAddress, int port) { - RestClientBuilder builder = builder(hostAddress, port); - - restClient = builder.build(); - - ObjectMapper mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - mapper.registerModule(new GuavaModule()); - mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE, JsonTypeInfo.As.PROPERTY); - mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false); - mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); - - // Create Client - OpenSearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper(mapper)); + OpenSearchTransport transport = initializeTransport(hostAddress, port); javaClient = new OpenSearchClient(transport); return javaClient; } + /** + * Initializes an OpenAsyncSearchClient using OpenSearch JavaClient + * + * @param settings The Extension settings + * @return The SDKClient implementation of OpenSearchAsyncClient. The user is responsible for calling + * {@link #doCloseJavaClients()} when finished with the client as JavaClient and JavaAsyncClient uses the same close method + */ + public OpenSearchAsyncClient initializeJavaAsyncClient(ExtensionSettings settings) { + return initalizeJavaAsyncClient(settings.getOpensearchAddress(), Integer.parseInt(settings.getOpensearchPort())); + } + + /** + * Initializes an OpenAsyncSearchClient using OpenSearch JavaClient + * + * @param hostAddress The address of OpenSearch cluster, client can connect to + * @param port The port of OpenSearch cluster + * @return The SDKClient implementation of OpenSearchAsyncClient. The user is responsible for calling + * {@link #doCloseJavaClients()} when finished with the client + */ + public OpenSearchAsyncClient initalizeJavaAsyncClient(String hostAddress, int port) { + OpenSearchTransport transport = initializeTransport(hostAddress, port); + javaAsyncClient = new OpenSearchAsyncClient(transport); + return javaAsyncClient; + } + /** * Initializes a SDK Rest Client wrapping the {@link RestHighLevelClient}. *
@@ -191,7 +229,7 @@ public SDKRestClient initializeRestClient(String hostAddress, int port) { * * @throws IOException if closing the restClient fails */ - public void doCloseJavaClient() throws IOException { + public void doCloseJavaClients() throws IOException { if (restClient != null) { restClient.close(); } @@ -210,7 +248,7 @@ public void doCloseHighLevelClient() throws IOException { @Override public void close() throws IOException { - doCloseJavaClient(); + doCloseJavaClients(); doCloseHighLevelClient(); } diff --git a/src/test/java/org/opensearch/sdk/TestSDKClient.java b/src/test/java/org/opensearch/sdk/TestSDKClient.java index de6cf4cf..ec834b23 100644 --- a/src/test/java/org/opensearch/sdk/TestSDKClient.java +++ b/src/test/java/org/opensearch/sdk/TestSDKClient.java @@ -26,8 +26,11 @@ import org.opensearch.client.indices.GetMappingsRequest; import org.opensearch.client.indices.PutMappingRequest; import org.opensearch.client.indices.rollover.RolloverRequest; +import org.opensearch.client.opensearch.OpenSearchAsyncClient; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.cluster.OpenSearchClusterAsyncClient; import org.opensearch.client.opensearch.cluster.OpenSearchClusterClient; +import org.opensearch.client.opensearch.indices.OpenSearchIndicesAsyncClient; import org.opensearch.client.opensearch.indices.OpenSearchIndicesClient; import org.opensearch.sdk.SDKClient.SDKClusterAdminClient; import org.opensearch.sdk.SDKClient.SDKIndicesClient; @@ -60,7 +63,16 @@ public void testCreateJavaClient() throws Exception { assertInstanceOf(OpenSearchIndicesClient.class, javaClient.indices()); assertInstanceOf(OpenSearchClusterClient.class, javaClient.cluster()); - sdkClient.doCloseJavaClient(); + sdkClient.doCloseJavaClients(); + } + + @Test + public void testCreateJavaAsyncClient() throws Exception { + OpenSearchAsyncClient javaAsyncClient = sdkClient.initializeJavaAsyncClient(settings); + assertInstanceOf(OpenSearchIndicesAsyncClient.class, javaAsyncClient.indices()); + assertInstanceOf(OpenSearchClusterAsyncClient.class, javaAsyncClient.cluster()); + + sdkClient.doCloseJavaClients(); } @Test