Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.TreeSet;
import java.util.function.Consumer;

import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;

/** Entry to store the idempotence information of each table-bucket. */
@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.LogRecordBatch;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.InitWriterRequest;
import org.apache.fluss.rpc.protocol.Errors;
Expand All @@ -40,7 +39,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down Expand Up @@ -182,10 +182,10 @@ synchronized void removeInFlightBatch(ReadyWriteBatch batch) {
*/
synchronized int firstInFlightBatchSequence(TableBucket tableBucket) {
if (!hasInflightBatches(tableBucket)) {
return LogRecordBatch.NO_BATCH_SEQUENCE;
return NO_BATCH_SEQUENCE;
}
WriteBatch batch = nextBatchBySequence(tableBucket);
return batch == null ? LogRecordBatch.NO_BATCH_SEQUENCE : batch.batchSequence();
return batch == null ? NO_BATCH_SEQUENCE : batch.batchSequence();
}

synchronized void handleCompletedBatch(ReadyWriteBatch readyWriteBatch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.record.LogRecordBatch;
import org.apache.fluss.row.arrow.ArrowWriter;
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -61,7 +60,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.fluss.record.LogRecordBatch.NO_WRITER_ID;
import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.utils.Preconditions.checkNotNull;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
Expand Down Expand Up @@ -757,7 +757,7 @@ private boolean shouldStopDrainBatchesForBucket(WriteBatch first, TableBucket ta
// flight request count to 1.
int firstInFlightSequence = idempotenceManager.firstInFlightBatchSequence(tableBucket);
boolean isFirstInFlightBatch =
firstInFlightSequence == LogRecordBatch.NO_BATCH_SEQUENCE
firstInFlightSequence == NO_BATCH_SEQUENCE
|| (first.hasBatchSequence()
&& first.batchSequence() == firstInFlightSequence);

Expand Down Expand Up @@ -824,7 +824,7 @@ private void insertInSequenceOrder(
Deque<WriteBatch> deque, WriteBatch batch, TableBucket tableBucket) {
// When we are re-enqueue and have enabled idempotence, the re-enqueued batch must always
// have a batch sequence.
if (batch.batchSequence() == LogRecordBatch.NO_BATCH_SEQUENCE) {
if (batch.batchSequence() == NO_BATCH_SEQUENCE) {
throw new IllegalStateException(
"Trying to re-enqueue a batch which doesn't have a sequence even "
+ "though idempotence is enabled.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.memory.MemorySegmentPool;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.record.LogRecordBatch;
import org.apache.fluss.record.bytesview.BytesView;

import org.slf4j.Logger;
Expand All @@ -35,6 +34,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
import static org.apache.fluss.utils.Preconditions.checkNotNull;

/** The abstract write batch contains write callback object to wait write request feedback. */
Expand Down Expand Up @@ -113,7 +113,7 @@ public abstract boolean tryAppend(WriteRecord writeRecord, WriteCallback callbac
public abstract void abortRecordAppends();

public boolean hasBatchSequence() {
return batchSequence() != LogRecordBatch.NO_BATCH_SEQUENCE;
return batchSequence() != NO_BATCH_SEQUENCE;
}

public void resetWriterState(long writerId, int batchSequence) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.record.DefaultKvRecord;
import org.apache.fluss.record.DefaultKvRecordBatch;
import org.apache.fluss.record.DefaultLogRecordBatch;
import org.apache.fluss.record.IndexedLogRecord;
import org.apache.fluss.row.BinaryRow;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.indexed.IndexedRow;

import javax.annotation.Nullable;

import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
import static org.apache.fluss.utils.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -85,7 +86,7 @@ public static WriteRecord forIndexedAppend(
PhysicalTablePath tablePath, IndexedRow row, @Nullable byte[] bucketKey) {
checkNotNull(row);
int estimatedSizeInBytes =
IndexedLogRecord.sizeOf(row) + DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
IndexedLogRecord.sizeOf(row) + recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
return new WriteRecord(
tablePath,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,22 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.fluss.compression.ArrowCompressionInfo.DEFAULT_COMPRESSION;
import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V0;
import static org.apache.fluss.record.LogRecordBatchFormat.LOG_MAGIC_VALUE_V1;
import static org.apache.fluss.record.TestData.DATA2;
import static org.apache.fluss.record.TestData.DATA2_ROW_TYPE;
import static org.apache.fluss.record.TestData.DATA2_TABLE_ID;
Expand Down Expand Up @@ -78,14 +84,15 @@ void beforeEach() {
logScannerStatus.assignScanBuckets(scanBuckets);
}

@Test
void testSimple() throws Exception {
@ParameterizedTest
@ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
void testSimple(byte recordBatchMagic) throws Exception {
long fetchOffset = 0L;
int bucketId = 0; // records for 0-10.
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
FetchLogResultForBucket resultForBucket0 =
new FetchLogResultForBucket(
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L);
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic), 10L);
DefaultCompletedFetch defaultCompletedFetch =
makeCompletedFetch(tb, resultForBucket0, fetchOffset);
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(8);
Expand All @@ -100,14 +107,15 @@ void testSimple() throws Exception {
assertThat(scanRecords.size()).isEqualTo(0);
}

@Test
void testNegativeFetchCount() throws Exception {
@ParameterizedTest
@ValueSource(bytes = {LOG_MAGIC_VALUE_V0, LOG_MAGIC_VALUE_V1})
void testNegativeFetchCount(byte recordBatchMagic) throws Exception {
long fetchOffset = 0L;
int bucketId = 0; // records for 0-10.
TableBucket tb = new TableBucket(DATA2_TABLE_ID, bucketId);
FetchLogResultForBucket resultForBucket0 =
new FetchLogResultForBucket(
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW), 10L);
tb, createMemoryLogRecords(DATA2, LogFormat.ARROW, recordBatchMagic), 10L);
DefaultCompletedFetch defaultCompletedFetch =
makeCompletedFetch(tb, resultForBucket0, fetchOffset);
List<ScanRecord> scanRecords = defaultCompletedFetch.fetchRecords(-10);
Expand All @@ -128,9 +136,8 @@ void testNoRecordsInFetch() {
}

@ParameterizedTest
@ValueSource(strings = {"INDEXED", "ARROW"})
void testProjection(String format) throws Exception {
LogFormat logFormat = LogFormat.fromString(format);
@MethodSource("typeAndMagic")
void testProjection(LogFormat logFormat, byte magic) throws Exception {
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
Expand Down Expand Up @@ -158,9 +165,9 @@ void testProjection(String format) throws Exception {
Projection projection = Projection.of(new int[] {0, 2});
MemoryLogRecords memoryLogRecords;
if (logFormat == LogFormat.ARROW) {
memoryLogRecords = genRecordsWithProjection(DATA2, projection);
memoryLogRecords = genRecordsWithProjection(DATA2, projection, magic);
} else {
memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.INDEXED);
memoryLogRecords = createMemoryLogRecords(DATA2, LogFormat.INDEXED, magic);
}
FetchLogResultForBucket resultForBucket0 =
new FetchLogResultForBucket(tb, memoryLogRecords, 10L);
Expand Down Expand Up @@ -224,19 +231,28 @@ private DefaultCompletedFetch makeCompletedFetch(
offset);
}

private MemoryLogRecords createMemoryLogRecords(List<Object[]> objects, LogFormat logFormat)
throws Exception {
private static Collection<Arguments> typeAndMagic() {
List<Arguments> params = new ArrayList<>();
params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V1));
params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V1));
params.add(Arguments.arguments(LogFormat.ARROW, LOG_MAGIC_VALUE_V0));
params.add(Arguments.arguments(LogFormat.INDEXED, LOG_MAGIC_VALUE_V0));
return params;
}

private MemoryLogRecords createMemoryLogRecords(
List<Object[]> objects, LogFormat logFormat, byte magic) throws Exception {
return createRecordsWithoutBaseLogOffset(
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, logFormat);
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, logFormat);
}

private MemoryLogRecords genRecordsWithProjection(List<Object[]> objects, Projection projection)
throws Exception {
private MemoryLogRecords genRecordsWithProjection(
List<Object[]> objects, Projection projection, byte magic) throws Exception {
File logFile = FlussPaths.logFile(tempDir, 0L);
FileLogRecords fileLogRecords = FileLogRecords.open(logFile, false, 1024 * 1024, false);
fileLogRecords.append(
createRecordsWithoutBaseLogOffset(
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, objects, LogFormat.ARROW));
rowType, DEFAULT_SCHEMA_ID, 0L, 1000L, magic, objects, LogFormat.ARROW));
fileLogRecords.flush();

FileLogProjection fileLogProjection = new FileLogProjection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.fluss.memory.PreAllocatedPagedOutputView;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.record.DefaultLogRecordBatch;
import org.apache.fluss.record.IndexedLogRecord;
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.record.LogRecordBatch;
Expand All @@ -40,6 +39,8 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
Expand Down Expand Up @@ -72,7 +73,7 @@ void testTryAppendWithWriteLimit() throws Exception {

for (int i = 0;
i
< (writeLimit - DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE)
< (writeLimit - recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE))
/ estimatedSizeInBytes;
i++) {
boolean appendResult =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static org.apache.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static org.apache.fluss.record.LogRecordBatchFormat.recordBatchHeaderSize;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
Expand Down Expand Up @@ -614,7 +615,7 @@ private void verifyTableBucketInBatches(

/** Return the offset delta. */
private int expectedNumAppends(IndexedRow row, int batchSize) {
int size = RECORD_BATCH_HEADER_SIZE;
int size = recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);
int offsetDelta = 0;
while (true) {
int recordSize = IndexedLogRecord.sizeOf(row);
Expand Down Expand Up @@ -652,7 +653,8 @@ private RecordAccumulator createTestRecordAccumulator(
}

private long getTestBatchSize(BinaryRow row) {
return RECORD_BATCH_HEADER_SIZE + DefaultKvRecord.sizeOf(new byte[4], row);
return recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE)
+ DefaultKvRecord.sizeOf(new byte[4], row);
}

private int getBatchNumInAccum(RecordAccumulator accum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.fluss.config.MemorySize;
import org.apache.fluss.exception.TimeoutException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecordBatch;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
Expand All @@ -51,6 +50,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO;
Expand Down Expand Up @@ -797,7 +797,7 @@ private IdempotenceManager createIdempotenceManager(boolean idempotenceEnabled)

private static boolean hasIdempotentRecords(TableBucket tb, ProduceLogRequest request) {
MemoryLogRecords memoryLogRecords = getProduceLogData(request).get(tb);
return memoryLogRecords.batchIterator().next().writerId() != LogRecordBatch.NO_WRITER_ID;
return memoryLogRecords.batchIterator().next().writerId() != NO_WRITER_ID;
}

private static void assertBatchSequenceEquals(
Expand Down
Loading