Skip to content

Commit

Permalink
feat(kafka_issues618): stream object compact support drop expired data (
Browse files Browse the repository at this point in the history
#881)

Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
  • Loading branch information
superhx authored Jan 4, 2024
1 parent 74c6698 commit d3d9a52
Show file tree
Hide file tree
Showing 14 changed files with 529 additions and 1,410 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.13.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.14.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.13.0-SNAPSHOT</version>
<version>0.14.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
10 changes: 0 additions & 10 deletions s3stream/src/main/java/com/automq/stream/s3/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class Config {
private long blockCacheSize = 100 * 1024 * 1024;
private int streamObjectCompactionIntervalMinutes = 60;
private long streamObjectCompactionMaxSizeBytes = 10737418240L;
private int streamObjectCompactionLivingTimeMinutes = 60;
private int controllerRequestRetryMaxCount = Integer.MAX_VALUE;
private long controllerRequestRetryBaseDelayMs = 500;
private long nodeEpoch = 0L;
Expand Down Expand Up @@ -156,10 +155,6 @@ public long streamObjectCompactionMaxSizeBytes() {
return streamObjectCompactionMaxSizeBytes;
}

public int streamObjectCompactionLivingTimeMinutes() {
return streamObjectCompactionLivingTimeMinutes;
}

public int controllerRequestRetryMaxCount() {
return controllerRequestRetryMaxCount;
}
Expand Down Expand Up @@ -343,11 +338,6 @@ public Config streamObjectCompactionMaxSizeBytes(long s3StreamObjectCompactionMa
return this;
}

public Config streamObjectCompactionLivingTimeMinutes(int s3StreamObjectCompactionLivingTimeMinutes) {
this.streamObjectCompactionLivingTimeMinutes = s3StreamObjectCompactionLivingTimeMinutes;
return this;
}

public Config controllerRequestRetryMaxCount(int s3ControllerRequestRetryMaxCount) {
this.controllerRequestRetryMaxCount = s3ControllerRequestRetryMaxCount;
return this;
Expand Down
26 changes: 26 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.automq.stream.utils.CloseableIterator;
import com.automq.stream.utils.biniarysearch.IndexBlockOrderedBytes;
import io.netty.buffer.ByteBuf;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -129,6 +130,7 @@ public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock, int blo

public static BasicObjectInfo parse(ByteBuf objectTailBuf,
S3ObjectMetadata s3ObjectMetadata) throws IndexBlockParseException {
objectTailBuf = objectTailBuf.slice();
long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - FOOTER_SIZE);
int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40);
if (indexBlockPosition + objectTailBuf.readableBytes() < s3ObjectMetadata.objectSize()) {
Expand Down Expand Up @@ -175,6 +177,30 @@ public IndexBlock(S3ObjectMetadata s3ObjectMetadata, ByteBuf blocks, ByteBuf str
this.size = blocks.readableBytes() + streamRanges.readableBytes();
}

public Iterator<StreamDataBlock> iterator() {
ByteBuf blocks = this.blocks.slice();
ByteBuf ranges = this.streamRanges.slice();
return new Iterator<>() {
@Override
public boolean hasNext() {
return ranges.readableBytes() != 0;
}

@Override
public StreamDataBlock next() {
long streamId = ranges.readLong();
long startOffset = ranges.readLong();
long endOffset = startOffset + ranges.readInt();
int rangeBlockId = ranges.readInt();
long blockPosition = blocks.getLong(rangeBlockId * 16);
int blockSize = blocks.getInt(rangeBlockId * 16 + 8);
int recordCount = blocks.getInt(rangeBlockId * 16 + 12);
return new StreamDataBlock(streamId, startOffset, endOffset, rangeBlockId, s3ObjectMetadata.objectId(),
blockPosition, blockSize, recordCount);
}
};
}

public ByteBuf blocks() {
return blocks.slice();
}
Expand Down
27 changes: 7 additions & 20 deletions s3stream/src/main/java/com/automq/stream/s3/S3Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,12 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -74,8 +73,7 @@ public class S3Stream implements Stream {
private final Storage storage;
private final StreamManager streamManager;
private final Status status;
private final Function<Long, Void> closeHook;
private final StreamObjectsCompactionTask streamObjectsCompactionTask;
private final Consumer<Long> closeHook;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
private final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
Expand All @@ -87,14 +85,12 @@ public class S3Stream implements Stream {
private CompletableFuture<Void> lastPendingTrim = CompletableFuture.completedFuture(null);

public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
StreamManager streamManager,
StreamObjectsCompactionTask.Builder compactionTaskBuilder, Function<Long, Void> closeHook) {
this(streamId, epoch, startOffset, nextOffset, storage, streamManager, compactionTaskBuilder, closeHook, null, null);
StreamManager streamManager, Consumer<Long> closeHook) {
this(streamId, epoch, startOffset, nextOffset, storage, streamManager, closeHook, null, null);
}

public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, Storage storage,
StreamManager streamManager,
StreamObjectsCompactionTask.Builder compactionTaskBuilder, Function<Long, Void> closeHook,
StreamManager streamManager, Consumer<Long> closeHook,
AsyncNetworkBandwidthLimiter networkInboundLimiter, AsyncNetworkBandwidthLimiter networkOutboundLimiter) {
this.streamId = streamId;
this.epoch = epoch;
Expand All @@ -105,18 +101,11 @@ public S3Stream(long streamId, long epoch, long startOffset, long nextOffset, St
this.status = new Status();
this.storage = storage;
this.streamManager = streamManager;
this.streamObjectsCompactionTask = compactionTaskBuilder.withStream(this).build();
this.closeHook = closeHook;
this.networkInboundLimiter = networkInboundLimiter;
this.networkOutboundLimiter = networkOutboundLimiter;
}

public StreamObjectsCompactionTask.CompactionSummary triggerCompactionTask() throws ExecutionException, InterruptedException {
streamObjectsCompactionTask.prepare();
streamObjectsCompactionTask.doCompactions().get();
return streamObjectsCompactionTask.getCompactionsSummary();
}

public boolean isClosed() {
return status.isClosed();
}
Expand Down Expand Up @@ -328,10 +317,9 @@ public CompletableFuture<Void> close() {
}

private CompletableFuture<Void> close0() {
streamObjectsCompactionTask.close();
return storage.forceUpload(streamId)
.thenCompose(nil -> streamManager.closeStream(streamId, epoch))
.whenComplete((nil, ex) -> closeHook.apply(streamId));
.whenComplete((nil, ex) -> closeHook.accept(streamId));
}

@Override
Expand All @@ -354,8 +342,7 @@ public CompletableFuture<Void> destroy() {

private CompletableFuture<Void> destroy0() {
status.markDestroy();
streamObjectsCompactionTask.close();
closeHook.apply(streamId);
closeHook.accept(streamId);
startOffset = this.confirmOffset.get();
return streamManager.deleteStream(streamId, epoch);
}
Expand Down
110 changes: 6 additions & 104 deletions s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -50,8 +48,6 @@ public class S3StreamClient implements StreamClient {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamClient.class);
private final ScheduledThreadPoolExecutor streamObjectCompactionScheduler = Threads.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("stream-object-compaction-scheduler", true), LOGGER, true);
private final ExecutorService streamCompactionExecutor = Threads.newFixedThreadPool(1,
ThreadUtils.createThreadFactory("stream-object-compaction-background", true), LOGGER);
private final Map<Long, S3Stream> openedStreams;
private final StreamManager streamManager;
private final Storage storage;
Expand Down Expand Up @@ -106,33 +102,11 @@ public Optional<Stream> getStream(long streamId) {
private void startStreamObjectsCompactions() {
scheduledCompactionTaskFuture = streamObjectCompactionScheduler.scheduleWithFixedDelay(() -> {
List<S3Stream> operationStreams = new LinkedList<>(openedStreams.values());
CompactionTasksSummary.Builder totalSummaryBuilder = CompactionTasksSummary.builder();
final long startTime = System.currentTimeMillis();

operationStreams.forEach(stream -> {
if (stream.isClosed()) {
return;
}
try {
StreamObjectsCompactionTask.CompactionSummary summary = stream.triggerCompactionTask();
if (summary == null) {
LOGGER.debug("[stream {}] stream objects compaction finished, no compaction happened", stream.streamId());
} else {
LOGGER.debug("[stream {}] stream objects compaction finished, compaction summary: {}", stream.streamId(), summary);
totalSummaryBuilder.withItem(summary);
}
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("get exception when do stream objects compaction: {}", e.getMessage());
if (e.getCause() instanceof StreamObjectsCompactionTask.HaltException) {
LOGGER.error("halt stream objects compaction for stream {}", stream.streamId());
}
} catch (Throwable e) {
LOGGER.error("get exception when do stream objects compaction: {}", e.getMessage());
}
StreamObjectCompactor task = StreamObjectCompactor.builder().objectManager(objectManager)
.s3Operator(s3Operator).maxStreamObjectSize(config.streamObjectCompactionMaxSizeBytes()).build();
task.compact();
});

final long totalTimeCostInMs = System.currentTimeMillis() - startTime;
LOGGER.info("stream objects compaction finished, summary: {}", totalSummaryBuilder.withTimeCostInMs(totalTimeCostInMs).build());
}, config.streamObjectCompactionIntervalMinutes(), config.streamObjectCompactionIntervalMinutes(), TimeUnit.MINUTES);
}

Expand All @@ -141,17 +115,12 @@ private CompletableFuture<Stream> openStream0(long streamId, long epoch) {
return streamManager.openStream(streamId, epoch).
thenApply(metadata -> {
S3StreamMetricsManager.recordOperationLatency(MetricsLevel.INFO, timerUtil.elapsedAs(TimeUnit.NANOSECONDS), S3Operation.OPEN_STREAM);
StreamObjectsCompactionTask.Builder builder = new StreamObjectsCompactionTask.Builder(objectManager, s3Operator)
.compactedStreamObjectMaxSizeInBytes(config.streamObjectCompactionMaxSizeBytes())
.eligibleStreamObjectLivingTimeInMs(config.streamObjectCompactionLivingTimeMinutes() * 60L * 1000)
.s3ObjectLogEnabled(config.objectLogEnable()).executor(streamCompactionExecutor);
StreamObjectCompactor.Builder builder = StreamObjectCompactor.builder().objectManager(objectManager).s3Operator(s3Operator)
.maxStreamObjectSize(config.streamObjectCompactionMaxSizeBytes());
S3Stream stream = new S3Stream(
metadata.streamId(), metadata.epoch(),
metadata.startOffset(), metadata.endOffset(),
storage, streamManager, builder, id -> {
openedStreams.remove(id);
return null;
}, networkInboundBucket, networkOutboundBucket);
storage, streamManager, openedStreams::remove, networkInboundBucket, networkOutboundBucket);
openedStreams.put(streamId, stream);
return stream;
});
Expand Down Expand Up @@ -186,71 +155,4 @@ public void shutdown() {
}
LOGGER.info("wait streams[{}] closed cost {}ms", streamCloseFutures.keySet(), timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
}

private static class CompactionTasksSummary {
private final long involvedStreamCount;
private final long sourceObjectsTotalSize;
private final long sourceObjectsCount;
private final long targetObjectsCount;
private final long smallSizeCopyWriteCount;
private final long timeCostInMs;

private CompactionTasksSummary(long involvedStreamCount, long sourceObjectsTotalSize, long sourceObjectsCount,
long targetObjectsCount, long smallSizeCopyWriteCount,
long timeCostInMs) {
this.involvedStreamCount = involvedStreamCount;
this.sourceObjectsTotalSize = sourceObjectsTotalSize;
this.sourceObjectsCount = sourceObjectsCount;
this.targetObjectsCount = targetObjectsCount;
this.smallSizeCopyWriteCount = smallSizeCopyWriteCount;
this.timeCostInMs = timeCostInMs;
}

public static Builder builder() {
return new Builder();
}

@Override
public String toString() {
return "CompactionTasksSummary{" +
"involvedStreamCount=" + involvedStreamCount +
", sourceObjectsTotalSize=" + sourceObjectsTotalSize +
", sourceObjectsCount=" + sourceObjectsCount +
", targetObjectsCount=" + targetObjectsCount +
", smallSizeCopyWriteCount=" + smallSizeCopyWriteCount +
", timeCostInMs=" + timeCostInMs +
'}';
}

public static class Builder {
private long involvedStreamCount;
private long sourceObjectsTotalSize;
private long sourceObjectsCount;
private long targetObjectsCount;
private long smallSizeCopyWriteCount;
private long timeCostInMs;

public Builder withItem(StreamObjectsCompactionTask.CompactionSummary compactionSummary) {
if (compactionSummary == null) {
return this;
}
this.involvedStreamCount++;
this.sourceObjectsTotalSize += compactionSummary.totalObjectSize();
this.sourceObjectsCount += compactionSummary.sourceObjectsCount();
this.targetObjectsCount += compactionSummary.targetObjectCount();
this.smallSizeCopyWriteCount += compactionSummary.smallSizeCopyWriteCount();
return this;
}

public Builder withTimeCostInMs(long timeCostInMs) {
this.timeCostInMs = timeCostInMs;
return this;
}

public CompactionTasksSummary build() {
return new CompactionTasksSummary(involvedStreamCount, sourceObjectsTotalSize, sourceObjectsCount, targetObjectsCount, smallSizeCopyWriteCount, timeCostInMs);
}
}

}
}
Loading

0 comments on commit d3d9a52

Please sign in to comment.