Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Fix relocation failure due to transport receive timeout #10761

Merged
merged 3 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -509,4 +509,27 @@ public void testRestoreSnapshotToIndexWithSameNameDifferentUUID() throws Excepti
assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
});
}

public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, InterruptedException {
internalCluster().startClusterManagerOnlyNode();
String primaryShardNode = internalCluster().startDataOnlyNodes(1).get(0);

createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
ensureGreen(INDEX_NAME);
IndexShard indexShard = getIndexShard(primaryShardNode);
assertFalse(indexShard.isSearchIdleSupported());

String replicaShardNode = internalCluster().startDataOnlyNodes(1).get(0);
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
ensureGreen(INDEX_NAME);
assertFalse(indexShard.isSearchIdleSupported());

indexShard = getIndexShard(replicaShardNode);
assertFalse(indexShard.isSearchIdleSupported());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,9 @@
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
if (this.isRemoteStoreEnabled) {
logger.warn("Search idle is not supported for remote backed indices");

Check warning on line 1028 in server/src/main/java/org/opensearch/index/IndexSettings.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/IndexSettings.java#L1028

Added line #L1028 was not covered by tests
}
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
if (this.replicationType == ReplicationType.SEGMENT && this.getNumberOfReplicas() > 0) {
logger.warn("Search idle is not supported for indices with replicas using 'replication.type: SEGMENT'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4425,7 +4425,6 @@ public final boolean isSearchIdle() {
}

/**
*
* Returns true if this shard supports search idle.
* <p>
* Indices using Segment Replication will ignore search idle unless there are no replicas.
Expand All @@ -4434,6 +4433,11 @@ public final boolean isSearchIdle() {
* a new set of segments.
*/
public final boolean isSearchIdleSupported() {
// If the index is remote store backed, then search idle is not supported. This is to ensure that async refresh
// task continues to upload to remote store periodically.
if (isRemoteTranslogEnabled()) {
return false;
}
return indexSettings.isSegRepEnabled() == false || indexSettings.getNumberOfReplicas() == 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,10 @@ public String getTranslogUUID() {
* @return if the translog should be flushed
*/
public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long flushThreshold) {
final long translogGenerationOfLastCommit = translog.getMinGenerationForSeqNo(
localCheckpointOfLastCommit + 1
).translogFileGeneration;
if (translog.sizeInBytesByMinGen(translogGenerationOfLastCommit) < flushThreshold) {
// This is the minimum seqNo that is referred in translog and considered for calculating translog size
long minTranslogRefSeqNo = translog.getMinUnreferencedSeqNoInSegments(localCheckpointOfLastCommit + 1);
final long minReferencedTranslogGeneration = translog.getMinGenerationForSeqNo(minTranslogRefSeqNo).translogFileGeneration;
if (translog.sizeInBytesByMinGen(minReferencedTranslogGeneration) < flushThreshold) {
return false;
}
/*
Expand All @@ -454,7 +454,7 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl
final long translogGenerationOfNewCommit = translog.getMinGenerationForSeqNo(
localCheckpointTrackerSupplier.get().getProcessedCheckpoint() + 1
).translogFileGeneration;
return translogGenerationOfLastCommit < translogGenerationOfNewCommit
return minReferencedTranslogGeneration < translogGenerationOfNewCommit
|| localCheckpointTrackerSupplier.get().getProcessedCheckpoint() == localCheckpointTrackerSupplier.get().getMaxSeqNo();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,4 +544,9 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
}
}
}

@Override
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minSeqNoToKeep;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2034,4 +2034,8 @@ public static String createEmptyTranslog(
writer.close();
return uuid;
}

public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minUnrefCheckpointInLastCommit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@
transportService,
request.targetNode(),
recoverySettings,
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime)
throttleTime -> shard.recoveryStats().addThrottleTime(throttleTime),

Check warning on line 379 in server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/recovery/PeerRecoverySourceService.java#L379

Added line #L379 was not covered by tests
shard.isRemoteTranslogEnabled()
);
handler = RecoverySourceHandlerFactory.create(shard, recoveryTarget, request, recoverySettings);
return Tuple.tuple(handler, recoveryTarget);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@
private final AtomicLong requestSeqNoGenerator = new AtomicLong(0);
private final RetryableTransportClient retryableTransportClient;
private final RemoteSegmentFileChunkWriter fileChunkWriter;
private final boolean remoteStoreEnabled;

public RemoteRecoveryTargetHandler(
long recoveryId,
ShardId shardId,
TransportService transportService,
DiscoveryNode targetNode,
RecoverySettings recoverySettings,
Consumer<Long> onSourceThrottle
Consumer<Long> onSourceThrottle,
boolean remoteStoreEnabled
) {
this.transportService = transportService;
// It is safe to pass the retry timeout value here because RemoteRecoveryTargetHandler
Expand Down Expand Up @@ -111,6 +113,7 @@
requestSeqNoGenerator,
onSourceThrottle
);
this.remoteStoreEnabled = remoteStoreEnabled;
}

public DiscoveryNode targetNode() {
Expand All @@ -129,7 +132,13 @@
);
final Writeable.Reader<TransportResponse.Empty> reader = in -> TransportResponse.Empty.INSTANCE;
final ActionListener<TransportResponse.Empty> responseListener = ActionListener.map(listener, r -> null);
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
if (remoteStoreEnabled) {
// If remote store is enabled, during the prepare_translog phase, translog is also downloaded on the
// target host along with incremental segments download.
retryableTransportClient.executeRetryableAction(action, request, translogOpsRequestOptions, responseListener, reader);

Check warning on line 138 in server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java#L138

Added line #L138 was not covered by tests
} else {
retryableTransportClient.executeRetryableAction(action, request, responseListener, reader);
}
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,15 @@ public void onReplicationFailure(
}
}

@Override
protected void validateShardIdleWithNoReplicas(IndexShard primary) {
// ensure search idle conditions are met.
assertFalse(primary.isSearchIdleSupported());
assertTrue(primary.isSearchIdle());
assertTrue(primary.scheduledRefresh());
assertFalse(primary.hasRefreshPending());
}

private void assertSingleSegmentFile(IndexShard shard, String fileName) throws IOException {
final Set<String> segmentsFileNames = Arrays.stream(shard.store().directory().listAll())
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,17 @@ public void testShardIdleWithNoReplicas() throws Exception {
shards.startAll();
final IndexShard primary = shards.getPrimary();
shards.indexDocs(randomIntBetween(1, 10));
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
validateShardIdleWithNoReplicas(primary);
}
}

protected void validateShardIdleWithNoReplicas(IndexShard primary) {
// ensure search idle conditions are met.
assertTrue(primary.isSearchIdle());
assertFalse(primary.scheduledRefresh());
assertTrue(primary.hasRefreshPending());
}

/**
* here we are starting a new primary shard in PrimaryMode and testing if the shard publishes checkpoint after refresh.
*/
Expand Down
Loading