Skip to content

Commit

Permalink
Fix cluster level restart model not auto redeploy issuee
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <zaniu@amazon.com>
  • Loading branch information
zane-neo committed Nov 14, 2023
1 parent bc39114 commit 9a50c61
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import lombok.Setter;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
Expand Down Expand Up @@ -68,6 +69,9 @@ public class MLModelAutoReDeployer {

private final SearchRequestBuilderFactory searchRequestBuilderFactory;

@Setter
private ActionListener<Boolean> startCronJobListener;

public MLModelAutoReDeployer(
ClusterService clusterService,
Client client,
Expand Down Expand Up @@ -126,6 +130,7 @@ Consumer<Boolean> undeployModelsOnDataNodesConsumer() {
public void buildAutoReloadArrangement(List<String> addedNodes, String clusterManagerNodeId) {
if (!enableAutoReDeployModel) {
log.info("Model auto reload configuration is false, not performing auto reloading!");
startCronjobAndClearListener();
return;
}
String localNodeId = clusterService.localNode().getId();
Expand All @@ -142,10 +147,12 @@ public void buildAutoReloadArrangement(List<String> addedNodes, String clusterMa
public void redeployAModel() {
if (!enableAutoReDeployModel) {
log.info("Model auto reload configuration is false, not performing auto reloading!");
startCronjobAndClearListener();
return;
}
if (modelAutoRedeployArrangements.size() == 0) {
log.info("No models needs to be auto redeployed!");
startCronjobAndClearListener();
return;
}
ModelAutoRedeployArrangement modelAutoRedeployArrangement = modelAutoRedeployArrangements.poll();
Expand Down Expand Up @@ -176,9 +183,10 @@ private void triggerAutoDeployModels(List<String> addedNodes) {
});
redeployAModel();
}
},
e -> { log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e); }
);
}, e -> {
log.error("Failed to query need auto redeploy models, no action will be performed, addedNodes are: {}", addedNodes, e);
startCronjobAndClearListener();
});

queryRunningModels(listener);
}
Expand Down Expand Up @@ -296,6 +304,14 @@ private void triggerModelRedeploy(ModelAutoRedeployArrangement modelAutoRedeploy
client.execute(MLDeployModelAction.INSTANCE, deployModelRequest, listener);
}

private void startCronjobAndClearListener() {
boolean managerNode = clusterService.localNode().isClusterManagerNode();
if (managerNode && startCronJobListener != null) {
startCronJobListener.onResponse(true);
startCronJobListener = null;
}
}

@Data
@Builder
static class ModelAutoRedeployArrangement {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.common.lifecycle.LifecycleListener;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer;
import org.opensearch.ml.engine.encryptor.Encryptor;
import org.opensearch.ml.indices.MLIndicesHandler;
Expand All @@ -22,7 +23,6 @@

import lombok.extern.log4j.Log4j2;

import java.util.Arrays;
import java.util.List;

@Log4j2
Expand Down Expand Up @@ -70,11 +70,20 @@ public MLCommonsClusterManagerEventListener(

@Override
public void onClusterManager() {
ActionListener<Boolean> listener = ActionListener.wrap(r -> {
if (syncModelRoutingCron == null) {
startSyncModelRoutingCron();
}
}, e -> {
if (syncModelRoutingCron == null) {
startSyncModelRoutingCron();
}
});
mlModelAutoReDeployer.setStartCronJobListener(listener);
String localNodeId = clusterService.localNode().getId();
mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId);
if (syncModelRoutingCron == null) {
startSyncModelRoutingCron();
}
threadPool.schedule(() -> mlModelAutoReDeployer.buildAutoReloadArrangement(List.of(localNodeId), localNodeId),
TimeValue.timeValueSeconds(jobInterval),
GENERAL_THREAD_POOL);
}

private void startSyncModelRoutingCron() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ public void run() {
Set<String> workerNodes = deployingModels.computeIfAbsent(modelId, it -> new HashSet<>());
workerNodes.add(nodeId);
}
} else {

}

String[] runningDeployModelTaskIds = response.getRunningDeployModelTaskIds();
Expand Down

0 comments on commit 9a50c61

Please sign in to comment.