@@ -67,6 +67,8 @@ public class AutoForceMergeManager extends AbstractLifecycleComponent {
6767 private NodeValidator nodeValidator ;
6868 private ShardValidator shardValidator ;
6969 private Integer allocatedProcessors ;
70+ private String nodeId ;
71+ private final AutoForceMergeMetrics autoForceMergeMetrics ;
7072 private ResourceTrackerProvider .ResourceTrackers resourceTrackers ;
7173 private final ForceMergeManagerSettings forceMergeManagerSettings ;
7274 private final CommonStatsFlags flags = new CommonStatsFlags (CommonStatsFlags .Flag .Segments , CommonStatsFlags .Flag .Translog );
@@ -78,14 +80,16 @@ public AutoForceMergeManager(
7880 ThreadPool threadPool ,
7981 MonitorService monitorService ,
8082 IndicesService indicesService ,
81- ClusterService clusterService
83+ ClusterService clusterService ,
84+ AutoForceMergeMetrics autoForceMergeMetrics
8285 ) {
8386 this .threadPool = threadPool ;
8487 this .osService = monitorService .osService ();
8588 this .fsService = monitorService .fsService ();
8689 this .jvmService = monitorService .jvmService ();
8790 this .clusterService = clusterService ;
8891 this .indicesService = indicesService ;
92+ this .autoForceMergeMetrics = autoForceMergeMetrics ;
8993 this .forceMergeManagerSettings = new ForceMergeManagerSettings (clusterService , this ::modifySchedulerInterval );
9094 this .task = new AsyncForceMergeTask ();
9195 this .mergingShards = new HashSet <>();
@@ -98,6 +102,7 @@ protected void doStart() {
98102 this .shardValidator = new ShardValidator ();
99103 this .allocatedProcessors = OpenSearchExecutors .allocatedProcessors (clusterService .getSettings ());
100104 this .resourceTrackers = ResourceTrackerProvider .create (threadPool );
105+ this .nodeId = clusterService .localNode ().getId ();
101106 }
102107
103108 @ Override
@@ -119,20 +124,44 @@ private void modifySchedulerInterval(TimeValue schedulerInterval) {
119124 }
120125
121126 private void triggerForceMerge () {
122- if (isValidForForceMerge () == false ) {
123- return ;
127+ long startTime = System .currentTimeMillis ();
128+ try {
129+ if (isValidForForceMerge () == false ) {
130+ return ;
131+ }
132+ executeForceMergeOnShards ();
133+ } finally {
134+ autoForceMergeMetrics .recordInHistogram (
135+ autoForceMergeMetrics .totalSchedulerExecutionTime ,
136+ (double ) System .currentTimeMillis () - startTime ,
137+ autoForceMergeMetrics .getTags (nodeId , null )
138+ );
124139 }
125- executeForceMergeOnShards ();
126140 }
127141
128142 private boolean isValidForForceMerge () {
129143 if (configurationValidator .hasWarmNodes () == false ) {
130144 resourceTrackers .stop ();
131145 logger .debug ("No warm nodes found. Skipping Auto Force merge." );
146+ autoForceMergeMetrics .incrementCounter (
147+ autoForceMergeMetrics .totalMergesSkipped ,
148+ 1.0 ,
149+ autoForceMergeMetrics .getTags (nodeId , null )
150+ );
132151 return false ;
133152 }
134153 if (nodeValidator .validate ().isAllowed () == false ) {
135154 logger .debug ("Node capacity constraints are not allowing to trigger auto ForceMerge" );
155+ autoForceMergeMetrics .incrementCounter (
156+ autoForceMergeMetrics .skipsFromNodeValidator ,
157+ 1.0 ,
158+ autoForceMergeMetrics .getTags (nodeId , null )
159+ );
160+ autoForceMergeMetrics .incrementCounter (
161+ autoForceMergeMetrics .totalMergesSkipped ,
162+ 1.0 ,
163+ autoForceMergeMetrics .getTags (nodeId , null )
164+ );
136165 return false ;
137166 }
138167 return true ;
@@ -157,11 +186,47 @@ private void executeForceMergeOnShards() {
157186
158187 private void executeForceMergeForShard (IndexShard shard ) {
159188 CompletableFuture .runAsync (() -> {
189+ long startTime = System .currentTimeMillis ();
160190 try {
161191 mergingShards .add (shard .shardId ().getId ());
192+ autoForceMergeMetrics .incrementCounter (
193+ autoForceMergeMetrics .totalMergesTriggered ,
194+ 1.0 ,
195+ autoForceMergeMetrics .getTags (nodeId , null )
196+ );
197+
198+ CommonStats stats = new CommonStats (indicesService .getIndicesQueryCache (), shard , flags );
199+ if (stats .getSegments () != null ) {
200+ autoForceMergeMetrics .incrementCounter (
201+ autoForceMergeMetrics .segmentCount ,
202+ (double ) stats .getSegments ().getCount (),
203+ autoForceMergeMetrics .getTags (nodeId , String .valueOf (shard .shardId ().getId ()))
204+ );
205+ }
206+
207+ long shardSizeInBytes = shard .store ().stats (0L ).sizeInBytes ();
208+ autoForceMergeMetrics .incrementCounter (
209+ autoForceMergeMetrics .shardSize ,
210+ (double ) shardSizeInBytes ,
211+ autoForceMergeMetrics .getTags (nodeId , String .valueOf (shard .shardId ().getId ()))
212+ );
213+
162214 shard .forceMerge (new ForceMergeRequest ().maxNumSegments (forceMergeManagerSettings .getSegmentCount ()));
215+
216+ // Record shard-specific metrics
217+ autoForceMergeMetrics .recordInHistogram (
218+ autoForceMergeMetrics .shardForceMergeLatency ,
219+ (double ) System .currentTimeMillis () - startTime ,
220+ autoForceMergeMetrics .getTags (nodeId , String .valueOf (shard .shardId ().getId ()))
221+ );
222+
163223 logger .debug ("Merging is completed successfully for the shard {}" , shard .shardId ());
164224 } catch (Exception e ) {
225+ autoForceMergeMetrics .incrementCounter (
226+ autoForceMergeMetrics .totalMergesFailed ,
227+ 1.0 ,
228+ autoForceMergeMetrics .getTags (nodeId , null )
229+ );
165230 logger .error ("Error during force merge for shard {}\n Exception: {}" , shard .shardId (), e );
166231 } finally {
167232 mergingShards .remove (shard .shardId ().getId ());
0 commit comments