diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 8541a1f5e554b..726a1f0159f3e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -339,6 +339,8 @@ Runnable getGlobalCheckpointSyncer() { private final boolean isTimeSeriesIndex; private final RemoteRefreshSegmentPressureService remoteRefreshSegmentPressureService; + private final List internalRefreshListener = new ArrayList<>(); + public IndexShard( final ShardRouting shardRouting, final IndexSettings indexSettings, @@ -813,6 +815,13 @@ public void relocated( if (syncTranslog) { maybeSync(); } + + // Ensures all in-flight remote store operations drain, before we perform the handoff. + internalRefreshListener.stream() + .filter(refreshListener -> refreshListener instanceof Closeable) + .map(refreshListener -> (Closeable) refreshListener) + .close(); + // no shard operation permits are being held here, move state from started to relocated assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED : "in-flight operations in progress while moving shard state to relocated"; @@ -3660,7 +3669,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro } }; - final List internalRefreshListener = new ArrayList<>(); + internalRefreshListener.clear(); internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric)); if (this.checkpointPublisher != null && shardRouting.primary() && indexSettings.isSegRepLocalEnabled()) { internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));