Skip to content

Commit

Permalink
Fix segments upload retry regression introduced in PR #7119
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Jul 11, 2023
1 parent 5f98d67 commit 6796c7a
Showing 1 changed file with 6 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ private synchronized void syncSegments(boolean isRetry) {
long refreshTimeMs = segmentTracker.getLocalRefreshTimeMs(), refreshClockTimeMs = segmentTracker.getLocalRefreshClockTimeMs();
long refreshSeqNo = segmentTracker.getLocalRefreshSeqNo();
long bytesBeforeUpload = segmentTracker.getUploadBytesSucceeded(), startTimeInNS = System.nanoTime();
final AtomicBoolean shouldRetry = new AtomicBoolean(true);

try {

if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
this.primaryTerm = indexShard.getOperationPrimaryTerm();
this.remoteDirectory.init();
Expand Down Expand Up @@ -251,7 +251,6 @@ private synchronized void syncSegments(boolean isRetry) {
ActionListener<Void> segmentUploadsCompletedListener = new LatchedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(Void unused) {
boolean shouldRetry = true;
try {
// Start metadata file upload
uploadMetadata(localSegmentsPostRefresh, segmentInfos);
Expand All @@ -265,27 +264,18 @@ public void onResponse(Void unused) {
);
// At this point since we have uploaded new segments, segment infos and segment metadata file,
// along with marking minSeqNoToKeep, upload has succeeded completely.
shouldRetry = false;
shouldRetry.set(false);
} catch (Exception e) {
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
// in the next refresh. This should not affect durability of the indexed data after remote trans-log
// integration.
logger.warn("Exception in post new segment upload actions", e);
} finally {
doComplete(shouldRetry);
}
}

@Override
public void onFailure(Exception e) {
logger.warn("Exception while uploading new segments to the remote segment store", e);
doComplete(true);
}

private void doComplete(boolean shouldRetry) {
// Update the segment tracker with the final upload status as seen at the end
updateFinalUploadStatusInSegmentTracker(shouldRetry == false, bytesBeforeUpload, startTimeInNS);
afterSegmentsSync(isRetry, shouldRetry);
}
}, latch);

Expand All @@ -303,7 +293,11 @@ private void doComplete(boolean shouldRetry) {
}
} catch (Throwable t) {
logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t);
} finally {
// Update the segment tracker with the final upload status as seen at the end
updateFinalUploadStatusInSegmentTracker(shouldRetry.get() == false, bytesBeforeUpload, startTimeInNS);
}
afterSegmentsSync(isRetry, shouldRetry.get());
}

/**
Expand Down

0 comments on commit 6796c7a

Please sign in to comment.