Skip to content

Commit

Permalink
remove extra changes
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <zaniu@amazon.com>
  • Loading branch information
zane-neo committed Nov 25, 2023
1 parent 7d5e8e7 commit c840012
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import lombok.Setter;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
Expand Down
51 changes: 23 additions & 28 deletions plugin/src/main/java/org/opensearch/ml/cluster/MLSyncUpCron.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.util.CollectionUtils;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.ml.autoredeploy.MLModelAutoReDeployer;
import org.opensearch.ml.common.FunctionName;
import org.opensearch.ml.common.MLModel;
import org.opensearch.ml.common.model.MLModelState;
Expand Down Expand Up @@ -65,7 +63,6 @@ public class MLSyncUpCron implements Runnable {
private volatile Boolean mlConfigInited;
@VisibleForTesting
Semaphore updateModelStateSemaphore;
private MLModelAutoReDeployer mlModelAutoReDeployer;

public MLSyncUpCron(
Client client,
Expand Down Expand Up @@ -247,20 +244,20 @@ void refreshModelState(Map<String, Set<String>> modelWorkerNodes, Map<String, Se
FunctionName functionName = FunctionName.from((String) sourceAsMap.get(MLModel.ALGORITHM_FIELD));
MLModelState state = MLModelState.from((String) sourceAsMap.get(MLModel.MODEL_STATE_FIELD));
Long lastUpdateTime = sourceAsMap.containsKey(MLModel.LAST_UPDATED_TIME_FIELD)
? (Long) sourceAsMap.get(MLModel.LAST_UPDATED_TIME_FIELD)
: null;
? (Long) sourceAsMap.get(MLModel.LAST_UPDATED_TIME_FIELD)
: null;
int planningWorkerNodeCount = sourceAsMap.containsKey(MLModel.PLANNING_WORKER_NODE_COUNT_FIELD)
? (int) sourceAsMap.get(MLModel.PLANNING_WORKER_NODE_COUNT_FIELD)
: 0;
? (int) sourceAsMap.get(MLModel.PLANNING_WORKER_NODE_COUNT_FIELD)
: 0;
int currentWorkerNodeCountInIndex = sourceAsMap.containsKey(MLModel.CURRENT_WORKER_NODE_COUNT_FIELD)
? (int) sourceAsMap.get(MLModel.CURRENT_WORKER_NODE_COUNT_FIELD)
: 0;
? (int) sourceAsMap.get(MLModel.CURRENT_WORKER_NODE_COUNT_FIELD)
: 0;
boolean deployToAllNodes = sourceAsMap.containsKey(MLModel.DEPLOY_TO_ALL_NODES_FIELD)
? (boolean) sourceAsMap.get(MLModel.DEPLOY_TO_ALL_NODES_FIELD)
: false;
? (boolean) sourceAsMap.get(MLModel.DEPLOY_TO_ALL_NODES_FIELD)
: false;
List<String> planningWorkNodes = sourceAsMap.containsKey(MLModel.PLANNING_WORKER_NODES_FIELD)
? (List<String>) sourceAsMap.get(MLModel.PLANNING_WORKER_NODES_FIELD)
: new ArrayList<>();
? (List<String>) sourceAsMap.get(MLModel.PLANNING_WORKER_NODES_FIELD)
: new ArrayList<>();
if (deployToAllNodes) {
DiscoveryNode[] eligibleNodes = nodeHelper.getEligibleNodes(functionName);
planningWorkerNodeCount = eligibleNodes.length;
Expand All @@ -273,19 +270,17 @@ void refreshModelState(Map<String, Set<String>> modelWorkerNodes, Map<String, Se
newPlanningWorkerNodes.put(modelId, eligibleNodeIds);
}
}
if (modelWorkerNodes != null && modelWorkerNodes.size() != 0) {
MLModelState mlModelState = getNewModelState(
deployingModels,
modelWorkerNodes,
modelId,
state,
lastUpdateTime,
planningWorkerNodeCount,
currentWorkerNodeCountInIndex
);
if (mlModelState != null) {
newModelStates.put(modelId, mlModelState);
}
MLModelState mlModelState = getNewModelState(
deployingModels,
modelWorkerNodes,
modelId,
state,
lastUpdateTime,
planningWorkerNodeCount,
currentWorkerNodeCountInIndex
);
if (mlModelState != null) {
newModelStates.put(modelId, mlModelState);
}
}
bulkUpdateModelState(modelWorkerNodes, newModelStates, newPlanningWorkerNodes);
Expand Down Expand Up @@ -317,8 +312,8 @@ private MLModelState getNewModelState(
if (currentWorkerNodeCount == 0
&& state != MLModelState.DEPLOY_FAILED
&& !(state == MLModelState.DEPLOYING
&& lastUpdateTime != null
&& lastUpdateTime + DEPLOY_MODEL_TASK_GRACE_TIME_IN_MS > Instant.now().toEpochMilli())) {
&& lastUpdateTime != null
&& lastUpdateTime + DEPLOY_MODEL_TASK_GRACE_TIME_IN_MS > Instant.now().toEpochMilli())) {
// If model not deployed to any node and no node is deploying the model, then set model state as DEPLOY_FAILED
return MLModelState.DEPLOY_FAILED;
}
Expand Down

0 comments on commit c840012

Please sign in to comment.