@@ -456,6 +456,10 @@ public void onFailure(Exception e) {
456456 }
457457 }
458458
459+ IndexShard getPrimaryShard () {
460+ return replicationGroup .primary ;
461+ }
462+
459463 protected abstract PrimaryResult performOnPrimary (IndexShard primary , Request request ) throws Exception ;
460464
461465 protected abstract void performOnReplica (ReplicaRequest request , IndexShard replica ) throws Exception ;
@@ -592,7 +596,7 @@ protected PrimaryResult performOnPrimary(IndexShard primary, BulkShardRequest re
592596
593597 @ Override
594598 protected void performOnReplica (BulkShardRequest request , IndexShard replica ) throws Exception {
595- executeShardBulkOnReplica (replica , request );
599+ executeShardBulkOnReplica (request , replica , getPrimaryShard (). getPrimaryTerm (), getPrimaryShard (). getGlobalCheckpoint () );
596600 }
597601 }
598602
@@ -602,15 +606,24 @@ private TransportWriteAction.WritePrimaryResult<BulkShardRequest, BulkShardRespo
602606 ((IndexRequest ) itemRequest .request ()).process (Version .CURRENT , null , index .getName ());
603607 }
604608 }
605- final TransportWriteAction .WritePrimaryResult <BulkShardRequest , BulkShardResponse > result =
606- TransportShardBulkAction .performOnPrimary (request , primary , null ,
607- System ::currentTimeMillis , new TransportShardBulkActionTests .NoopMappingUpdatePerformer ());
609+ final PlainActionFuture <Releasable > permitAcquiredFuture = new PlainActionFuture <>();
610+ primary .acquirePrimaryOperationPermit (permitAcquiredFuture , ThreadPool .Names .SAME , request );
611+ final TransportWriteAction .WritePrimaryResult <BulkShardRequest , BulkShardResponse > result ;
612+ try (Releasable ignored = permitAcquiredFuture .actionGet ()) {
613+ result = TransportShardBulkAction .performOnPrimary (request , primary , null , System ::currentTimeMillis ,
614+ new TransportShardBulkActionTests .NoopMappingUpdatePerformer ());
615+ }
608616 TransportWriteActionTestHelper .performPostWriteActions (primary , request , result .location , logger );
609617 return result ;
610618 }
611619
612- private void executeShardBulkOnReplica (IndexShard replica , BulkShardRequest request ) throws Exception {
613- final Translog .Location location = TransportShardBulkAction .performOnReplica (request , replica );
620+ private void executeShardBulkOnReplica (BulkShardRequest request , IndexShard replica , long operationPrimaryTerm , long globalCheckpointOnPrimary ) throws Exception {
621+ final PlainActionFuture <Releasable > permitAcquiredFuture = new PlainActionFuture <>();
622+ replica .acquireReplicaOperationPermit (operationPrimaryTerm , globalCheckpointOnPrimary , permitAcquiredFuture , ThreadPool .Names .SAME , request );
623+ final Translog .Location location ;
624+ try (Releasable ignored = permitAcquiredFuture .actionGet ()) {
625+ location = TransportShardBulkAction .performOnReplica (request , replica );
626+ }
614627 TransportWriteActionTestHelper .performPostWriteActions (replica , request , location , logger );
615628 }
616629
@@ -630,8 +643,8 @@ BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws
630643 /**
631644 * indexes the given requests on the supplied replica shard
632645 */
633- void indexOnReplica (BulkShardRequest request , IndexShard replica ) throws Exception {
634- executeShardBulkOnReplica (replica , request );
646+ void indexOnReplica (BulkShardRequest request , ReplicationGroup group , IndexShard replica ) throws Exception {
647+ executeShardBulkOnReplica (request , replica , group . primary . getPrimaryTerm (), group . primary . getGlobalCheckpoint () );
635648 }
636649
637650 class GlobalCheckpointSync extends ReplicationAction <
0 commit comments