Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Small changelog files can now be compacted into big files #4255

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions docs/content/maintenance/write-performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,16 @@ You can use fine-grained-resource-management of Flink to increase committer heap
1. Configure Flink Configuration `cluster.fine-grained-resource-management.enabled: true`. (This is default after Flink 1.18)
2. Configure Paimon Table Options: `sink.committer-memory`, for example 300 MB, depends on your `TaskManager`.
(`sink.committer-cpu` is also supported)

## Changelog Compaction

If Flink's checkpoint interval is short (for example, 30 seconds) and the number of buckets is large,
each snapshot may produce lots of small changelog files.
Too many files may put a burden on the distributed storage cluster.

In order to compact small changelog files into large ones, you can set the table option `changelog.compact.parallelism`.
This option will add a compact operator after the writer operator, which copies changelog files into large ones.
If the parallelism becomes larger, file copying will become faster.
However, the number of resulting files will also become larger.
As file copying is fast in most storage system,
we suggest that you start experimenting with `'changelog.compact.parallelism' = '1'` and increase the value if needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My idea is to have only one switch: changelog.precommit-compact = true.

We can add a Coordinator node to this pipeline to decide how to concatenate it into a target file size result file, which can be one or multiple files.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>changelog.compact.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Compact several changelog files from the same partition into one file, in order to decrease the number of small files. This property sets the parallelism of the compact operator. More parallelism means faster file copy, however the number of resulting files will also become larger.</td>
</tr>
<tr>
<td><h5>end-input.watermark</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class IOUtils {
private static final Logger LOG = LoggerFactory.getLogger(IOUtils.class);

/** The block size for byte operations in byte. */
private static final int BLOCKSIZE = 4096;
public static final int BLOCKSIZE = 4096;

// ------------------------------------------------------------------------
// Byte copy operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,17 @@ public class FlinkConnectorOptions {
.withDescription(
"Optional endInput watermark used in case of batch mode or bounded stream.");

public static final ConfigOption<Integer> CHANGELOG_COMPACT_PARALLELISM =
key("changelog.compact.parallelism")
.intType()
.noDefaultValue()
.withDescription(
"Compact several changelog files from the same partition into one file, "
+ "in order to decrease the number of small files. "
+ "This property sets the parallelism of the compact operator. "
+ "More parallelism means faster file copy, "
+ "however the number of resulting files will also become larger.");

public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact.changelog;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
* Operator to compact several changelog files from the same partition into one file, in order to
* reduce the number of small files.
*/
public class ChangelogCompactOperator extends AbstractStreamOperator<Committable>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why choose "add a compact operator" instead of "compact changelog file in write operator"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can you compact changelog files from all buckets into one file inside the write operator? Please suggest your solution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry,at first, I guess the dimension of compact changelog is bucket, but after I read code, I discovered that the dimension of compact changelog is partition.
But before I submit comments, I forget delete this comment.

implements OneInputStreamOperator<Committable, Committable>, BoundedOneInput {

private final FileStoreTable table;

private transient FileStorePathFactory pathFactory;
private transient long checkpointId;
private transient Map<BinaryRow, OutputStream> outputStreams;
private transient Map<BinaryRow, List<Result>> results;

public ChangelogCompactOperator(FileStoreTable table) {
this.table = table;
}

@Override
public void open() throws Exception {
super.open();

pathFactory = table.store().pathFactory();
checkpointId = Long.MIN_VALUE;
outputStreams = new HashMap<>();
results = new HashMap<>();
}

@Override
public void processElement(StreamRecord<Committable> record) throws Exception {
Committable committable = record.getValue();
checkpointId = Math.max(checkpointId, committable.checkpointId());
if (committable.kind() != Committable.Kind.FILE) {
output.collect(record);
return;
}

CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable();
if (message.newFilesIncrement().changelogFiles().isEmpty()
&& message.compactIncrement().changelogFiles().isEmpty()) {
output.collect(record);
return;
}

// copy changelogs from the same partition into one file
DataFilePathFactory dataFilePathFactory =
pathFactory.createDataFilePathFactory(message.partition(), message.bucket());
for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) {
copyFile(
dataFilePathFactory.toPath(meta.fileName()),
message.partition(),
message.bucket(),
false,
meta);
}
for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
copyFile(
dataFilePathFactory.toPath(meta.fileName()),
message.partition(),
message.bucket(),
true,
meta);
}

// send commit message without changelog files
CommitMessageImpl newMessage =
new CommitMessageImpl(
message.partition(),
message.bucket(),
new DataIncrement(
message.newFilesIncrement().newFiles(),
message.newFilesIncrement().deletedFiles(),
Collections.emptyList()),
new CompactIncrement(
message.compactIncrement().compactBefore(),
message.compactIncrement().compactAfter(),
Collections.emptyList()),
message.indexIncrement());
Committable newCommittable =
new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage);
if (record.hasTimestamp()) {
output.collect(new StreamRecord<>(newCommittable, record.getTimestamp()));
} else {
output.collect(new StreamRecord<>(newCommittable));
}
}

private void copyFile(
Path path, BinaryRow partition, int bucket, boolean isCompactResult, DataFileMeta meta)
throws Exception {
if (!outputStreams.containsKey(partition)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here could change to outputStreams.computeIfAbsent?

Path outputPath =
new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final String xxx = "tmp-compacted-changelog-";

outputStreams.put(
partition,
new OutputStream(
outputPath, table.fileIO().newOutputStream(outputPath, false)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why overwrite is true? If file already exist, throw a exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no true in this line of code. What do you mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why overwrite is false? If file already exist, throw a exception?

}

OutputStream outputStream = outputStreams.get(partition);
long offset = outputStream.out.getPos();
try (SeekableInputStream in = table.fileIO().newInputStream(path)) {
IOUtils.copyBytes(in, outputStream.out, IOUtils.BLOCKSIZE, false);
}
table.fileIO().deleteQuietly(path);
results.computeIfAbsent(partition, p -> new ArrayList<>())
.add(
new Result(
bucket,
isCompactResult,
meta,
offset,
outputStream.out.getPos() - offset));

if (outputStream.out.getPos() >= table.coreOptions().targetFileSize(false)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only pk table has changelog, so why here call targetFileSize with false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changelog files are not LSM tree data files. They're just a bunch of records and they don't care about what the primary keys are.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, you are right.

flushPartition(partition);
}
}

@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
flushAllPartitions();
}

@Override
public void endInput() throws Exception {
flushAllPartitions();
}

private void flushAllPartitions() throws Exception {
List<BinaryRow> partitions = new ArrayList<>(outputStreams.keySet());
for (BinaryRow partition : partitions) {
flushPartition(partition);
}
}

private void flushPartition(BinaryRow partition) throws Exception {
OutputStream outputStream = outputStreams.get(partition);
outputStream.out.close();

Result baseResult = results.get(partition).get(0);
Preconditions.checkArgument(baseResult.offset == 0);
DataFilePathFactory dataFilePathFactory =
pathFactory.createDataFilePathFactory(partition, baseResult.bucket);
// see Java docs of `CompactedChangelogFormatReaderFactory`
String realName =
"compacted-changelog-"
+ UUID.randomUUID()
+ "$"
+ baseResult.bucket
+ "-"
+ baseResult.length;
table.fileIO()
.rename(
outputStream.path,
dataFilePathFactory.toPath(
realName
+ "."
+ CompactedChangelogReadOnlyFormat.getIdentifier(
baseResult.meta.fileFormat())));

Map<Integer, List<Result>> grouped = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This map can init capacity with bucketNum.

for (Result result : results.get(partition)) {
grouped.computeIfAbsent(result.bucket, b -> new ArrayList<>()).add(result);
}

for (Map.Entry<Integer, List<Result>> entry : grouped.entrySet()) {
List<DataFileMeta> newFilesChangelog = new ArrayList<>();
List<DataFileMeta> compactChangelog = new ArrayList<>();
for (Result result : entry.getValue()) {
// see Java docs of `CompactedChangelogFormatReaderFactory`
String name =
(result.offset == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and line 205, can abstract a util function for get path?

? realName
: realName + "-" + result.offset + "-" + result.length)
+ "."
+ CompactedChangelogReadOnlyFormat.getIdentifier(
result.meta.fileFormat());
if (result.isCompactResult) {
compactChangelog.add(result.meta.rename(name));
} else {
newFilesChangelog.add(result.meta.rename(name));
}
}

CommitMessageImpl newMessage =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like line 113, here can add:
// send commit message only with changelog files

new CommitMessageImpl(
partition,
entry.getKey(),
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
newFilesChangelog),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
compactChangelog));
Committable newCommittable =
new Committable(checkpointId, Committable.Kind.FILE, newMessage);
output.collect(new StreamRecord<>(newCommittable));
}

outputStreams.remove(partition);
results.remove(partition);
}

@Override
public void close() throws Exception {
for (Map.Entry<BinaryRow, OutputStream> entry : outputStreams.entrySet()) {
OutputStream outputStream = entry.getValue();
try {
outputStream.out.close();
} catch (Exception e) {
LOG.warn("Failed to close output stream for file " + outputStream.path, e);
}
table.fileIO().deleteQuietly(outputStream.path);
}

outputStreams.clear();
results.clear();
}

private static class OutputStream {

private final Path path;
private final PositionOutputStream out;

private OutputStream(Path path, PositionOutputStream out) {
this.path = path;
this.out = out;
}
}

private static class Result {

private final int bucket;
private final boolean isCompactResult;
private final DataFileMeta meta;
private final long offset;
private final long length;

private Result(
int bucket, boolean isCompactResult, DataFileMeta meta, long offset, long length) {
this.bucket = bucket;
this.isCompactResult = isCompactResult;
this.meta = meta;
this.offset = offset;
this.length = length;
}
}
}
Loading
Loading