Skip to content

Commit 6f9e1f8

Browse files
authored
[Experimental] Start without joining a cluster if a "clusterless" ClusterPlugin is loaded (#18479)
These are the core changes to allow ClusterPlugins to manage cluster state on behalf of an OpenSearch node. If a "clusterless" ClusterPlugin is loaded, then the node will start up with dummy versions of Discovery and ClusterManagerService, and will load the minimum cluster state required for startup (essentially, the node needs to see itself). From there, the ClusterPlugin can interact with ClusterApplierService to apply some cluster state to the node. (The details of constructing/receiving that cluster state is up to the plugin.) --------- Signed-off-by: Michael Froh <msfroh@apache.org>
1 parent 94143ad commit 6f9e1f8

File tree

10 files changed

+353
-45
lines changed

10 files changed

+353
-45
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2828
- Pass index settings to system ingest processor factories. ([#18708](https://github.com/opensearch-project/OpenSearch/pull/18708))
2929
- Include named queries from rescore contexts in matched_queries array ([#18697](https://github.com/opensearch-project/OpenSearch/pull/18697))
3030
- Add the configurable limit on rule cardinality ([#18663](https://github.com/opensearch-project/OpenSearch/pull/18663))
31+
- [Experimental] Start in "clusterless" mode if a clusterless ClusterPlugin is loaded ([#18479](https://github.com/opensearch-project/OpenSearch/pull/18479))
3132

3233
### Changed
3334
- Update Subject interface to use CheckedRunnable ([#18570](https://github.com/opensearch-project/OpenSearch/issues/18570))

server/src/main/java/org/opensearch/cluster/ClusterModule.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ public class ClusterModule extends AbstractModule {
144144
final Collection<AllocationDecider> deciderList;
145145
final ShardsAllocator shardsAllocator;
146146
private final ClusterManagerMetrics clusterManagerMetrics;
147+
private final Class<? extends ShardStateAction> shardStateActionClass;
147148

148149
public ClusterModule(
149150
Settings settings,
@@ -152,7 +153,8 @@ public ClusterModule(
152153
ClusterInfoService clusterInfoService,
153154
SnapshotsInfoService snapshotsInfoService,
154155
ThreadContext threadContext,
155-
ClusterManagerMetrics clusterManagerMetrics
156+
ClusterManagerMetrics clusterManagerMetrics,
157+
Class<? extends ShardStateAction> shardStateActionClass
156158
) {
157159
this.clusterPlugins = clusterPlugins;
158160
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
@@ -169,6 +171,7 @@ public ClusterModule(
169171
clusterManagerMetrics
170172
);
171173
this.clusterManagerMetrics = clusterManagerMetrics;
174+
this.shardStateActionClass = shardStateActionClass;
172175
}
173176

174177
public static List<Entry> getNamedWriteables() {
@@ -474,7 +477,11 @@ protected void configure() {
474477
bind(MetadataIndexTemplateService.class).asEagerSingleton();
475478
bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver);
476479
bind(DelayedAllocationService.class).asEagerSingleton();
477-
bind(ShardStateAction.class).asEagerSingleton();
480+
if (shardStateActionClass == ShardStateAction.class) {
481+
bind(ShardStateAction.class).asEagerSingleton();
482+
} else {
483+
bind(ShardStateAction.class).to(shardStateActionClass).asEagerSingleton();
484+
}
478485
bind(NodeMappingRefreshAction.class).asEagerSingleton();
479486
bind(MappingUpdatedAction.class).asEagerSingleton();
480487
bind(TaskResultsService.class).asEagerSingleton();
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.action.shard;
10+
11+
import org.opensearch.cluster.ClusterState;
12+
import org.opensearch.cluster.routing.IndexRoutingTable;
13+
import org.opensearch.cluster.routing.IndexShardRoutingTable;
14+
import org.opensearch.cluster.routing.RerouteService;
15+
import org.opensearch.cluster.routing.RoutingTable;
16+
import org.opensearch.cluster.routing.ShardRouting;
17+
import org.opensearch.cluster.routing.allocation.AllocationService;
18+
import org.opensearch.cluster.service.ClusterService;
19+
import org.opensearch.common.inject.Inject;
20+
import org.opensearch.core.action.ActionListener;
21+
import org.opensearch.threadpool.ThreadPool;
22+
import org.opensearch.transport.TransportService;
23+
24+
import java.util.function.Function;
25+
26+
/**
27+
* A local implementation of {@link ShardStateAction} that applies shard state changes directly to the
28+
* local cluster state. This is used in clusterless mode, where there is no cluster manager.
29+
*/
30+
public class LocalShardStateAction extends ShardStateAction {
31+
@Inject
32+
public LocalShardStateAction(
33+
ClusterService clusterService,
34+
TransportService transportService,
35+
AllocationService allocationService,
36+
RerouteService rerouteService,
37+
ThreadPool threadPool
38+
) {
39+
super(clusterService, transportService, allocationService, rerouteService, threadPool);
40+
}
41+
42+
@Override
43+
public void shardStarted(
44+
ShardRouting shardRouting,
45+
long primaryTerm,
46+
String message,
47+
ActionListener<Void> listener,
48+
ClusterState currentState
49+
) {
50+
Function<ClusterState, ClusterState> clusterStateUpdater = clusterState -> {
51+
// We're running in clusterless mode. Apply the state change directly to the local cluster state.
52+
RoutingTable routingTable = clusterState.getRoutingTable();
53+
IndexRoutingTable indexRoutingTable = routingTable.index(shardRouting.index());
54+
55+
ClusterState.Builder clusterStateBuilder = ClusterState.builder(clusterState);
56+
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(routingTable);
57+
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(shardRouting.index());
58+
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
59+
if (indexShardRoutingTable.shardId().equals(shardRouting.shardId())) {
60+
IndexShardRoutingTable.Builder indexShardRoutingTableBuilder = new IndexShardRoutingTable.Builder(
61+
indexShardRoutingTable
62+
);
63+
indexShardRoutingTableBuilder.removeShard(shardRouting);
64+
indexShardRoutingTableBuilder.addShard(shardRouting.moveToStarted());
65+
indexRoutingTableBuilder.addIndexShard(indexShardRoutingTableBuilder.build());
66+
} else {
67+
indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable);
68+
}
69+
}
70+
routingTableBuilder.add(indexRoutingTableBuilder);
71+
clusterStateBuilder.routingTable(routingTableBuilder.build());
72+
return clusterStateBuilder.build();
73+
};
74+
clusterService.getClusterApplierService()
75+
.updateClusterState("shard-started " + shardRouting.shardId(), clusterStateUpdater, (s, e) -> {});
76+
}
77+
78+
@Override
79+
public void localShardFailed(
80+
ShardRouting shardRouting,
81+
String message,
82+
Exception failure,
83+
ActionListener<Void> listener,
84+
ClusterState currentState
85+
) {
86+
// Do not send a failure to the cluster manager, as we are running in clusterless mode.
87+
}
88+
}

server/src/main/java/org/opensearch/cluster/action/shard/ShardStateAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
import java.util.function.Supplier;
8989

9090
/**
91-
* Transport action for retrieving the shard state
91+
* Registers transport actions that react to shard state changes, such as shard started or shard failed.
9292
*
9393
* @opensearch.internal
9494
*/
@@ -128,7 +128,7 @@ private static Priority parseReroutePriority(String priorityString) {
128128
}
129129

130130
private final TransportService transportService;
131-
private final ClusterService clusterService;
131+
final ClusterService clusterService;
132132
private final ThreadPool threadPool;
133133

134134
private volatile Priority followUpRerouteTaskPriority;

server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,14 @@ public void onNewClusterState(
382382
submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), applyFunction, listener);
383383
}
384384

385+
public void updateClusterState(
386+
final String source,
387+
final Function<ClusterState, ClusterState> updateFunction,
388+
final ClusterApplyListener listener
389+
) {
390+
submitStateUpdateTask(source, ClusterStateTaskConfig.build(Priority.HIGH), updateFunction, listener);
391+
}
392+
385393
private void submitStateUpdateTask(
386394
final String source,
387395
final ClusterStateTaskConfig config,
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.cluster.service;
10+
11+
import org.opensearch.cluster.ClusterManagerMetrics;
12+
import org.opensearch.cluster.ClusterStateTaskConfig;
13+
import org.opensearch.cluster.ClusterStateTaskExecutor;
14+
import org.opensearch.cluster.ClusterStateTaskListener;
15+
import org.opensearch.cluster.coordination.ClusterStatePublisher;
16+
import org.opensearch.common.settings.ClusterSettings;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.node.Node;
19+
import org.opensearch.threadpool.ThreadPool;
20+
21+
import java.util.Map;
22+
23+
/**
24+
* A local implementation of {@link ClusterService} that assumes we have no cluster manager.
25+
* This is used in clusterless mode.
26+
*/
27+
public class LocalClusterService extends ClusterService {
28+
private static class DummyClusterManagerService extends ClusterManagerService {
29+
private static final ClusterManagerThrottlingStats EMPTY_THROTTLING_STATS = new ClusterManagerThrottlingStats();
30+
31+
public DummyClusterManagerService(Settings settings, ClusterSettings clusterSettings) {
32+
super(settings, clusterSettings, null, null);
33+
}
34+
35+
@Override
36+
public synchronized void setClusterStatePublisher(ClusterStatePublisher publisher) {}
37+
38+
@Override
39+
public ClusterManagerThrottlingStats getThrottlingStats() {
40+
return EMPTY_THROTTLING_STATS;
41+
}
42+
}
43+
44+
public LocalClusterService(
45+
Settings settings,
46+
ClusterSettings clusterSettings,
47+
ThreadPool threadPool,
48+
ClusterManagerMetrics clusterManagerMetrics
49+
) {
50+
super(
51+
settings,
52+
clusterSettings,
53+
new DummyClusterManagerService(settings, clusterSettings),
54+
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, clusterManagerMetrics)
55+
);
56+
}
57+
58+
@Override
59+
protected synchronized void doStart() {
60+
getClusterApplierService().start();
61+
}
62+
63+
@Override
64+
protected synchronized void doStop() {
65+
getClusterApplierService().stop();
66+
}
67+
68+
@Override
69+
protected synchronized void doClose() {
70+
getClusterApplierService().close();
71+
}
72+
73+
@Override
74+
public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(ClusterManagerTask task, boolean throttlingEnabled) {
75+
return null;
76+
}
77+
78+
@Override
79+
public <T> void submitStateUpdateTasks(
80+
final String source,
81+
final Map<T, ClusterStateTaskListener> tasks,
82+
final ClusterStateTaskConfig config,
83+
final ClusterStateTaskExecutor<T> executor
84+
) {
85+
throw new UnsupportedOperationException("Cannot submit cluster state update tasks when cluster manager service is not available");
86+
}
87+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.discovery;
10+
11+
import org.opensearch.cluster.ClusterChangedEvent;
12+
import org.opensearch.cluster.ClusterState;
13+
import org.opensearch.cluster.NodeConnectionsService;
14+
import org.opensearch.cluster.coordination.PendingClusterStateStats;
15+
import org.opensearch.cluster.coordination.PublishClusterStateStats;
16+
import org.opensearch.cluster.node.DiscoveryNode;
17+
import org.opensearch.cluster.node.DiscoveryNodes;
18+
import org.opensearch.cluster.service.ClusterApplier;
19+
import org.opensearch.cluster.service.ClusterStateStats;
20+
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
21+
import org.opensearch.core.action.ActionListener;
22+
import org.opensearch.transport.TransportService;
23+
24+
import java.io.IOException;
25+
26+
/**
27+
* Clusterless implementation of Discovery. This is only able to "discover" the local node.
28+
*/
29+
public class LocalDiscovery extends AbstractLifecycleComponent implements Discovery {
30+
private static final DiscoveryStats EMPTY_STATS = new DiscoveryStats(
31+
new PendingClusterStateStats(0, 0, 0),
32+
new PublishClusterStateStats(0, 0, 0),
33+
new ClusterStateStats()
34+
);
35+
private final TransportService transportService;
36+
private final ClusterApplier clusterApplier;
37+
38+
public LocalDiscovery(TransportService transportService, ClusterApplier clusterApplier) {
39+
this.transportService = transportService;
40+
this.clusterApplier = clusterApplier;
41+
}
42+
43+
@Override
44+
public void publish(ClusterChangedEvent clusterChangedEvent, ActionListener<Void> publishListener, AckListener ackListener) {
45+
// In clusterless mode, we should never be asked to publish a cluster state.
46+
throw new UnsupportedOperationException("Should not be called in clusterless mode");
47+
}
48+
49+
@Override
50+
protected void doStart() {
51+
DiscoveryNode localNode = transportService.getLocalNode();
52+
ClusterState bootstrapClusterState = ClusterState.builder(ClusterState.EMPTY_STATE)
53+
.nodes(DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode).build())
54+
.build();
55+
clusterApplier.setInitialState(bootstrapClusterState);
56+
}
57+
58+
@Override
59+
protected void doStop() {
60+
61+
}
62+
63+
@Override
64+
protected void doClose() throws IOException {
65+
66+
}
67+
68+
@Override
69+
public DiscoveryStats stats() {
70+
return EMPTY_STATS;
71+
}
72+
73+
@Override
74+
public void startInitialJoin() {
75+
76+
}
77+
78+
@Override
79+
public void setNodeConnectionsService(NodeConnectionsService nodeConnectionsService) {
80+
81+
}
82+
}

0 commit comments

Comments
 (0)