Skip to content

Commit

Permalink
fix(s3stream): wait force upload complete before return
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <niesc@automq.com>
  • Loading branch information
SCNieh committed Nov 3, 2024
1 parent c51cc02 commit 0e5fd0a
Showing 1 changed file with 28 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@ Map<Boolean, List<S3ObjectMetadata>> convertS3Objects(List<S3ObjectMetadata> str
>= TimeUnit.MINUTES.toMillis(this.forceSplitObjectPeriod))));
}

@SuppressWarnings("checkstyle:CyclomaticComplexity")
void executeCompactionPlans(CommitStreamSetObjectRequest request, List<CompactionPlan> compactionPlans,
List<S3ObjectMetadata> s3ObjectMetadata)
throws CompletionException {
Expand Down Expand Up @@ -743,17 +744,34 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List<Compactio
return;
}
// wait for all stream objects and stream set object part to be uploaded
compactionCf = CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0]))
.whenComplete((v, ex) -> {
uploader.forceUploadStreamSetObject();
if (ex != null) {
logger.error("Error while uploading compaction objects", ex);
uploader.release().thenAccept(vv -> {
for (CompactedObject compactedObject : compactionPlan.compactedObjects()) {
compactedObject.streamDataBlocks().forEach(StreamDataBlock::release);
}
}).join();
compactionCf = new CompletableFuture<>();
CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0]))
.whenComplete((v, uploadException) -> {
if (uploadException != null) {
logger.error("Error while uploading compaction objects", uploadException);
}
uploader.forceUploadStreamSetObject().whenComplete((vv, forceUploadException) -> {
if (forceUploadException != null) {
logger.error("Error while force uploading stream set object", uploadException);
}
if (uploadException != null || forceUploadException != null) {
uploader.release().whenComplete((vvv, releaseException) -> {
if (releaseException != null) {
logger.error("Unexpected exception while release uploader");
}
for (CompactedObject compactedObject : compactionPlan.compactedObjects()) {
compactedObject.streamDataBlocks().forEach(StreamDataBlock::release);
}
if (uploadException != null) {
compactionCf.completeExceptionally(new CompletionException("Uploading failed", uploadException));
} else {
compactionCf.completeExceptionally(new CompletionException("Force uploading sso failed", forceUploadException));
}
});
} else {
compactionCf.complete(null);
}
});
});
}
try {
Expand Down

0 comments on commit 0e5fd0a

Please sign in to comment.