Skip to content

Commit 08fe462

Browse files
author
Kamal Nayan
committed
Made the segment sync to remote async
Signed-off-by: Kamal Nayan <askkamal@amazon.com> Updated the runAfterRefreshWithPermit code and fixed Tests Signed-off-by: Kamal Nayan <askkamal@amazon.com>
1 parent d723db8 commit 08fe462

File tree

10 files changed

+108
-11
lines changed

10 files changed

+108
-11
lines changed

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2197,21 +2197,61 @@ public void waitForRemoteStoreSync() throws IOException {
21972197
waitForRemoteStoreSync(() -> {});
21982198
}
21992199

2200+
public void waitForRemoteStoreSync(Runnable onProgress) throws IOException {
2201+
waitForRemoteStoreSync(
2202+
onProgress,
2203+
getRecoverySettings().internalRemoteUploadTimeout(),
2204+
TimeValue.timeValueSeconds(30)
2205+
);
2206+
}
2207+
2208+
/**
2209+
* Waits for remote store sync with custom timeout and polling interval.
2210+
*
2211+
* @param timeout Maximum duration to wait for sync completion
2212+
* @param pollInterval Time between sync status checks
2213+
* @throws IOException If sync fails to complete within timeout
2214+
*/
2215+
public void waitForRemoteStoreSyncWithTimeout(TimeValue timeout, TimeValue pollInterval) throws IOException {
2216+
if (indexSettings.isAssignedOnRemoteNode() == false) {
2217+
return;
2218+
}
2219+
2220+
waitForRemoteStoreSync(
2221+
() -> {},
2222+
timeout,
2223+
pollInterval
2224+
);
2225+
}
2226+
2227+
/**
2228+
* Quick check for remote store sync using default short timeout (5s) and polling interval (100ms).
2229+
*
2230+
* @throws IOException If sync fails to complete within timeout
2231+
*/
2232+
public void waitForRemoteStoreSyncWithTimeout() throws IOException {
2233+
waitForRemoteStoreSyncWithTimeout(
2234+
TimeValue.timeValueSeconds(5),
2235+
TimeValue.timeValueMillis(100)
2236+
);
2237+
}
2238+
22002239
/*
22012240
Blocks the calling thread, waiting for the remote store to get synced till internal Remote Upload Timeout
22022241
Calls onProgress on seeing an increased file count on remote
22032242
Throws IOException if the remote store is not synced within the timeout
22042243
*/
2205-
public void waitForRemoteStoreSync(Runnable onProgress) throws IOException {
2244+
private void waitForRemoteStoreSync(Runnable onProgress, TimeValue remoteUploadTimeout, TimeValue waitBetweenChecks) throws IOException {
22062245
assert indexSettings.isAssignedOnRemoteNode();
22072246
RemoteSegmentStoreDirectory directory = getRemoteDirectory();
22082247
int segmentUploadeCount = 0;
2209-
if (shardRouting.primary() == false) {
2248+
if (shardRouting.primary() == false || remoteUploadTimeout == null || waitBetweenChecks == null) {
22102249
return;
22112250
}
2251+
22122252
long startNanos = System.nanoTime();
22132253

2214-
while (System.nanoTime() - startNanos < getRecoverySettings().internalRemoteUploadTimeout().nanos()) {
2254+
while (System.nanoTime() - startNanos < remoteUploadTimeout.nanos()) {
22152255
try {
22162256
if (isRemoteSegmentStoreInSync()) {
22172257
return;
@@ -2222,7 +2262,7 @@ public void waitForRemoteStoreSync(Runnable onProgress) throws IOException {
22222262
segmentUploadeCount = directory.getSegmentsUploadedToRemoteStore().size();
22232263
}
22242264
try {
2225-
Thread.sleep(TimeValue.timeValueSeconds(30).millis());
2265+
Thread.sleep(waitBetweenChecks.millis());
22262266
} catch (InterruptedException ie) {
22272267
throw new OpenSearchException("Interrupted waiting for completion of [{}]", ie);
22282268
}

server/src/main/java/org/opensearch/index/shard/ReleasableRetryableRefreshListener.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,13 @@ public ReleasableRetryableRefreshListener(ThreadPool threadPool) {
5959
}
6060

6161
@Override
62-
public final void afterRefresh(boolean didRefresh) throws IOException {
62+
public void afterRefresh(boolean didRefresh) throws IOException {
6363
if (closed.get()) {
6464
return;
6565
}
6666
runAfterRefreshExactlyOnce(didRefresh);
67-
runAfterRefreshWithPermit(didRefresh, () -> {});
67+
runAfterRefreshWithPermit(didRefresh, () -> {
68+
});
6869
}
6970

7071
/**
@@ -157,7 +158,7 @@ protected boolean isRetryEnabled() {
157158
* The synchronised block ensures that if there is a retry or afterRefresh waiting, then it waits until the previous
158159
* execution finishes.
159160
*/
160-
private synchronized void runAfterRefreshWithPermit(boolean didRefresh, Runnable runFinally) {
161+
protected synchronized void runAfterRefreshWithPermit(boolean didRefresh, Runnable runFinally) {
161162
if (closed.get()) {
162163
return;
163164
}

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,20 @@ protected void runAfterRefreshExactlyOnce(boolean didRefresh) {
145145
}
146146
}
147147

148+
@Override
149+
public final void afterRefresh(boolean didRefresh) throws IOException {
150+
if (isClosed()) {
151+
return;
152+
}
153+
runAfterRefreshExactlyOnce(didRefresh);
154+
this.indexShard.getThreadPool().schedule(
155+
() -> runAfterRefreshWithPermit(didRefresh, () -> {
156+
}),
157+
new TimeValue(1, TimeUnit.SECONDS),
158+
getRemoteRefreshThreadPoolName()
159+
);
160+
}
161+
148162
/**
149163
* Upload new segment files created as part of the last refresh to the remote segment store.
150164
* This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded.
@@ -380,6 +394,11 @@ protected String getRetryThreadPoolName() {
380394
return ThreadPool.Names.REMOTE_REFRESH_RETRY;
381395
}
382396

397+
private String getRemoteRefreshThreadPoolName()
398+
{
399+
return ThreadPool.Names.REMOTE_REFRESH_SEGMENT_SYNC;
400+
}
401+
383402
private boolean isRefreshAfterCommit() throws IOException {
384403
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
385404
return (lastCommittedLocalSegmentFileName != null

server/src/main/java/org/opensearch/threadpool/ThreadPool.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ public static class Names {
124124
public static final String REMOTE_STATE_READ = "remote_state_read";
125125
public static final String INDEX_SEARCHER = "index_searcher";
126126
public static final String REMOTE_STATE_CHECKSUM = "remote_state_checksum";
127+
public static final String REMOTE_REFRESH_SEGMENT_SYNC = "remote_refresh_segment_sync";
127128
}
128129

129130
static Set<String> scalingThreadPoolKeys = new HashSet<>(Arrays.asList("max", "core"));
@@ -201,6 +202,7 @@ public static ThreadPoolType fromType(String type) {
201202
map.put(Names.REMOTE_STATE_READ, ThreadPoolType.FIXED);
202203
map.put(Names.INDEX_SEARCHER, ThreadPoolType.RESIZABLE);
203204
map.put(Names.REMOTE_STATE_CHECKSUM, ThreadPoolType.FIXED);
205+
map.put(Names.REMOTE_REFRESH_SEGMENT_SYNC, ThreadPoolType.SCALING);
204206
THREAD_POOL_TYPES = Collections.unmodifiableMap(map);
205207
}
206208

@@ -325,6 +327,10 @@ public ThreadPool(
325327
Names.REMOTE_STATE_CHECKSUM,
326328
new FixedExecutorBuilder(settings, Names.REMOTE_STATE_CHECKSUM, ClusterStateChecksum.COMPONENT_SIZE, 1000)
327329
);
330+
builders.put(
331+
Names.REMOTE_REFRESH_SEGMENT_SYNC,
332+
new ScalingExecutorBuilder(Names.REMOTE_REFRESH_SEGMENT_SYNC, 1, halfProc, TimeValue.timeValueMinutes(5))
333+
);
328334

329335
for (final ExecutorBuilder<?> builder : customBuilders) {
330336
if (builders.containsKey(builder.name())) {

server/src/test/java/org/opensearch/index/shard/IndexShardTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2855,6 +2855,7 @@ public void testSyncSegmentsFromGivenRemoteSegmentStore() throws IOException {
28552855
indexDoc(source, "_doc", "1");
28562856
indexDoc(source, "_doc", "2");
28572857
source.refresh("test");
2858+
source.waitForRemoteStoreSyncWithTimeout();
28582859
assertTrue("At lease one remote sync should have been completed", source.isRemoteSegmentStoreInSync());
28592860
assertDocs(source, "1", "2");
28602861
indexDoc(source, "_doc", "3");
@@ -2866,7 +2867,7 @@ public void testSyncSegmentsFromGivenRemoteSegmentStore() throws IOException {
28662867

28672868
indexDoc(source, "_doc", "4");
28682869
source.refresh("test");
2869-
2870+
source.waitForRemoteStoreSyncWithTimeout();
28702871
long primaryTerm;
28712872
long commitGeneration;
28722873
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = source.getSegmentInfosSnapshot()) {
@@ -3027,6 +3028,7 @@ public void testRestoreSearchOnlyShardFromStore() throws IOException {
30273028
indexDoc(primary, "_doc", "1");
30283029
indexDoc(primary, "_doc", "2");
30293030
primary.refresh("test");
3031+
primary.waitForRemoteStoreSyncWithTimeout();
30303032
assertDocs(primary, "1", "2");
30313033

30323034
ShardRouting searchReplicaShardRouting = TestShardRouting.newShardRouting(
@@ -3073,6 +3075,7 @@ public void testRestoreSearchOnlyShardFromStoreOnNewNode() throws IOException {
30733075
indexDoc(primary, "_doc", "1");
30743076
indexDoc(primary, "_doc", "2");
30753077
primary.refresh("test");
3078+
primary.waitForRemoteStoreSyncWithTimeout();
30763079
assertDocs(primary, "1", "2");
30773080

30783081
// Setting the RecoverySource to ExistingStoreRecoverySource to simulate a shard initializing on a new node
@@ -3111,6 +3114,7 @@ public void testSearchShardDoesNotStartIfCorruptedMarkerIsPresent() throws Excep
31113114
indexDoc(primary, "_doc", "1");
31123115
indexDoc(primary, "_doc", "2");
31133116
primary.refresh("test");
3117+
primary.waitForRemoteStoreSyncWithTimeout();
31143118
assertDocs(primary, "1", "2");
31153119

31163120
// start search replica

server/src/test/java/org/opensearch/index/shard/RemoteIndexShardTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ public void testReplicaCommitsInfosBytesOnRecovery() throws Exception {
216216
shards.refresh("test");
217217

218218
final IndexShard primary = shards.getPrimary();
219+
primary.waitForRemoteStoreSyncWithTimeout();
220+
219221
final Engine primaryEngine = getEngine(primary);
220222
assertNotNull(primaryEngine);
221223
final SegmentInfos latestCommit = SegmentInfos.readLatestCommit(primary.store().directory());
@@ -234,7 +236,7 @@ public void testReplicaCommitsInfosBytesOnRecovery() throws Exception {
234236
);
235237
}
236238

237-
final IndexShard replica = shards.addReplica(remotePath);
239+
IndexShard replica = shards.addReplica(remotePath);
238240
replica.store().createEmpty(Version.LATEST);
239241
assertEquals(
240242
"Replica starts at empty segment 2",
@@ -292,6 +294,7 @@ public void testPrimaryRestart_PrimaryHasExtraCommits() throws Exception {
292294
} else {
293295
primary.refresh("test");
294296
}
297+
primary.waitForRemoteStoreSyncWithTimeout();
295298
assertDocCount(primary, 10);
296299
// get a metadata map - we'll use segrep diff to ensure segments on reader are identical after restart.
297300
final Map<String, StoreFileMetadata> metadataBeforeRestart = primary.getSegmentMetadataMap();
@@ -322,6 +325,7 @@ public void testRepicaCleansUpOldCommitsWhenReceivingNew() throws Exception {
322325
final SegmentInfos initialCommit = store.readLastCommittedSegmentsInfo();
323326
shards.indexDocs(1);
324327
flushShard(primary);
328+
primary.waitForRemoteStoreSyncWithTimeout();
325329
replicateSegments(primary, shards.getReplicas());
326330

327331
assertDocCount(primary, 1);
@@ -332,13 +336,15 @@ public void testRepicaCleansUpOldCommitsWhenReceivingNew() throws Exception {
332336

333337
shards.indexDocs(1);
334338
primary.refresh("test");
339+
primary.waitForRemoteStoreSyncWithTimeout();
335340
replicateSegments(primary, shards.getReplicas());
336341
assertDocCount(replica, 2);
337342
assertSingleSegmentFile(replica);
338343
assertEquals(store.readLastCommittedSegmentsInfo().getGeneration(), secondCommit.getGeneration());
339344

340345
shards.indexDocs(1);
341346
flushShard(primary);
347+
primary.waitForRemoteStoreSyncWithTimeout();
342348
replicateSegments(primary, shards.getReplicas());
343349
assertDocCount(replica, 3);
344350
assertSingleSegmentFile(replica);
@@ -363,6 +369,7 @@ public void testPrimaryRestart() throws Exception {
363369
} else {
364370
primary.refresh("test");
365371
}
372+
primary.waitForRemoteStoreSyncWithTimeout();
366373
assertDocCount(primary, 10);
367374
// get a metadata map - we'll use segrep diff to ensure segments on reader are identical after restart.
368375
final Map<String, StoreFileMetadata> metadataBeforeRestart = primary.getSegmentMetadataMap();
@@ -392,6 +399,7 @@ public void testSegRepSucceedsOnPreviousCopiedFiles() throws Exception {
392399

393400
shards.indexDocs(10);
394401
primary.refresh("Test");
402+
primary.waitForRemoteStoreSyncWithTimeout();
395403

396404
final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
397405
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
@@ -486,6 +494,7 @@ public void testNoFailuresOnFileReads() throws Exception {
486494
final int docCount = 10;
487495
shards.indexDocs(docCount);
488496
primary.refresh("Test");
497+
primary.waitForRemoteStoreSyncWithTimeout();
489498

490499
final SegmentReplicationSourceFactory sourceFactory = mock(SegmentReplicationSourceFactory.class);
491500
final SegmentReplicationTargetService targetService = newTargetService(sourceFactory);
@@ -538,6 +547,7 @@ public void onReplicationFailure(
538547
// Ingest more data and start next round of segment replication
539548
shards.indexDocs(docCount);
540549
primary.refresh("Post corruption");
550+
primary.waitForRemoteStoreSyncWithTimeout();
541551
replicateSegments(primary, List.of(replica));
542552

543553
assertDocCount(primary, 2 * docCount);

server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ public void testRemoteDirectoryInitThrowsException() throws IOException {
251251
public void testAfterRefresh() throws IOException {
252252
setup(true, 3);
253253
assertDocs(indexShard, "1", "2", "3");
254+
indexShard.waitForRemoteStoreSyncWithTimeout();
254255

255256
try (Store remoteStore = indexShard.remoteStore()) {
256257
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
@@ -268,7 +269,7 @@ public void testAfterRefresh() throws IOException {
268269
public void testAfterCommit() throws IOException {
269270
setup(true, 3);
270271
assertDocs(indexShard, "1", "2", "3");
271-
flushShard(indexShard);
272+
indexShard.waitForRemoteStoreSyncWithTimeout();
272273

273274
try (Store remoteStore = indexShard.remoteStore()) {
274275
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
@@ -293,6 +294,7 @@ public void testRefreshAfterCommit() throws IOException {
293294

294295
indexDocs(8, 4);
295296
indexShard.refresh("test");
297+
indexShard.waitForRemoteStoreSyncWithTimeout();
296298

297299
try (Store remoteStore = indexShard.remoteStore()) {
298300
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
@@ -316,6 +318,8 @@ public void testAfterMultipleCommits() throws IOException {
316318
flushShard(indexShard);
317319
}
318320

321+
indexShard.waitForRemoteStoreSyncWithTimeout();
322+
319323
try (Store remoteStore = indexShard.remoteStore()) {
320324
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
321325
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();
@@ -875,6 +879,7 @@ private void verifyUploadedSegments(RemoteSegmentStoreDirectory remoteSegmentSto
875879
public void testRemoteSegmentStoreNotInSync() throws IOException {
876880
setup(true, 3);
877881
remoteStoreRefreshListener.afterRefresh(true);
882+
indexShard.waitForRemoteStoreSyncWithTimeout();
878883
try (Store remoteStore = indexShard.remoteStore()) {
879884
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory =
880885
(RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();

server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ public void testReplication() throws Exception {
141141
int numDocs = randomIntBetween(10, 20);
142142
shards.indexDocs(numDocs);
143143
primaryShard.refresh("test");
144+
primaryShard.waitForRemoteStoreSyncWithTimeout();
144145
flushShard(primaryShard);
145146
replicateSegments(primaryShard, List.of(replicaShard));
146147

@@ -205,6 +206,7 @@ public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception {
205206
int numDocs = randomIntBetween(10, 20);
206207
shards.indexDocs(numDocs);
207208
primary.refresh("test");
209+
primary.waitForRemoteStoreSyncWithTimeout();
208210
replicateSegments(primary, List.of(replica));
209211

210212
replicaTuple = replica.getLatestSegmentInfosAndCheckpoint();
@@ -305,6 +307,7 @@ public void testSegmentReplication_With_ReaderClosedConcurrently() throws Except
305307
assertEqualTranslogOperations(shards, primaryShard);
306308
primaryShard.refresh("Test");
307309
primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true));
310+
primaryShard.waitForRemoteStoreSyncWithTimeout();
308311
replicateSegments(primaryShard, shards.getReplicas());
309312

310313
IndexShard spyShard = spy(replicaShard);
@@ -318,6 +321,7 @@ public void testSegmentReplication_With_ReaderClosedConcurrently() throws Except
318321
}
319322
assertEqualTranslogOperations(shards, primaryShard);
320323
primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true));
324+
primaryShard.waitForRemoteStoreSyncWithTimeout();
321325
replicateSegments(primaryShard, shards.getReplicas());
322326

323327
// Step 3. Perform force merge down to 1 segment on primary
@@ -354,6 +358,7 @@ public void testSegmentReplication_With_EngineClosedConcurrently() throws Except
354358
assertEqualTranslogOperations(shards, primaryShard);
355359
primaryShard.refresh("Test");
356360
primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true));
361+
primaryShard.waitForRemoteStoreSyncWithTimeout();
357362
replicateSegments(primaryShard, shards.getReplicas());
358363
shards.assertAllEqual(numDocs);
359364

@@ -365,6 +370,7 @@ public void testSegmentReplication_With_EngineClosedConcurrently() throws Except
365370
assertEqualTranslogOperations(shards, primaryShard);
366371
primaryShard.flush(new FlushRequest().waitIfOngoing(true).force(true));
367372
logger.info("--> primary store after final flush {}", Arrays.toString(primaryShard.store().directory().listAll()));
373+
primaryShard.waitForRemoteStoreSyncWithTimeout();
368374

369375
// Step 3. Before replicating segments, block finalizeReplication and perform engine commit directly that
370376
// cleans up recently copied over files
@@ -398,6 +404,8 @@ public void testIgnoreShardIdle() throws Exception {
398404

399405
// invoke scheduledRefresh, returns true if refresh is immediately invoked.
400406
assertTrue(primary.scheduledRefresh());
407+
primary.waitForRemoteStoreSyncWithTimeout();
408+
401409
// replica would always return false here as there is no indexed doc to refresh on.
402410
assertFalse(replica.scheduledRefresh());
403411

@@ -699,6 +707,7 @@ public void testCloseShardDuringFinalize() throws Exception {
699707
final IndexShard replicaSpy = spy(replica);
700708

701709
primary.refresh("Test");
710+
primary.waitForRemoteStoreSyncWithTimeout();
702711

703712
doThrow(AlreadyClosedException.class).when(replicaSpy).finalizeReplication(any());
704713

0 commit comments

Comments
 (0)