Skip to content

Commit

Permalink
[core] Generate changelog by copying data when records are insert-only
Browse files Browse the repository at this point in the history
  • Loading branch information
yunfengzhou-hub committed Jun 27, 2024
1 parent 1812496 commit 989b717
Show file tree
Hide file tree
Showing 14 changed files with 325 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ public void sync() throws Exception {
trySyncLatestCompaction(true);
}

@Override
public void withInsertOnly(boolean insertOnly) {}

@Override
public void close() throws Exception {
// cancel compaction so that it does not block job cancelling
Expand Down
20 changes: 20 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,26 @@ public DataFileMeta upgrade(int newLevel) {
fileSource);
}

public DataFileMeta rename(String newFileName) {
return new DataFileMeta(
newFileName,
fileSize,
rowCount,
minKey,
maxKey,
keyStats,
valueStats,
minSequenceNumber,
maxSequenceNumber,
schemaId,
level,
extraFiles,
creationTime,
deleteRowCount,
embeddedIndex,
fileSource);
}

public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(fileName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -123,6 +124,21 @@ public void deleteFile(String filename, int level) {
fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename));
}

public void copyFile(String sourceFileName, String targetFileName, int level)
throws IOException {
Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName);
Path targetPath = formatContext.pathFactory(level).toPath(targetFileName);
fileIO.copyFile(sourcePath, targetPath);
}

public FileIO getFileIO() {
return fileIO;
}

public Path newChangelogPath(int level) {
return formatContext.pathFactory(level).newChangelogPath();
}

public static Builder builder(
FileIO fileIO,
long schemaId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner {

private long newSequenceNumber;
private WriteBuffer writeBuffer;
private boolean isInsertOnly;
private boolean areAllRecordsInsertOnlySinceLastFlush;

public MergeTreeWriter(
boolean writeBufferSpillable,
Expand Down Expand Up @@ -213,7 +215,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
}

final RollingFileWriter<KeyValue, DataFileMeta> changelogWriter =
changelogProducer == ChangelogProducer.INPUT
changelogProducer == ChangelogProducer.INPUT & !canChangelogOptimizedToCopy()
? writerFactory.createRollingChangelogFileWriter(0)
: null;
final RollingFileWriter<KeyValue, DataFileMeta> dataWriter =
Expand All @@ -232,16 +234,25 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
dataWriter.close();
}

List<DataFileMeta> fileMetas = dataWriter.result();
if (changelogWriter != null) {
newFilesChangelog.addAll(changelogWriter.result());
} else if (canChangelogOptimizedToCopy()) {
List<DataFileMeta> changelogMetas = getChangelogFileMetaFromDataFile(fileMetas);
for (int i = 0; i < changelogMetas.size(); i++) {
writerFactory.copyFile(
fileMetas.get(i).fileName(), changelogMetas.get(i).fileName(), 0);
}
newFilesChangelog.addAll(changelogMetas);
}

for (DataFileMeta fileMeta : dataWriter.result()) {
for (DataFileMeta fileMeta : fileMetas) {
newFiles.add(fileMeta);
compactManager.addNewFile(fileMeta);
}

writeBuffer.clear();
areAllRecordsInsertOnlySinceLastFlush = isInsertOnly;
}

trySyncLatestCompaction(waitForLatestCompaction);
Expand Down Expand Up @@ -276,6 +287,32 @@ public void sync() throws Exception {
trySyncLatestCompaction(true);
}

@Override
public void withInsertOnly(boolean insertOnly) {
this.isInsertOnly = insertOnly;
if (writeBuffer == null || writeBuffer.size() == 0) {
areAllRecordsInsertOnlySinceLastFlush = isInsertOnly;
} else if (!isInsertOnly) {
areAllRecordsInsertOnlySinceLastFlush = false;
}
}

private List<DataFileMeta> getChangelogFileMetaFromDataFile(
List<DataFileMeta> dataFileMetaList) {
List<DataFileMeta> changelogFileMetaList = new ArrayList<>();
for (DataFileMeta dataFileMeta : dataFileMetaList) {
DataFileMeta changelogFileMeta =
dataFileMeta.rename(writerFactory.newChangelogPath(0).getName());
changelogFileMetaList.add(changelogFileMeta);
}
return changelogFileMetaList;
}

private boolean canChangelogOptimizedToCopy() {
return changelogProducer == ChangelogProducer.INPUT
&& areAllRecordsInsertOnlySinceLastFlush;
}

private CommitIncrement drainIncrement() {
DataIncrement dataIncrement =
new DataIncrement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {

protected CompactionMetrics compactionMetrics = null;
protected final String tableName;
private boolean isInsertOnly;

protected AbstractFileStoreWrite(
String commitUser,
Expand Down Expand Up @@ -123,6 +124,16 @@ public void withCompactExecutor(ExecutorService compactExecutor) {
this.closeCompactExecutorWhenLeaving = false;
}

@Override
public void withInsertOnly(boolean insertOnly) {
this.isInsertOnly = insertOnly;
for (Map<Integer, WriterContainer<T>> containerMap : writers.values()) {
for (WriterContainer<T> container : containerMap.values()) {
container.writer.withInsertOnly(insertOnly);
}
}
}

@Override
public void write(BinaryRow partition, int bucket, T data) throws Exception {
WriterContainer<T> container = getWriterWrapper(partition, bucket);
Expand Down Expand Up @@ -404,6 +415,7 @@ public WriterContainer<T> createWriterContainer(
null,
compactExecutor(),
deletionVectorsMaintainer);
writer.withInsertOnly(isInsertOnly);
notifyNewWriter(writer);
return new WriterContainer<>(
writer, indexMaintainer, deletionVectorsMaintainer, latestSnapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ default FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool) {

void withCompactExecutor(ExecutorService compactExecutor);

/**
* This method is called when the insert only status of the records changes.
*
* @param insertOnly If true, all the following records would be of {@link
* org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary
* key.
*/
void withInsertOnly(boolean insertOnly);

/**
* Write the data to the store according to the partition and bucket.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ public interface TableWrite extends AutoCloseable {
/** With {@link MemorySegmentPool} for the current table write. */
TableWrite withMemoryPool(MemorySegmentPool memoryPool);

/**
* This method is called when the insert only status of the records changes.
*
* @param insertOnly If true, all the following records would be of {@link
* org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary
* key.
*/
void withInsertOnly(boolean insertOnly);

/** Calculate which partition {@code row} belongs to. */
BinaryRow getPartition(InternalRow row);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ public TableWriteImpl<T> withBucketMode(BucketMode bucketMode) {
return this;
}

@Override
public void withInsertOnly(boolean insertOnly) {
write.withInsertOnly(insertOnly);
}

@Override
public BinaryRow getPartition(InternalRow row) {
keyAndBucketExtractor.setRecord(row);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ public interface RecordWriter<T> {
*/
void sync() throws Exception;

/**
* This method is called when the insert only status of the records changes.
*
* @param insertOnly If true, all the following records would be of {@link
* org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary
* key.
*/
void withInsertOnly(boolean insertOnly);

/** Close this writer, the call will delete newly generated but not committed files. */
void close() throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,29 @@ public void testWriteMany() throws Exception {
doTestWriteRead(3, 20_000);
}

@Test
public void testChangelog() throws Exception {
writer =
createMergeTreeWriter(
Collections.emptyList(),
createCompactManager(service, Collections.emptyList()),
ChangelogProducer.INPUT);

doTestWriteReadWithChangelog(8, 200, false);
}

@Test
public void testChangelogFromCopyingData() throws Exception {
writer =
createMergeTreeWriter(
Collections.emptyList(),
createCompactManager(service, Collections.emptyList()),
ChangelogProducer.INPUT);
writer.withInsertOnly(true);

doTestWriteReadWithChangelog(8, 200, true);
}

private void doTestWriteRead(int batchNumber) throws Exception {
doTestWriteRead(batchNumber, 200);
}
Expand Down Expand Up @@ -413,12 +436,77 @@ private void doTestWriteRead(int batchNumber, int perBatch) throws Exception {
assertThat(files).isEqualTo(Collections.emptySet());
}

private void doTestWriteReadWithChangelog(
int batchNumber, int perBatch, boolean isChangelogEqualToData) throws Exception {
List<TestRecord> expected = new ArrayList<>();
List<DataFileMeta> newFiles = new ArrayList<>();
List<DataFileMeta> changelogFiles = new ArrayList<>();
Set<String> newFileNames = new HashSet<>();
List<DataFileMeta> compactedFiles = new ArrayList<>();

// write batch and commit
for (int i = 0; i <= batchNumber; i++) {
if (i < batchNumber) {
expected.addAll(writeBatch(perBatch));
} else {
writer.sync();
}

CommitIncrement increment = writer.prepareCommit(true);
newFiles.addAll(increment.newFilesIncrement().newFiles());
changelogFiles.addAll(increment.newFilesIncrement().changelogFiles());
mergeCompacted(newFileNames, compactedFiles, increment);
}

// assert records from writer
assertRecords(expected);

// assert records from increment new files
assertRecords(expected, newFiles, false);
assertRecords(expected, newFiles, true);

// assert records from changelog files
if (isChangelogEqualToData) {
assertRecords(expected, changelogFiles, false);
assertRecords(expected, changelogFiles, true);
} else {
List<TestRecord> actual = new ArrayList<>();
for (DataFileMeta changelogFile : changelogFiles) {
actual.addAll(readAll(Collections.singletonList(changelogFile), false));
}
assertThat(actual).containsExactlyInAnyOrder(expected.toArray(new TestRecord[0]));
}

// assert records from increment compacted files
assertRecords(expected, compactedFiles, true);

writer.close();

Path bucketDir = writerFactory.pathFactory(0).toPath("ignore").getParent();
Set<String> files =
Arrays.stream(LocalFileIO.create().listStatus(bucketDir))
.map(FileStatus::getPath)
.map(Path::getName)
.collect(Collectors.toSet());
newFiles.stream().map(DataFileMeta::fileName).forEach(files::remove);
changelogFiles.stream().map(DataFileMeta::fileName).forEach(files::remove);
compactedFiles.stream().map(DataFileMeta::fileName).forEach(files::remove);
assertThat(files).isEqualTo(Collections.emptySet());
}

private MergeTreeWriter createMergeTreeWriter(List<DataFileMeta> files) {
return createMergeTreeWriter(files, createCompactManager(service, files));
}

private MergeTreeWriter createMergeTreeWriter(
List<DataFileMeta> files, MergeTreeCompactManager compactManager) {
return createMergeTreeWriter(files, compactManager, ChangelogProducer.NONE);
}

private MergeTreeWriter createMergeTreeWriter(
List<DataFileMeta> files,
MergeTreeCompactManager compactManager,
ChangelogProducer changelogProducer) {
long maxSequenceNumber =
files.stream().map(DataFileMeta::maxSequenceNumber).max(Long::compare).orElse(-1L);
MergeTreeWriter writer =
Expand All @@ -434,7 +522,7 @@ private MergeTreeWriter createMergeTreeWriter(
DeduplicateMergeFunction.factory().create(),
writerFactory,
options.commitForceCompact(),
ChangelogProducer.NONE,
changelogProducer,
null,
null);
writer.setMemoryPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@
/** Helper class of {@link PrepareCommitOperator} for different types of paimon sinks. */
public interface StoreSinkWrite {

/**
* This method is called when the insert only status of the records changes.
*
* @param insertOnly If true, all the following records would be of {@link
* org.apache.paimon.types.RowKind#INSERT}, and no two records would have the same primary
* key.
*/
void withInsertOnly(boolean insertOnly);

@Nullable
SinkRecord write(InternalRow rowData) throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public void withCompactExecutor(ExecutorService compactExecutor) {
write.withCompactExecutor(compactExecutor);
}

@Override
public void withInsertOnly(boolean insertOnly) {
write.withInsertOnly(insertOnly);
}

@Override
@Nullable
public SinkRecord write(InternalRow rowData) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ public CompactRememberStoreWrite(boolean streamingMode) {
this.streamingMode = streamingMode;
}

@Override
public void withInsertOnly(boolean insertOnly) {}

@Override
public SinkRecord write(InternalRow rowData) {
return null;
Expand Down
Loading

0 comments on commit 989b717

Please sign in to comment.