Skip to content

Commit

Permalink
Fix remote segments sync retry regression introduced in PR #7119 (#8632)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Ashish Singh <ssashish@amazon.com>
Signed-off-by: Sachin Kale <kalsac@amazon.com>
Co-authored-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
ashking94 and Sachin Kale authored Jul 12, 2023
1 parent 32e7261 commit 7dddb31
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,9 @@ private synchronized void syncSegments(boolean isRetry) {
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
long bytesBeforeUpload = segmentTracker.getUploadBytesSucceeded(), startTimeInNS = System.nanoTime();
final AtomicBoolean shouldRetry = new AtomicBoolean(true);

try {

if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.remoteDirectory.init();
Expand Down Expand Up @@ -252,7 +252,6 @@ private synchronized void syncSegments(boolean isRetry) {
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
boolean shouldRetry = true;
try {
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
Expand All @@ -266,27 +265,18 @@ public void onResponse(Void unused) {
);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
shouldRetry = false;
shouldRetry.set(false);
} catch (Exception e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// in the next refresh. This should not affect durability of the indexed data after remote trans-log
// integration.
logger.warn("Exception in post new segment upload actions", e);
} finally {
doComplete(shouldRetry);
}
}

@Override
public void onFailure(Exception e) {
logger.warn("Exception while uploading new segments to the remote segment store", e);
doComplete(true);
}

private void doComplete(boolean shouldRetry) {
// Update the segment tracker with the final upload status as seen at the end
updateFinalUploadStatusInSegmentTracker(shouldRetry == false, bytesBeforeUpload, startTimeInNS);
afterSegmentsSync(isRetry, shouldRetry);
}
}, latch);

Expand All @@ -305,6 +295,8 @@ private void doComplete(boolean shouldRetry) {
} catch (Throwable t) {
logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t);
}
updateFinalStatusInSegmentTracker(shouldRetry.get() == false, bytesBeforeUpload, startTimeInNS);
afterSegmentsSync(isRetry, shouldRetry.get());
}

/**
Expand Down Expand Up @@ -517,7 +509,7 @@ private void updateLocalSizeMapAndTracker(Collection<String> segmentFiles) {
segmentTracker.setLatestLocalFileNameLengthMap(latestFileNameSizeOnLocalMap);
}

private void updateFinalUploadStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) {
private void updateFinalStatusInSegmentTracker(boolean uploadStatus, long bytesBeforeUpload, long startTimeInNS) {
if (uploadStatus) {
long bytesUploaded = segmentTracker.getUploadBytesSucceeded() - bytesBeforeUpload;
long timeTakenInMS = (System.nanoTime() - startTimeInNS) / 1_000_000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,30 +581,32 @@ public void uploadMetadata(
RemoteSegmentMetadata.CURRENT_VERSION
);
try {
IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT);
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
if (segmentsUploadedToRemoteStore.containsKey(file)) {
uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString());
} else {
throw new NoSuchFileException(file);
try (IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT)) {
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
if (segmentsUploadedToRemoteStore.containsKey(file)) {
uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString());
} else {
throw new NoSuchFileException(file);
}
}
}

ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput();
segmentInfosSnapshot.write(new ByteBuffersIndexOutput(byteBuffersIndexOutput, "Snapshot of SegmentInfos", "SegmentInfos"));
byte[] segmentInfoSnapshotByteArray = byteBuffersIndexOutput.toArrayCopy();

metadataStreamWrapper.writeStream(
indexOutput,
new RemoteSegmentMetadata(
RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments),
segmentInfoSnapshotByteArray,
primaryTerm,
segmentInfosSnapshot.getGeneration()
)
);
indexOutput.close();
ByteBuffersDataOutput byteBuffersIndexOutput = new ByteBuffersDataOutput();
segmentInfosSnapshot.write(
new ByteBuffersIndexOutput(byteBuffersIndexOutput, "Snapshot of SegmentInfos", "SegmentInfos")
);
byte[] segmentInfoSnapshotByteArray = byteBuffersIndexOutput.toArrayCopy();

metadataStreamWrapper.writeStream(
indexOutput,
new RemoteSegmentMetadata(
RemoteSegmentMetadata.fromMapOfStrings(uploadedSegments),
segmentInfoSnapshotByteArray,
primaryTerm,
segmentInfosSnapshot.getGeneration()
)
);
}
storeDirectory.sync(Collections.singleton(metadataFilename));
remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.junit.After;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -48,7 +47,6 @@
import static org.mockito.Mockito.when;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;

@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/8549")
public class RemoteStoreRefreshListenerTests extends IndexShardTestCase {
private IndexShard indexShard;
private ClusterService clusterService;
Expand Down

0 comments on commit 7dddb31

Please sign in to comment.