diff --git a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java index 0c2687755..93848209a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java @@ -53,7 +53,6 @@ public ObjectReader(S3ObjectMetadata metadata, S3Operator s3Operator) { this.objectKey = metadata.key(); this.s3Operator = s3Operator; this.basicObjectInfoCf = new CompletableFuture<>(); - // TODO: close object reader to release resources such as index block. asyncGetBasicObjectInfo(); } @@ -78,23 +77,27 @@ public CompletableFuture read(DataBlockIndex block) { return rangeReadCf.thenApply(buf -> new DataBlock(buf, block.recordCount())); } - private void asyncGetBasicObjectInfo() { - asyncGetBasicObjectInfo0(Math.max(0, metadata.objectSize() - 1024 * 1024)); + void asyncGetBasicObjectInfo() { + asyncGetBasicObjectInfo0(Math.max(0, metadata.objectSize() - 1024 * 1024), true); } - private void asyncGetBasicObjectInfo0(long startPosition) { + private void asyncGetBasicObjectInfo0(long startPosition, boolean firstAttempt) { CompletableFuture cf = s3Operator.rangeRead(objectKey, startPosition, metadata.objectSize()); cf.thenAccept(buf -> { try { BasicObjectInfo basicObjectInfo = BasicObjectInfo.parse(buf, metadata.objectSize()); basicObjectInfoCf.complete(basicObjectInfo); } catch (IndexBlockParseException ex) { - asyncGetBasicObjectInfo0(ex.indexBlockPosition); + asyncGetBasicObjectInfo0(ex.indexBlockPosition, false); } }).exceptionally(ex -> { LOGGER.warn("s3 range read from {} [{}, {}) failed", objectKey, startPosition, metadata.objectSize(), ex); // TODO: delay retry. - asyncGetBasicObjectInfo0(startPosition); + if (firstAttempt) { + asyncGetBasicObjectInfo0(startPosition, false); + } else { + basicObjectInfoCf.completeExceptionally(ex); + } return null; }); } @@ -148,7 +151,7 @@ public BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock, int blockCount public static BasicObjectInfo parse(ByteBuf objectTailBuf, long objectSize) throws IndexBlockParseException { long indexBlockPosition = objectTailBuf.getLong(objectTailBuf.readableBytes() - FOOTER_SIZE); int indexBlockSize = objectTailBuf.getInt(objectTailBuf.readableBytes() - 40); - if (indexBlockPosition + indexBlockSize + FOOTER_SIZE < objectSize) { + if (indexBlockPosition + objectTailBuf.readableBytes() < objectSize) { objectTailBuf.release(); throw new IndexBlockParseException(indexBlockPosition); } else { @@ -253,7 +256,7 @@ void close() { } } - static class IndexBlockParseException extends Exception { + public static class IndexBlockParseException extends Exception { long indexBlockPosition; public IndexBlockParseException(long indexBlockPosition) { diff --git a/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java b/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java index e11781bfa..5c60bc083 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/ObjectReaderTest.java @@ -17,12 +17,18 @@ package com.automq.stream.s3; +import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.metadata.S3ObjectType; +import com.automq.stream.s3.model.StreamRecordBatch; +import com.automq.stream.s3.operator.MemoryS3Operator; +import com.automq.stream.s3.operator.S3Operator; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.util.List; +import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -92,4 +98,22 @@ public void testIndexBlock() { assertEquals(2, rst.size()); } + @Test + public void testGetBasicObjectInfo() throws ExecutionException, InterruptedException { + S3Operator s3Operator = new MemoryS3Operator(); + ObjectWriter objectWriter = ObjectWriter.writer(233L, s3Operator, 1024, 1024); + // make index block bigger than 1M + int streamCount = 2 * 1024 * 1024 / 40; + for (int i = 0; i < streamCount; i++) { + StreamRecordBatch r = new StreamRecordBatch(i, 0, i, 1, TestUtils.random(1)); + objectWriter.write(i, List.of(r)); + } + objectWriter.close().get(); + S3ObjectMetadata metadata = new S3ObjectMetadata(233L, objectWriter.size(), S3ObjectType.WAL); + try (ObjectReader objectReader = new ObjectReader(metadata, s3Operator)) { + ObjectReader.BasicObjectInfo info = objectReader.basicObjectInfo().get(); + assertEquals(streamCount, info.blockCount()); + } + } + }