Skip to content

Commit

Permalink
fix(s3stream): delete out-dated object directly during compaction (#896)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <niesc@automq.com>
  • Loading branch information
SCNieh authored Jan 15, 2024
1 parent b9662a2 commit 315ec49
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -312,29 +312,30 @@ public CompletableFuture<Void> forceSplitAll() {
*
* @param streamMetadataList metadata of opened streams
* @param objectMetadata stream set object to split
* @return List of CompletableFuture of StreamObject
* @param cfs List of CompletableFuture of StreamObject
* @return true if split succeed, false otherwise
*/
private Collection<CompletableFuture<StreamObject>> splitStreamSetObject(List<StreamMetadata> streamMetadataList,
S3ObjectMetadata objectMetadata) {
private boolean splitStreamSetObject(List<StreamMetadata> streamMetadataList,
S3ObjectMetadata objectMetadata, Collection<CompletableFuture<StreamObject>> cfs) {
if (objectMetadata == null) {
return new ArrayList<>();
return false;
}

Map<Long, List<StreamDataBlock>> streamDataBlocksMap = CompactionUtils.blockWaitObjectIndices(streamMetadataList,
Collections.singletonList(objectMetadata), s3Operator, logger);
if (streamDataBlocksMap.isEmpty()) {
// object not exist, metadata is out of date
logger.warn("Object {} not exist, metadata is out of date", objectMetadata.objectId());
return new ArrayList<>();
logger.warn("Read index for object {} failed", objectMetadata.objectId());
return false;
}
List<StreamDataBlock> streamDataBlocks = streamDataBlocksMap.get(objectMetadata.objectId());
if (streamDataBlocks.isEmpty()) {
// object is empty, metadata is out of date
logger.warn("Object {} is empty, metadata is out of date", objectMetadata.objectId());
return new ArrayList<>();
logger.info("Object {} is out of date, will be deleted after compaction", objectMetadata.objectId());
return true;
}

return groupAndSplitStreamDataBlocks(objectMetadata, streamDataBlocks);
cfs.addAll(groupAndSplitStreamDataBlocks(objectMetadata, streamDataBlocks));
return true;
}

Collection<CompletableFuture<StreamObject>> groupAndSplitStreamDataBlocks(S3ObjectMetadata objectMetadata,
Expand Down Expand Up @@ -414,8 +415,9 @@ Collection<CompletableFuture<StreamObject>> groupAndSplitStreamDataBlocks(S3Obje
CommitStreamSetObjectRequest buildSplitRequest(List<StreamMetadata> streamMetadataList,
S3ObjectMetadata objectToSplit)
throws CompletionException {
Collection<CompletableFuture<StreamObject>> cfs = splitStreamSetObject(streamMetadataList, objectToSplit);
if (cfs.isEmpty()) {
List<CompletableFuture<StreamObject>> cfs = new ArrayList<>();
boolean status = splitStreamSetObject(streamMetadataList, objectToSplit, cfs);
if (!status) {
logger.error("Force split object {} failed, no stream object generated", objectToSplit.objectId());
return null;
}
Expand Down Expand Up @@ -468,6 +470,12 @@ CommitStreamSetObjectRequest buildCompactRequest(List<StreamMetadata> streamMeta
executeCompactionPlans(request, compactionPlans, objectsToCompact);
compactionPlans.forEach(c -> c.streamDataBlocksMap().values().forEach(v -> v.forEach(b -> compactedObjectIds.add(b.getObjectId()))));

// compact out-dated objects directly
streamDataBlockMap.entrySet().stream().filter(e -> e.getValue().isEmpty()).forEach(e -> {
logger.info("Object {} is out of date, will be deleted after compaction", e.getKey());
compactedObjectIds.add(e.getKey());
});

request.setCompactedObjectIds(new ArrayList<>(compactedObjectIds));
List<S3ObjectMetadata> compactedObjectMetadata = objectsToCompact.stream()
.filter(e -> compactedObjectIds.contains(e.objectId())).toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,37 @@ public void testForceSplit() {
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(2)), request));
}

@Test
public void testForceSplitWithOutDatedObject() {
when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(
List.of(new StreamMetadata(STREAM_0, 0, 999, 9999, StreamState.OPENED),
new StreamMetadata(STREAM_1, 0, 999, 9999, StreamState.OPENED),
new StreamMetadata(STREAM_2, 0, 999, 9999, StreamState.OPENED))));

List<StreamMetadata> streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join();
List<S3ObjectMetadata> s3ObjectMetadata = this.objectManager.getServerObjects().join();
when(config.streamSetObjectCompactionForceSplitPeriod()).thenReturn(0);
compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator);

CommitStreamSetObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds());
Assertions.assertTrue(request.getStreamObjects().isEmpty());
Assertions.assertTrue(request.getStreamRanges().isEmpty());

request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(1));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds());
Assertions.assertTrue(request.getStreamObjects().isEmpty());
Assertions.assertTrue(request.getStreamRanges().isEmpty());

request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(2));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds());
Assertions.assertTrue(request.getStreamObjects().isEmpty());
Assertions.assertTrue(request.getStreamRanges().isEmpty());
}

@Test
public void testForceSplitWithException() {
S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class);
Expand Down Expand Up @@ -320,6 +351,26 @@ public void testCompactionWithDataTrimmed4() {
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request));
}

@Test
public void testCompactWithOutdatedObject() {
when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(
List.of(new StreamMetadata(STREAM_0, 0, 15, 20, StreamState.OPENED),
new StreamMetadata(STREAM_1, 0, 60, 500, StreamState.OPENED),
new StreamMetadata(STREAM_2, 0, 60, 270, StreamState.OPENED))));
compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator);
List<StreamMetadata> streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join();
CommitStreamSetObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST);

assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2), request.getCompactedObjectIds());
assertEquals(OBJECT_0, request.getOrderId());
assertTrue(request.getObjectId() > OBJECT_2);
request.getStreamObjects().forEach(s -> assertTrue(s.getObjectId() > OBJECT_2));
assertEquals(2, request.getStreamObjects().size());
assertEquals(2, request.getStreamRanges().size());

Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request));
}

@Test
public void testCompactWithNonExistStream() {
when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(
Expand Down

0 comments on commit 315ec49

Please sign in to comment.