Skip to content

Commit

Permalink
feat(s3stream): clean up buffer on compaction exception
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <niesc@automq.com>
  • Loading branch information
SCNieh committed Dec 5, 2023
1 parent 99c0fb7 commit 6570ebd
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 142 deletions.
29 changes: 19 additions & 10 deletions s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Comparator;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

public class StreamDataBlock {
public static final Comparator<StreamDataBlock> STREAM_OFFSET_COMPARATOR = Comparator.comparingLong(StreamDataBlock::getStartOffset);
Expand All @@ -35,6 +36,8 @@ public class StreamDataBlock {

private final ObjectReader.DataBlockIndex dataBlockIndex;
private final CompletableFuture<ByteBuf> dataCf = new CompletableFuture<>();
private final AtomicInteger refCount = new AtomicInteger(1);
private boolean isDataTransferred = false;

public StreamDataBlock(long streamId, long startOffset, long endOffset, long objectId, ObjectReader.DataBlockIndex dataBlockIndex) {
this.streamId = streamId;
Expand Down Expand Up @@ -101,23 +104,29 @@ public CompletableFuture<ByteBuf> getDataCf() {
return this.dataCf;
}

public void free() {
this.dataCf.thenAccept(buf -> {
if (buf != null) {
buf.release();
}
});
public void release() {
if (!isDataTransferred && refCount.decrementAndGet() == 0) {
dataCf.thenAccept(buf -> {
if (buf != null) {
buf.release();
}
});
}
}

public void transferDataOwnership() {
isDataTransferred = true;
}

@Override
public String toString() {
return "StreamDataBlock{" +
"streamId=" + streamId +
"objectId=" + objectId +
", streamId=" + streamId +
", startOffset=" + startOffset +
", endOffset=" + endOffset +
", objectId=" + objectId +
", blockPosition=" + getBlockEndPosition() +
", blockSize=" + getBlockSize() +
", dataBlockIndex=" + dataBlockIndex +
", isDataTransferred=" + isDataTransferred +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -129,8 +130,7 @@ private void scheduleNextCompaction(long delayMillis) {
.exceptionally(ex -> {
logger.error("Compaction failed, cost {} ms, ", timerUtil.elapsedAs(TimeUnit.MILLISECONDS), ex);
return null;
})
.join();
}).join();
} catch (Exception ex) {
logger.error("Error while compacting objects ", ex);
}
Expand All @@ -154,7 +154,7 @@ private CompletableFuture<Void> compact() {
}, compactThreadPool);
}

private void compact(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectMetadataList) {
private void compact(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectMetadataList) throws CompletionException {
logger.info("Get {} stream set objects from metadata", objectMetadataList.size());
if (objectMetadataList.isEmpty()) {
return;
Expand Down Expand Up @@ -187,15 +187,21 @@ private void compact(List<StreamMetadata> streamMetadataList, List<S3ObjectMetad
compactObjects(streamMetadataList, objectsToCompact);
}

private void forceSplitObjects(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectsToForceSplit) {
void forceSplitObjects(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectsToForceSplit) {
logger.info("Force split {} stream set objects", objectsToForceSplit.size());
TimerUtil timerUtil = new TimerUtil();
for (int i = 0; i < objectsToForceSplit.size(); i++) {
timerUtil.reset();
S3ObjectMetadata objectToForceSplit = objectsToForceSplit.get(i);
logger.info("Force split progress {}/{}, splitting object {}, object size {}", i + 1, objectsToForceSplit.size(),
objectToForceSplit.objectId(), objectToForceSplit.objectSize());
CommitStreamSetObjectRequest request = buildSplitRequest(streamMetadataList, objectToForceSplit);
CommitStreamSetObjectRequest request;
try {
request = buildSplitRequest(streamMetadataList, objectToForceSplit);
} catch (Exception ex) {
logger.error("Build force split request for object {} failed, ex: ", objectToForceSplit.objectId(), ex);
continue;
}
if (request == null) {
continue;
}
Expand All @@ -217,7 +223,8 @@ private void forceSplitObjects(List<StreamMetadata> streamMetadataList, List<S3O
}
}

private void compactObjects(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectsToCompact) {
private void compactObjects(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectsToCompact)
throws CompletionException {
if (objectsToCompact.isEmpty()) {
return;
}
Expand Down Expand Up @@ -311,7 +318,7 @@ private Collection<CompletableFuture<StreamObject>> splitStreamSetObject(List<St
}

Map<Long, List<StreamDataBlock>> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList,
Collections.singletonList(objectMetadata), s3Operator);
Collections.singletonList(objectMetadata), s3Operator, logger);
if (streamDataBlocksMap.isEmpty()) {
// object not exist, metadata is out of date
logger.warn("Object {} not exist, metadata is out of date", objectMetadata.objectId());
Expand All @@ -324,6 +331,10 @@ private Collection<CompletableFuture<StreamObject>> splitStreamSetObject(List<St
return new ArrayList<>();
}

return groupAndSplitStreamDataBlocks(objectMetadata, streamDataBlocks);
}

Collection<CompletableFuture<StreamObject>> groupAndSplitStreamDataBlocks(S3ObjectMetadata objectMetadata, List<StreamDataBlock> streamDataBlocks) {
List<Pair<List<StreamDataBlock>, CompletableFuture<StreamObject>>> groupedDataBlocks = new ArrayList<>();
List<List<StreamDataBlock>> groupedStreamDataBlocks = CompactionUtils.groupStreamDataBlocks(streamDataBlocks);
for (List<StreamDataBlock> group : groupedStreamDataBlocks) {
Expand Down Expand Up @@ -363,11 +374,15 @@ private Collection<CompletableFuture<StreamObject>> splitStreamSetObject(List<St
for (Pair<List<StreamDataBlock>, CompletableFuture<StreamObject>> pair : batchGroup) {
List<StreamDataBlock> blocks = pair.getLeft();
DataBlockWriter writer = new DataBlockWriter(objectId, s3Operator, config.objectPartSize());
for (StreamDataBlock block : blocks) {
writer.write(block);
}
CompletableFuture<Void> cf = CompactionUtils.chainWriteDataBlock(writer, blocks, forceSplitThreadPool);
long finalObjectId = objectId;
cfs.add(writer.close().thenAccept(v -> {
cfs.add(cf.thenAccept(nil -> writer.close()).whenComplete((ret, ex) -> {
if (ex != null) {
logger.error("write to stream object {} failed", finalObjectId, ex);
writer.release();
blocks.forEach(StreamDataBlock::release);
return;
}
StreamObject streamObject = new StreamObject();
streamObject.setObjectId(finalObjectId);
streamObject.setStreamId(blocks.get(0).getStreamId());
Expand All @@ -381,19 +396,19 @@ private Collection<CompletableFuture<StreamObject>> splitStreamSetObject(List<St
return CompletableFuture.allOf(cfs.toArray(new CompletableFuture[0]));
}, forceSplitThreadPool)
.exceptionally(ex -> {
//TODO: clean up buffer
logger.error("Force split object failed", ex);
logger.error("Force split object {} failed", objectMetadata.objectId(), ex);
for (Pair<List<StreamDataBlock>, CompletableFuture<StreamObject>> pair : groupedDataBlocks) {
pair.getValue().completeExceptionally(ex);
}
return null;
throw new IllegalStateException(String.format("Force split object %d failed", objectMetadata.objectId()), ex);
}).join();
}

return groupedDataBlocks.stream().map(Pair::getValue).collect(Collectors.toList());
}

CommitStreamSetObjectRequest buildSplitRequest(List<StreamMetadata> streamMetadataList, S3ObjectMetadata objectToSplit) {
CommitStreamSetObjectRequest buildSplitRequest(List<StreamMetadata> streamMetadataList, S3ObjectMetadata objectToSplit)
throws CompletionException {
Collection<CompletableFuture<StreamObject>> cfs = splitStreamSetObject(streamMetadataList, objectToSplit);
if (cfs.isEmpty()) {
logger.error("Force split object {} failed, no stream object generated", objectToSplit.objectId());
Expand Down Expand Up @@ -421,13 +436,15 @@ CommitStreamSetObjectRequest buildSplitRequest(List<StreamMetadata> streamMetada
return request;
}

CommitStreamSetObjectRequest buildCompactRequest(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectsToCompact) {
CommitStreamSetObjectRequest buildCompactRequest(List<StreamMetadata> streamMetadataList, List<S3ObjectMetadata> objectsToCompact)
throws CompletionException {
CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest();

Set<Long> compactedObjectIds = new HashSet<>();
logger.info("{} stream set objects as compact candidates, total compaction size: {}",
objectsToCompact.size(), objectsToCompact.stream().mapToLong(S3ObjectMetadata::objectSize).sum());
Map<Long, List<StreamDataBlock>> streamDataBlockMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList, objectsToCompact, s3Operator);
Map<Long, List<StreamDataBlock>> streamDataBlockMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList,
objectsToCompact, s3Operator, logger);
long now = System.currentTimeMillis();
Set<Long> excludedObjectIds = new HashSet<>();
List<CompactionPlan> compactionPlans = this.compactionAnalyzer.analyze(streamDataBlockMap, excludedObjectIds);
Expand Down Expand Up @@ -525,7 +542,7 @@ Map<Boolean, List<S3ObjectMetadata>> convertS3Objects(List<S3ObjectMetadata> str
}

void executeCompactionPlans(CommitStreamSetObjectRequest request, List<CompactionPlan> compactionPlans, List<S3ObjectMetadata> s3ObjectMetadata)
throws IllegalArgumentException {
throws CompletionException {
if (compactionPlans.isEmpty()) {
return;
}
Expand All @@ -545,37 +562,39 @@ void executeCompactionPlans(CommitStreamSetObjectRequest request, List<Compactio
DataBlockReader reader = new DataBlockReader(metadata, s3Operator, compactionBucket, bucketCallbackScheduledExecutor);
reader.readBlocks(streamDataBlocks, Math.min(CompactionConstants.S3_OBJECT_MAX_READ_BATCH, networkBandwidth));
}
List<CompletableFuture<StreamObject>> streamObjectCFList = new ArrayList<>();
CompletableFuture<Void> streamSetObjectCF = null;
List<CompletableFuture<StreamObject>> streamObjectCfList = new ArrayList<>();
CompletableFuture<Void> streamSetObjectChainWriteCf = CompletableFuture.completedFuture(null);
for (CompactedObject compactedObject : compactionPlan.compactedObjects()) {
if (compactedObject.type() == CompactionType.COMPACT) {
sortedStreamDataBlocks.addAll(compactedObject.streamDataBlocks());
streamSetObjectCF = uploader.chainWriteStreamSetObject(streamSetObjectCF, compactedObject);
streamSetObjectChainWriteCf = uploader.chainWriteStreamSetObject(streamSetObjectChainWriteCf, compactedObject);
} else {
streamObjectCFList.add(uploader.writeStreamObject(compactedObject));
streamObjectCfList.add(uploader.writeStreamObject(compactedObject));
}
}

List<CompletableFuture<?>> cfList = new ArrayList<>();
cfList.add(streamSetObjectChainWriteCf);
cfList.addAll(streamObjectCfList);
// wait for all stream objects and stream set object part to be uploaded
try {
if (streamSetObjectCF != null) {
// wait for all writes done
streamSetObjectCF.thenAccept(v -> uploader.forceUploadStreamSetObject()).join();
}
streamObjectCFList.stream().map(CompletableFuture::join).forEach(request::addStreamObject);
} catch (Exception ex) {
//TODO: clean up buffer
logger.error("Error while uploading compaction objects", ex);
uploader.reset();
throw new IllegalArgumentException("Error while uploading compaction objects", ex);
}
CompletableFuture.allOf(cfList.toArray(new CompletableFuture[0]))
.thenAccept(v -> uploader.forceUploadStreamSetObject())
.exceptionally(ex -> {
logger.error("Error while uploading compaction objects", ex);
uploader.release().thenAccept(v -> {
for (CompactedObject compactedObject : compactionPlan.compactedObjects()) {
compactedObject.streamDataBlocks().forEach(StreamDataBlock::release);
}
}).join();
throw new IllegalStateException("Error while uploading compaction objects", ex);
}).join();
streamObjectCfList.stream().map(CompletableFuture::join).forEach(request::addStreamObject);
}
List<ObjectStreamRange> objectStreamRanges = CompactionUtils.buildObjectStreamRange(sortedStreamDataBlocks);
objectStreamRanges.forEach(request::addStreamRange);
request.setObjectId(uploader.getStreamSetObjectId());
// set stream set object id to be the first object id of compacted objects
request.setOrderId(s3ObjectMetadata.get(0).objectId());
request.setObjectSize(uploader.complete());
uploader.reset();
}
}
Loading

0 comments on commit 6570ebd

Please sign in to comment.