Skip to content

Commit

Permalink
[flink] Small changelog files can now be compacted into big files
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Sep 25, 2024
1 parent 36c2756 commit 69a04c8
Show file tree
Hide file tree
Showing 10 changed files with 841 additions and 12 deletions.
13 changes: 13 additions & 0 deletions docs/content/maintenance/write-performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,3 +160,16 @@ You can use fine-grained-resource-management of Flink to increase committer heap
1. Configure Flink Configuration `cluster.fine-grained-resource-management.enabled: true`. (This is default after Flink 1.18)
2. Configure Paimon Table Options: `sink.committer-memory`, for example 300 MB, depends on your `TaskManager`.
(`sink.committer-cpu` is also supported)

## Changelog Compaction

If Flink's checkpoint interval is short (for example, 30 seconds) and the number of buckets is large,
each snapshot may produce lots of small changelog files.
Too many files may put a burden on the distributed storage cluster.

In order to compact small changelog files into large ones, you can set the table option `changelog.compact.parallelism`.
This option will add a compact operator after the writer operator, which copies changelog files into large ones.
If the parallelism becomes larger, file copying will become faster.
However, the number of resulting files will also become larger.
As file copying is fast in most storage system,
we suggest that you start experimenting with `'changelog.compact.parallelism' = '1'` and increase the value if needed.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>changelog.compact.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Compact several changelog files from the same partition into one file, in order to decrease the number of small files. This property sets the parallelism of the compact operator. More parallelism means faster file copy, however the number of resulting files will also become larger.</td>
</tr>
<tr>
<td><h5>end-input.watermark</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public final class IOUtils {
private static final Logger LOG = LoggerFactory.getLogger(IOUtils.class);

/** The block size for byte operations in byte. */
private static final int BLOCKSIZE = 4096;
public static final int BLOCKSIZE = 4096;

// ------------------------------------------------------------------------
// Byte copy operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,17 @@ public class FlinkConnectorOptions {
.withDescription(
"Optional endInput watermark used in case of batch mode or bounded stream.");

public static final ConfigOption<Integer> CHANGELOG_COMPACT_PARALLELISM =
key("changelog.compact.parallelism")
.intType()
.noDefaultValue()
.withDescription(
"Compact several changelog files from the same partition into one file, "
+ "in order to decrease the number of small files. "
+ "This property sets the parallelism of the compact operator. "
+ "More parallelism means faster file copy, "
+ "however the number of resulting files will also become larger.");

public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact.changelog;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOnlyFormat;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

/**
* Operator to compact several changelog files from the same partition into one file, in order to
* reduce the number of small files.
*/
public class ChangelogCompactOperator extends AbstractStreamOperator<Committable>
implements OneInputStreamOperator<Committable, Committable>, BoundedOneInput {

private final FileStoreTable table;

private transient FileStorePathFactory pathFactory;
private transient long checkpointId;
private transient Map<BinaryRow, OutputStream> outputStreams;
private transient Map<BinaryRow, List<Result>> results;

public ChangelogCompactOperator(FileStoreTable table) {
this.table = table;
}

@Override
public void open() throws Exception {
super.open();

pathFactory = table.store().pathFactory();
checkpointId = Long.MIN_VALUE;
outputStreams = new HashMap<>();
results = new HashMap<>();
}

@Override
public void processElement(StreamRecord<Committable> record) throws Exception {
Committable committable = record.getValue();
checkpointId = Math.max(checkpointId, committable.checkpointId());
if (committable.kind() != Committable.Kind.FILE) {
output.collect(record);
return;
}

CommitMessageImpl message = (CommitMessageImpl) committable.wrappedCommittable();
if (message.newFilesIncrement().changelogFiles().isEmpty()
&& message.compactIncrement().changelogFiles().isEmpty()) {
output.collect(record);
return;
}

// copy changelogs from the same partition into one file
DataFilePathFactory dataFilePathFactory =
pathFactory.createDataFilePathFactory(message.partition(), message.bucket());
for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) {
copyFile(
dataFilePathFactory.toPath(meta.fileName()),
message.partition(),
message.bucket(),
false,
meta);
}
for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
copyFile(
dataFilePathFactory.toPath(meta.fileName()),
message.partition(),
message.bucket(),
true,
meta);
}

// send commit message without changelog files
CommitMessageImpl newMessage =
new CommitMessageImpl(
message.partition(),
message.bucket(),
new DataIncrement(
message.newFilesIncrement().newFiles(),
message.newFilesIncrement().deletedFiles(),
Collections.emptyList()),
new CompactIncrement(
message.compactIncrement().compactBefore(),
message.compactIncrement().compactAfter(),
Collections.emptyList()),
message.indexIncrement());
Committable newCommittable =
new Committable(committable.checkpointId(), Committable.Kind.FILE, newMessage);
if (record.hasTimestamp()) {
output.collect(new StreamRecord<>(newCommittable, record.getTimestamp()));
} else {
output.collect(new StreamRecord<>(newCommittable));
}
}

private void copyFile(
Path path, BinaryRow partition, int bucket, boolean isCompactResult, DataFileMeta meta)
throws Exception {
if (!outputStreams.containsKey(partition)) {
Path outputPath =
new Path(path.getParent(), "tmp-compacted-changelog-" + UUID.randomUUID());
outputStreams.put(
partition,
new OutputStream(
outputPath, table.fileIO().newOutputStream(outputPath, false)));
}

OutputStream outputStream = outputStreams.get(partition);
long offset = outputStream.out.getPos();
try (SeekableInputStream in = table.fileIO().newInputStream(path)) {
IOUtils.copyBytes(in, outputStream.out, IOUtils.BLOCKSIZE, false);
}
table.fileIO().deleteQuietly(path);
results.computeIfAbsent(partition, p -> new ArrayList<>())
.add(
new Result(
bucket,
isCompactResult,
meta,
offset,
outputStream.out.getPos() - offset));

if (outputStream.out.getPos() >= table.coreOptions().targetFileSize(false)) {
flushPartition(partition);
}
}

@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
flushAllPartitions();
}

@Override
public void endInput() throws Exception {
flushAllPartitions();
}

private void flushAllPartitions() throws Exception {
List<BinaryRow> partitions = new ArrayList<>(outputStreams.keySet());
for (BinaryRow partition : partitions) {
flushPartition(partition);
}
}

private void flushPartition(BinaryRow partition) throws Exception {
OutputStream outputStream = outputStreams.get(partition);
outputStream.out.close();

Result baseResult = results.get(partition).get(0);
Preconditions.checkArgument(baseResult.offset == 0);
DataFilePathFactory dataFilePathFactory =
pathFactory.createDataFilePathFactory(partition, baseResult.bucket);
// see Java docs of `CompactedChangelogFormatReaderFactory`
String realName =
"compacted-changelog-"
+ UUID.randomUUID()
+ "$"
+ baseResult.bucket
+ "-"
+ baseResult.length;
table.fileIO()
.rename(
outputStream.path,
dataFilePathFactory.toPath(
realName
+ "."
+ CompactedChangelogReadOnlyFormat.getIdentifier(
baseResult.meta.fileFormat())));

Map<Integer, List<Result>> grouped = new HashMap<>();
for (Result result : results.get(partition)) {
grouped.computeIfAbsent(result.bucket, b -> new ArrayList<>()).add(result);
}

for (Map.Entry<Integer, List<Result>> entry : grouped.entrySet()) {
List<DataFileMeta> newFilesChangelog = new ArrayList<>();
List<DataFileMeta> compactChangelog = new ArrayList<>();
for (Result result : entry.getValue()) {
// see Java docs of `CompactedChangelogFormatReaderFactory`
String name =
(result.offset == 0
? realName
: realName + "-" + result.offset + "-" + result.length)
+ "."
+ CompactedChangelogReadOnlyFormat.getIdentifier(
result.meta.fileFormat());
if (result.isCompactResult) {
compactChangelog.add(result.meta.rename(name));
} else {
newFilesChangelog.add(result.meta.rename(name));
}
}

CommitMessageImpl newMessage =
new CommitMessageImpl(
partition,
entry.getKey(),
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
newFilesChangelog),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
compactChangelog));
Committable newCommittable =
new Committable(checkpointId, Committable.Kind.FILE, newMessage);
output.collect(new StreamRecord<>(newCommittable));
}

outputStreams.remove(partition);
results.remove(partition);
}

@Override
public void close() throws Exception {
for (Map.Entry<BinaryRow, OutputStream> entry : outputStreams.entrySet()) {
OutputStream outputStream = entry.getValue();
try {
outputStream.out.close();
} catch (Exception e) {
LOG.warn("Failed to close output stream for file " + outputStream.path, e);
}
table.fileIO().deleteQuietly(outputStream.path);
}

outputStreams.clear();
results.clear();
}

private static class OutputStream {

private final Path path;
private final PositionOutputStream out;

private OutputStream(Path path, PositionOutputStream out) {
this.path = path;
this.out = out;
}
}

private static class Result {

private final int bucket;
private final boolean isCompactResult;
private final DataFileMeta meta;
private final long offset;
private final long length;

private Result(
int bucket, boolean isCompactResult, DataFileMeta meta, long offset, long length) {
this.bucket = bucket;
this.isCompactResult = isCompactResult;
this.meta = meta;
this.offset = offset;
this.length = length;
}
}
}
Loading

0 comments on commit 69a04c8

Please sign in to comment.