-
Notifications
You must be signed in to change notification settings - Fork 135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
dispatch ML task to ML node first #346
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
package org.opensearch.ml.task; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
|
@@ -23,6 +24,7 @@ | |
import org.opensearch.ml.action.stats.MLStatsNodesAction; | ||
import org.opensearch.ml.action.stats.MLStatsNodesRequest; | ||
import org.opensearch.ml.stats.MLNodeLevelStat; | ||
import org.opensearch.ml.utils.MLNodeUtils; | ||
|
||
import com.google.common.collect.ImmutableSet; | ||
|
||
|
@@ -49,9 +51,7 @@ public MLTaskDispatcher(ClusterService clusterService, Client client) { | |
* @param listener Action listener | ||
*/ | ||
public void dispatchTask(ActionListener<DiscoveryNode> listener) { | ||
// todo: add ML node type setting check | ||
// DiscoveryNode[] mlNodes = getEligibleMLNodes(); | ||
DiscoveryNode[] mlNodes = getEligibleDataNodes(); | ||
DiscoveryNode[] mlNodes = getEligibleNodes(); | ||
MLStatsNodesRequest MLStatsNodesRequest = new MLStatsNodesRequest(mlNodes); | ||
MLStatsNodesRequest | ||
.addNodeLevelStats(ImmutableSet.of(MLNodeLevelStat.ML_NODE_EXECUTING_TASK_COUNT, MLNodeLevelStat.ML_NODE_JVM_HEAP_USAGE)); | ||
|
@@ -107,14 +107,32 @@ public void dispatchTask(ActionListener<DiscoveryNode> listener) { | |
})); | ||
} | ||
|
||
private DiscoveryNode[] getEligibleDataNodes() { | ||
/** | ||
* Get eligible node to run ML task. If there are nodes with ml role, will return all these | ||
* ml nodes; otherwise return all data nodes. | ||
* | ||
* @return array of discovery node | ||
*/ | ||
protected DiscoveryNode[] getEligibleNodes() { | ||
Comment on lines
+110
to
+116
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it only preferable to run ML tasks in ml node? I assume ml-common can run in data node as well. Also is there any logic in the ClusterState.nodes() to evaluate if any ml node is overloaded, etc? I just wonder, in the future, if we want to add more priority based strategy here to prioritize ML node, but still use data node if ML node is heavy loaded, etc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Check the comment "If there are nodes with ml role, will return all these ml nodes; otherwise return all data nodes."
Yes, we check JVM heap usage and how many ML task running on a node. If exceeds limit, will not dispatch new ML task to that node.
I think we'd better ask user to scale the cluster by adding more ML node or switch to more powerful node type if ML node is heavy/over loaded. But this is not the one way door, we can always tune the code if cx really needs to run model on data nodes if ML node overloaded. |
||
ClusterState state = this.clusterService.state(); | ||
final List<DiscoveryNode> eligibleMLNodes = new ArrayList<>(); | ||
final List<DiscoveryNode> eligibleDataNodes = new ArrayList<>(); | ||
for (DiscoveryNode node : state.nodes()) { | ||
if (MLNodeUtils.isMLNode(node)) { | ||
eligibleMLNodes.add(node); | ||
} | ||
if (node.isDataNode()) { | ||
eligibleDataNodes.add(node); | ||
} | ||
} | ||
return eligibleDataNodes.toArray(new DiscoveryNode[0]); | ||
if (eligibleMLNodes.size() > 0) { | ||
DiscoveryNode[] mlNodes = eligibleMLNodes.toArray(new DiscoveryNode[0]); | ||
log.debug("Find {} dedicated ML nodes: {}", eligibleMLNodes.size(), Arrays.toString(mlNodes)); | ||
return mlNodes; | ||
} else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick: This "else" should be redundant. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's code style preference. People have different preference for "No-else-after-return" or not, check https://stackoverflow.com/questions/46875442/unnecessary-else-after-return-no-else-return. For me, I feel the code is more readable to keep |
||
DiscoveryNode[] dataNodes = eligibleDataNodes.toArray(new DiscoveryNode[0]); | ||
log.debug("Find no dedicated ML nodes. But have {} data nodes: {}", eligibleDataNodes.size(), Arrays.toString(dataNodes)); | ||
return dataNodes; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curiosity question: what is the purpose of
IS_ML_NODE_SETTING
before and why we don't need it now? I saw this part of logic was moved toTestHelper
class, what's the reason for that?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That part was from prototype when we tried to support ML node. But actually it's not being used in our formal release as OpenSearch core doesn't support ML role and we have to postpone that. Now OpenSearch plan to support ML role with dynamic role feature in 2.1. We can add this back but we don't need this prototype/experiment code any more. Just move it to test part.