Skip to content

Commit 3082f87

Browse files
committed
Add data tiers (hot, warm, cold, frozen) as custom node roles (elastic#60994)
This commit adds the `data_hot`, `data_warm`, `data_cold`, and `data_frozen` node roles to the x-pack plugin. These roles are intended to be the base for the formalization of data tiers in Elasticsearch. These roles all act as data nodes (meaning shards can be allocated to them). Nodes with the existing `data` role acts as though they have all of the roles configured (it is a hot, warm, cold, and frozen node). This also includes a custom `AllocationDecider` that allows the user to configure the following settings on a cluster level: - `cluster.routing.allocation.require._tier` - `cluster.routing.allocation.include._tier` - `cluster.routing.allocation.exclude._tier` And in index settings: - `index.routing.allocation.require._tier` - `index.routing.allocation.include._tier` - `index.routing.allocation.exclude._tier` Relates to elastic#60848
1 parent 8c488de commit 3082f87

File tree

10 files changed

+893
-9
lines changed

10 files changed

+893
-9
lines changed

server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.io.stream.StreamInput;
2525
import org.elasticsearch.common.io.stream.StreamOutput;
2626
import org.elasticsearch.common.io.stream.Writeable;
27+
import org.elasticsearch.common.settings.Setting;
2728
import org.elasticsearch.common.settings.Settings;
2829
import org.elasticsearch.common.transport.TransportAddress;
2930
import org.elasticsearch.common.xcontent.ToXContentFragment;
@@ -83,8 +84,13 @@ public static boolean isMasterNode(Settings settings) {
8384
return hasRole(settings, DiscoveryNodeRole.MASTER_ROLE);
8485
}
8586

87+
/**
88+
* Due to the way that plugins may not be available when settings are being initialized,
89+
* not all roles may be available from a static/initializing context such as a {@link Setting}
90+
* default value function. In that case, be warned that this may not include all plugin roles.
91+
*/
8692
public static boolean isDataNode(final Settings settings) {
87-
return hasRole(settings, DiscoveryNodeRole.DATA_ROLE);
93+
return getRolesFromSettings(settings).stream().anyMatch(DiscoveryNodeRole::canContainData);
8894
}
8995

9096
public static boolean isIngestNode(Settings settings) {
@@ -383,7 +389,7 @@ public Map<String, String> getAttributes() {
383389
* Should this node hold data (shards) or not.
384390
*/
385391
public boolean isDataNode() {
386-
return roles.contains(DiscoveryNodeRole.DATA_ROLE);
392+
return roles.stream().anyMatch(DiscoveryNodeRole::canContainData);
387393
}
388394

389395
/**

server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodeRole.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,13 @@ private DiscoveryNodeRole(final boolean isKnownRole, final String roleName, fina
8484

8585
public abstract Setting<Boolean> legacySetting();
8686

87+
/**
88+
* Indicates whether a node with the given role can contain data. Defaults to false and can be overridden
89+
*/
90+
public boolean canContainData() {
91+
return false;
92+
}
93+
8794
@Override
8895
public final boolean equals(Object o) {
8996
if (this == o) return true;
@@ -124,6 +131,10 @@ public Setting<Boolean> legacySetting() {
124131
return Setting.boolSetting("node.data", true, Property.Deprecated, Property.NodeScope);
125132
}
126133

134+
@Override
135+
public boolean canContainData() {
136+
return true;
137+
}
127138
};
128139

129140
/**

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@
190190
import java.util.Arrays;
191191
import java.util.Collection;
192192
import java.util.Collections;
193+
import java.util.LinkedHashSet;
193194
import java.util.List;
194195
import java.util.Map;
195196
import java.util.Optional;
@@ -345,8 +346,11 @@ protected Node(final Environment initialEnvironment,
345346
this.environment = new Environment(settings, initialEnvironment.configFile(), Node.NODE_LOCAL_STORAGE_SETTING.get(settings));
346347
Environment.assertEquivalent(initialEnvironment, this.environment);
347348
nodeEnvironment = new NodeEnvironment(tmpSettings, environment);
348-
logger.info("node name [{}], node ID [{}], cluster name [{}]",
349-
NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value());
349+
logger.info("node name [{}], node ID [{}], cluster name [{}], roles {}",
350+
NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId(), ClusterName.CLUSTER_NAME_SETTING.get(tmpSettings).value(),
351+
DiscoveryNode.getRolesFromSettings(settings).stream()
352+
.map(DiscoveryNodeRole::roleName)
353+
.collect(Collectors.toCollection(LinkedHashSet::new)));
350354
resourcesToClose.add(nodeEnvironment);
351355
localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());
352356

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -766,11 +766,11 @@ private static String getRoleSuffix(Settings settings) {
766766
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE)) {
767767
suffix = suffix + DiscoveryNodeRole.MASTER_ROLE.roleNameAbbreviation();
768768
}
769-
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE)) {
769+
if (DiscoveryNode.isDataNode(settings)) {
770770
suffix = suffix + DiscoveryNodeRole.DATA_ROLE.roleNameAbbreviation();
771771
}
772772
if (DiscoveryNode.hasRole(settings, DiscoveryNodeRole.MASTER_ROLE) == false
773-
&& DiscoveryNode.hasRole(settings, DiscoveryNodeRole.DATA_ROLE) == false) {
773+
&& DiscoveryNode.isDataNode(settings) == false) {
774774
suffix = suffix + "c";
775775
}
776776
}
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.cluster.routing.allocation;
8+
9+
import org.elasticsearch.cluster.metadata.IndexMetadata;
10+
import org.elasticsearch.cluster.node.DiscoveryNode;
11+
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
12+
import org.elasticsearch.cluster.routing.RoutingNode;
13+
import org.elasticsearch.cluster.routing.ShardRouting;
14+
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
15+
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
16+
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
17+
import org.elasticsearch.common.Strings;
18+
import org.elasticsearch.common.settings.ClusterSettings;
19+
import org.elasticsearch.common.settings.Setting;
20+
import org.elasticsearch.common.settings.Settings;
21+
import org.elasticsearch.xpack.core.DataTier;
22+
23+
import java.util.Arrays;
24+
import java.util.Set;
25+
import java.util.stream.Collectors;
26+
27+
/**
28+
* The {@code DataTierAllocationDecider} is a custom allocation decider that behaves similar to the
29+
* {@link org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider}, however it
30+
* is specific to the {@code _tier} setting for both the cluster and index level.
31+
*/
32+
public class DataTierAllocationDecider extends AllocationDecider {
33+
34+
public static final String NAME = "data_tier";
35+
36+
public static final String CLUSTER_ROUTING_REQUIRE = "cluster.routing.allocation.require._tier";
37+
public static final String CLUSTER_ROUTING_INCLUDE = "cluster.routing.allocation.include._tier";
38+
public static final String CLUSTER_ROUTING_EXCLUDE = "cluster.routing.allocation.exclude._tier";
39+
public static final String INDEX_ROUTING_REQUIRE = "index.routing.allocation.require._tier";
40+
public static final String INDEX_ROUTING_INCLUDE = "index.routing.allocation.include._tier";
41+
public static final String INDEX_ROUTING_EXCLUDE = "index.routing.allocation.exclude._tier";
42+
43+
public static final Setting<String> CLUSTER_ROUTING_REQUIRE_SETTING = Setting.simpleString(CLUSTER_ROUTING_REQUIRE,
44+
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
45+
public static final Setting<String> CLUSTER_ROUTING_INCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_INCLUDE,
46+
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
47+
public static final Setting<String> CLUSTER_ROUTING_EXCLUDE_SETTING = Setting.simpleString(CLUSTER_ROUTING_EXCLUDE,
48+
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.NodeScope);
49+
public static final Setting<String> INDEX_ROUTING_REQUIRE_SETTING = Setting.simpleString(INDEX_ROUTING_REQUIRE,
50+
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);
51+
public static final Setting<String> INDEX_ROUTING_INCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_INCLUDE,
52+
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);
53+
public static final Setting<String> INDEX_ROUTING_EXCLUDE_SETTING = Setting.simpleString(INDEX_ROUTING_EXCLUDE,
54+
DataTierAllocationDecider::validateTierSetting, Setting.Property.Dynamic, Setting.Property.IndexScope);
55+
56+
private static void validateTierSetting(String setting) {
57+
if (Strings.hasText(setting)) {
58+
Set<String> invalidTiers = Arrays.stream(setting.split(","))
59+
.filter(tier -> DataTier.validTierName(tier) == false)
60+
.collect(Collectors.toSet());
61+
if (invalidTiers.size() > 0) {
62+
throw new IllegalArgumentException("invalid tier names: " + invalidTiers);
63+
}
64+
}
65+
}
66+
67+
private volatile String clusterRequire = null;
68+
private volatile String clusterInclude = null;
69+
private volatile String clusterExclude = null;
70+
71+
public DataTierAllocationDecider(ClusterSettings clusterSettings) {
72+
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_REQUIRE_SETTING, s -> this.clusterRequire = s);
73+
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_INCLUDE_SETTING, s -> this.clusterInclude = s);
74+
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_EXCLUDE_SETTING, s -> this.clusterExclude = s);
75+
}
76+
77+
@Override
78+
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
79+
return shouldFilter(shardRouting, node.node(), allocation);
80+
}
81+
82+
@Override
83+
public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) {
84+
return shouldFilter(indexMetadata, node.node(), allocation);
85+
}
86+
87+
@Override
88+
public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
89+
return shouldFilter(shardRouting, node.node(), allocation);
90+
}
91+
92+
@Override
93+
public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) {
94+
Decision decision = shouldClusterFilter(node, allocation);
95+
if (decision != null) {
96+
return decision;
97+
}
98+
99+
decision = shouldIndexFilter(indexMetadata, node, allocation);
100+
if (decision != null) {
101+
return decision;
102+
}
103+
104+
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
105+
}
106+
107+
private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
108+
Decision decision = shouldClusterFilter(node, allocation);
109+
if (decision != null) {
110+
return decision;
111+
}
112+
113+
decision = shouldIndexFilter(allocation.metadata().getIndexSafe(shardRouting.index()), node, allocation);
114+
if (decision != null) {
115+
return decision;
116+
}
117+
118+
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
119+
}
120+
121+
private Decision shouldFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) {
122+
Decision decision = shouldClusterFilter(node, allocation);
123+
if (decision != null) {
124+
return decision;
125+
}
126+
127+
decision = shouldIndexFilter(indexMd, node, allocation);
128+
if (decision != null) {
129+
return decision;
130+
}
131+
132+
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require tier filters");
133+
}
134+
135+
private Decision shouldIndexFilter(IndexMetadata indexMd, DiscoveryNode node, RoutingAllocation allocation) {
136+
Settings indexSettings = indexMd.getSettings();
137+
String indexRequire = INDEX_ROUTING_REQUIRE_SETTING.get(indexSettings);
138+
String indexInclude = INDEX_ROUTING_INCLUDE_SETTING.get(indexSettings);
139+
String indexExclude = INDEX_ROUTING_EXCLUDE_SETTING.get(indexSettings);
140+
141+
if (Strings.hasText(indexRequire)) {
142+
if (allocationAllowed(OpType.AND, indexRequire, node) == false) {
143+
return allocation.decision(Decision.NO, NAME, "node does not match all index setting [%s] tier filters [%s]",
144+
INDEX_ROUTING_REQUIRE, indexRequire);
145+
}
146+
}
147+
if (Strings.hasText(indexInclude)) {
148+
if (allocationAllowed(OpType.OR, indexInclude, node) == false) {
149+
return allocation.decision(Decision.NO, NAME, "node does not match any index setting [%s] tier filters [%s]",
150+
INDEX_ROUTING_INCLUDE, indexInclude);
151+
}
152+
}
153+
if (Strings.hasText(indexExclude)) {
154+
if (allocationAllowed(OpType.OR, indexExclude, node)) {
155+
return allocation.decision(Decision.NO, NAME, "node matches any index setting [%s] tier filters [%s]",
156+
INDEX_ROUTING_EXCLUDE, indexExclude);
157+
}
158+
}
159+
return null;
160+
}
161+
162+
private Decision shouldClusterFilter(DiscoveryNode node, RoutingAllocation allocation) {
163+
if (Strings.hasText(clusterRequire)) {
164+
if (allocationAllowed(OpType.AND, clusterRequire, node) == false) {
165+
return allocation.decision(Decision.NO, NAME, "node does not match all cluster setting [%s] tier filters [%s]",
166+
CLUSTER_ROUTING_REQUIRE, clusterRequire);
167+
}
168+
}
169+
if (Strings.hasText(clusterInclude)) {
170+
if (allocationAllowed(OpType.OR, clusterInclude, node) == false) {
171+
return allocation.decision(Decision.NO, NAME, "node does not match any cluster setting [%s] tier filters [%s]",
172+
CLUSTER_ROUTING_INCLUDE, clusterInclude);
173+
}
174+
}
175+
if (Strings.hasText(clusterExclude)) {
176+
if (allocationAllowed(OpType.OR, clusterExclude, node)) {
177+
return allocation.decision(Decision.NO, NAME, "node matches any cluster setting [%s] tier filters [%s]",
178+
CLUSTER_ROUTING_EXCLUDE, clusterExclude);
179+
}
180+
}
181+
return null;
182+
}
183+
184+
private enum OpType {
185+
AND,
186+
OR
187+
}
188+
189+
private static boolean allocationAllowed(OpType opType, String tierSetting, DiscoveryNode node) {
190+
String[] values = Strings.tokenizeToStringArray(tierSetting, ",");
191+
for (String value : values) {
192+
// generic "data" roles are considered to have all tiers
193+
if (node.getRoles().contains(DiscoveryNodeRole.DATA_ROLE) ||
194+
node.getRoles().stream().map(DiscoveryNodeRole::roleName).collect(Collectors.toSet()).contains(value)) {
195+
if (opType == OpType.OR) {
196+
return true;
197+
}
198+
} else {
199+
if (opType == OpType.AND) {
200+
return false;
201+
}
202+
}
203+
}
204+
if (opType == OpType.OR) {
205+
return false;
206+
} else {
207+
return true;
208+
}
209+
}
210+
}

0 commit comments

Comments
 (0)