Skip to content

Commit 1da3cc1

Browse files
author
Hendrik Muhs
authored
prevent assignment if any node is older than 7.4 (#48055) (#48116)
disable task assignment of transforms if any node uses version 7.2 or 7.3 (mixed cluster). fixes #48019
1 parent 8f4512e commit 1da3cc1

File tree

2 files changed

+90
-2
lines changed

2 files changed

+90
-2
lines changed

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutor.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.logging.log4j.Logger;
1111
import org.apache.lucene.util.SetOnce;
1212
import org.elasticsearch.ResourceNotFoundException;
13+
import org.elasticsearch.Version;
1314
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.LatchedActionListener;
1516
import org.elasticsearch.action.support.IndicesOptions;
@@ -96,8 +97,18 @@ public PersistentTasksCustomMetaData.Assignment getAssignment(DataFrameTransform
9697
logger.debug(reason);
9798
return new PersistentTasksCustomMetaData.Assignment(null, reason);
9899
}
100+
101+
// see gh#48019 disable assignment if any node is using 7.2 or 7.3
102+
if (clusterState.getNodes().getMinNodeVersion().before(Version.V_7_4_0)) {
103+
String reason = "Not starting transform [" + params.getId() + "], " +
104+
"because cluster contains nodes with version older than 7.4.0";
105+
logger.debug(reason);
106+
return new PersistentTasksCustomMetaData.Assignment(null, reason);
107+
}
108+
99109
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, (node) ->
100-
node.isDataNode() && node.getVersion().onOrAfter(params.getVersion())
110+
node.isDataNode() &&
111+
node.getVersion().onOrAfter(params.getVersion())
101112
);
102113
return discoveryNode == null ? NO_NODE_FOUND : new PersistentTasksCustomMetaData.Assignment(discoveryNode.getId(), "");
103114
}

x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void testNodeVersionAssignment() {
7676
buildNewFakeTransportAddress(),
7777
Collections.emptyMap(),
7878
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
79-
Version.V_7_2_0))
79+
Version.V_7_4_0))
8080
.add(new DiscoveryNode("current-data-node-with-2-tasks",
8181
buildNewFakeTransportAddress(),
8282
Collections.emptyMap(),
@@ -123,6 +123,83 @@ dataFrameTransformsCheckpointService, mock(SchedulerEngine.class),
123123
equalTo("past-data-node-1"));
124124
}
125125

126+
public void testDoNotSelectOldNodes() {
127+
MetaData.Builder metaData = MetaData.builder();
128+
RoutingTable.Builder routingTable = RoutingTable.builder();
129+
addIndices(metaData, routingTable);
130+
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
131+
.addTask("transform-task-1",
132+
DataFrameTransform.NAME,
133+
new DataFrameTransform("transform-task-1", Version.CURRENT, null),
134+
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-task", ""));
135+
136+
PersistentTasksCustomMetaData pTasks = pTasksBuilder.build();
137+
138+
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, pTasks);
139+
140+
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder()
141+
.add(new DiscoveryNode("old-data-node-1",
142+
buildNewFakeTransportAddress(),
143+
Collections.emptyMap(),
144+
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
145+
Version.V_7_2_0))
146+
.add(new DiscoveryNode("current-data-node-with-1-task",
147+
buildNewFakeTransportAddress(),
148+
Collections.emptyMap(),
149+
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
150+
Version.CURRENT))
151+
.add(new DiscoveryNode("non-data-node-1",
152+
buildNewFakeTransportAddress(),
153+
Collections.emptyMap(),
154+
Collections.singleton(DiscoveryNodeRole.MASTER_ROLE),
155+
Version.CURRENT));
156+
157+
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name"))
158+
.nodes(nodes);
159+
csBuilder.routingTable(routingTable.build());
160+
csBuilder.metaData(metaData);
161+
162+
ClusterState cs = csBuilder.build();
163+
Client client = mock(Client.class);
164+
DataFrameAuditor mockAuditor = mock(DataFrameAuditor.class);
165+
DataFrameTransformsConfigManager transformsConfigManager = new DataFrameTransformsConfigManager(client, xContentRegistry());
166+
DataFrameTransformsCheckpointService transformCheckpointService = new DataFrameTransformsCheckpointService(client,
167+
transformsConfigManager, mockAuditor);
168+
ClusterSettings cSettings = new ClusterSettings(Settings.EMPTY,
169+
Collections.singleton(DataFrameTransformTask.NUM_FAILURE_RETRIES_SETTING));
170+
ClusterService clusterService = mock(ClusterService.class);
171+
when(clusterService.getClusterSettings()).thenReturn(cSettings);
172+
when(clusterService.state()).thenReturn(DataFrameInternalIndexTests.STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE);
173+
DataFrameTransformPersistentTasksExecutor executor = new DataFrameTransformPersistentTasksExecutor(client,
174+
transformsConfigManager,
175+
transformCheckpointService, mock(SchedulerEngine.class),
176+
new DataFrameAuditor(client, ""),
177+
mock(ThreadPool.class),
178+
clusterService,
179+
Settings.EMPTY);
180+
181+
// old-data-node-1 prevents assignment
182+
assertNull(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT, null), cs).getExecutorNode());
183+
184+
// remove the old 7.2 node
185+
nodes = DiscoveryNodes.builder()
186+
.add(new DiscoveryNode("current-data-node-with-1-task",
187+
buildNewFakeTransportAddress(),
188+
Collections.emptyMap(),
189+
new HashSet<>(Arrays.asList(DiscoveryNodeRole.DATA_ROLE, DiscoveryNodeRole.MASTER_ROLE)),
190+
Version.CURRENT))
191+
.add(new DiscoveryNode("non-data-node-1",
192+
buildNewFakeTransportAddress(),
193+
Collections.emptyMap(),
194+
Collections.singleton(DiscoveryNodeRole.MASTER_ROLE),
195+
Version.CURRENT));
196+
csBuilder.nodes(nodes);
197+
cs = csBuilder.build();
198+
199+
assertThat(executor.getAssignment(new DataFrameTransform("new-old-task-id", Version.V_7_2_0, null), cs).getExecutorNode(),
200+
equalTo("current-data-node-with-1-task"));
201+
}
202+
126203
public void testVerifyIndicesPrimaryShardsAreActive() {
127204
MetaData.Builder metaData = MetaData.builder();
128205
RoutingTable.Builder routingTable = RoutingTable.builder();

0 commit comments

Comments
 (0)