Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.client.write;

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.annotation.VisibleForTesting;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.memory.MemorySegment;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.MemoryLogRecords;
import com.alibaba.fluss.record.MemoryLogRecordsCompactedBuilder;
import com.alibaba.fluss.record.RowKind;
import com.alibaba.fluss.record.bytesview.BytesView;
import com.alibaba.fluss.record.bytesview.MemorySegmentBytesView;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.rpc.messages.ProduceLogRequest;
import com.alibaba.fluss.utils.Preconditions;

import javax.annotation.concurrent.NotThreadSafe;

import java.io.IOException;
import java.util.Collections;
import java.util.List;

/**
* A batch of log records managed in COMPACTED format that is or will be sent to server by {@link
* ProduceLogRequest}.
*
* <p>This class is not thread safe and external synchronization must be used when modifying it.
*/
@NotThreadSafe
@Internal
public class CompactedLogWriteBatch extends WriteBatch {
private final MemoryLogRecordsCompactedBuilder recordsBuilder;

public CompactedLogWriteBatch(
TableBucket tableBucket,
PhysicalTablePath physicalTablePath,
MemoryLogRecordsCompactedBuilder recordsBuilder) {
super(tableBucket, physicalTablePath);
this.recordsBuilder = recordsBuilder;
}

@Override
public boolean tryAppend(WriteRecord writeRecord, WriteCallback callback) throws Exception {
InternalRow row = writeRecord.getRow();
Preconditions.checkArgument(
writeRecord.getTargetColumns() == null,
"target columns must be null for log record");
Preconditions.checkArgument(
writeRecord.getKey() == null, "key must be null for log record");
Preconditions.checkNotNull(row != null, "row must not be null for log record");
Preconditions.checkNotNull(callback, "write callback must be not null");
if (!recordsBuilder.hasRoomFor(row) || isClosed()) {
return false;
} else {
recordsBuilder.append(RowKind.APPEND_ONLY, row);
recordCount++;
callbacks.add(callback);
return true;
}
}

@Override
public void serialize() {
// do nothing, records are serialized into memory buffer when appending
}

@Override
public boolean trySerialize() {
// records have been serialized.
return true;
}

@VisibleForTesting
public MemoryLogRecords records() {
try {
return recordsBuilder.build();
} catch (IOException e) {
throw new FlussRuntimeException("build memory log records failed", e);
}
}

@Override
public BytesView build() {
MemoryLogRecords memoryLogRecords = records();
return new MemorySegmentBytesView(
memoryLogRecords.getMemorySegment(),
memoryLogRecords.getPosition(),
memoryLogRecords.sizeInBytes());
}

@Override
public boolean isClosed() {
return recordsBuilder.isClosed();
}

@Override
public void close() throws Exception {
recordsBuilder.close();
reopened = false;
}

@Override
public List<MemorySegment> memorySegments() {
return Collections.singletonList(recordsBuilder.getMemorySegment());
}

@Override
public void setWriterState(long writerId, int batchSequence) {
recordsBuilder.setWriterState(writerId, batchSequence);
}

@Override
public long writerId() {
return recordsBuilder.writerId();
}

@Override
public int batchSequence() {
return recordsBuilder.batchSequence();
}

public void resetWriterState(long writerId, int batchSequence) {
super.resetWriterState(writerId, batchSequence);
recordsBuilder.resetWriterState(writerId, batchSequence);
}

@Override
public int sizeInBytes() {
return recordsBuilder.getSizeInBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.alibaba.fluss.metrics.MetricNames;
import com.alibaba.fluss.record.DefaultKvRecordBatch;
import com.alibaba.fluss.record.LogRecordBatch;
import com.alibaba.fluss.record.MemoryLogRecordsCompactedBuilder;
import com.alibaba.fluss.record.MemoryLogRecordsIndexedBuilder;
import com.alibaba.fluss.row.arrow.ArrowWriter;
import com.alibaba.fluss.row.arrow.ArrowWriterPool;
Expand Down Expand Up @@ -394,6 +395,8 @@ private WriteBatch.WriteBatchType getWriteBatchType(
return WriteBatch.WriteBatchType.ARROW_LOG;
} else if (logFormat == LogFormat.INDEXED) {
return WriteBatch.WriteBatchType.INDEXED_LOG;
} else if (logFormat == LogFormat.COMPACTED) {
return WriteBatch.WriteBatchType.COMPACTED_LOG;
} else {
throw new IllegalArgumentException("Unsupported log format: " + logFormat);
}
Expand Down Expand Up @@ -546,13 +549,20 @@ private RecordAppendResult appendNewBatch(
arrowWriter,
segment,
memorySegmentPool);
} else {
} else if (writeBatchType == WriteBatch.WriteBatchType.INDEXED_LOG) {
batch =
new IndexedLogWriteBatch(
tb,
physicalTablePath,
MemoryLogRecordsIndexedBuilder.builder(
tableInfo.getSchemaId(), segment.size(), segment));
} else {
batch =
new CompactedLogWriteBatch(
tb,
physicalTablePath,
MemoryLogRecordsCompactedBuilder.builder(
tableInfo.getSchemaId(), segment.size(), segment));
}

batch.tryAppend(writeRecord, callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ private enum FinalState {
public enum WriteBatchType {
ARROW_LOG,
INDEXED_LOG,
COMPACTED_LOG,
KV
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
import com.alibaba.fluss.client.table.writer.AppendWriter;
import com.alibaba.fluss.client.table.writer.UpsertWriter;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.record.CompactedLogRecord;
import com.alibaba.fluss.record.DefaultKvRecord;
import com.alibaba.fluss.record.DefaultKvRecordBatch;
import com.alibaba.fluss.record.DefaultLogRecord;
import com.alibaba.fluss.record.DefaultLogRecordBatch;
import com.alibaba.fluss.record.IndexedLogRecord;
import com.alibaba.fluss.row.InternalRow;
import com.alibaba.fluss.row.MemoryAwareGetters;
import com.alibaba.fluss.row.compacted.CompactedRow;
import com.alibaba.fluss.row.indexed.IndexedRow;
import com.alibaba.fluss.utils.Preconditions;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -72,14 +77,7 @@ public WriteRecord(
this.bucketKey = bucketKey;
this.row = row;
this.targetColumns = targetColumns;
this.estimatedSizeInBytes =
key != null
? DefaultKvRecord.sizeOf(key, row)
+ DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE
// TODO: row maybe not IndexedRow, which can't be estimated size
// and the size maybe not accurate when the format is arrow.
: DefaultLogRecord.sizeOf(row)
+ DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
this.estimatedSizeInBytes = estimatedSizeInBytes();
}

public PhysicalTablePath getPhysicalTablePath() {
Expand Down Expand Up @@ -115,4 +113,23 @@ public int[] getTargetColumns() {
public int getEstimatedSizeInBytes() {
return estimatedSizeInBytes;
}

private int estimatedSizeInBytes() {
// TODO: the size maybe not accurate when the format is arrow.
if (key != null) {
return DefaultKvRecord.sizeOf(key, row) + DefaultKvRecordBatch.RECORD_BATCH_HEADER_SIZE;
}

Preconditions.checkArgument(row != null, "row is null");
if (row instanceof IndexedRow) {
return IndexedLogRecord.sizeOf((IndexedRow) row)
+ DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
} else if (row instanceof CompactedRow) {
return CompactedLogRecord.sizeOf((CompactedRow) row)
+ DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
} else {
return ((MemoryAwareGetters) row).getSizeInBytes()
+ DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -770,4 +770,57 @@ void testInvalidColumnProjection() throws Exception {
"Only ARROW log format supports column projection, but the log format "
+ "of table 'test_db_1.test_non_pk_table_1' is INDEXED");
}

@Test
void testWriteKvWithCompactedLogFormat() throws Exception {
Schema schema =
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.INT())
.column("c", DataTypes.STRING())
.primaryKey("a")
.build();
TableDescriptor tableDescriptor =
TableDescriptor.builder().schema(schema).logFormat(LogFormat.COMPACTED).build();
TablePath tablePath = TablePath.of("test_db_1", "test_pk_table_1");
createTable(tablePath, tableDescriptor, false);

try (Table table = conn.getTable(tablePath)) {
UpsertWriter writer = table.getUpsertWriter();
int expectedSize = 30;
for (int i = 0; i < expectedSize; i++) {
String value = i % 2 == 0 ? "hello, friend" + i : null;
InternalRow row = compactedRow(schema.toRowType(), new Object[] {i, 100, value});
writer.upsert(row);
if (i % 10 == 0) {
// insert 3 bathes, each batch has 10 rows
writer.flush();
}
}

// fetch data.
LogScanner logScanner = createLogScanner(table);
subscribeFromBeginning(logScanner, table);
int count = 0;
while (count < expectedSize) {
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
for (ScanRecord scanRecord : scanRecords) {
assertThat(scanRecord.getRowKind()).isEqualTo(RowKind.INSERT);
assertThat(scanRecord.getRow().getFieldCount()).isEqualTo(3);
assertThat(scanRecord.getRow().getInt(0)).isEqualTo(count);
assertThat(scanRecord.getRow().getInt(1)).isEqualTo(100);
if (count % 2 == 0) {
assertThat(scanRecord.getRow().getString(2).toString())
.isEqualTo("hello, friend" + count);
} else {
// check null values
assertThat(scanRecord.getRow().isNullAt(2)).isTrue();
}
count++;
}
}
assertThat(count).isEqualTo(expectedSize);
logScanner.close();
}
}
}
Loading