Skip to content

Commit 4562b30

Browse files
committed
Merge branch 'master' into multiple-settings-validation
* master: Refactor PathTrie and RestController to use a single trie for all methods (elastic#25459) Switch indices read-only if a node runs out of disk space (elastic#25541)
2 parents 01f0dbf + 30b5ca7 commit 4562b30

File tree

24 files changed

+844
-311
lines changed

24 files changed

+844
-311
lines changed

core/src/main/java/org/elasticsearch/cluster/ClusterInfoService.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,4 @@ public interface ClusterInfoService {
2727

2828
/** The latest cluster information */
2929
ClusterInfo getClusterInfo();
30-
31-
/** Add a listener that will be called every time new information is gathered */
32-
void addListener(Listener listener);
33-
34-
/**
35-
* Interface for listeners to implement in order to perform actions when
36-
* new information about the cluster has been gathered
37-
*/
38-
interface Listener {
39-
void onNewInfo(ClusterInfo info);
40-
}
4130
}

core/src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,4 @@ private EmptyClusterInfoService() {
3636
public ClusterInfo getClusterInfo() {
3737
return ClusterInfo.EMPTY;
3838
}
39-
40-
@Override
41-
public void addListener(Listener listener) {
42-
// no-op, no new info is ever gathered, so adding listeners is useless
43-
}
4439
}

core/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.concurrent.CopyOnWriteArrayList;
5353
import java.util.concurrent.CountDownLatch;
5454
import java.util.concurrent.TimeUnit;
55+
import java.util.function.Consumer;
5556

5657
/**
5758
* InternalClusterInfoService provides the ClusterInfoService interface,
@@ -86,9 +87,10 @@ public class InternalClusterInfoService extends AbstractComponent
8687
private final ClusterService clusterService;
8788
private final ThreadPool threadPool;
8889
private final NodeClient client;
89-
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
90+
private final Consumer<ClusterInfo> listener;
9091

91-
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) {
92+
public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client,
93+
Consumer<ClusterInfo> listener) {
9294
super(settings);
9395
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
9496
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
@@ -109,6 +111,7 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
109111
this.clusterService.addLocalNodeMasterListener(this);
110112
// Add to listen for state changes (when nodes are added)
111113
this.clusterService.addListener(this);
114+
this.listener = listener;
112115
}
113116

114117
private void setEnabled(boolean enabled) {
@@ -201,11 +204,6 @@ public ClusterInfo getClusterInfo() {
201204
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath);
202205
}
203206

204-
@Override
205-
public void addListener(Listener listener) {
206-
this.listeners.add(listener);
207-
}
208-
209207
/**
210208
* Class used to submit {@link #maybeRefresh()} on the
211209
* {@link InternalClusterInfoService} threadpool, these jobs will
@@ -362,21 +360,17 @@ public void onFailure(Exception e) {
362360
logger.warn("Failed to update shard information for ClusterInfoUpdateJob within {} timeout", fetchTimeout);
363361
}
364362
ClusterInfo clusterInfo = getClusterInfo();
365-
for (Listener l : listeners) {
366-
try {
367-
l.onNewInfo(clusterInfo);
368-
} catch (Exception e) {
369-
logger.info("Failed executing ClusterInfoService listener", e);
370-
}
363+
try {
364+
listener.accept(clusterInfo);
365+
} catch (Exception e) {
366+
logger.info("Failed executing ClusterInfoService listener", e);
371367
}
372368
return clusterInfo;
373369
}
374370

375371
static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
376372
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
377-
MetaData meta = state.getMetaData();
378373
for (ShardStats s : stats) {
379-
IndexMetaData indexMeta = meta.index(s.getShardRouting().index());
380374
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
381375
long size = s.getStats().getStore().sizeInBytes();
382376
String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting());

core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,23 @@
1919

2020
package org.elasticsearch.cluster.routing.allocation;
2121

22+
import java.util.HashSet;
2223
import java.util.Set;
24+
import java.util.function.Supplier;
2325

2426
import com.carrotsearch.hppc.ObjectLookupContainer;
2527
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2628
import org.elasticsearch.client.Client;
2729
import org.elasticsearch.cluster.ClusterInfo;
28-
import org.elasticsearch.cluster.ClusterInfoService;
30+
import org.elasticsearch.cluster.ClusterState;
2931
import org.elasticsearch.cluster.DiskUsage;
32+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
33+
import org.elasticsearch.cluster.metadata.IndexMetaData;
34+
import org.elasticsearch.cluster.routing.RoutingNode;
35+
import org.elasticsearch.cluster.routing.ShardRouting;
3036
import org.elasticsearch.common.Strings;
3137
import org.elasticsearch.common.collect.ImmutableOpenMap;
3238
import org.elasticsearch.common.component.AbstractComponent;
33-
import org.elasticsearch.common.inject.Inject;
3439
import org.elasticsearch.common.settings.ClusterSettings;
3540
import org.elasticsearch.common.settings.Settings;
3641
import org.elasticsearch.common.util.set.Sets;
@@ -40,29 +45,30 @@
4045
* reroute if it does. Also responsible for logging about nodes that have
4146
* passed the disk watermarks
4247
*/
43-
public class DiskThresholdMonitor extends AbstractComponent implements ClusterInfoService.Listener {
48+
public class DiskThresholdMonitor extends AbstractComponent {
4449
private final DiskThresholdSettings diskThresholdSettings;
4550
private final Client client;
4651
private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
47-
52+
private final Supplier<ClusterState> clusterStateSupplier;
4853
private long lastRunNS;
4954

50-
// TODO: remove injection when ClusterInfoService is not injected
51-
@Inject
52-
public DiskThresholdMonitor(Settings settings, ClusterSettings clusterSettings,
53-
ClusterInfoService infoService, Client client) {
55+
public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
56+
Client client) {
5457
super(settings);
58+
this.clusterStateSupplier = clusterStateSupplier;
5559
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
5660
this.client = client;
57-
infoService.addListener(this);
5861
}
5962

6063
/**
6164
* Warn about the given disk usage if the low or high watermark has been passed
6265
*/
6366
private void warnAboutDiskIfNeeded(DiskUsage usage) {
6467
// Check absolute disk values
65-
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
68+
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes()) {
69+
logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only",
70+
diskThresholdSettings.getFreeBytesThresholdFloodStage(), usage);
71+
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()) {
6672
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
6773
diskThresholdSettings.getFreeBytesThresholdHigh(), usage);
6874
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes()) {
@@ -72,6 +78,9 @@ private void warnAboutDiskIfNeeded(DiskUsage usage) {
7278

7379
// Check percentage disk values
7480
if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
81+
logger.warn("floodstage disk watermark [{}] exceeded on {}, all indices on this node will marked read-only",
82+
Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdFloodStage(), "%"), usage);
83+
} else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
7584
logger.warn("high disk watermark [{}] exceeded on {}, shards will be relocated away from this node",
7685
Strings.format1Decimals(100.0 - diskThresholdSettings.getFreeDiskThresholdHigh(), "%"), usage);
7786
} else if (usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) {
@@ -80,7 +89,7 @@ private void warnAboutDiskIfNeeded(DiskUsage usage) {
8089
}
8190
}
8291

83-
@Override
92+
8493
public void onNewInfo(ClusterInfo info) {
8594
ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
8695
if (usages != null) {
@@ -95,12 +104,21 @@ public void onNewInfo(ClusterInfo info) {
95104
nodeHasPassedWatermark.remove(node);
96105
}
97106
}
98-
107+
ClusterState state = clusterStateSupplier.get();
108+
Set<String> indicesToMarkReadOnly = new HashSet<>();
99109
for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
100110
String node = entry.key;
101111
DiskUsage usage = entry.value;
102112
warnAboutDiskIfNeeded(usage);
103-
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
113+
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
114+
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
115+
RoutingNode routingNode = state.getRoutingNodes().node(node);
116+
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
117+
for (ShardRouting routing : routingNode) {
118+
indicesToMarkReadOnly.add(routing.index().getName());
119+
}
120+
}
121+
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
104122
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
105123
if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
106124
lastRunNS = System.nanoTime();
@@ -136,9 +154,23 @@ public void onNewInfo(ClusterInfo info) {
136154
}
137155
if (reroute) {
138156
logger.info("rerouting shards: [{}]", explanation);
139-
// Execute an empty reroute, but don't block on the response
140-
client.admin().cluster().prepareReroute().execute();
157+
reroute();
158+
}
159+
indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
160+
if (indicesToMarkReadOnly.isEmpty() == false) {
161+
markIndicesReadOnly(indicesToMarkReadOnly);
141162
}
142163
}
143164
}
165+
166+
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
167+
// set read-only block but don't block on the response
168+
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)).
169+
setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute();
170+
}
171+
172+
protected void reroute() {
173+
// Execute an empty reroute, but don't block on the response
174+
client.admin().cluster().prepareReroute().execute();
175+
}
144176
}

core/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdSettings.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ public class DiskThresholdSettings {
4949
(s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.high"),
5050
new HighDiskWatermarkValidator(),
5151
Setting.Property.Dynamic, Setting.Property.NodeScope);
52+
public static final Setting<String> CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING =
53+
new Setting<>("cluster.routing.allocation.disk.watermark.floodstage", "95%",
54+
(s) -> validWatermarkSetting(s, "cluster.routing.allocation.disk.watermark.floodstage"),
55+
Setting.Property.Dynamic, Setting.Property.NodeScope);
5256
public static final Setting<Boolean> CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING =
5357
Setting.boolSetting("cluster.routing.allocation.disk.include_relocations", true,
5458
Setting.Property.Dynamic, Setting.Property.NodeScope);;
@@ -65,17 +69,23 @@ public class DiskThresholdSettings {
6569
private volatile boolean includeRelocations;
6670
private volatile boolean enabled;
6771
private volatile TimeValue rerouteInterval;
72+
private volatile String floodStageRaw;
73+
private volatile Double freeDiskThresholdFloodStage;
74+
private volatile ByteSizeValue freeBytesThresholdFloodStage;
6875

6976
public DiskThresholdSettings(Settings settings, ClusterSettings clusterSettings) {
7077
final String lowWatermark = CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.get(settings);
7178
final String highWatermark = CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.get(settings);
79+
final String floodStage = CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.get(settings);
7280
setHighWatermark(highWatermark);
7381
setLowWatermark(lowWatermark);
82+
setFloodStageRaw(floodStage);
7483
this.includeRelocations = CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING.get(settings);
7584
this.rerouteInterval = CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(settings);
7685
this.enabled = CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING.get(settings);
7786
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING, this::setLowWatermark);
7887
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING, this::setHighWatermark);
88+
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING, this::setFloodStageRaw);
7989
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS_SETTING, this::setIncludeRelocations);
8090
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING, this::setRerouteInterval);
8191
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING, this::setEnabled);
@@ -175,7 +185,15 @@ private void setHighWatermark(String highWatermark) {
175185
this.highWatermarkRaw = highWatermark;
176186
this.freeDiskThresholdHigh = 100.0 - thresholdPercentageFromWatermark(highWatermark);
177187
this.freeBytesThresholdHigh = thresholdBytesFromWatermark(highWatermark,
178-
CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING.getKey());
188+
CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING.getKey());
189+
}
190+
191+
private void setFloodStageRaw(String floodStageRaw) {
192+
// Watermark is expressed in terms of used data, but we need "free" data watermark
193+
this.floodStageRaw = floodStageRaw;
194+
this.freeDiskThresholdFloodStage = 100.0 - thresholdPercentageFromWatermark(floodStageRaw);
195+
this.freeBytesThresholdFloodStage = thresholdBytesFromWatermark(floodStageRaw,
196+
CLUSTER_ROUTING_ALLOCATION_DISK_FLOOD_STAGE_WATERMARK_SETTING.getKey());
179197
}
180198

181199
/**
@@ -208,6 +226,18 @@ public ByteSizeValue getFreeBytesThresholdHigh() {
208226
return freeBytesThresholdHigh;
209227
}
210228

229+
public Double getFreeDiskThresholdFloodStage() {
230+
return freeDiskThresholdFloodStage;
231+
}
232+
233+
public ByteSizeValue getFreeBytesThresholdFloodStage() {
234+
return freeBytesThresholdFloodStage;
235+
}
236+
237+
public String getFloodStageRaw() {
238+
return floodStageRaw;
239+
}
240+
211241
public boolean includeRelocations() {
212242
return includeRelocations;
213243
}

0 commit comments

Comments
 (0)