Skip to content

Commit

Permalink
Use remote cluster to persist model group
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunkumargiri committed Mar 25, 2024
1 parent 6684d33 commit c80179c
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 59 deletions.
122 changes: 65 additions & 57 deletions plugin/src/main/java/org/opensearch/ml/indices/MLIndicesHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -69,75 +71,81 @@ public void initMLConfigIndex(ActionListener<Boolean> listener) {
public void initMLIndexIfAbsent(MLIndex index, ActionListener<Boolean> listener) {
String indexName = index.getIndexName();
String mapping = index.getMapping();
Client osClient = client.getRemoteClusterClient("remote");

try (ThreadContext.StoredContext threadContext = client.threadPool().getThreadContext().stashContext()) {
ActionListener<Boolean> internalListener = ActionListener.runBefore(listener, () -> threadContext.restore());
if (!clusterService.state().metadata().hasIndex(indexName)) {
ActionListener<CreateIndexResponse> actionListener = ActionListener.wrap(r -> {
if (r.isAcknowledged()) {
log.info("create index:{}", indexName);
internalListener.onResponse(true);
} else {
internalListener.onResponse(false);
}
}, e -> {
log.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings);
client.admin().indices().create(request, actionListener);
} else {
log.debug("index:{} is already created", indexName);
if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) {
shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> {
if (r) {
// return true if should update index
client
.admin()
.indices()
.putMapping(
new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON),
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest();
updateSettingRequest.indices(indexName).settings(indexSettings);
client
.admin()
.indices()
.updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener
.onFailure(new MLException("Failed to update index setting for: " + indexName));
}
}, exception -> {
log.error("Failed to update index setting for: " + indexName, exception);
internalListener.onFailure(exception);
}));
} else {
internalListener.onFailure(new MLException("Failed to update index: " + indexName));
}
}, exception -> {
log.error("Failed to update index " + indexName, exception);
internalListener.onFailure(exception);
})
);
} else {
// no need to update index if it does not exist or the version is already up-to-date.
indexMappingUpdated.get(indexName).set(true);
GetIndexRequest getIndexRequest = new GetIndexRequest();
getIndexRequest.indices(indexName);
// TODO: Update cluster state with remote cluster state to avoid fetching index details with every call.
osClient.admin().indices().getIndex(getIndexRequest, ActionListener.wrap(getIndexResponse -> {
if (getIndexResponse == null && getIndexResponse.getIndices().length == 0) {
ActionListener<CreateIndexResponse> actionListener = ActionListener.wrap(r -> {
if (r.isAcknowledged()) {
log.info("create index:{}", indexName);
internalListener.onResponse(true);
} else {
internalListener.onResponse(false);
}
}, e -> {
log.error("Failed to update index mapping", e);
log.error("Failed to create index " + indexName, e);
internalListener.onFailure(e);
}));
});
CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(mapping).settings(indexSettings);
osClient.admin().indices().create(request, actionListener);
} else {
log.debug("index:{} is already created", indexName);
if (indexMappingUpdated.containsKey(indexName) && !indexMappingUpdated.get(indexName).get()) {
shouldUpdateIndex(indexName, index.getVersion(), ActionListener.wrap(r -> {
if (r) {
// return true if should update index
osClient
.admin()
.indices()
.putMapping(
new PutMappingRequest().indices(indexName).source(mapping, XContentType.JSON),
ActionListener.wrap(response -> {
if (response.isAcknowledged()) {
UpdateSettingsRequest updateSettingRequest = new UpdateSettingsRequest();
updateSettingRequest.indices(indexName).settings(indexSettings);
osClient
.admin()
.indices()
.updateSettings(updateSettingRequest, ActionListener.wrap(updateResponse -> {
if (response.isAcknowledged()) {
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
} else {
internalListener
.onFailure(new MLException("Failed to update index setting for: " + indexName));
}
}, exception -> {
log.error("Failed to update index setting for: " + indexName, exception);
internalListener.onFailure(exception);
}));
} else {
internalListener.onFailure(new MLException("Failed to update index: " + indexName));
}
}, exception -> {
log.error("Failed to update index " + indexName, exception);
internalListener.onFailure(exception);
})
);
} else {
// no need to update index if it does not exist or the version is already up-to-date.
indexMappingUpdated.get(indexName).set(true);
internalListener.onResponse(true);
}
}, e -> {
log.error("Failed to update index mapping", e);
internalListener.onFailure(e);
}));
} else {
// No need to update index if it's not ML system index or it's already updated.
internalListener.onResponse(true);
}
}
}, ex-> {}));
} catch (Exception e) {
log.error("Failed to init index " + indexName, e);
listener.onFailure(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public void createModelGroup(MLRegisterModelGroupInput input, ActionListener<Str
try {
String modelName = input.getName();
User user = RestActionUtils.getUserContext(client);

try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
ActionListener<String> wrappedListener = ActionListener.runBefore(listener, () -> context.restore());
validateUniqueModelGroupName(input.getName(), ActionListener.wrap(modelGroups -> {
Expand Down Expand Up @@ -116,7 +117,8 @@ public void createModelGroup(MLRegisterModelGroupInput input, ActionListener<Str
);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

client.index(indexRequest, ActionListener.wrap(r -> {
Client osClient = client.getRemoteClusterClient("remote");
osClient.index(indexRequest, ActionListener.wrap(r -> {
log.debug("Indexed model group doc successfully {}", modelName);
wrappedListener.onResponse(r.getId());
}, e -> {
Expand Down Expand Up @@ -189,7 +191,8 @@ public void validateUniqueModelGroupName(String name, ActionListener<SearchRespo
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(query);
SearchRequest searchRequest = new SearchRequest(ML_MODEL_GROUP_INDEX).source(searchSourceBuilder);

client
Client osClient = client.getRemoteClusterClient("remote");
osClient
.search(
searchRequest,
ActionListener.runBefore(ActionListener.wrap(modelGroups -> { listener.onResponse(modelGroups); }, e -> {
Expand Down

0 comments on commit c80179c

Please sign in to comment.