Skip to content

Commit b256549

Browse files
authored
Staggered merges metrics (#19226)
Signed-off-by: Sukriti Sinha <sukriiti@amazon.com>
1 parent 1103b3f commit b256549

File tree

5 files changed

+630
-6
lines changed

5 files changed

+630
-6
lines changed

server/src/main/java/org/opensearch/index/autoforcemerge/AutoForceMergeManager.java

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.HashSet;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.Optional;
4243
import java.util.Set;
4344
import java.util.concurrent.CompletableFuture;
4445
import java.util.stream.Collectors;
@@ -67,6 +68,8 @@ public class AutoForceMergeManager extends AbstractLifecycleComponent {
6768
private NodeValidator nodeValidator;
6869
private ShardValidator shardValidator;
6970
private Integer allocatedProcessors;
71+
private String nodeId;
72+
private final AutoForceMergeMetrics autoForceMergeMetrics;
7073
private ResourceTrackerProvider.ResourceTrackers resourceTrackers;
7174
private ForceMergeManagerSettings forceMergeManagerSettings;
7275
private final CommonStatsFlags flags = new CommonStatsFlags(CommonStatsFlags.Flag.Segments, CommonStatsFlags.Flag.Translog);
@@ -78,14 +81,16 @@ public AutoForceMergeManager(
7881
ThreadPool threadPool,
7982
MonitorService monitorService,
8083
IndicesService indicesService,
81-
ClusterService clusterService
84+
ClusterService clusterService,
85+
AutoForceMergeMetrics autoForceMergeMetrics
8286
) {
8387
this.threadPool = threadPool;
8488
this.osService = monitorService.osService();
8589
this.fsService = monitorService.fsService();
8690
this.jvmService = monitorService.jvmService();
8791
this.clusterService = clusterService;
8892
this.indicesService = indicesService;
93+
this.autoForceMergeMetrics = autoForceMergeMetrics;
8994
this.mergingShards = new HashSet<>();
9095
}
9196

@@ -98,6 +103,7 @@ protected void doStart() {
98103
this.shardValidator = new ShardValidator();
99104
this.allocatedProcessors = OpenSearchExecutors.allocatedProcessors(clusterService.getSettings());
100105
this.resourceTrackers = ResourceTrackerProvider.create(threadPool);
106+
this.nodeId = clusterService.localNode().getId();
101107
}
102108

103109
@Override
@@ -119,20 +125,39 @@ private void modifySchedulerInterval(TimeValue schedulerInterval) {
119125
}
120126

121127
private void triggerForceMerge() {
122-
if (isValidForForceMerge() == false) {
123-
return;
128+
long startTime = System.currentTimeMillis();
129+
try {
130+
if (isValidForForceMerge() == false) {
131+
return;
132+
}
133+
executeForceMergeOnShards();
134+
} finally {
135+
autoForceMergeMetrics.recordInHistogram(
136+
autoForceMergeMetrics.schedulerExecutionTime,
137+
(double) System.currentTimeMillis() - startTime,
138+
autoForceMergeMetrics.getTags(Optional.of(nodeId), Optional.empty())
139+
);
124140
}
125-
executeForceMergeOnShards();
126141
}
127142

128143
private boolean isValidForForceMerge() {
129144
if (configurationValidator.hasWarmNodes() == false) {
130145
resourceTrackers.stop();
131146
logger.debug("No warm nodes found. Skipping Auto Force merge.");
147+
autoForceMergeMetrics.incrementCounter(
148+
autoForceMergeMetrics.skipsFromConfigValidator,
149+
1.0,
150+
autoForceMergeMetrics.getTags(Optional.of(nodeId), Optional.empty())
151+
);
132152
return false;
133153
}
134154
if (nodeValidator.validate().isAllowed() == false) {
135155
logger.debug("Node capacity constraints are not allowing to trigger auto ForceMerge");
156+
autoForceMergeMetrics.incrementCounter(
157+
autoForceMergeMetrics.skipsFromNodeValidator,
158+
1.0,
159+
autoForceMergeMetrics.getTags(Optional.of(nodeId), Optional.empty())
160+
);
136161
return false;
137162
}
138163
return true;
@@ -157,13 +182,47 @@ private void executeForceMergeOnShards() {
157182

158183
private void executeForceMergeForShard(IndexShard shard) {
159184
CompletableFuture.runAsync(() -> {
185+
long startTime = System.currentTimeMillis();
160186
try {
161187
mergingShards.add(shard.shardId().getId());
188+
autoForceMergeMetrics.incrementCounter(
189+
autoForceMergeMetrics.mergesTriggered,
190+
1.0,
191+
autoForceMergeMetrics.getTags(Optional.of(nodeId), Optional.empty())
192+
);
193+
194+
CommonStats stats = new CommonStats(indicesService.getIndicesQueryCache(), shard, flags);
195+
if (stats.getSegments() != null) {
196+
autoForceMergeMetrics.incrementCounter(
197+
autoForceMergeMetrics.segmentCount,
198+
(double) stats.getSegments().getCount(),
199+
autoForceMergeMetrics.getTags(Optional.of(nodeId), Optional.of(String.valueOf(shard.shardId().getId())))
200+
);
201+
}
202+
203+
long shardSizeInBytes = shard.store().stats(0L).sizeInBytes();
204+
autoForceMergeMetrics.incrementCounter(
205+
autoForceMergeMetrics.shardSize,
206+
(double) shardSizeInBytes,
207+
autoForceMergeMetrics.getTags(Optional.of(nodeId), Optional.of(String.valueOf(shard.shardId().getId())))
208+
);
209+
162210
shard.forceMerge(new ForceMergeRequest().maxNumSegments(forceMergeManagerSettings.getSegmentCount()));
211+
163212
logger.debug("Merging is completed successfully for the shard {}", shard.shardId());
164213
} catch (Exception e) {
214+
autoForceMergeMetrics.incrementCounter(
215+
autoForceMergeMetrics.mergesFailed,
216+
1.0,
217+
autoForceMergeMetrics.getTags(Optional.of(nodeId), Optional.empty())
218+
);
165219
logger.error("Error during force merge for shard {}\nException: {}", shard.shardId(), e);
166220
} finally {
221+
autoForceMergeMetrics.recordInHistogram(
222+
autoForceMergeMetrics.shardMergeLatency,
223+
(double) System.currentTimeMillis() - startTime,
224+
autoForceMergeMetrics.getTags(Optional.of(nodeId), Optional.of(String.valueOf(shard.shardId().getId())))
225+
);
167226
mergingShards.remove(shard.shardId().getId());
168227
}
169228
}, threadPool.executor(ThreadPool.Names.FORCE_MERGE));
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.index.autoforcemerge;
10+
11+
import org.opensearch.telemetry.metrics.Counter;
12+
import org.opensearch.telemetry.metrics.Histogram;
13+
import org.opensearch.telemetry.metrics.MetricsRegistry;
14+
import org.opensearch.telemetry.metrics.tags.Tags;
15+
16+
import java.util.Objects;
17+
import java.util.Optional;
18+
19+
/**
20+
* Class containing metrics (counters/latency) specific to Auto Force merges.
21+
*
22+
* @opensearch.internal
23+
*/
24+
public class AutoForceMergeMetrics {
25+
26+
private static final String LATENCY_METRIC_UNIT_MS = "ms";
27+
private static final String COUNTER_METRICS_UNIT = "1";
28+
private static final String SIZE_METRIC_UNIT = "bytes";
29+
30+
public static final String NODE_ID = "node_id";
31+
public static final String SHARD_ID = "shard_id";
32+
33+
public final Histogram schedulerExecutionTime;
34+
public final Counter mergesTriggered;
35+
public final Counter skipsFromConfigValidator;
36+
public final Counter skipsFromNodeValidator;
37+
public final Counter mergesFailed;
38+
39+
// Shard specific metrics
40+
public final Histogram shardMergeLatency;
41+
public final Counter shardSize;
42+
public final Counter segmentCount;
43+
44+
public AutoForceMergeMetrics(MetricsRegistry metricsRegistry) {
45+
schedulerExecutionTime = metricsRegistry.createHistogram(
46+
"auto_force_merge.scheduler.execution_time",
47+
"Histogram for tracking total scheduler execution time.",
48+
LATENCY_METRIC_UNIT_MS
49+
);
50+
51+
mergesTriggered = metricsRegistry.createCounter(
52+
"auto_force_merge.merges.triggered",
53+
"Counter for number of force merges triggered.",
54+
COUNTER_METRICS_UNIT
55+
);
56+
57+
skipsFromConfigValidator = metricsRegistry.createCounter(
58+
"auto_force_merge.merges.skipped.config_validator",
59+
"Counter for number of force merges skipped due to Configuration Validator.",
60+
COUNTER_METRICS_UNIT
61+
);
62+
63+
skipsFromNodeValidator = metricsRegistry.createCounter(
64+
"auto_force_merge.merges.skipped.node_validator",
65+
"Counter for number of force merges skipped due to Node Validator.",
66+
COUNTER_METRICS_UNIT
67+
);
68+
69+
mergesFailed = metricsRegistry.createCounter(
70+
"auto_force_merge.merges.failed",
71+
"Counter for number of force merges failed.",
72+
COUNTER_METRICS_UNIT
73+
);
74+
75+
shardMergeLatency = metricsRegistry.createHistogram(
76+
"auto_force_merge.shard.merge_latency",
77+
"Histogram for tracking time taken by force merge on individual shards.",
78+
LATENCY_METRIC_UNIT_MS
79+
);
80+
81+
shardSize = metricsRegistry.createCounter(
82+
"auto_force_merge.shard.size",
83+
"Counter for tracking shard size during force merge operations.",
84+
SIZE_METRIC_UNIT
85+
);
86+
87+
segmentCount = metricsRegistry.createCounter(
88+
"auto_force_merge.shard.segment_count",
89+
"Counter for tracking segment count during force merge operations.",
90+
COUNTER_METRICS_UNIT
91+
);
92+
}
93+
94+
public void recordInHistogram(Histogram histogram, Double value, Optional<Tags> tags) {
95+
if (Objects.isNull(tags) || tags.isEmpty()) {
96+
histogram.record(value);
97+
return;
98+
}
99+
histogram.record(value, tags.get());
100+
}
101+
102+
public void incrementCounter(Counter counter, Double value, Optional<Tags> tags) {
103+
if (Objects.isNull(tags) || tags.isEmpty()) {
104+
counter.add(value);
105+
return;
106+
}
107+
counter.add(value, tags.get());
108+
}
109+
110+
public Optional<Tags> getTags(Optional<String> nodeId, Optional<String> shardId) {
111+
Tags tags = Tags.create();
112+
113+
if (shardId.isPresent()) {
114+
tags.addTag(SHARD_ID, shardId.get());
115+
} else if (nodeId.isPresent()) {
116+
tags.addTag(NODE_ID, nodeId.get());
117+
}
118+
119+
return Optional.of(tags);
120+
}
121+
}

server/src/main/java/org/opensearch/node/Node.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@
161161
import org.opensearch.index.SegmentReplicationStatsTracker;
162162
import org.opensearch.index.analysis.AnalysisRegistry;
163163
import org.opensearch.index.autoforcemerge.AutoForceMergeManager;
164+
import org.opensearch.index.autoforcemerge.AutoForceMergeMetrics;
164165
import org.opensearch.index.compositeindex.CompositeIndexSettings;
165166
import org.opensearch.index.engine.EngineFactory;
166167
import org.opensearch.index.engine.MergedSegmentWarmerFactory;
@@ -1208,7 +1209,14 @@ protected Node(final Environment initialEnvironment, Collection<PluginInfo> clas
12081209
workloadGroupService
12091210
);
12101211

1211-
this.autoForceMergeManager = new AutoForceMergeManager(threadPool, monitorService, indicesService, clusterService);
1212+
final AutoForceMergeMetrics autoForceMergeMetrics = new AutoForceMergeMetrics(metricsRegistry);
1213+
this.autoForceMergeManager = new AutoForceMergeManager(
1214+
threadPool,
1215+
monitorService,
1216+
indicesService,
1217+
clusterService,
1218+
autoForceMergeMetrics
1219+
);
12121220

12131221
final Collection<SecureSettingsFactory> secureSettingsFactories = pluginsService.filterPlugins(Plugin.class)
12141222
.stream()

0 commit comments

Comments
 (0)