Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): composite object reader&writer #1432

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3;

import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.operator.Writer;

/**
* CompositeObject is a logic object which soft links multiple objects together.
* <p>
* v0 format:
* objects
* object_count u32
* objects (
* object_id u64
* block_start_index u32
* bucket_index u16
* )*
* indexes
* index_count u32
* (
* stream_id u64
* start_offset u64
* end_offset_delta u32
* record_count u32
* block_start_position u64
* block_size u32
* )*
* index_handle
* position u64
* length u32
* padding 40byte - 8 - 8 - 4
* magic u64
*
*
*/
public class CompositeObject {
public static final byte OBJECTS_BLOCK_MAGIC = 0x52;
public static final int OBJECT_BLOCK_HEADER_SIZE = 1 /* magic */ + 4 /* objects count */;
public static final int OBJECT_UNIT_SIZE = 8 /* objectId */ + 4 /* blockStartIndex */ + 2 /* bucketId */;

public static final int FOOTER_SIZE = 48;
public static final long FOOTER_MAGIC = 0x88e241b785f4cff8L;

public static ObjectReader reader(S3ObjectMetadata objectMetadata, ObjectReader.RangeReader rangeReader) {
return new CompositeObjectReader(objectMetadata, rangeReader);
}

public static ObjectWriter writer(Writer writer) {
return new CompositeObjectWriter(writer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3;

import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection;
import com.automq.stream.utils.biniarysearch.ComparableItem;
import io.netty.buffer.ByteBuf;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static com.automq.stream.s3.ByteBufAlloc.BLOCK_CACHE;
import static com.automq.stream.s3.CompositeObject.FOOTER_MAGIC;
import static com.automq.stream.s3.CompositeObject.OBJECT_BLOCK_HEADER_SIZE;
import static com.automq.stream.s3.CompositeObject.OBJECT_UNIT_SIZE;
import static com.automq.stream.s3.ObjectWriter.Footer.FOOTER_SIZE;

public class CompositeObjectReader implements ObjectReader {
private final S3ObjectMetadata objectMetadata;
private final RangeReader rangeReader;
private CompletableFuture<BasicObjectInfo> basicObjectInfoCf;
private final AtomicInteger refCount = new AtomicInteger(1);

public CompositeObjectReader(S3ObjectMetadata objectMetadata, RangeReader rangeReader) {
this.objectMetadata = objectMetadata;
this.rangeReader = rangeReader;
}

@Override
public S3ObjectMetadata metadata() {
return objectMetadata;
}

@Override
public String objectKey() {
return ObjectUtils.genKey(0, objectMetadata.objectId());
}

@Override
public synchronized CompletableFuture<BasicObjectInfo> basicObjectInfo() {
if (basicObjectInfoCf == null) {
this.basicObjectInfoCf = new CompletableFuture<>();
asyncGetBasicObjectInfo(this.basicObjectInfoCf);
}
return basicObjectInfoCf;
}

@Override
public CompletableFuture<DataBlockGroup> read(DataBlockIndex block) {
return basicObjectInfo().thenCompose(info -> read0(info, block));
}

public ObjectReader retain() {
refCount.incrementAndGet();
return this;
}

public ObjectReader release() {
if (refCount.decrementAndGet() == 0) {
close0();
}
return this;
}

@Override
public void close() {
release();
}

public synchronized void close0() {
if (basicObjectInfoCf != null) {
basicObjectInfoCf.thenAccept(BasicObjectInfo::close);
}
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
CompositeObjectReader reader = (CompositeObjectReader) o;
return Objects.equals(objectMetadata.objectId(), reader.objectMetadata.objectId());
}

@Override
public int hashCode() {
return Objects.hashCode(objectMetadata.objectId());
}

private void asyncGetBasicObjectInfo(CompletableFuture<BasicObjectInfo> basicObjectInfoCf) {
CompletableFuture<ByteBuf> cf = rangeReader.rangeRead(objectMetadata, 0, objectMetadata.objectSize());
cf.thenAccept(buf -> {
try {
buf = buf.slice();
int readableBytes = buf.readableBytes();
long footerMagic = buf.getLong(readableBytes - 8);
if (footerMagic != FOOTER_MAGIC) {
throw new ObjectParseException("Invalid footer magic: " + footerMagic);
}
long indexBlockPosition = buf.getLong(readableBytes - FOOTER_SIZE);
int indexBlockSize = buf.getInt(readableBytes - 40);
ByteBuf objectsBlockBuf = buf.retainedSlice(0, (int) indexBlockPosition);
ByteBuf indexesBlockBuf = buf.retainedSlice((int) indexBlockPosition, indexBlockSize);
buf.release();
basicObjectInfoCf.complete(new BasicObjectInfoExt(objectsBlockBuf, new IndexBlock(indexesBlockBuf)));
} catch (Throwable e) {
buf.release();
basicObjectInfoCf.completeExceptionally(e);
}
}).exceptionally(ex -> {
basicObjectInfoCf.completeExceptionally(ex);
return null;
});
}

private CompletableFuture<DataBlockGroup> read0(BasicObjectInfo info, DataBlockIndex block) {
S3ObjectMetadata linkObjectMetadata = ((BasicObjectInfoExt) info).getLinkObjectMetadata(block.id());
return rangeReader.rangeRead(linkObjectMetadata, block.startPosition(), block.endPosition()).thenApply(buf -> {
ByteBuf pooled = ByteBufAlloc.byteBuffer(buf.readableBytes(), BLOCK_CACHE);
pooled.writeBytes(buf);
buf.release();
return new DataBlockGroup(pooled);
});
}

class BasicObjectInfoExt extends BasicObjectInfo {
private final ByteBuf objectsBlockBuf;
private final ObjectsSearcher objectsSearcher;

public BasicObjectInfoExt(ByteBuf objectsBlockBuf, IndexBlock indexBlock) {
super(-1L, indexBlock);
this.objectsBlockBuf = objectsBlockBuf;
this.objectsSearcher = new ObjectsSearcher(objectsBlockBuf);
}

@Override
void close() {
super.close();
objectsBlockBuf.release();
}

public S3ObjectMetadata getLinkObjectMetadata(int blockIndex) {
// TODO: optimize for next continuous search
int index = objectsSearcher.search(new SearchTarget(blockIndex));
int base = OBJECT_BLOCK_HEADER_SIZE + index * OBJECT_UNIT_SIZE;
long objectId = objectsBlockBuf.getLong(base);
short bucketId = objectsBlockBuf.getShort(base + 12);
return new S3ObjectMetadata(objectId, bucketId);
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
if (!super.equals(o))
return false;
BasicObjectInfoExt ext = (BasicObjectInfoExt) o;
return objectId() == ext.objectId();
}

@Override
public int hashCode() {
return Objects.hash(objectId());
}

long objectId() {
return objectMetadata.objectId();
}
}

static class ObjectsSearcher extends AbstractOrderedCollection<SearchTarget> {
private final ByteBuf objectsBlockBuf;
private final int size;

public ObjectsSearcher(ByteBuf objectsBlockBuf) {
this.objectsBlockBuf = objectsBlockBuf;
this.size = objectsBlockBuf.getInt(1);
}

@Override
protected int size() {
return size;
}

@Override
protected ComparableItem<SearchTarget> get(int index) {
int base = OBJECT_BLOCK_HEADER_SIZE + index * OBJECT_UNIT_SIZE;
int blockStartIndex = objectsBlockBuf.getInt(base + 8);
int endOffset = Integer.MAX_VALUE;
if (index < size - 1) {
endOffset = objectsBlockBuf.getInt(base + OBJECT_UNIT_SIZE + 8);
}

return new ObjectCompareItem(blockStartIndex, endOffset);
}
}

static class SearchTarget {
final int blockStartIndex;

SearchTarget(int blockIndex) {
this.blockStartIndex = blockIndex;
}
}

static class ObjectCompareItem extends SearchTarget implements ComparableItem<SearchTarget> {
final long blockEndIndex;

ObjectCompareItem(int blockStartIndex, int blockEndIndex) {
super(blockStartIndex);
this.blockEndIndex = blockEndIndex;
}

@Override
public boolean isLessThan(SearchTarget value) {
return blockEndIndex <= value.blockStartIndex;
}

@Override
public boolean isGreaterThan(SearchTarget value) {
return blockStartIndex > value.blockStartIndex;
}
}

}
Loading
Loading