Skip to content

Commit ff244c0

Browse files
committed
Ensure translog upload completes before primary mode change & reset new primary engine
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 5322ad9 commit ff244c0

File tree

3 files changed

+48
-14
lines changed

3 files changed

+48
-14
lines changed

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -762,12 +762,20 @@ public void relocated(
762762
final String targetAllocationId,
763763
final Consumer<ReplicationTracker.PrimaryContext> consumer,
764764
final Consumer<StepListener> performSegRep,
765-
final ActionListener<Void> listener
765+
final ActionListener<Void> listener,
766+
final boolean syncTranslog
766767
) throws IllegalIndexShardStateException, IllegalStateException, InterruptedException {
767768
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
768769
try (Releasable forceRefreshes = refreshListeners.forceRefreshes()) {
769770
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
770771
forceRefreshes.close();
772+
773+
// Since all the index permits are acquired at this point, the translog buffer will not change.
774+
// It is safe to perform sync of translogs now as this will ensure for remote-backed indexes, the
775+
// translogs has been uploaded to the remote store.
776+
if (syncTranslog) {
777+
maybeSync();
778+
}
771779
// no shard operation permits are being held here, move state from started to relocated
772780
assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED
773781
: "in-flight operations in progress while moving shard state to relocated";
@@ -808,6 +816,16 @@ public void relocated(
808816
}
809817
}
810818

819+
private void maybeSync() {
820+
try {
821+
if (isSyncNeeded()) {
822+
sync();
823+
}
824+
} catch (IOException e) {
825+
logger.warn("failed to sync translog", e);
826+
}
827+
}
828+
811829
private void verifyRelocatingState() {
812830
if (state != IndexShardState.STARTED) {
813831
throw new IndexShardNotStartedException(shardId, state);

server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.opensearch.common.util.concurrent.ListenableFuture;
5959
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
6060
import org.opensearch.core.internal.io.IOUtils;
61+
import org.opensearch.index.IndexSettings;
6162
import org.opensearch.index.engine.RecoveryEngineException;
6263
import org.opensearch.index.seqno.RetentionLease;
6364
import org.opensearch.index.seqno.RetentionLeaseNotFoundException;
@@ -821,17 +822,21 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
821822
final StepListener<Void> handoffListener = new StepListener<>();
822823
if (request.isPrimaryRelocation()) {
823824
logger.trace("performing relocation hand-off");
824-
final Consumer<StepListener> forceSegRepConsumer = shard.indexSettings().isSegRepEnabled()
825+
final IndexSettings indexSettings = shard.indexSettings();
826+
final Consumer<StepListener> forceSegRepConsumer = indexSettings.isSegRepEnabled()
825827
? recoveryTarget::forceSegmentFileSync
826828
: res -> res.onResponse(null);
829+
final boolean syncTranslog = indexSettings.isRemoteTranslogStoreEnabled()
830+
&& Translog.Durability.ASYNC == indexSettings.getTranslogDurability();
827831
// TODO: make relocated async
828832
// this acquires all IndexShard operation permits and will thus delay new recoveries until it is done
829833
cancellableThreads.execute(
830834
() -> shard.relocated(
831835
request.targetAllocationId(),
832836
recoveryTarget::handoffPrimaryContext,
833837
forceSegRepConsumer,
834-
handoffListener
838+
handoffListener,
839+
syncTranslog
835840
)
836841
);
837842
/*

server/src/test/java/org/opensearch/index/shard/IndexShardTests.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,7 +1011,8 @@ public void onResponse(Void unused) {
10111011
public void onFailure(Exception e) {
10121012
fail(e.toString());
10131013
}
1014-
}
1014+
},
1015+
false
10151016
);
10161017
engineClosed = false;
10171018
break;
@@ -2039,7 +2040,8 @@ public void onResponse(Void unused) {
20392040
public void onFailure(Exception e) {
20402041
fail(e.toString());
20412042
}
2042-
}
2043+
},
2044+
false
20432045
);
20442046
} catch (InterruptedException e) {
20452047
throw new RuntimeException(e);
@@ -2086,7 +2088,8 @@ public void onResponse(Void unused) {
20862088
public void onFailure(Exception e) {
20872089
fail(e.toString());
20882090
}
2089-
}
2091+
},
2092+
false
20902093
);
20912094
} catch (InterruptedException e) {
20922095
throw new RuntimeException(e);
@@ -2190,7 +2193,8 @@ public void onFailure(Exception e) {
21902193
relocated.set(false);
21912194
fail(e.toString());
21922195
}
2193-
}
2196+
},
2197+
false
21942198
);
21952199
} catch (InterruptedException e) {
21962200
throw new RuntimeException(e);
@@ -2239,7 +2243,8 @@ public void onResponse(Void unused) {
22392243
public void onFailure(Exception e) {
22402244
fail(e.toString());
22412245
}
2242-
}
2246+
},
2247+
false
22432248
);
22442249
expectThrows(IllegalIndexShardStateException.class, () -> IndexShardTestCase.updateRoutingEntry(shard, originalRouting));
22452250
closeShards(shard);
@@ -2265,7 +2270,8 @@ public void onFailure(Exception e) {
22652270
assertTrue(ExceptionsHelper.unwrapCause(e) instanceof ReplicationFailedException);
22662271
assertEquals(e.getMessage(), "Segment replication failed");
22672272
}
2268-
}
2273+
},
2274+
false
22692275
);
22702276
closeShards(shard);
22712277
}
@@ -2290,7 +2296,8 @@ public void onResponse(Void unused) {
22902296
public void onFailure(Exception e) {
22912297
assertTrue(ExceptionsHelper.unwrapCause(e) instanceof IllegalIndexShardStateException);
22922298
}
2293-
}
2299+
},
2300+
false
22942301
);
22952302
closeShards(shard);
22962303
}
@@ -2325,7 +2332,8 @@ public void onResponse(Void unused) {
23252332
public void onFailure(Exception e) {
23262333
relocationException.set(e);
23272334
}
2328-
}
2335+
},
2336+
false
23292337
);
23302338
}
23312339
});
@@ -2396,7 +2404,8 @@ public void onFailure(Exception e) {
23962404
)
23972405
);
23982406
}
2399-
}
2407+
},
2408+
false
24002409
);
24012410

24022411
assertFalse(relocated.get());
@@ -2414,7 +2423,8 @@ public void onResponse(Void unused) {
24142423
public void onFailure(Exception e) {
24152424
fail(e.toString());
24162425
}
2417-
}
2426+
},
2427+
false
24182428
);
24192429
assertTrue(relocated.get());
24202430
closeShards(shard);
@@ -2822,7 +2832,8 @@ public void onResponse(Void unused) {
28222832
public void onFailure(Exception e) {
28232833
fail(e.toString());
28242834
}
2825-
}
2835+
},
2836+
false
28262837
);
28272838
assertTrue(shard.isRelocatedPrimary());
28282839
try {

0 commit comments

Comments
 (0)