2121import org .apache .logging .log4j .message .ParameterizedMessage ;
2222import org .apache .logging .log4j .util .Supplier ;
2323import org .elasticsearch .ElasticsearchException ;
24+ import org .elasticsearch .Version ;
2425import org .elasticsearch .action .ActionListener ;
2526import org .elasticsearch .action .admin .indices .flush .FlushRequest ;
2627import org .elasticsearch .action .admin .indices .flush .SyncedFlushResponse ;
4445import org .elasticsearch .index .Index ;
4546import org .elasticsearch .index .IndexNotFoundException ;
4647import org .elasticsearch .index .IndexService ;
48+ import org .elasticsearch .index .engine .CommitStats ;
4749import org .elasticsearch .index .engine .Engine ;
4850import org .elasticsearch .index .shard .IndexEventListener ;
4951import org .elasticsearch .index .shard .IndexShard ;
@@ -199,10 +201,10 @@ private void innerAttemptSyncedFlush(final ShardId shardId, final ClusterState s
199201 return ;
200202 }
201203
202- final ActionListener <Map <String , Engine . CommitId >> commitIdsListener = new ActionListener <Map <String , Engine . CommitId >>() {
204+ final ActionListener <Map <String , PreSyncedFlushResponse >> presyncListener = new ActionListener <Map <String , PreSyncedFlushResponse >>() {
203205 @ Override
204- public void onResponse (final Map <String , Engine . CommitId > commitIds ) {
205- if (commitIds .isEmpty ()) {
206+ public void onResponse (final Map <String , PreSyncedFlushResponse > presyncResponses ) {
207+ if (presyncResponses .isEmpty ()) {
206208 actionListener .onResponse (new ShardsSyncedFlushResult (shardId , totalShards , "all shards failed to commit on pre-sync" ));
207209 return ;
208210 }
@@ -216,7 +218,7 @@ public void onResponse(InFlightOpsResponse response) {
216218 } else {
217219 // 3. now send the sync request to all the shards
218220 String syncId = UUIDs .randomBase64UUID ();
219- sendSyncRequests (syncId , activeShards , state , commitIds , shardId , totalShards , actionListener );
221+ sendSyncRequests (syncId , activeShards , state , presyncResponses , shardId , totalShards , actionListener );
220222 }
221223 }
222224
@@ -236,7 +238,7 @@ public void onFailure(Exception e) {
236238 };
237239
238240 // 1. send pre-sync flushes to all replicas
239- sendPreSyncRequests (activeShards , state , shardId , commitIdsListener );
241+ sendPreSyncRequests (activeShards , state , shardId , presyncListener );
240242 } catch (Exception e ) {
241243 actionListener .onFailure (e );
242244 }
@@ -299,28 +301,49 @@ public String executor() {
299301 }
300302 }
301303
304+ private int numDocsOnPrimary (List <ShardRouting > shards , Map <String , PreSyncedFlushResponse > preSyncResponses ) {
305+ for (ShardRouting shard : shards ) {
306+ if (shard .primary ()) {
307+ final PreSyncedFlushResponse resp = preSyncResponses .get (shard .currentNodeId ());
308+ if (resp != null ) {
309+ return resp .numDocs ;
310+ }
311+ }
312+ }
313+ return PreSyncedFlushResponse .UNKNOWN_NUM_DOCS ;
314+ }
302315
303- void sendSyncRequests (final String syncId , final List <ShardRouting > shards , ClusterState state , Map <String , Engine . CommitId > expectedCommitIds ,
316+ void sendSyncRequests (final String syncId , final List <ShardRouting > shards , ClusterState state , Map <String , PreSyncedFlushResponse > preSyncResponses ,
304317 final ShardId shardId , final int totalShards , final ActionListener <ShardsSyncedFlushResult > listener ) {
305318 final CountDown countDown = new CountDown (shards .size ());
306319 final Map <ShardRouting , ShardSyncedFlushResponse > results = ConcurrentCollections .newConcurrentMap ();
320+ final int numDocsOnPrimary = numDocsOnPrimary (shards , preSyncResponses );
307321 for (final ShardRouting shard : shards ) {
308322 final DiscoveryNode node = state .nodes ().get (shard .currentNodeId ());
309323 if (node == null ) {
310324 logger .trace ("{} is assigned to an unknown node. skipping for sync id [{}]. shard routing {}" , shardId , syncId , shard );
311325 results .put (shard , new ShardSyncedFlushResponse ("unknown node" ));
312- contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
326+ countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
313327 continue ;
314328 }
315- final Engine . CommitId expectedCommitId = expectedCommitIds .get (shard .currentNodeId ());
316- if (expectedCommitId == null ) {
329+ final PreSyncedFlushResponse preSyncedResponse = preSyncResponses .get (shard .currentNodeId ());
330+ if (preSyncedResponse == null ) {
317331 logger .trace ("{} can't resolve expected commit id for current node, skipping for sync id [{}]. shard routing {}" , shardId , syncId , shard );
318332 results .put (shard , new ShardSyncedFlushResponse ("no commit id from pre-sync flush" ));
319- contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
333+ countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
334+ continue ;
335+ }
336+ if (preSyncedResponse .numDocs != numDocsOnPrimary
337+ && preSyncedResponse .numDocs != PreSyncedFlushResponse .UNKNOWN_NUM_DOCS && numDocsOnPrimary != PreSyncedFlushResponse .UNKNOWN_NUM_DOCS ) {
338+ logger .warn ("{} can't to issue sync id [{}] for out of sync replica [{}] with num docs [{}]; num docs on primary [{}]" ,
339+ shardId , syncId , shard , preSyncedResponse .numDocs , numDocsOnPrimary );
340+ results .put (shard , new ShardSyncedFlushResponse ("out of sync replica; " +
341+ "num docs on replica [" + preSyncedResponse .numDocs + "]; num docs on primary [" + numDocsOnPrimary + "]" ));
342+ countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
320343 continue ;
321344 }
322345 logger .trace ("{} sending synced flush request to {}. sync id [{}]." , shardId , shard , syncId );
323- transportService .sendRequest (node , SYNCED_FLUSH_ACTION_NAME , new ShardSyncedFlushRequest (shard .shardId (), syncId , expectedCommitId ),
346+ transportService .sendRequest (node , SYNCED_FLUSH_ACTION_NAME , new ShardSyncedFlushRequest (shard .shardId (), syncId , preSyncedResponse . commitId ),
324347 new TransportResponseHandler <ShardSyncedFlushResponse >() {
325348 @ Override
326349 public ShardSyncedFlushResponse newInstance () {
@@ -332,14 +355,14 @@ public void handleResponse(ShardSyncedFlushResponse response) {
332355 ShardSyncedFlushResponse existing = results .put (shard , response );
333356 assert existing == null : "got two answers for node [" + node + "]" ;
334357 // count after the assert so we won't decrement twice in handleException
335- contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
358+ countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
336359 }
337360
338361 @ Override
339362 public void handleException (TransportException exp ) {
340363 logger .trace ((Supplier <?>) () -> new ParameterizedMessage ("{} error while performing synced flush on [{}], skipping" , shardId , shard ), exp );
341364 results .put (shard , new ShardSyncedFlushResponse (exp .getMessage ()));
342- contDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
365+ countDownAndSendResponseIfDone (syncId , shards , shardId , totalShards , listener , countDown , results );
343366 }
344367
345368 @ Override
@@ -351,8 +374,8 @@ public String executor() {
351374
352375 }
353376
354- private void contDownAndSendResponseIfDone (String syncId , List <ShardRouting > shards , ShardId shardId , int totalShards ,
355- ActionListener <ShardsSyncedFlushResult > listener , CountDown countDown , Map <ShardRouting , ShardSyncedFlushResponse > results ) {
377+ private void countDownAndSendResponseIfDone (String syncId , List <ShardRouting > shards , ShardId shardId , int totalShards ,
378+ ActionListener <ShardsSyncedFlushResult > listener , CountDown countDown , Map <ShardRouting , ShardSyncedFlushResponse > results ) {
356379 if (countDown .countDown ()) {
357380 assert results .size () == shards .size ();
358381 listener .onResponse (new ShardsSyncedFlushResult (shardId , syncId , totalShards , results ));
@@ -362,16 +385,16 @@ private void contDownAndSendResponseIfDone(String syncId, List<ShardRouting> sha
362385 /**
363386 * send presync requests to all started copies of the given shard
364387 */
365- void sendPreSyncRequests (final List <ShardRouting > shards , final ClusterState state , final ShardId shardId , final ActionListener <Map <String , Engine . CommitId >> listener ) {
388+ void sendPreSyncRequests (final List <ShardRouting > shards , final ClusterState state , final ShardId shardId , final ActionListener <Map <String , PreSyncedFlushResponse >> listener ) {
366389 final CountDown countDown = new CountDown (shards .size ());
367- final ConcurrentMap <String , Engine . CommitId > commitIds = ConcurrentCollections .newConcurrentMap ();
390+ final ConcurrentMap <String , PreSyncedFlushResponse > presyncResponses = ConcurrentCollections .newConcurrentMap ();
368391 for (final ShardRouting shard : shards ) {
369392 logger .trace ("{} sending pre-synced flush request to {}" , shardId , shard );
370393 final DiscoveryNode node = state .nodes ().get (shard .currentNodeId ());
371394 if (node == null ) {
372395 logger .trace ("{} shard routing {} refers to an unknown node. skipping." , shardId , shard );
373396 if (countDown .countDown ()) {
374- listener .onResponse (commitIds );
397+ listener .onResponse (presyncResponses );
375398 }
376399 continue ;
377400 }
@@ -383,19 +406,19 @@ public PreSyncedFlushResponse newInstance() {
383406
384407 @ Override
385408 public void handleResponse (PreSyncedFlushResponse response ) {
386- Engine . CommitId existing = commitIds .putIfAbsent (node .getId (), response . commitId () );
409+ PreSyncedFlushResponse existing = presyncResponses .putIfAbsent (node .getId (), response );
387410 assert existing == null : "got two answers for node [" + node + "]" ;
388411 // count after the assert so we won't decrement twice in handleException
389412 if (countDown .countDown ()) {
390- listener .onResponse (commitIds );
413+ listener .onResponse (presyncResponses );
391414 }
392415 }
393416
394417 @ Override
395418 public void handleException (TransportException exp ) {
396419 logger .trace ((Supplier <?>) () -> new ParameterizedMessage ("{} error while performing pre synced flush on [{}], skipping" , shardId , shard ), exp );
397420 if (countDown .countDown ()) {
398- listener .onResponse (commitIds );
421+ listener .onResponse (presyncResponses );
399422 }
400423 }
401424
@@ -411,9 +434,11 @@ private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest
411434 IndexShard indexShard = indicesService .indexServiceSafe (request .shardId ().getIndex ()).getShard (request .shardId ().id ());
412435 FlushRequest flushRequest = new FlushRequest ().force (false ).waitIfOngoing (true );
413436 logger .trace ("{} performing pre sync flush" , request .shardId ());
414- Engine .CommitId commitId = indexShard .flush (flushRequest );
415- logger .trace ("{} pre sync flush done. commit id {}" , request .shardId (), commitId );
416- return new PreSyncedFlushResponse (commitId );
437+ indexShard .flush (flushRequest );
438+ final CommitStats commitStats = indexShard .commitStats ();
439+ final Engine .CommitId commitId = commitStats .getRawCommitId ();
440+ logger .trace ("{} pre sync flush done. commit id {}, num docs {}" , request .shardId (), commitId , commitStats .getNumDocs ());
441+ return new PreSyncedFlushResponse (commitId , commitStats .getNumDocs ());
417442 }
418443
419444 private ShardSyncedFlushResponse performSyncedFlush (ShardSyncedFlushRequest request ) {
@@ -483,30 +508,45 @@ public ShardId shardId() {
483508 * Response for first step of synced flush (flush) for one shard copy
484509 */
485510 static final class PreSyncedFlushResponse extends TransportResponse {
511+ static final int UNKNOWN_NUM_DOCS = -1 ;
486512
487513 Engine .CommitId commitId ;
514+ int numDocs ;
488515
489516 PreSyncedFlushResponse () {
490517 }
491518
492- PreSyncedFlushResponse (Engine .CommitId commitId ) {
519+ PreSyncedFlushResponse (Engine .CommitId commitId , int numDocs ) {
493520 this .commitId = commitId ;
521+ this .numDocs = numDocs ;
494522 }
495523
496- public Engine .CommitId commitId () {
524+ Engine .CommitId commitId () {
497525 return commitId ;
498526 }
499527
528+ int numDocs () {
529+ return numDocs ;
530+ }
531+
500532 @ Override
501533 public void readFrom (StreamInput in ) throws IOException {
502534 super .readFrom (in );
503535 commitId = new Engine .CommitId (in );
536+ if (in .getVersion ().onOrAfter (Version .V_7_0_0_alpha1 )) {
537+ numDocs = in .readInt ();
538+ } else {
539+ numDocs = UNKNOWN_NUM_DOCS ;
540+ }
504541 }
505542
506543 @ Override
507544 public void writeTo (StreamOutput out ) throws IOException {
508545 super .writeTo (out );
509546 commitId .writeTo (out );
547+ if (out .getVersion ().onOrAfter (Version .V_7_0_0_alpha1 )) {
548+ out .writeInt (numDocs );
549+ }
510550 }
511551 }
512552
0 commit comments