diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java index f48377aa55..c2426b20ea 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceBucketEntry.java @@ -29,7 +29,7 @@ import java.util.TreeSet; import java.util.function.Consumer; -import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; /** Entry to store the idempotence information of each table-bucket. */ @Internal diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java index 7b5605263b..5593c19999 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java @@ -25,7 +25,6 @@ import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.record.LogRecordBatch; import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.protocol.Errors; @@ -40,7 +39,8 @@ import java.util.Set; import java.util.stream.Collectors; -import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -182,10 +182,10 @@ synchronized void removeInFlightBatch(ReadyWriteBatch batch) { */ synchronized int firstInFlightBatchSequence(TableBucket tableBucket) { if (!hasInflightBatches(tableBucket)) { - return LogRecordBatch.NO_BATCH_SEQUENCE; + return NO_BATCH_SEQUENCE; } WriteBatch batch = nextBatchBySequence(tableBucket); - return batch == null ? LogRecordBatch.NO_BATCH_SEQUENCE : batch.batchSequence(); + return batch == null ? NO_BATCH_SEQUENCE : batch.batchSequence(); } synchronized void handleCompletedBatch(ReadyWriteBatch readyWriteBatch) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 6164d6ecfc..2cfd74da34 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -32,7 +32,6 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metrics.MetricNames; -import org.apache.fluss.record.LogRecordBatch; import org.apache.fluss.row.arrow.ArrowWriter; import org.apache.fluss.row.arrow.ArrowWriterPool; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; @@ -61,7 +60,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; import static org.apache.fluss.utils.Preconditions.checkNotNull; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache @@ -757,7 +757,7 @@ private boolean shouldStopDrainBatchesForBucket(WriteBatch first, TableBucket ta // flight request count to 1. int firstInFlightSequence = idempotenceManager.firstInFlightBatchSequence(tableBucket); boolean isFirstInFlightBatch = - firstInFlightSequence == LogRecordBatch.NO_BATCH_SEQUENCE + firstInFlightSequence == NO_BATCH_SEQUENCE || (first.hasBatchSequence() && first.batchSequence() == firstInFlightSequence); @@ -824,7 +824,7 @@ private void insertInSequenceOrder( Deque deque, WriteBatch batch, TableBucket tableBucket) { // When we are re-enqueue and have enabled idempotence, the re-enqueued batch must always // have a batch sequence. - if (batch.batchSequence() == LogRecordBatch.NO_BATCH_SEQUENCE) { + if (batch.batchSequence() == NO_BATCH_SEQUENCE) { throw new IllegalStateException( "Trying to re-enqueue a batch which doesn't have a sequence even " + "though idempotence is enabled."); diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java index fe89508453..49766612ce 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteBatch.java @@ -21,7 +21,6 @@ import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.memory.MemorySegmentPool; import org.apache.fluss.metadata.PhysicalTablePath; -import org.apache.fluss.record.LogRecordBatch; import org.apache.fluss.record.bytesview.BytesView; import org.slf4j.Logger; @@ -35,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; import static org.apache.fluss.utils.Preconditions.checkNotNull; /** The abstract write batch contains write callback object to wait write request feedback. */ @@ -113,7 +113,7 @@ public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callbac public abstract void abortRecordAppends(); public boolean hasBatchSequence() { - return batchSequence() != LogRecordBatch.NO_BATCH_SEQUENCE; + return batchSequence() != NO_BATCH_SEQUENCE; } public void resetWriterState(long writerId, int batchSequence) { diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java index b2b73778c7..0c1427e6fa 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java @@ -21,7 +21,6 @@ import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.record.DefaultKvRecord; import org.apache.fluss.record.DefaultKvRecordBatch; -import org.apache.fluss.record.DefaultLogRecordBatch; import org.apache.fluss.record.IndexedLogRecord; import org.apache.fluss.row.BinaryRow; import org.apache.fluss.row.InternalRow; @@ -29,6 +28,8 @@ import javax.annotation.Nullable; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; import static org.apache.fluss.utils.Preconditions.checkNotNull; /** @@ -85,7 +86,7 @@ public static WriteRecord forIndexedAppend( PhysicalTablePath tablePath, IndexedRow row, @Nullable byte[] bucketKey) { checkNotNull(row); int estimatedSizeInBytes = - IndexedLogRecord.sizeOf(row) + DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; + IndexedLogRecord.sizeOf(row) + recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE); return new WriteRecord( tablePath, null, diff --git a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java index 7a3d8845ae..3502dcc3cd 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java @@ -39,16 +39,22 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; import static org.apache.fluss.record.TestData.DATA2; import static org.apache.fluss.record.TestData.DATA2_ROW_TYPE; import static org.apache.fluss.record.TestData.DATA2_TABLE_ID; @@ -78,14 +84,15 @@ void beforeEach() { logScannerStatus.assignScanBuckets(scanBuckets); } - @Test - void testSimple() throws Exception { + @ParameterizedTest + @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) + void testSimple(byte recordBatchMagic) throws Exception { long fetchOffset = 0L; int bucketId = 0; // records for 0-10. TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId); FetchLogResultForBucket resultForBucket0 = new FetchLogResultForBucket( - tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L); + tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic), 10L); DefaultCompletedFetch defaultCompletedFetch = makeCompletedFetch(tb, resultForBucket0, fetchOffset); List scanRecords = defaultCompletedFetch.fetchRecords(8); @@ -100,14 +107,15 @@ void testSimple() throws Exception { assertThat(scanRecords.size()).isEqualTo(0); } - @Test - void testNegativeFetchCount() throws Exception { + @ParameterizedTest + @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) + void testNegativeFetchCount(byte recordBatchMagic) throws Exception { long fetchOffset = 0L; int bucketId = 0; // records for 0-10. TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId); FetchLogResultForBucket resultForBucket0 = new FetchLogResultForBucket( - tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L); + tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic), 10L); DefaultCompletedFetch defaultCompletedFetch = makeCompletedFetch(tb, resultForBucket0, fetchOffset); List scanRecords = defaultCompletedFetch.fetchRecords(-10); @@ -128,9 +136,8 @@ void testNoRecordsInFetch() { } @ParameterizedTest - @ValueSource(strings = {"INDEXED", "ARROW"}) - void testProjection(String format) throws Exception { - LogFormat logFormat = LogFormat.fromString(format); + @MethodSource("typeAndMagic") + void testProjection(LogFormat logFormat, byte magic) throws Exception { Schema schema = Schema.newBuilder() .column("a", DataTypes.INT()) @@ -158,9 +165,9 @@ void testProjection(String format) throws Exception { Projection projection = Projection.of(new int[] {0, 2}); MemoryLogRecords memoryLogRecords; if (logFormat == LogFormat.ARROW) { - memoryLogRecords = genRecordsWithProjection(DATA2, projection); + memoryLogRecords = genRecordsWithProjection(DATA2, projection, magic); } else { - memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.INDEXED); + memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.INDEXED, magic); } FetchLogResultForBucket resultForBucket0 = new FetchLogResultForBucket(tb, memoryLogRecords, 10L); @@ -224,19 +231,28 @@ private DefaultCompletedFetch makeCompletedFetch( offset); } - private MemoryLogRecords createMemoryLogRecords(List objects, LogFormat logFormat) - throws Exception { + private static Collection typeAndMagic() { + List params = new ArrayList<>(); + params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V1)); + params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V1)); + params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V0)); + params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V0)); + return params; + } + + private MemoryLogRecords createMemoryLogRecords( + List objects, LogFormat logFormat, byte magic) throws Exception { return createRecordsWithoutBaseLogOffset( - rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, logFormat); + rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, logFormat); } - private MemoryLogRecords genRecordsWithProjection(List objects, Projection projection) - throws Exception { + private MemoryLogRecords genRecordsWithProjection( + List objects, Projection projection, byte magic) throws Exception { File logFile = FlussPaths.logFile(tempDir, 0L); FileLogRecords fileLogRecords = FileLogRecords.open(logFile, false, 1024 * 1024, false); fileLogRecords.append( createRecordsWithoutBaseLogOffset( - rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, LogFormat.ARROW)); + rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, LogFormat.ARROW)); fileLogRecords.flush(); FileLogProjection fileLogProjection = new FileLogProjection(); diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java index 53bf730e1a..07544e8daf 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/IndexedLogWriteBatchTest.java @@ -21,7 +21,6 @@ import org.apache.fluss.memory.PreAllocatedPagedOutputView; import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.record.ChangeType; -import org.apache.fluss.record.DefaultLogRecordBatch; import org.apache.fluss.record.IndexedLogRecord; import org.apache.fluss.record.LogRecord; import org.apache.fluss.record.LogRecordBatch; @@ -40,6 +39,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; @@ -72,7 +73,7 @@ void testTryAppendWithWriteLimit() throws Exception { for (int i = 0; i - < (writeLimit - DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE) + < (writeLimit - recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE)) / estimatedSizeInBytes; i++) { boolean appendResult = diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java index 444b93a5bf..e5d71a927f 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java @@ -66,7 +66,8 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import static org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; import static org.apache.fluss.record.TestData.DATA1_SCHEMA; @@ -614,7 +615,7 @@ private void verifyTableBucketInBatches( /** Return the offset delta. */ private int expectedNumAppends(IndexedRow row, int batchSize) { - int size = RECORD_BATCH_HEADER_SIZE; + int size = recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE); int offsetDelta = 0; while (true) { int recordSize = IndexedLogRecord.sizeOf(row); @@ -652,7 +653,8 @@ private RecordAccumulator createTestRecordAccumulator( } private long getTestBatchSize(BinaryRow row) { - return RECORD_BATCH_HEADER_SIZE + DefaultKvRecord.sizeOf(new byte[4], row); + return recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE) + + DefaultKvRecord.sizeOf(new byte[4], row); } private int getBatchNumInAccum(RecordAccumulator accum) { diff --git a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java index cc0b0b21d3..7209a315df 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java @@ -26,7 +26,6 @@ import org.apache.fluss.config.MemorySize; import org.apache.fluss.exception.TimeoutException; import org.apache.fluss.metadata.TableBucket; -import org.apache.fluss.record.LogRecordBatch; import org.apache.fluss.record.MemoryLogRecords; import org.apache.fluss.row.GenericRow; import org.apache.fluss.rpc.entity.ProduceLogResultForBucket; @@ -51,6 +50,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO; @@ -797,7 +797,7 @@ private IdempotenceManager createIdempotenceManager(boolean idempotenceEnabled) private static boolean hasIdempotentRecords(TableBucket tb, ProduceLogRequest request) { MemoryLogRecords memoryLogRecords = getProduceLogData(request).get(tb); - return memoryLogRecords.batchIterator().next().writerId() != LogRecordBatch.NO_WRITER_ID; + return memoryLogRecords.batchIterator().next().writerId() != NO_WRITER_ID; } private static void assertBatchSequenceEquals( diff --git a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java index c2c7b5fb1b..eaacc48a6d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatch.java @@ -19,7 +19,6 @@ import org.apache.fluss.annotation.PublicEvolving; import org.apache.fluss.exception.CorruptMessageException; -import org.apache.fluss.exception.OutOfOrderSequenceException; import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.row.arrow.ArrowReader; @@ -35,95 +34,54 @@ import java.nio.ByteBuffer; import java.util.NoSuchElementException; +import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.COMMIT_TIMESTAMP_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD; +import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH; +import static org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.attributeOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.batchSequenceOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.leaderEpochOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; +import static org.apache.fluss.record.LogRecordBatchFormat.recordsCountOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.writeClientIdOffset; + /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ /** - * LogRecordBatch implementation for magic 0 and above. The schema of {@link LogRecordBatch} is - * given below: + * LogRecordBatch implementation for different magic version. + * + *

To learn more about the recordBatch format, see {@link LogRecordBatchFormat}. Supported + * recordBatch format: * *

    - * RecordBatch => - *
  • BaseOffset => Int64 - *
  • Length => Int32 - *
  • Magic => Int8 - *
  • CommitTimestamp => Int64 - *
  • CRC => Uint32 - *
  • SchemaId => Int16 - *
  • Attributes => Int8 - *
  • LastOffsetDelta => Int32 - *
  • WriterID => Int64 - *
  • SequenceID => Int32 - *
  • RecordCount => Int32 - *
  • Records => [Record] + *
  • V0 => {@link LogRecordBatchFormat#LOG_MAGIC_VALUE_V0} + *
  • V1 => {@link LogRecordBatchFormat#LOG_MAGIC_VALUE_V1} *
* - *

The CRC covers the data from the schemaId to the end of the batch (i.e. all the bytes that - * follow the CRC). It is located after the magic byte, which means that clients must parse the - * magic byte before deciding how to interpret the bytes between the batch length and the magic - * byte. The CRC-32C (Castagnoli) polynomial is used for the computation. CommitTimestamp is also - * located before the CRC, because it is determined in server side. - * - *

The field 'lastOffsetDelta is used to calculate the lastOffset of the current batch as: - * [lastOffset = baseOffset + LastOffsetDelta] instead of [lastOffset = baseOffset + recordCount - - * 1]. The reason for introducing this field is that there might be cases where the offset delta in - * batch does not match the recordCount. For example, when generating CDC logs for a kv table and - * sending a batch that only contains the deletion of non-existent kvs, no CDC logs would be - * generated. However, we need to increment the batchSequence for the corresponding writerId to make - * sure no {@link OutOfOrderSequenceException} will be thrown. In such a case, we would generate a - * logRecordBatch with a LastOffsetDelta of 0 but a recordCount of 0. - * - *

The current attributes are given below: - * - *

- * ------------------------------------------
- * |  Unused (1-7)   |  AppendOnly Flag (0) |
- * ------------------------------------------
- * 
- * * @since 0.1 */ // TODO rename to MemoryLogRecordBatch @PublicEvolving public class DefaultLogRecordBatch implements LogRecordBatch { - protected static final int BASE_OFFSET_LENGTH = 8; - public static final int LENGTH_LENGTH = 4; - static final int MAGIC_LENGTH = 1; - static final int COMMIT_TIMESTAMP_LENGTH = 8; - static final int CRC_LENGTH = 4; - static final int SCHEMA_ID_LENGTH = 2; - static final int ATTRIBUTE_LENGTH = 1; - static final int LAST_OFFSET_DELTA_LENGTH = 4; - static final int WRITE_CLIENT_ID_LENGTH = 8; - static final int BATCH_SEQUENCE_LENGTH = 4; - static final int RECORDS_COUNT_LENGTH = 4; - - static final int BASE_OFFSET_OFFSET = 0; - public static final int LENGTH_OFFSET = BASE_OFFSET_OFFSET + BASE_OFFSET_LENGTH; - static final int MAGIC_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH; - static final int COMMIT_TIMESTAMP_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH; - public static final int CRC_OFFSET = COMMIT_TIMESTAMP_OFFSET + COMMIT_TIMESTAMP_LENGTH; - protected static final int SCHEMA_ID_OFFSET = CRC_OFFSET + CRC_LENGTH; - public static final int ATTRIBUTES_OFFSET = SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; - static final int LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; - static final int WRITE_CLIENT_ID_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH; - static final int BATCH_SEQUENCE_OFFSET = WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH; - public static final int RECORDS_COUNT_OFFSET = BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; - static final int RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; - - public static final int RECORD_BATCH_HEADER_SIZE = RECORDS_OFFSET; - public static final int ARROW_CHANGETYPE_OFFSET = RECORD_BATCH_HEADER_SIZE; - public static final int LOG_OVERHEAD = LENGTH_OFFSET + LENGTH_LENGTH; - public static final byte APPEND_ONLY_FLAG_MASK = 0x01; private MemorySegment segment; private int position; + private byte magic; public void pointTo(MemorySegment segment, int position) { this.segment = segment; this.position = position; + this.magic = segment.get(position + MAGIC_OFFSET); } public void setBaseLogOffset(long baseLogOffset) { @@ -132,7 +90,7 @@ public void setBaseLogOffset(long baseLogOffset) { @Override public byte magic() { - return segment.get(position + MAGIC_OFFSET); + return magic; } @Override @@ -144,25 +102,43 @@ public void setCommitTimestamp(long timestamp) { segment.putLong(position + COMMIT_TIMESTAMP_OFFSET, timestamp); } + public void setLeaderEpoch(int leaderEpoch) { + if (magic >= LOG_MAGIC_VALUE_V1) { + segment.putInt(position + leaderEpochOffset(magic), leaderEpoch); + } else { + throw new UnsupportedOperationException( + "Set leader epoch is not supported for magic v" + magic + " record batch"); + } + } + @Override public long writerId() { - return segment.getLong(position + WRITE_CLIENT_ID_OFFSET); + return segment.getLong(position + writeClientIdOffset(magic)); } @Override public int batchSequence() { - return segment.getInt(position + BATCH_SEQUENCE_OFFSET); + return segment.getInt(position + batchSequenceOffset(magic)); + } + + @Override + public int leaderEpoch() { + if (magic >= LOG_MAGIC_VALUE_V1) { + return segment.getInt(position + leaderEpochOffset(magic)); + } else { + return NO_LEADER_EPOCH; + } } @Override public void ensureValid() { int sizeInBytes = sizeInBytes(); - if (sizeInBytes < RECORD_BATCH_HEADER_SIZE) { + if (sizeInBytes < recordBatchHeaderSize(magic)) { throw new CorruptMessageException( "Record batch is corrupt (the size " + sizeInBytes + " is smaller than the minimum allowed overhead " - + RECORD_BATCH_HEADER_SIZE + + recordBatchHeaderSize(magic) + ")"); } @@ -178,17 +154,18 @@ public void ensureValid() { @Override public boolean isValid() { - return sizeInBytes() >= RECORD_BATCH_HEADER_SIZE && checksum() == computeChecksum(); + return sizeInBytes() >= recordBatchHeaderSize(magic) && checksum() == computeChecksum(); } private long computeChecksum() { ByteBuffer buffer = segment.wrap(position, sizeInBytes()); - return Crc32C.compute(buffer, SCHEMA_ID_OFFSET, sizeInBytes() - SCHEMA_ID_OFFSET); + int schemaIdOffset = schemaIdOffset(magic); + return Crc32C.compute(buffer, schemaIdOffset, sizeInBytes() - schemaIdOffset); } private byte attributes() { // note we're not using the byte of attributes now. - return segment.get(ATTRIBUTES_OFFSET + position); + return segment.get(attributeOffset(magic) + position); } @Override @@ -198,12 +175,12 @@ public long nextLogOffset() { @Override public long checksum() { - return segment.getUnsignedInt(CRC_OFFSET + position); + return segment.getUnsignedInt(crcOffset(magic) + position); } @Override public short schemaId() { - return segment.getShort(SCHEMA_ID_OFFSET + position); + return segment.getShort(schemaIdOffset(magic) + position); } @Override @@ -217,7 +194,7 @@ public long lastLogOffset() { } private int lastOffsetDelta() { - return segment.getInt(LAST_OFFSET_DELTA_OFFSET + position); + return segment.getInt(lastOffsetDeltaOffset(magic) + position); } @Override @@ -227,7 +204,7 @@ public int sizeInBytes() { @Override public int getRecordCount() { - return segment.getInt(RECORDS_COUNT_OFFSET + position); + return segment.getInt(position + recordsCountOffset(magic)); } @Override @@ -278,7 +255,7 @@ public int hashCode() { private CloseableIterator rowRecordIterator(RowType rowType, long timestamp) { DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]); return new LogRecordIterator() { - int position = DefaultLogRecordBatch.this.position + RECORD_BATCH_HEADER_SIZE; + int position = DefaultLogRecordBatch.this.position + recordBatchHeaderSize(magic); int rowId = 0; @Override @@ -307,8 +284,9 @@ private CloseableIterator columnRecordIterator( if (isAppendOnly) { // append only batch, no change type vector, // the start of the arrow data is the beginning of the batch records - int arrowOffset = position + RECORD_BATCH_HEADER_SIZE; - int arrowLength = sizeInBytes() - RECORD_BATCH_HEADER_SIZE; + int recordBatchHeaderSize = recordBatchHeaderSize(magic); + int arrowOffset = position + recordBatchHeaderSize; + int arrowLength = sizeInBytes() - recordBatchHeaderSize; ArrowReader reader = ArrowUtils.createArrowReader( segment, arrowOffset, arrowLength, root, allocator, rowType); @@ -321,12 +299,12 @@ protected ChangeType getChangeType(int rowId) { } else { // with change type, decode the change type vector first, // the arrow data starts after the change type vector - int changeTypeOffset = position + ARROW_CHANGETYPE_OFFSET; + int changeTypeOffset = position + arrowChangeTypeOffset(magic); ChangeTypeVector changeTypeVector = new ChangeTypeVector(segment, changeTypeOffset, getRecordCount()); int arrowOffset = changeTypeOffset + changeTypeVector.sizeInBytes(); int arrowLength = - sizeInBytes() - ARROW_CHANGETYPE_OFFSET - changeTypeVector.sizeInBytes(); + sizeInBytes() - arrowChangeTypeOffset(magic) - changeTypeVector.sizeInBytes(); ArrowReader reader = ArrowUtils.createArrowReader( segment, arrowOffset, arrowLength, root, allocator, rowType); @@ -394,7 +372,7 @@ public LogRecordIterator() { "Found invalid record count " + numRecords + " in magic v" - + magic() + + magic + " batch"); } this.numRecords = numRecords; diff --git a/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java b/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java index 3f314ce468..9be7e459ad 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogInputStream.java @@ -28,12 +28,12 @@ import java.nio.channels.FileChannel; import java.util.Objects; -import static org.apache.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD; -import static org.apache.fluss.record.DefaultLogRecordBatch.MAGIC_LENGTH; -import static org.apache.fluss.record.DefaultLogRecordBatch.MAGIC_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; +import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.HEADER_SIZE_UP_TO_MAGIC; +import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD; +import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -42,9 +42,6 @@ /** A log input stream which is backed by a {@link FileChannel}. */ public class FileLogInputStream implements LogInputStream { - - private static final int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET + MAGIC_LENGTH; - private int position; private final int end; private final FileLogRecords fileRecords; @@ -152,6 +149,11 @@ public int batchSequence() { return loadBatchHeader().batchSequence(); } + @Override + public int leaderEpoch() { + return loadBatchHeader().leaderEpoch(); + } + @Override public long lastLogOffset() { return loadBatchHeader().lastLogOffset(); @@ -189,7 +191,7 @@ private LogRecordBatch toMemoryRecordBatch(ByteBuffer buffer) { } private int headerSize() { - return RECORD_BATCH_HEADER_SIZE; + return recordBatchHeaderSize(magic); } protected LogRecordBatch loadFullBatch() { diff --git a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java index 736ae63e86..1f889fff18 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java @@ -41,6 +41,7 @@ import org.apache.fluss.utils.types.Tuple2; import java.io.ByteArrayOutputStream; +import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -54,12 +55,18 @@ import java.util.Map; import static org.apache.fluss.record.DefaultLogRecordBatch.APPEND_ONLY_FLAG_MASK; -import static org.apache.fluss.record.DefaultLogRecordBatch.ARROW_CHANGETYPE_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.ATTRIBUTES_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD; -import static org.apache.fluss.record.DefaultLogRecordBatch.RECORDS_COUNT_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; +import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD; +import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE; +import static org.apache.fluss.record.LogRecordBatchFormat.V1_RECORD_BATCH_HEADER_SIZE; +import static org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.attributeOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; +import static org.apache.fluss.record.LogRecordBatchFormat.recordsCountOffset; +import static org.apache.fluss.utils.FileUtils.readFully; import static org.apache.fluss.utils.FileUtils.readFullyOrFail; import static org.apache.fluss.utils.Preconditions.checkNotNull; import static org.apache.fluss.utils.Preconditions.checkState; @@ -81,7 +88,13 @@ public class FileLogProjection { // shared resources for multiple projections private final ByteArrayOutputStream outputStream; private final WriteChannel writeChannel; - private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(RECORD_BATCH_HEADER_SIZE); + + /** + * Buffer to read log records batch header. V1 is larger than V0, so use V1 head buffer can read + * V0 header even if there is no enough bytes in log file. + */ + private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(V1_RECORD_BATCH_HEADER_SIZE); + private final ByteBuffer arrowHeaderBuffer = ByteBuffer.allocate(ARROW_HEADER_SIZE); private ByteBuffer arrowMetadataBuffer; @@ -163,17 +176,23 @@ public BytesViewLogRecords project(FileChannel channel, int start, int end, int checkNotNull(currentProjection, "There is no projection registered yet."); MultiBytesView.Builder builder = MultiBytesView.builder(); int position = start; - while (maxBytes > RECORD_BATCH_HEADER_SIZE) { - if (position >= end - RECORD_BATCH_HEADER_SIZE) { - // the remaining bytes in the file are not enough to read a batch header + + // The condition is an optimization to avoid read log header when there is no enough bytes, + // So we use V0 header size here for a conservative judgment. In the end, the condition + // of (position >= end - recordBatchHeaderSize) will ensure the final correctness. + while (maxBytes > V0_RECORD_BATCH_HEADER_SIZE) { + if (position >= end - V0_RECORD_BATCH_HEADER_SIZE) { + // the remaining bytes in the file are not enough to read a batch header up to + // magic. return new BytesViewLogRecords(builder.build()); } - // read log header logHeaderBuffer.rewind(); - readFullyOrFail(channel, logHeaderBuffer, position, "log header"); + readLogHeaderFullyOrFail(channel, logHeaderBuffer, position); logHeaderBuffer.rewind(); + byte magic = logHeaderBuffer.get(MAGIC_OFFSET); + int recordBatchHeaderSize = recordBatchHeaderSize(magic); int batchSizeInBytes = LOG_OVERHEAD + logHeaderBuffer.getInt(LENGTH_OFFSET); if (position > end - batchSizeInBytes) { // the remaining bytes in the file are not enough to read a full batch @@ -183,22 +202,22 @@ public BytesViewLogRecords project(FileChannel channel, int start, int end, int // Skip empty batch. The empty batch was generated when build cdc log batch when there // is no cdc log generated for this kv batch. See the comments about the field // 'lastOffsetDelta' in DefaultLogRecordBatch. - if (batchSizeInBytes == RECORD_BATCH_HEADER_SIZE) { + if (batchSizeInBytes == recordBatchHeaderSize) { position += batchSizeInBytes; continue; } boolean isAppendOnly = - (logHeaderBuffer.get(ATTRIBUTES_OFFSET) & APPEND_ONLY_FLAG_MASK) > 0; + (logHeaderBuffer.get(attributeOffset(magic)) & APPEND_ONLY_FLAG_MASK) > 0; final int changeTypeBytes; final long arrowHeaderOffset; if (isAppendOnly) { changeTypeBytes = 0; - arrowHeaderOffset = position + RECORD_BATCH_HEADER_SIZE; + arrowHeaderOffset = position + recordBatchHeaderSize; } else { - changeTypeBytes = logHeaderBuffer.getInt(RECORDS_COUNT_OFFSET); - arrowHeaderOffset = position + RECORD_BATCH_HEADER_SIZE + changeTypeBytes; + changeTypeBytes = logHeaderBuffer.getInt(recordsCountOffset(magic)); + arrowHeaderOffset = position + recordBatchHeaderSize + changeTypeBytes; } // read arrow header @@ -226,7 +245,7 @@ public BytesViewLogRecords project(FileChannel channel, int start, int end, int long arrowBodyLength = projectedArrowBatch.bodyLength(); int newBatchSizeInBytes = - RECORD_BATCH_HEADER_SIZE + recordBatchHeaderSize + changeTypeBytes + currentProjection.arrowMetadataLength + (int) arrowBodyLength; // safe to cast to int @@ -250,13 +269,13 @@ public BytesViewLogRecords project(FileChannel channel, int start, int end, int logHeaderBuffer.putInt(newBatchSizeInBytes - LOG_OVERHEAD); logHeaderBuffer.rewind(); // the logHeader can't be reused, as it will be sent to network - byte[] logHeader = new byte[RECORD_BATCH_HEADER_SIZE]; + byte[] logHeader = new byte[recordBatchHeaderSize]; logHeaderBuffer.get(logHeader); // 5. build log records builder.addBytes(logHeader); if (!isAppendOnly) { - builder.addBytes(channel, position + ARROW_CHANGETYPE_OFFSET, changeTypeBytes); + builder.addBytes(channel, position + arrowChangeTypeOffset(magic), changeTypeBytes); } builder.addBytes(headerMetadata); final long bufferOffset = arrowHeaderOffset + ARROW_HEADER_SIZE + arrowMetadataSize; @@ -380,6 +399,36 @@ private static BitSet fillBitSet(int length, boolean value) { return bitset; } + /** + * Read log header fully or fail with EOFException if there is no enough bytes to read a full + * log header. This handles different log header size for magic v0 and v1. + */ + static void readLogHeaderFullyOrFail(FileChannel channel, ByteBuffer buffer, int position) + throws IOException { + if (position < 0) { + throw new IllegalArgumentException( + "The file channel position cannot be negative, but it is " + position); + } + readFully(channel, buffer, position); + if (buffer.hasRemaining()) { + int size = buffer.position(); + byte magic = buffer.get(MAGIC_OFFSET); + if (magic == LOG_MAGIC_VALUE_V0 && size < V0_RECORD_BATCH_HEADER_SIZE) { + throw new EOFException( + String.format( + "Failed to read v0 log header from file channel `%s`. Expected to read %d bytes, " + + "but reached end of file after reading %d bytes. Started read from position %d.", + channel, V0_RECORD_BATCH_HEADER_SIZE, size, position)); + } else if (magic == LOG_MAGIC_VALUE_V1 && size < V1_RECORD_BATCH_HEADER_SIZE) { + throw new EOFException( + String.format( + "Failed to read v1 log header from file channel `%s`. Expected to read %d bytes, " + + "but reached end of file after reading %d bytes. Started read from position %d.", + channel, V1_RECORD_BATCH_HEADER_SIZE, size, position)); + } + } + } + @VisibleForTesting ByteBuffer getLogHeaderBuffer() { return logHeaderBuffer; diff --git a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java b/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java index e14b6ce140..385e457b8e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/IndexedLogRecord.java @@ -30,7 +30,7 @@ import java.io.IOException; -import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH; +import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for diff --git a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java index c66ecb3836..12c3217101 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/KvRecordBatchBuilder.java @@ -37,6 +37,8 @@ import static org.apache.fluss.record.DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE; import static org.apache.fluss.record.DefaultKvRecordBatch.SCHEMA_ID_OFFSET; import static org.apache.fluss.record.KvRecordBatch.CURRENT_KV_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; import static org.apache.fluss.utils.Preconditions.checkArgument; /** Builder for {@link DefaultKvRecordBatch} memory bytes. */ @@ -72,8 +74,8 @@ private KvRecordBatchBuilder( this.writeLimit = writeLimit; this.pagedOutputView = pagedOutputView; this.firstSegment = pagedOutputView.getCurrentSegment(); - this.writerId = LogRecordBatch.NO_WRITER_ID; - this.batchSequence = LogRecordBatch.NO_BATCH_SEQUENCE; + this.writerId = NO_WRITER_ID; + this.batchSequence = NO_BATCH_SEQUENCE; this.currentRecordNumber = 0; this.isClosed = false; // We don't need to write header information while the builder creating, diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java index 8dcab1be23..d28c28ad38 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatch.java @@ -26,6 +26,9 @@ import java.util.Iterator; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; + /** * A record batch is a container for {@link LogRecord LogRecords}. * @@ -33,17 +36,8 @@ */ @PublicEvolving public interface LogRecordBatch { - - /** The "magic" values. */ - byte LOG_MAGIC_VALUE_V0 = 0; - /** The current "magic" value. */ - byte CURRENT_LOG_MAGIC_VALUE = LOG_MAGIC_VALUE_V0; - - /** Value used if non-idempotent. */ - long NO_WRITER_ID = -1L; - - int NO_BATCH_SEQUENCE = -1; + byte CURRENT_LOG_MAGIC_VALUE = LOG_MAGIC_VALUE_V1; /** * Check whether the checksum of this batch is correct. @@ -129,6 +123,13 @@ default boolean hasWriterId() { */ int batchSequence(); + /** + * Get leader epoch of this bucket for this log record batch. + * + * @return leader epoch + */ + int leaderEpoch(); + /** * Get the size in bytes of this batch, including the size of the record and the batch overhead. * diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java new file mode 100644 index 0000000000..d0f42c052c --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordBatchFormat.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.exception.OutOfOrderSequenceException; + +/** The format of Fluss how to organize and storage a {@link LogRecordBatch}. */ +public class LogRecordBatchFormat { + + // ---------------------------------------------------------------------------------------- + // Common Variables + // ---------------------------------------------------------------------------------------- + + /** Value used if non-idempotent. */ + public static final long NO_WRITER_ID = -1L; + + public static final int NO_BATCH_SEQUENCE = -1; + + /** + * Used to indicate an unknown leaderEpoch, which will be the case when the record set is first + * created by the writer or the magic lower than V1. + */ + public static final int NO_LEADER_EPOCH = -1; + + public static final int BASE_OFFSET_LENGTH = 8; + public static final int LENGTH_LENGTH = 4; + public static final int MAGIC_LENGTH = 1; + private static final int COMMIT_TIMESTAMP_LENGTH = 8; + private static final int CRC_LENGTH = 4; + private static final int SCHEMA_ID_LENGTH = 2; + private static final int LEADER_EPOCH_LENGTH = 4; + private static final int ATTRIBUTE_LENGTH = 1; + private static final int LAST_OFFSET_DELTA_LENGTH = 4; + private static final int WRITE_CLIENT_ID_LENGTH = 8; + private static final int BATCH_SEQUENCE_LENGTH = 4; + private static final int RECORDS_COUNT_LENGTH = 4; + + public static final int BASE_OFFSET_OFFSET = 0; + public static final int LENGTH_OFFSET = BASE_OFFSET_OFFSET + BASE_OFFSET_LENGTH; + public static final int MAGIC_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH; + public static final int COMMIT_TIMESTAMP_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH; + public static final int LOG_OVERHEAD = LENGTH_OFFSET + LENGTH_LENGTH; + public static final int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET + MAGIC_LENGTH; + + // ---------------------------------------------------------------------------------------- + // Format of Magic Version: V1 + // ---------------------------------------------------------------------------------------- + + /** + * LogRecordBatch implementation for magic 1 (V1). The schema of {@link LogRecordBatch} is given + * below: + * + *
    + * RecordBatch => + *
  • BaseOffset => Int64 + *
  • Length => Int32 + *
  • Magic => Int8 + *
  • CommitTimestamp => Int64 + *
  • LeaderEpoch => Int32 + *
  • CRC => Uint32 + *
  • SchemaId => Int16 + *
  • Attributes => Int8 + *
  • LastOffsetDelta => Int32 + *
  • WriterID => Int64 + *
  • SequenceID => Int32 + *
  • RecordCount => Int32 + *
  • Records => [Record] + *
+ * + *

Newly added field in LogRecordBatch header of magic V1 is LeaderEpoch, which used to build + * a consistent leaderEpoch cache across different tabletServers. + * + *

The CRC covers the data from the schemaId to the end of the batch (i.e. all the bytes that + * follow the CRC). It is located after the magic byte, which means that clients must parse the + * magic byte before deciding how to interpret the bytes between the batch length and the magic + * byte. The CRC-32C (Castagnoli) polynomial is used for the computation. CommitTimestamp is + * also located before the CRC, because it is determined in server side. + * + *

The field 'lastOffsetDelta is used to calculate the lastOffset of the current batch as: + * [lastOffset = baseOffset + LastOffsetDelta] instead of [lastOffset = baseOffset + recordCount + * - 1]. The reason for introducing this field is that there might be cases where the offset + * delta in batch does not match the recordCount. For example, when generating CDC logs for a kv + * table and sending a batch that only contains the deletion of non-existent kvs, no CDC logs + * would be generated. However, we need to increment the batchSequence for the corresponding + * writerId to make sure no {@link OutOfOrderSequenceException} will be thrown. In such a case, + * we would generate a logRecordBatch with a LastOffsetDelta of 0 but a recordCount of 0. + * + *

The current attributes are given below: + * + *

+     * ------------------------------------------
+     * |  Unused (1-7)   |  AppendOnly Flag (0) |
+     * ------------------------------------------
+     * 
+ * + * @since 0.7 + */ + public static final byte LOG_MAGIC_VALUE_V1 = 1; + + private static final int V1_LEADER_EPOCH_OFFSET = + COMMIT_TIMESTAMP_OFFSET + COMMIT_TIMESTAMP_LENGTH; + private static final int V1_CRC_OFFSET = V1_LEADER_EPOCH_OFFSET + LEADER_EPOCH_LENGTH; + private static final int V1_SCHEMA_ID_OFFSET = V1_CRC_OFFSET + CRC_LENGTH; + private static final int V1_ATTRIBUTES_OFFSET = V1_SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; + private static final int V1_LAST_OFFSET_DELTA_OFFSET = V1_ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; + private static final int V1_WRITE_CLIENT_ID_OFFSET = + V1_LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH; + private static final int V1_BATCH_SEQUENCE_OFFSET = + V1_WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH; + private static final int V1_RECORDS_COUNT_OFFSET = + V1_BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; + private static final int V1_RECORDS_OFFSET = V1_RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + + public static final int V1_RECORD_BATCH_HEADER_SIZE = V1_RECORDS_OFFSET; + private static final int V1_ARROW_CHANGETYPE_OFFSET = V1_RECORD_BATCH_HEADER_SIZE; + + // ---------------------------------------------------------------------------------------- + // Format of Magic Version: V0 + // ---------------------------------------------------------------------------------------- + + /** + * LogRecordBatch implementation for magic 0 (V0). The schema of {@link LogRecordBatch} is given + * below: + * + *
    + * RecordBatch => + *
  • BaseOffset => Int64 + *
  • Length => Int32 + *
  • Magic => Int8 + *
  • CommitTimestamp => Int64 + *
  • CRC => Uint32 + *
  • SchemaId => Int16 + *
  • Attributes => Int8 + *
  • LastOffsetDelta => Int32 + *
  • WriterID => Int64 + *
  • SequenceID => Int32 + *
  • RecordCount => Int32 + *
  • Records => [Record] + *
+ * + *

The current attributes are given below: + * + *

+     * ------------------------------------------
+     * |  Unused (1-7)   |  AppendOnly Flag (0) |
+     * ------------------------------------------
+     * 
+ * + * @since 0.1 + */ + public static final byte LOG_MAGIC_VALUE_V0 = 0; + + private static final int V0_CRC_OFFSET = COMMIT_TIMESTAMP_OFFSET + COMMIT_TIMESTAMP_LENGTH; + private static final int V0_SCHEMA_ID_OFFSET = V0_CRC_OFFSET + CRC_LENGTH; + private static final int V0_ATTRIBUTES_OFFSET = V0_SCHEMA_ID_OFFSET + SCHEMA_ID_LENGTH; + private static final int V0_LAST_OFFSET_DELTA_OFFSET = V0_ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH; + private static final int V0_WRITE_CLIENT_ID_OFFSET = + V0_LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH; + private static final int V0_BATCH_SEQUENCE_OFFSET = + V0_WRITE_CLIENT_ID_OFFSET + WRITE_CLIENT_ID_LENGTH; + private static final int V0_RECORDS_COUNT_OFFSET = + V0_BATCH_SEQUENCE_OFFSET + BATCH_SEQUENCE_LENGTH; + private static final int V0_RECORDS_OFFSET = V0_RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH; + + public static final int V0_RECORD_BATCH_HEADER_SIZE = V0_RECORDS_OFFSET; + private static final int V0_ARROW_CHANGETYPE_OFFSET = V0_RECORD_BATCH_HEADER_SIZE; + + // ---------------------------------------------------------------------------------------- + // Static Methods + // ---------------------------------------------------------------------------------------- + + public static int leaderEpochOffset(byte magic) { + if (magic == LOG_MAGIC_VALUE_V1) { + return V1_LEADER_EPOCH_OFFSET; + } + throw new IllegalArgumentException("Unsupported magic value " + magic); + } + + public static int crcOffset(byte magic) { + switch (magic) { + case LOG_MAGIC_VALUE_V1: + return V1_CRC_OFFSET; + case LOG_MAGIC_VALUE_V0: + return V0_CRC_OFFSET; + default: + throw new IllegalArgumentException("Unsupported magic value " + magic); + } + } + + public static int schemaIdOffset(byte magic) { + switch (magic) { + case LOG_MAGIC_VALUE_V1: + return V1_SCHEMA_ID_OFFSET; + case LOG_MAGIC_VALUE_V0: + return V0_SCHEMA_ID_OFFSET; + default: + throw new IllegalArgumentException("Unsupported magic value " + magic); + } + } + + public static int attributeOffset(byte magic) { + switch (magic) { + case LOG_MAGIC_VALUE_V1: + return V1_ATTRIBUTES_OFFSET; + case LOG_MAGIC_VALUE_V0: + return V0_ATTRIBUTES_OFFSET; + default: + throw new IllegalArgumentException("Unsupported magic value " + magic); + } + } + + public static int lastOffsetDeltaOffset(byte magic) { + switch (magic) { + case LOG_MAGIC_VALUE_V1: + return V1_LAST_OFFSET_DELTA_OFFSET; + case LOG_MAGIC_VALUE_V0: + return V0_LAST_OFFSET_DELTA_OFFSET; + default: + throw new IllegalArgumentException("Unsupported magic value " + magic); + } + } + + public static int writeClientIdOffset(byte magic) { + switch (magic) { + case LOG_MAGIC_VALUE_V1: + return V1_WRITE_CLIENT_ID_OFFSET; + case LOG_MAGIC_VALUE_V0: + return V0_WRITE_CLIENT_ID_OFFSET; + default: + throw new IllegalArgumentException("Unsupported magic value " + magic); + } + } + + public static int batchSequenceOffset(byte magic) { + switch (magic) { + case LOG_MAGIC_VALUE_V1: + return V1_BATCH_SEQUENCE_OFFSET; + case LOG_MAGIC_VALUE_V0: + return V0_BATCH_SEQUENCE_OFFSET; + default: + throw new IllegalArgumentException("Unsupported magic value " + magic); + } + } + + public static int recordsCountOffset(byte magic) { + switch (magic) { + case LOG_MAGIC_VALUE_V1: + return V1_RECORDS_COUNT_OFFSET; + case LOG_MAGIC_VALUE_V0: + return V0_RECORDS_COUNT_OFFSET; + default: + throw new IllegalArgumentException("Unsupported magic value " + magic); + } + } + + public static int recordBatchHeaderSize(byte magic) { + switch (magic) { + case LOG_MAGIC_VALUE_V1: + return V1_RECORD_BATCH_HEADER_SIZE; + case LOG_MAGIC_VALUE_V0: + return V0_RECORD_BATCH_HEADER_SIZE; + default: + throw new IllegalArgumentException("Unsupported magic value " + magic); + } + } + + public static int arrowChangeTypeOffset(byte magic) { + switch (magic) { + case LOG_MAGIC_VALUE_V1: + return V1_ARROW_CHANGETYPE_OFFSET; + case LOG_MAGIC_VALUE_V0: + return V0_ARROW_CHANGETYPE_OFFSET; + default: + throw new IllegalArgumentException("Unsupported magic value " + magic); + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java index ab8128f06c..8ff8997e58 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecords.java @@ -27,6 +27,8 @@ import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; + /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ @@ -72,13 +74,14 @@ public void clear() { sizeInBytes = 0; } - public void ensureValid() { - if (sizeInBytes < DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE) { + public void ensureValid(byte recordBatchMagic) { + int recordBatchHeaderSize = recordBatchHeaderSize(recordBatchMagic); + if (sizeInBytes < recordBatchHeaderSize) { throw new RuntimeException( "Record batch is corrupt (the size " + sizeInBytes + " is smaller than the minimum allowed overhead " - + DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE + + recordBatchHeaderSize + ")"); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java index 225e948db7..94b2fe59f7 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilder.java @@ -29,15 +29,17 @@ import java.io.IOException; -import static org.apache.fluss.record.DefaultLogRecordBatch.ARROW_CHANGETYPE_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH; -import static org.apache.fluss.record.DefaultLogRecordBatch.CRC_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH; -import static org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; -import static org.apache.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET; import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; -import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE; -import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID; +import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH; +import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; +import static org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; +import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset; import static org.apache.fluss.utils.Preconditions.checkArgument; import static org.apache.fluss.utils.Preconditions.checkNotNull; @@ -89,24 +91,26 @@ private MemoryLogRecordsArrowBuilder( this.pagedOutputView = pagedOutputView; this.firstSegment = pagedOutputView.getCurrentSegment(); + int arrowChangeTypeOffset = arrowChangeTypeOffset(magic); checkArgument( - firstSegment.size() >= ARROW_CHANGETYPE_OFFSET, + firstSegment.size() >= arrowChangeTypeOffset, "The size of first segment of pagedOutputView is too small, need at least " - + ARROW_CHANGETYPE_OFFSET + + arrowChangeTypeOffset + " bytes."); - this.changeTypeWriter = new ChangeTypeVectorWriter(firstSegment, ARROW_CHANGETYPE_OFFSET); - this.estimatedSizeInBytes = RECORD_BATCH_HEADER_SIZE; + this.changeTypeWriter = new ChangeTypeVectorWriter(firstSegment, arrowChangeTypeOffset); + this.estimatedSizeInBytes = recordBatchHeaderSize(magic); this.recordCount = 0; } @VisibleForTesting public static MemoryLogRecordsArrowBuilder builder( long baseLogOffset, + byte magic, int schemaId, ArrowWriter arrowWriter, AbstractPagedOutputView outputView) { return new MemoryLogRecordsArrowBuilder( - baseLogOffset, schemaId, CURRENT_LOG_MAGIC_VALUE, arrowWriter, outputView, false); + baseLogOffset, schemaId, magic, arrowWriter, outputView, false); } /** Builder with limited write size and the memory segment used to serialize records. */ @@ -139,7 +143,7 @@ public MultiBytesView build() throws IOException { // serialize the arrow batch to dynamically allocated memory segments arrowWriter.serializeToOutputView( - pagedOutputView, ARROW_CHANGETYPE_OFFSET + changeTypeWriter.sizeInBytes()); + pagedOutputView, arrowChangeTypeOffset(magic) + changeTypeWriter.sizeInBytes()); recordCount = arrowWriter.getRecordsCount(); bytesView = MultiBytesView.builder() @@ -237,7 +241,7 @@ public int estimatedSizeInBytes() { if (reCalculateSizeInBytes) { // make size in bytes up-to-date estimatedSizeInBytes = - ARROW_CHANGETYPE_OFFSET + arrowChangeTypeOffset(magic) + changeTypeWriter.sizeInBytes() + arrowWriter.estimatedSizeInBytes(); } @@ -259,6 +263,12 @@ private void writeBatchHeader() throws IOException { // write empty timestamp which will be overridden on server side outputView.writeLong(0); + + // write empty leaderEpoch which will be overridden on server side + if (magic >= LOG_MAGIC_VALUE_V1) { + outputView.writeInt(NO_LEADER_EPOCH); + } + // write empty crc first. outputView.writeUnsignedInt(0); // write schema id @@ -278,8 +288,8 @@ private void writeBatchHeader() throws IOException { outputView.writeInt(recordCount); // Update crc. - long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), SCHEMA_ID_OFFSET); - outputView.setPosition(CRC_OFFSET); + long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), schemaIdOffset(magic)); + outputView.setPosition(crcOffset(magic)); outputView.writeUnsignedInt(crc); } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java index b62c2cef73..8488c3351b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/MemoryLogRecordsIndexedBuilder.java @@ -29,15 +29,17 @@ import java.io.IOException; -import static org.apache.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH; -import static org.apache.fluss.record.DefaultLogRecordBatch.CRC_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.LAST_OFFSET_DELTA_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH; -import static org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; -import static org.apache.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET; import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; -import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE; -import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID; +import static org.apache.fluss.record.LogRecordBatchFormat.BASE_OFFSET_LENGTH; +import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_LENGTH; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_LEADER_EPOCH; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; +import static org.apache.fluss.record.LogRecordBatchFormat.crcOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.lastOffsetDeltaOffset; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; +import static org.apache.fluss.record.LogRecordBatchFormat.schemaIdOffset; import static org.apache.fluss.utils.Preconditions.checkArgument; /** @@ -87,8 +89,8 @@ private MemoryLogRecordsIndexedBuilder( // We don't need to write header information while the builder creating, // we'll skip it first. - this.pagedOutputView.setPosition(RECORD_BATCH_HEADER_SIZE); - this.sizeInBytes = RECORD_BATCH_HEADER_SIZE; + this.pagedOutputView.setPosition(recordBatchHeaderSize(magic)); + this.sizeInBytes = recordBatchHeaderSize(magic); } public static MemoryLogRecordsIndexedBuilder builder( @@ -219,6 +221,12 @@ private void writeBatchHeader() throws IOException { // write empty timestamp which will be overridden on server side outputView.writeLong(0); + + // write empty leaderEpoch which will be overridden on server side + if (magic >= LOG_MAGIC_VALUE_V1) { + outputView.writeInt(NO_LEADER_EPOCH); + } + // write empty crc first. outputView.writeUnsignedInt(0); @@ -226,7 +234,7 @@ private void writeBatchHeader() throws IOException { // write attributes (currently only appendOnly flag) outputView.writeBoolean(appendOnly); // skip write attribute byte for now. - outputView.setPosition(LAST_OFFSET_DELTA_OFFSET); + outputView.setPosition(lastOffsetDeltaOffset(magic)); if (currentRecordNumber > 0) { outputView.writeInt(currentRecordNumber - 1); } else { @@ -239,8 +247,8 @@ private void writeBatchHeader() throws IOException { outputView.writeInt(currentRecordNumber); // Update crc. - long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), SCHEMA_ID_OFFSET); - outputView.setPosition(CRC_OFFSET); + long crc = Crc32C.compute(pagedOutputView.getWrittenSegments(), schemaIdOffset(magic)); + outputView.setPosition(crcOffset(magic)); outputView.writeUnsignedInt(crc); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java b/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java index feaf430ff0..5e2bdebe51 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/MemorySegmentLogInputStream.java @@ -19,8 +19,9 @@ import org.apache.fluss.memory.MemorySegment; -import static org.apache.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET; -import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD; +import static org.apache.fluss.record.LogRecordBatchFormat.LENGTH_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD; +import static org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE; /** * A byte buffer backed log input stream. This class avoids the need to copy records by returning @@ -40,7 +41,8 @@ class MemorySegmentLogInputStream implements LogInputStream { public LogRecordBatch nextBatch() { Integer batchSize = nextBatchSize(); - if (batchSize == null || remaining < batchSize) { + // should at-least larger than V0 header size, because V1 header is larger than V0. + if (batchSize == null || remaining < batchSize || remaining < V0_RECORD_BATCH_HEADER_SIZE) { return null; } diff --git a/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java b/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java index bfb23da947..70f881e062 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/DefaultLogRecordBatchTest.java @@ -24,21 +24,26 @@ import org.apache.fluss.types.RowType; import org.apache.fluss.utils.CloseableIterator; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; +import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link DefaultLogRecordBatch}. */ public class DefaultLogRecordBatchTest extends LogTestBase { - @Test - void testRecordBatchSize() throws Exception { + @ParameterizedTest + @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) + void testRecordBatchSize(byte magic) throws Exception { MemoryLogRecords memoryLogRecords = - DataTestUtils.genMemoryLogRecordsByObject(TestData.DATA1); + DataTestUtils.genMemoryLogRecordsByObject(magic, TestData.DATA1); int totalSize = 0; for (LogRecordBatch logRecordBatch : memoryLogRecords.batches()) { totalSize += logRecordBatch.sizeInBytes(); @@ -46,8 +51,9 @@ void testRecordBatchSize() throws Exception { assertThat(totalSize).isEqualTo(memoryLogRecords.sizeInBytes()); } - @Test - void testIndexedRowWriteAndReadBatch() throws Exception { + @ParameterizedTest + @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) + void testIndexedRowWriteAndReadBatch(byte magic) throws Exception { int recordNumber = 50; RowType allRowType = TestInternalRowGenerator.createAllRowType(); MemoryLogRecordsIndexedBuilder builder = @@ -98,8 +104,9 @@ void testIndexedRowWriteAndReadBatch() throws Exception { builder.close(); } - @Test - void testNoRecordAppend() throws Exception { + @ParameterizedTest + @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) + void testNoRecordAppend(byte magic) throws Exception { // 1. no record append with baseOffset as 0. MemoryLogRecordsIndexedBuilder builder = MemoryLogRecordsIndexedBuilder.builder( @@ -107,7 +114,7 @@ void testNoRecordAppend() throws Exception { MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build()); Iterator iterator = memoryLogRecords.batches().iterator(); // only contains batch header. - assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48); + assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(recordBatchHeaderSize(magic)); assertThat(iterator.hasNext()).isTrue(); LogRecordBatch logRecordBatch = iterator.next(); @@ -135,7 +142,7 @@ void testNoRecordAppend() throws Exception { memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build()); iterator = memoryLogRecords.batches().iterator(); // only contains batch header. - assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48); + assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(recordBatchHeaderSize(magic)); assertThat(iterator.hasNext()).isTrue(); logRecordBatch = iterator.next(); diff --git a/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java b/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java index 9c8e1dab26..7de046ec74 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/FileLogInputStreamTest.java @@ -17,28 +17,40 @@ package org.apache.fluss.record; +import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.utils.CloseableIterator; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.util.Collections; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; -import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithBaseOffset; +import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; +import static org.apache.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link FileLogInputStream}. */ public class FileLogInputStreamTest extends LogTestBase { private @TempDir File tempDir; - @Test - void testWriteTo() throws Exception { + @ParameterizedTest + @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) + void testWriteTo(byte recordBatchMagic) throws Exception { try (FileLogRecords fileLogRecords = FileLogRecords.open(new File(tempDir, "test.tmp"))) { fileLogRecords.append( - genMemoryLogRecordsWithBaseOffset( - 0L, Collections.singletonList(new Object[] {0, "abc"}))); + createRecordsWithoutBaseLogOffset( + DATA1_ROW_TYPE, + DEFAULT_SCHEMA_ID, + 0L, + -1L, + recordBatchMagic, + Collections.singletonList(new Object[] {0, "abc"}), + LogFormat.ARROW)); fileLogRecords.flush(); FileLogInputStream logInputStream = @@ -46,7 +58,7 @@ void testWriteTo() throws Exception { FileLogInputStream.FileChannelLogRecordBatch batch = logInputStream.nextBatch(); assertThat(batch).isNotNull(); - assertThat(batch.magic()).isEqualTo(magic); + assertThat(batch.magic()).isEqualTo(recordBatchMagic); LogRecordBatch recordBatch = batch.loadFullBatch(); diff --git a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java index 515289cb42..c19ecc8828 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java @@ -28,15 +28,23 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.EOFException; import java.io.File; +import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.stream.Stream; import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; +import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET; +import static org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE; +import static org.apache.fluss.record.LogRecordBatchFormat.V1_RECORD_BATCH_HEADER_SIZE; import static org.apache.fluss.record.LogRecordReadContext.createArrowReadContext; import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static org.apache.fluss.testutils.DataTestUtils.createRecordsWithoutBaseLogOffset; @@ -126,17 +134,23 @@ void testIllegalSetCurrentProjection() { static Stream projectedFieldsArgs() { return Stream.of( - Arguments.of((Object) new int[] {0}), - Arguments.arguments((Object) new int[] {1}), - Arguments.arguments((Object) new int[] {0, 1})); + Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V0), + Arguments.arguments((Object) new int[] {1}, LOG_MAGIC_VALUE_V0), + Arguments.arguments((Object) new int[] {0, 1}, LOG_MAGIC_VALUE_V0), + Arguments.of((Object) new int[] {0}, LOG_MAGIC_VALUE_V1), + Arguments.arguments((Object) new int[] {1}, LOG_MAGIC_VALUE_V1), + Arguments.arguments((Object) new int[] {0, 1}, LOG_MAGIC_VALUE_V1)); } @ParameterizedTest @MethodSource("projectedFieldsArgs") - void testProject(int[] projectedFields) throws Exception { + void testProject(int[] projectedFields, byte recordBatchMagic) throws Exception { FileLogRecords fileLogRecords = createFileLogRecords( - TestData.DATA1_ROW_TYPE, TestData.DATA1, TestData.ANOTHER_DATA1); + recordBatchMagic, + TestData.DATA1_ROW_TYPE, + TestData.DATA1, + TestData.ANOTHER_DATA1); List results = doProjection( new FileLogProjection(), @@ -159,11 +173,15 @@ void testProject(int[] projectedFields) throws Exception { assertEquals(results, expected); } - @Test - void testIllegalByteOrder() throws Exception { + @ParameterizedTest + @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) + void testIllegalByteOrder(byte recordBatchMagic) throws Exception { FileLogRecords fileLogRecords = createFileLogRecords( - TestData.DATA1_ROW_TYPE, TestData.DATA1, TestData.ANOTHER_DATA1); + recordBatchMagic, + TestData.DATA1_ROW_TYPE, + TestData.DATA1, + TestData.ANOTHER_DATA1); FileLogProjection projection = new FileLogProjection(); // overwrite the wrong decoding byte order endian projection.getLogHeaderBuffer().order(ByteOrder.BIG_ENDIAN); @@ -180,14 +198,18 @@ void testIllegalByteOrder() throws Exception { .hasMessageContaining("Failed to read `arrow header` from file channel"); } - @Test - void testProjectSizeLimited() throws Exception { + @ParameterizedTest + @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) + void testProjectSizeLimited(byte recordBatchMagic) throws Exception { List allData = new ArrayList<>(); allData.addAll(TestData.DATA1); allData.addAll(TestData.ANOTHER_DATA1); FileLogRecords fileLogRecords = createFileLogRecords( - TestData.DATA1_ROW_TYPE, TestData.DATA1, TestData.ANOTHER_DATA1); + recordBatchMagic, + TestData.DATA1_ROW_TYPE, + TestData.DATA1, + TestData.ANOTHER_DATA1); int totalSize = fileLogRecords.sizeInBytes(); boolean hasEmpty = false; boolean hasHalf = false; @@ -218,9 +240,89 @@ void testProjectSizeLimited() throws Exception { assertThat(hasFull).isTrue(); } + @Test + void testReadLogHeaderFullyOrFail() throws Exception { + ByteBuffer logHeaderBuffer = ByteBuffer.allocate(V1_RECORD_BATCH_HEADER_SIZE); + + // only V1 log header, should read fully + try (FileLogRecords fileLogRecords = + createFileWithLogHeader(LOG_MAGIC_VALUE_V1, V1_RECORD_BATCH_HEADER_SIZE)) { + FileLogProjection.readLogHeaderFullyOrFail( + fileLogRecords.channel(), logHeaderBuffer, 0); + assertThat(logHeaderBuffer.hasRemaining()).isFalse(); + } + + // V1 log header with data, should read fully + try (FileLogRecords fileLogRecords = createFileWithLogHeader(LOG_MAGIC_VALUE_V1, 100)) { + logHeaderBuffer.rewind(); + FileLogProjection.readLogHeaderFullyOrFail( + fileLogRecords.channel(), logHeaderBuffer, 0); + assertThat(logHeaderBuffer.hasRemaining()).isFalse(); + } + + // only v0 log header, should only read 48 bytes + try (FileLogRecords fileLogRecords = + createFileWithLogHeader(LOG_MAGIC_VALUE_V0, V0_RECORD_BATCH_HEADER_SIZE)) { + logHeaderBuffer.rewind(); + FileLogProjection.readLogHeaderFullyOrFail( + fileLogRecords.channel(), logHeaderBuffer, 0); + assertThat(logHeaderBuffer.hasRemaining()).isTrue(); + assertThat(logHeaderBuffer.position()).isEqualTo(V0_RECORD_BATCH_HEADER_SIZE); + } + + // v0 log header with data, should read fully + try (FileLogRecords fileLogRecords = createFileWithLogHeader(LOG_MAGIC_VALUE_V0, 100)) { + logHeaderBuffer.rewind(); + FileLogProjection.readLogHeaderFullyOrFail( + fileLogRecords.channel(), logHeaderBuffer, 0); + assertThat(logHeaderBuffer.hasRemaining()).isFalse(); + } + + // v1 log header incomplete, should throw exception + try (FileLogRecords fileLogRecords = + createFileWithLogHeader(LOG_MAGIC_VALUE_V1, V0_RECORD_BATCH_HEADER_SIZE)) { + logHeaderBuffer.rewind(); + assertThatThrownBy( + () -> + FileLogProjection.readLogHeaderFullyOrFail( + fileLogRecords.channel(), logHeaderBuffer, 0), + "Should throw exception if the log header is incomplete") + .isInstanceOf(EOFException.class) + .hasMessageContaining( + "Expected to read 52 bytes, but reached end of file after reading 48 bytes."); + } + + // v0 log header incomplete, should throw exception + try (FileLogRecords fileLogRecords = + createFileWithLogHeader(LOG_MAGIC_VALUE_V0, V0_RECORD_BATCH_HEADER_SIZE - 1)) { + logHeaderBuffer.rewind(); + assertThatThrownBy( + () -> + FileLogProjection.readLogHeaderFullyOrFail( + fileLogRecords.channel(), logHeaderBuffer, 0), + "Should throw exception if the log header is incomplete") + .isInstanceOf(EOFException.class) + .hasMessageContaining( + "Expected to read 48 bytes, but reached end of file after reading 47 bytes."); + } + } + + private FileLogRecords createFileWithLogHeader(byte magic, int length) throws Exception { + ByteBuffer buffer = ByteBuffer.allocate(length).order(ByteOrder.LITTLE_ENDIAN); + buffer.position(MAGIC_OFFSET); + buffer.put(magic); + buffer.position(length); + buffer.flip(); + File file = new File(tempDir, UUID.randomUUID() + ".log"); + FileLogRecords fileLogRecords = FileLogRecords.open(file); + fileLogRecords.channel().write(buffer); + fileLogRecords.flush(); + return fileLogRecords; + } + @SafeVarargs - private final FileLogRecords createFileLogRecords(RowType rowType, List... inputs) - throws Exception { + final FileLogRecords createFileLogRecords( + byte recordBatchMagic, RowType rowType, List... inputs) throws Exception { FileLogRecords fileLogRecords = FileLogRecords.open(new File(tempDir, "test.tmp")); long offsetBase = 0L; for (List input : inputs) { @@ -230,6 +332,7 @@ private final FileLogRecords createFileLogRecords(RowType rowType, List leaderEpochOffset(magic)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Unsupported magic value 0"); + assertThat(crcOffset(magic)).isEqualTo(21); + assertThat(schemaIdOffset(magic)).isEqualTo(25); + assertThat(attributeOffset(magic)).isEqualTo(27); + assertThat(lastOffsetDeltaOffset(magic)).isEqualTo(28); + assertThat(writeClientIdOffset(magic)).isEqualTo(32); + assertThat(batchSequenceOffset(magic)).isEqualTo(40); + assertThat(recordsCountOffset(magic)).isEqualTo(44); + assertThat(recordBatchHeaderSize(magic)).isEqualTo(48); + assertThat(arrowChangeTypeOffset(magic)).isEqualTo(48); + } + + @Test + void testLogRecordBatchFormatForMagicV1() { + byte magic = (byte) 1; + assertThat(leaderEpochOffset(magic)).isEqualTo(21); + assertThat(crcOffset(magic)).isEqualTo(25); + assertThat(schemaIdOffset(magic)).isEqualTo(29); + assertThat(attributeOffset(magic)).isEqualTo(31); + assertThat(lastOffsetDeltaOffset(magic)).isEqualTo(32); + assertThat(writeClientIdOffset(magic)).isEqualTo(36); + assertThat(batchSequenceOffset(magic)).isEqualTo(44); + assertThat(recordsCountOffset(magic)).isEqualTo(48); + assertThat(recordBatchHeaderSize(magic)).isEqualTo(52); + assertThat(arrowChangeTypeOffset(magic)).isEqualTo(52); + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java index 7759e711e2..5c0df6e363 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/MemoryLogRecordsArrowBuilderTest.java @@ -38,17 +38,23 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; import static org.apache.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; @@ -77,14 +83,15 @@ void tearDown() { allocator.close(); } - @Test - void testAppendWithEmptyRecord() throws Exception { + @ParameterizedTest + @MethodSource("magicAndExpectedBatchSize") + void testAppendWithEmptyRecord(byte recordBatchMagic, int expectedBatchSize) throws Exception { int maxSizeInBytes = 1024; ArrowWriter writer = provider.getOrCreateWriter( 1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, DEFAULT_COMPRESSION); MemoryLogRecordsArrowBuilder builder = - createMemoryLogRecordsArrowBuilder(0, writer, 10, 100); + createMemoryLogRecordsArrowBuilder(0, writer, 10, 100, recordBatchMagic); assertThat(builder.isFull()).isFalse(); assertThat(builder.getWriteLimitInBytes()) .isEqualTo((int) (maxSizeInBytes * BUFFER_USAGE_RATIO)); @@ -95,18 +102,19 @@ void testAppendWithEmptyRecord() throws Exception { assertThat(iterator.hasNext()).isTrue(); LogRecordBatch batch = iterator.next(); assertThat(batch.getRecordCount()).isEqualTo(0); - assertThat(batch.sizeInBytes()).isEqualTo(48); + assertThat(batch.sizeInBytes()).isEqualTo(expectedBatchSize); assertThat(iterator.hasNext()).isFalse(); } - @Test - void testAppend() throws Exception { + @ParameterizedTest + @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) + void testAppend(byte recordBatchMagic) throws Exception { int maxSizeInBytes = 1024; ArrowWriter writer = provider.getOrCreateWriter( 1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, DEFAULT_COMPRESSION); MemoryLogRecordsArrowBuilder builder = - createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024); + createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024, recordBatchMagic); List changeTypes = DATA1.stream().map(row -> ChangeType.APPEND_ONLY).collect(Collectors.toList()); List rows = @@ -157,7 +165,7 @@ void testCompression(ArrowCompressionInfo compressionInfo) throws Exception { provider.getOrCreateWriter( 1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, NO_COMPRESSION); MemoryLogRecordsArrowBuilder builder = - createMemoryLogRecordsArrowBuilder(0, writer1, 10, 1024); + createMemoryLogRecordsArrowBuilder(0, writer1, 10, 1024, CURRENT_LOG_MAGIC_VALUE); for (Object[] data : dataSet) { builder.append(ChangeType.APPEND_ONLY, row(data)); } @@ -171,7 +179,7 @@ void testCompression(ArrowCompressionInfo compressionInfo) throws Exception { provider.getOrCreateWriter( 1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, compressionInfo); MemoryLogRecordsArrowBuilder builder2 = - createMemoryLogRecordsArrowBuilder(0, writer2, 10, 1024); + createMemoryLogRecordsArrowBuilder(0, writer2, 10, 1024, CURRENT_LOG_MAGIC_VALUE); for (Object[] data : dataSet) { builder2.append(ChangeType.APPEND_ONLY, row(data)); } @@ -184,8 +192,9 @@ void testCompression(ArrowCompressionInfo compressionInfo) throws Exception { assertThat(sizeInBytes1).isGreaterThan(sizeInBytes2); } - @Test - void testIllegalArgument() { + @ParameterizedTest + @MethodSource("magicAndExpectedBatchSize") + void testIllegalArgument(byte recordBatchMagic, int expectedBatchSize) { int maxSizeInBytes = 1024; assertThatThrownBy( () -> { @@ -196,12 +205,15 @@ void testIllegalArgument() { maxSizeInBytes, DATA1_ROW_TYPE, DEFAULT_COMPRESSION)) { - createMemoryLogRecordsArrowBuilder(0, writer, 10, 30); + createMemoryLogRecordsArrowBuilder( + 0, writer, 10, 30, recordBatchMagic); } }) .isInstanceOf(IllegalArgumentException.class) .hasMessage( - "The size of first segment of pagedOutputView is too small, need at least 48 bytes."); + "The size of first segment of pagedOutputView is too small, need at least " + + expectedBatchSize + + " bytes."); } @Test @@ -211,7 +223,7 @@ void testClose() throws Exception { provider.getOrCreateWriter( 1L, DEFAULT_SCHEMA_ID, 1024, DATA1_ROW_TYPE, NO_COMPRESSION); MemoryLogRecordsArrowBuilder builder = - createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024); + createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024, CURRENT_LOG_MAGIC_VALUE); List changeTypes = DATA1.stream().map(row -> ChangeType.APPEND_ONLY).collect(Collectors.toList()); List rows = @@ -244,17 +256,18 @@ void testClose() throws Exception { writer1.close(); } - @Test - void testNoRecordAppend() throws Exception { + @ParameterizedTest + @MethodSource("magicAndExpectedBatchSize") + void testNoRecordAppend(byte recordBatchMagic, int expectedBatchSize) throws Exception { // 1. no record append with base offset as 0. ArrowWriter writer = provider.getOrCreateWriter( 1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE, DEFAULT_COMPRESSION); MemoryLogRecordsArrowBuilder builder = - createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10); + createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024 * 10, recordBatchMagic); MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build()); // only contains batch header. - assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48); + assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(expectedBatchSize); Iterator iterator = memoryLogRecords.batches().iterator(); assertThat(iterator.hasNext()).isTrue(); LogRecordBatch logRecordBatch = iterator.next(); @@ -276,10 +289,10 @@ void testNoRecordAppend() throws Exception { ArrowWriter writer2 = provider.getOrCreateWriter( 1L, DEFAULT_SCHEMA_ID, 1024 * 10, DATA1_ROW_TYPE, DEFAULT_COMPRESSION); - builder = createMemoryLogRecordsArrowBuilder(100, writer2, 10, 1024 * 10); + builder = createMemoryLogRecordsArrowBuilder(100, writer2, 10, 1024 * 10, recordBatchMagic); memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build()); // only contains batch header. - assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(48); + assertThat(memoryLogRecords.sizeInBytes()).isEqualTo(expectedBatchSize); iterator = memoryLogRecords.batches().iterator(); assertThat(iterator.hasNext()).isTrue(); logRecordBatch = iterator.next(); @@ -305,7 +318,7 @@ void testResetWriterState() throws Exception { provider.getOrCreateWriter( 1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE, DEFAULT_COMPRESSION); MemoryLogRecordsArrowBuilder builder = - createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024); + createMemoryLogRecordsArrowBuilder(0, writer, 10, 1024, CURRENT_LOG_MAGIC_VALUE); List changeTypes = DATA1.stream().map(row -> ChangeType.APPEND_ONLY).collect(Collectors.toList()); List rows = @@ -343,8 +356,19 @@ private static List compressionInfos() { new ArrowCompressionInfo(ArrowCompressionType.ZSTD, 9)); } + private static Collection magicAndExpectedBatchSize() { + List params = new ArrayList<>(); + params.add(Arguments.arguments(LOG_MAGIC_VALUE_V0, 48)); + params.add(Arguments.arguments(LOG_MAGIC_VALUE_V1, 52)); + return params; + } + private MemoryLogRecordsArrowBuilder createMemoryLogRecordsArrowBuilder( - int baseOffset, ArrowWriter writer, int maxPages, int pageSizeInBytes) + int baseOffset, + ArrowWriter writer, + int maxPages, + int pageSizeInBytes, + byte recordBatchMagic) throws IOException { conf.set( ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, @@ -353,6 +377,7 @@ private MemoryLogRecordsArrowBuilder createMemoryLogRecordsArrowBuilder( conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(pageSizeInBytes)); return MemoryLogRecordsArrowBuilder.builder( baseOffset, + recordBatchMagic, DEFAULT_SCHEMA_ID, writer, new ManagedPagedOutputView(new TestingMemorySegmentPool(pageSizeInBytes))); diff --git a/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java b/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java index 3fe720853f..576e90131c 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/MemorySegmentLogInputStreamTest.java @@ -17,13 +17,16 @@ package org.apache.fluss.record; +import org.apache.fluss.memory.MemorySegment; import org.apache.fluss.testutils.DataTestUtils; import org.junit.jupiter.api.Test; import java.util.Iterator; -import static org.apache.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_OVERHEAD; +import static org.apache.fluss.record.LogRecordBatchFormat.MAGIC_OFFSET; import static org.apache.fluss.record.TestData.DATA1; import static org.assertj.core.api.Assertions.assertThat; @@ -52,13 +55,15 @@ void testNextBatch() throws Exception { iterator = getIterator(memoryLogRecords); assertThat(iterator.hasNext()).isFalse(); - // gen batch with enough size. + // gen batch with not enough size. memoryLogRecords = MemoryLogRecords.pointToBytes(new byte[LOG_OVERHEAD]); iterator = getIterator(memoryLogRecords); - assertThat(iterator.hasNext()).isTrue(); + assertThat(iterator.hasNext()).isFalse(); // gen batch with enough size. - memoryLogRecords = MemoryLogRecords.pointToBytes(new byte[12]); + MemorySegment memory = MemorySegment.allocateHeapMemory(100); + memory.put(MAGIC_OFFSET, CURRENT_LOG_MAGIC_VALUE); + memoryLogRecords = MemoryLogRecords.pointToBytes(memory.getHeapMemory()); iterator = getIterator(memoryLogRecords); assertThat(iterator.hasNext()).isTrue(); } diff --git a/fluss-common/src/test/java/org/apache/fluss/record/TestData.java b/fluss-common/src/test/java/org/apache/fluss/record/TestData.java index a920efa941..7752b15b06 100644 --- a/fluss-common/src/test/java/org/apache/fluss/record/TestData.java +++ b/fluss-common/src/test/java/org/apache/fluss/record/TestData.java @@ -32,11 +32,13 @@ import java.util.Arrays; import java.util.List; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; + /** utils to create test data. */ public final class TestData { public static final short DEFAULT_SCHEMA_ID = 1; public static final long BASE_OFFSET = 0L; - public static final byte DEFAULT_MAGIC = (byte) 0; + public static final byte DEFAULT_MAGIC = CURRENT_LOG_MAGIC_VALUE; // ---------------------------- data1 and related table info begin --------------------------- public static final List DATA1 = Arrays.asList( diff --git a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java index cedaadcbd0..ce9336c5e2 100644 --- a/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/row/arrow/ArrowReaderWriterTest.java @@ -44,7 +44,8 @@ import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; import static org.apache.fluss.compression.ArrowCompressionInfo.NO_COMPRESSION; -import static org.apache.fluss.record.DefaultLogRecordBatch.ARROW_CHANGETYPE_OFFSET; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.arrowChangeTypeOffset; import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; import static org.apache.fluss.testutils.DataTestUtils.row; @@ -147,12 +148,14 @@ void testReaderWriter() throws IOException { new ManagedPagedOutputView(new TestingMemorySegmentPool(10 * 1024)); // skip arrow batch header. - int size = writer.serializeToOutputView(pagedOutputView, ARROW_CHANGETYPE_OFFSET); + int size = + writer.serializeToOutputView( + pagedOutputView, arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE)); MemorySegment segment = MemorySegment.allocateHeapMemory(writer.estimatedSizeInBytes()); assertThat(pagedOutputView.getWrittenSegments().size()).isEqualTo(1); MemorySegment firstSegment = pagedOutputView.getCurrentSegment(); - firstSegment.copyTo(ARROW_CHANGETYPE_OFFSET, segment, 0, size); + firstSegment.copyTo(arrowChangeTypeOffset(CURRENT_LOG_MAGIC_VALUE), segment, 0, size); ArrowReader reader = ArrowUtils.createArrowReader(segment, 0, size, root, allocator, rowType); diff --git a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java index 4f00fc14bf..2236ebe216 100644 --- a/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java +++ b/fluss-common/src/test/java/org/apache/fluss/testutils/DataTestUtils.java @@ -70,8 +70,9 @@ import java.util.stream.Collectors; import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; -import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE; -import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; import static org.apache.fluss.record.LogRecordReadContext.createArrowReadContext; import static org.apache.fluss.record.TestData.BASE_OFFSET; import static org.apache.fluss.record.TestData.DATA1; @@ -150,17 +151,23 @@ private static CompactedRow genCompacted(RowType rowType, Object[] data) { return (CompactedRow) rowEncoder.finishRow(); } - public static MemoryLogRecords genMemoryLogRecordsByObject(List objects) + public static MemoryLogRecords genMemoryLogRecordsByObject(byte magic, List objects) throws Exception { return createRecordsWithoutBaseLogOffset( DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, 0, System.currentTimeMillis(), + magic, objects, LogFormat.ARROW); } + public static MemoryLogRecords genMemoryLogRecordsByObject(List objects) + throws Exception { + return genMemoryLogRecordsByObject(CURRENT_LOG_MAGIC_VALUE, objects); + } + public static MemoryLogRecords genMemoryLogRecordsWithWriterId( List objects, long writerId, int batchSequence, long baseOffset) throws Exception { @@ -171,6 +178,7 @@ public static MemoryLogRecords genMemoryLogRecordsWithWriterId( DEFAULT_SCHEMA_ID, baseOffset, System.currentTimeMillis(), + CURRENT_LOG_MAGIC_VALUE, writerId, batchSequence, changeTypes, @@ -196,7 +204,13 @@ public static MemoryLogRecords genIndexedMemoryLogRecords(List rows) public static MemoryLogRecords genMemoryLogRecordsWithBaseOffset( long offsetBase, List objects) throws Exception { return createRecordsWithoutBaseLogOffset( - DATA1_ROW_TYPE, DEFAULT_SCHEMA_ID, offsetBase, -1L, objects, LogFormat.ARROW); + DATA1_ROW_TYPE, + DEFAULT_SCHEMA_ID, + offsetBase, + -1L, + CURRENT_LOG_MAGIC_VALUE, + objects, + LogFormat.ARROW); } public static MemoryLogRecords genLogRecordsWithBaseOffsetAndTimestamp( @@ -206,6 +220,7 @@ public static MemoryLogRecords genLogRecordsWithBaseOffsetAndTimestamp( DEFAULT_SCHEMA_ID, offsetBase, maxTimestamp, + CURRENT_LOG_MAGIC_VALUE, objects, LogFormat.ARROW); } @@ -316,6 +331,7 @@ public static File genLogFile( DEFAULT_SCHEMA_ID, baseOffset, System.currentTimeMillis(), + CURRENT_LOG_MAGIC_VALUE, objects, logFormat)); fileLogRecords.flush(); @@ -358,6 +374,7 @@ public static MemoryLogRecords createRecordsWithoutBaseLogOffset( int schemaId, long offsetBase, long maxTimestamp, + byte magic, List objects, LogFormat logFormat) throws Exception { @@ -368,6 +385,7 @@ public static MemoryLogRecords createRecordsWithoutBaseLogOffset( schemaId, offsetBase, maxTimestamp, + magic, NO_WRITER_ID, NO_BATCH_SEQUENCE, changeTypes, @@ -381,6 +399,7 @@ public static MemoryLogRecords createBasicMemoryLogRecords( int schemaId, long offsetBase, long maxTimestamp, + byte magic, long writerId, int batchSequence, List changeTypes, @@ -393,6 +412,7 @@ public static MemoryLogRecords createBasicMemoryLogRecords( schemaId, offsetBase, maxTimestamp, + magic, writerId, batchSequence, changeTypes, @@ -406,6 +426,7 @@ public static MemoryLogRecords createMemoryLogRecords( int schemaId, long offsetBase, long maxTimestamp, + byte magic, long writerId, int batchSequence, List changeTypes, @@ -420,6 +441,7 @@ public static MemoryLogRecords createMemoryLogRecords( rowType, offsetBase, maxTimestamp, + magic, schemaId, writerId, batchSequence, @@ -458,7 +480,7 @@ private static MemoryLogRecords createIndexedMemoryLogRecords( } builder.setWriterState(writerId, batchSequence); MemoryLogRecords memoryLogRecords = MemoryLogRecords.pointToBytesView(builder.build()); - memoryLogRecords.ensureValid(); + memoryLogRecords.ensureValid(DEFAULT_MAGIC); ((DefaultLogRecordBatch) memoryLogRecords.batches().iterator().next()) .setCommitTimestamp(maxTimestamp); @@ -470,6 +492,7 @@ private static MemoryLogRecords createArrowMemoryLogRecords( RowType rowType, long baseLogOffset, long maxTimestamp, + byte magic, int schemaId, long writerId, int batchSequence, @@ -485,6 +508,7 @@ private static MemoryLogRecords createArrowMemoryLogRecords( MemoryLogRecordsArrowBuilder builder = MemoryLogRecordsArrowBuilder.builder( baseLogOffset, + magic, schemaId, writer, new ManagedPagedOutputView(new TestingMemorySegmentPool(10 * 1024))); @@ -497,7 +521,7 @@ private static MemoryLogRecords createArrowMemoryLogRecords( ((DefaultLogRecordBatch) memoryLogRecords.batches().iterator().next()) .setCommitTimestamp(maxTimestamp); - memoryLogRecords.ensureValid(); + memoryLogRecords.ensureValid(magic); return memoryLogRecords; } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java index 105fc47e73..0a30d5f5f4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java @@ -46,7 +46,7 @@ import java.io.IOException; import java.util.Optional; -import static org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE; +import static org.apache.fluss.record.LogRecordBatchFormat.V0_RECORD_BATCH_HEADER_SIZE; import static org.apache.fluss.utils.IOUtils.closeQuietly; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache @@ -495,7 +495,8 @@ public FetchDataInfo read( new LogOffsetMetadata(startOffset, this.baseOffset, startPosition); int adjustedMaxSize = minOneMessage ? Math.max(maxSize, startOffsetAndSize.getSize()) : maxSize; - if (adjustedMaxSize <= RECORD_BATCH_HEADER_SIZE) { + // use V0 size as the lower bound, since V1 header size is large than V0 + if (adjustedMaxSize <= V0_RECORD_BATCH_HEADER_SIZE) { return new FetchDataInfo(offsetMetadata, MemoryLogRecords.EMPTY); } if (projection == null) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java index 9ed95e35b2..cd625b155f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterAppendInfo.java @@ -21,7 +21,7 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.record.LogRecordBatch; -import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; /** * This class is used to validate the records appended by a given writer before they are written to diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java index c0b4349910..cf237a73e4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateEntry.java @@ -25,7 +25,7 @@ import java.util.Deque; import java.util.Optional; -import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; /** * This class represents the state of a specific writer id. The batch sequence number is ordered diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java index e8712964c7..4fa0baced1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java @@ -56,6 +56,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; import static org.apache.fluss.utils.FlussPaths.WRITER_SNAPSHOT_FILE_SUFFIX; import static org.apache.fluss.utils.FlussPaths.writerSnapshotFile; @@ -232,7 +233,7 @@ public WriterAppendInfo prepareUpdate(long writerId) { /** Update the mapping with the given append information. */ public void update(WriterAppendInfo appendInfo) { long writerId = appendInfo.writerId(); - if (writerId == LogRecordBatch.NO_WRITER_ID) { + if (writerId == NO_WRITER_ID) { throw new IllegalArgumentException( "Invalid writer id " + writerId diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java index 1f32715620..6734a32d51 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java @@ -77,8 +77,9 @@ import java.util.concurrent.Future; import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; -import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE; -import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK; import static org.apache.fluss.record.TestData.DATA2_SCHEMA; import static org.apache.fluss.record.TestData.DATA3_SCHEMA_PK; @@ -893,6 +894,7 @@ private MemoryLogRecords logRecords( DEFAULT_SCHEMA_ID, baseOffset, -1L, + CURRENT_LOG_MAGIC_VALUE, NO_WRITER_ID, NO_BATCH_SEQUENCE, changeTypes, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java index 526b1b350d..045bd35259 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java @@ -57,8 +57,9 @@ import java.util.concurrent.TimeUnit; import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION; -import static org.apache.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE; -import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID; +import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; +import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH_PK; @@ -671,6 +672,7 @@ private static MemoryLogRecords logRecords( DEFAULT_SCHEMA_ID, baseOffset, -1L, + CURRENT_LOG_MAGIC_VALUE, NO_WRITER_ID, NO_BATCH_SEQUENCE, changeTypes, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java index 04f8f3ce38..afd10c9432 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java @@ -62,6 +62,8 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import javax.annotation.Nullable; @@ -77,6 +79,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0; +import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1; import static org.apache.fluss.record.TestData.ANOTHER_DATA1; import static org.apache.fluss.record.TestData.DATA1; import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; @@ -118,8 +122,9 @@ public class TabletServiceITCase { public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION = FlussClusterExtension.builder().setNumOfTabletServers(3).build(); - @Test - void testProduceLog() throws Exception { + @ParameterizedTest + @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) + void testProduceLog(byte recordBatchMagic) throws Exception { long tableId = createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR); TableBucket tb = new TableBucket(tableId, 0); @@ -134,7 +139,10 @@ void testProduceLog() throws Exception { leaderGateWay .produceLog( newProduceLogRequest( - tableId, 0, 1, genMemoryLogRecordsByObject(DATA1))) + tableId, + 0, + 1, + genMemoryLogRecordsByObject(recordBatchMagic, DATA1))) .get(), 0, 0L); @@ -205,8 +213,9 @@ void testProduceLogResponseReturnInOrder() throws Exception { } } - @Test - void testFetchLog() throws Exception { + @ParameterizedTest + @ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1}) + void testFetchLog(byte recordBatchMagic) throws Exception { long tableId = createTable(FLUSS_CLUSTER_EXTENSION, DATA1_TABLE_PATH, DATA1_TABLE_DESCRIPTOR); TableBucket tb = new TableBucket(tableId, 0); @@ -222,7 +231,10 @@ void testFetchLog() throws Exception { leaderGateWay .produceLog( newProduceLogRequest( - tableId, 0, 1, genMemoryLogRecordsByObject(DATA1))) + tableId, + 0, + 1, + genMemoryLogRecordsByObject(recordBatchMagic, DATA1))) .get(), 0, 0L); @@ -248,7 +260,11 @@ void testFetchLog() throws Exception { leaderGateWay .produceLog( newProduceLogRequest( - tableId, 0, 1, genMemoryLogRecordsByObject(ANOTHER_DATA1))) + tableId, + 0, + 1, + genMemoryLogRecordsByObject( + recordBatchMagic, ANOTHER_DATA1))) .get(), 0, 10L);