Skip to content

Commit

Permalink
Integrated Async Java Client (#558)
Browse files Browse the repository at this point in the history
* Integrated Async Java Client

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Correct ordering of method

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

* Updated method name

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>

---------

Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>
(cherry picked from commit 8e0b1f6)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Mar 14, 2023
1 parent d125bb8 commit a846778
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 18 deletions.
72 changes: 55 additions & 17 deletions src/main/java/org/opensearch/sdk/SDKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.opensearch.client.indices.rollover.RolloverRequest;
import org.opensearch.client.indices.rollover.RolloverResponse;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.transport.OpenSearchTransport;
import org.opensearch.client.transport.rest_client.RestClientTransport;
Expand All @@ -75,6 +76,7 @@ public class SDKClient implements Closeable {
private OpenSearchClient javaClient;
private RestClient restClient;
private RestHighLevelClient sdkRestClient;
private OpenSearchAsyncClient javaAsyncClient;

// Used by client.execute, populated by initialize method
@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -110,12 +112,35 @@ private static RestClientBuilder builder(String hostAddress, int port) {
return builder;
}

/**
* Initializes an OpenSearchTransport using RestClientTransport. This is required for JavaClient and JavaAsyncClient
*
* @param hostAddress The address of OpenSearch cluster, client can connect to
* @param port The port of OpenSearch cluster
* @return The OpenSearchTransport implementation of RestClientTransport.
*/
private OpenSearchTransport initializeTransport(String hostAddress, int port) {
RestClientBuilder builder = builder(hostAddress, port);

restClient = builder.build();
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.registerModule(new GuavaModule());
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE, JsonTypeInfo.As.PROPERTY);
mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

// Create Client
OpenSearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper(mapper));
return transport;
}

/**
* Initializes an OpenSearchClient using OpenSearch JavaClient
*
* @param settings The Extension settings
* @return The SDKClient implementation of OpenSearchClient. The user is responsible for calling
* {@link #doCloseJavaClient()} when finished with the client
* {@link #doCloseJavaClients()} when finished with the client
*/
public OpenSearchClient initializeJavaClient(ExtensionSettings settings) {
return initializeJavaClient(settings.getOpensearchAddress(), Integer.parseInt(settings.getOpensearchPort()));
Expand All @@ -127,26 +152,39 @@ public OpenSearchClient initializeJavaClient(ExtensionSettings settings) {
* @param hostAddress The address of OpenSearch cluster, client can connect to
* @param port The port of OpenSearch cluster
* @return The SDKClient implementation of OpenSearchClient. The user is responsible for calling
* {@link #doCloseJavaClient()} when finished with the client
* {@link #doCloseJavaClients()} when finished with the client
*/
public OpenSearchClient initializeJavaClient(String hostAddress, int port) {
RestClientBuilder builder = builder(hostAddress, port);

restClient = builder.build();

ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.registerModule(new GuavaModule());
mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE, JsonTypeInfo.As.PROPERTY);
mapper.configure(MapperFeature.USE_GETTERS_AS_SETTERS, false);
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

// Create Client
OpenSearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper(mapper));
OpenSearchTransport transport = initializeTransport(hostAddress, port);
javaClient = new OpenSearchClient(transport);
return javaClient;
}

/**
* Initializes an OpenAsyncSearchClient using OpenSearch JavaClient
*
* @param settings The Extension settings
* @return The SDKClient implementation of OpenSearchAsyncClient. The user is responsible for calling
* {@link #doCloseJavaClients()} when finished with the client as JavaClient and JavaAsyncClient uses the same close method
*/
public OpenSearchAsyncClient initializeJavaAsyncClient(ExtensionSettings settings) {
return initalizeJavaAsyncClient(settings.getOpensearchAddress(), Integer.parseInt(settings.getOpensearchPort()));
}

/**
* Initializes an OpenAsyncSearchClient using OpenSearch JavaClient
*
* @param hostAddress The address of OpenSearch cluster, client can connect to
* @param port The port of OpenSearch cluster
* @return The SDKClient implementation of OpenSearchAsyncClient. The user is responsible for calling
* {@link #doCloseJavaClients()} when finished with the client
*/
public OpenSearchAsyncClient initalizeJavaAsyncClient(String hostAddress, int port) {
OpenSearchTransport transport = initializeTransport(hostAddress, port);
javaAsyncClient = new OpenSearchAsyncClient(transport);
return javaAsyncClient;
}

/**
* Initializes a SDK Rest Client wrapping the {@link RestHighLevelClient}.
* <p>
Expand Down Expand Up @@ -191,7 +229,7 @@ public SDKRestClient initializeRestClient(String hostAddress, int port) {
*
* @throws IOException if closing the restClient fails
*/
public void doCloseJavaClient() throws IOException {
public void doCloseJavaClients() throws IOException {
if (restClient != null) {
restClient.close();
}
Expand All @@ -210,7 +248,7 @@ public void doCloseHighLevelClient() throws IOException {

@Override
public void close() throws IOException {
doCloseJavaClient();
doCloseJavaClients();
doCloseHighLevelClient();
}

Expand Down
14 changes: 13 additions & 1 deletion src/test/java/org/opensearch/sdk/TestSDKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@
import org.opensearch.client.indices.GetMappingsRequest;
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.client.indices.rollover.RolloverRequest;
import org.opensearch.client.opensearch.OpenSearchAsyncClient;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch.cluster.OpenSearchClusterAsyncClient;
import org.opensearch.client.opensearch.cluster.OpenSearchClusterClient;
import org.opensearch.client.opensearch.indices.OpenSearchIndicesAsyncClient;
import org.opensearch.client.opensearch.indices.OpenSearchIndicesClient;
import org.opensearch.sdk.SDKClient.SDKClusterAdminClient;
import org.opensearch.sdk.SDKClient.SDKIndicesClient;
Expand Down Expand Up @@ -60,7 +63,16 @@ public void testCreateJavaClient() throws Exception {
assertInstanceOf(OpenSearchIndicesClient.class, javaClient.indices());
assertInstanceOf(OpenSearchClusterClient.class, javaClient.cluster());

sdkClient.doCloseJavaClient();
sdkClient.doCloseJavaClients();
}

@Test
public void testCreateJavaAsyncClient() throws Exception {
OpenSearchAsyncClient javaAsyncClient = sdkClient.initializeJavaAsyncClient(settings);
assertInstanceOf(OpenSearchIndicesAsyncClient.class, javaAsyncClient.indices());
assertInstanceOf(OpenSearchClusterAsyncClient.class, javaAsyncClient.cluster());

sdkClient.doCloseJavaClients();
}

@Test
Expand Down

0 comments on commit a846778

Please sign in to comment.