55 */
66package org .elasticsearch .xpack .core .indexlifecycle ;
77
8+ import org .apache .logging .log4j .LogManager ;
9+ import org .apache .logging .log4j .Logger ;
810import org .elasticsearch .action .ActionListener ;
11+ import org .elasticsearch .action .admin .indices .segments .IndexSegments ;
912import org .elasticsearch .action .admin .indices .segments .IndicesSegmentsRequest ;
13+ import org .elasticsearch .action .admin .indices .segments .ShardSegments ;
1014import org .elasticsearch .client .Client ;
1115import org .elasticsearch .cluster .metadata .IndexMetaData ;
16+ import org .elasticsearch .cluster .routing .ShardRouting ;
1217import org .elasticsearch .common .ParseField ;
1318import org .elasticsearch .common .Strings ;
1419import org .elasticsearch .common .xcontent .ConstructingObjectParser ;
1722
1823import java .io .IOException ;
1924import java .util .Arrays ;
25+ import java .util .List ;
26+ import java .util .Map ;
2027import java .util .Objects ;
21- import java .util .stream .StreamSupport ;
28+ import java .util .stream .Collectors ;
2229
2330/**
2431 * This {@link Step} evaluates whether force_merge was successful by checking the segment count.
2532 */
2633public class SegmentCountStep extends AsyncWaitStep {
34+
35+ private static final Logger logger = LogManager .getLogger (SegmentCountStep .class );
2736 public static final String NAME = "segment-count" ;
2837
2938 private final int maxNumSegments ;
@@ -41,10 +50,19 @@ public int getMaxNumSegments() {
4150 public void evaluateCondition (IndexMetaData indexMetaData , Listener listener ) {
4251 getClient ().admin ().indices ().segments (new IndicesSegmentsRequest (indexMetaData .getIndex ().getName ()),
4352 ActionListener .wrap (response -> {
44- long numberShardsLeftToMerge =
45- StreamSupport .stream (response .getIndices ().get (indexMetaData .getIndex ().getName ()).spliterator (), false )
46- .filter (iss -> Arrays .stream (iss .getShards ()).anyMatch (p -> p .getSegments ().size () > maxNumSegments )).count ();
47- listener .onResponse (numberShardsLeftToMerge == 0 , new Info (numberShardsLeftToMerge ));
53+ IndexSegments segments = response .getIndices ().get (indexMetaData .getIndex ().getName ());
54+ List <ShardSegments > unmergedShards = segments .getShards ().values ().stream ()
55+ .flatMap (iss -> Arrays .stream (iss .getShards ()))
56+ .filter (shardSegments -> shardSegments .getSegments ().size () > maxNumSegments )
57+ .collect (Collectors .toList ());
58+ if (unmergedShards .size () > 0 ) {
59+ Map <ShardRouting , Integer > unmergedShardCounts = unmergedShards .stream ()
60+ .collect (Collectors .toMap (ShardSegments ::getShardRouting , ss -> ss .getSegments ().size ()));
61+ logger .info ("[{}] best effort force merge to [{}] segments did not succeed for {} shards: {}" ,
62+ indexMetaData .getIndex ().getName (), maxNumSegments , unmergedShards .size (), unmergedShardCounts );
63+ }
64+ // Force merging is best effort, so always return true that the condition has been met.
65+ listener .onResponse (true , new Info (unmergedShards .size ()));
4866 }, listener ::onFailure ));
4967 }
5068
@@ -90,8 +108,12 @@ public long getNumberShardsLeftToMerge() {
90108 @ Override
91109 public XContentBuilder toXContent (XContentBuilder builder , Params params ) throws IOException {
92110 builder .startObject ();
93- builder .field (MESSAGE .getPreferredName (),
94- "Waiting for [" + numberShardsLeftToMerge + "] shards " + "to forcemerge" );
111+ if (numberShardsLeftToMerge == 0 ) {
112+ builder .field (MESSAGE .getPreferredName (), "all shards force merged successfully" );
113+ } else {
114+ builder .field (MESSAGE .getPreferredName (),
115+ "[" + numberShardsLeftToMerge + "] shards did not successfully force merge" );
116+ }
95117 builder .field (SHARDS_TO_MERGE .getPreferredName (), numberShardsLeftToMerge );
96118 builder .endObject ();
97119 return builder ;
0 commit comments