Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/docs/flink-maintenance.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ env.execute("Table Maintenance Job");
| `maxRewriteBytes(long)` | Maximum bytes to rewrite per execution | Long.MAX_VALUE | long |
| `filter(Expression)` | Filter expression for selecting files to rewrite | Expressions.alwaysTrue() | Expression |
| `maxFileGroupInputFiles(long)` | Maximum allowed number of input files within a file group | Long.MAX_VALUE | long |
| `openParquetMerge(boolean)` | For Parquet tables, `rewriteDataFiles` can use an optimized row-group level merge strategy that is significantly faster than the standard read-rewrite approach. This optimization directly copies row groups without deserialization and re-serialization. | false | boolean |

#### DeleteOrphanFiles Configuration

Expand Down Expand Up @@ -398,6 +399,12 @@ These keys are used in SQL (SET or table WITH options) and are applicable when w
- Enable `partialProgressEnabled` for large rewrite operations
- Set reasonable `maxRewriteBytes` limits
- Setting an appropriate `maxFileGroupSizeBytes` can break down large FileGroups into smaller ones, thereby increasing the speed of parallel processing
- For Parquet tables, `rewriteDataFiles` can open parquet merge, use an optimized row-group level merge strategy that is significantly faster than the standard read-rewrite approach. This optimization is applied when the following requirements are met:
- * All files are in Parquet format
- * Files have compatible schemas
- * Files are not encrypted
- * Files do not have associated delete files or delete vectors
- * Table does not have a sort order (including z-ordered tables)

### Troubleshooting

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public static class Builder extends MaintenanceTaskBuilder<RewriteDataFiles.Buil
private final Map<String, String> rewriteOptions = Maps.newHashMapWithExpectedSize(6);
private long maxRewriteBytes = Long.MAX_VALUE;
private Expression filter = Expressions.alwaysTrue();
private boolean openParquetMerge = false;

@Override
String maintenanceTaskName() {
Expand Down Expand Up @@ -218,6 +219,16 @@ public Builder filter(Expression newFilter) {
return this;
}

/**
* Configures whether to open parquet merge.
*
* @param newOpenParquetMerge whether to open parquet merge
*/
public Builder openParquetMerge(boolean newOpenParquetMerge) {
this.openParquetMerge = newOpenParquetMerge;
return this;
}

/**
* Configures the properties for the rewriter.
*
Expand Down Expand Up @@ -271,7 +282,8 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
SingleOutputStreamOperator<DataFileRewriteRunner.ExecutedGroup> rewritten =
planned
.rebalance()
.process(new DataFileRewriteRunner(tableName(), taskName(), index()))
.process(
new DataFileRewriteRunner(tableName(), taskName(), index(), openParquetMerge))
.name(operatorName(REWRITE_TASK_NAME))
.uid(REWRITE_TASK_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;

import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import org.apache.flink.annotation.Internal;
Expand All @@ -33,8 +34,11 @@
import org.apache.iceberg.BaseCombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.actions.RewriteFileGroup;
Expand All @@ -45,10 +49,15 @@
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.FileScanTaskReader;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.parquet.ParquetFileMerger;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -65,17 +74,20 @@ public class DataFileRewriteRunner
private final String tableName;
private final String taskName;
private final int taskIndex;
private final boolean openParquetMerge;

private transient int subTaskId;
private transient int attemptId;
private transient Counter errorCounter;

public DataFileRewriteRunner(String tableName, String taskName, int taskIndex) {
public DataFileRewriteRunner(
String tableName, String taskName, int taskIndex, boolean openParquetMerge) {
Preconditions.checkNotNull(tableName, "Table name should no be null");
Preconditions.checkNotNull(taskName, "Task name should no be null");
this.tableName = tableName;
this.taskName = taskName;
this.taskIndex = taskIndex;
this.openParquetMerge = openParquetMerge;
}

@Override
Expand Down Expand Up @@ -112,6 +124,54 @@ public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGr
value.group().rewrittenFiles().size());
}

MessageType messageType = null;
try {
messageType = canMergeAndGetSchema(value.group(), value.table());
} catch (Exception ex) {
LOG.warn(
DataFileRewritePlanner.MESSAGE_PREFIX
+ "Exception checking if Parquet merge can be used for group {}",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
value.group(),
ex);
}

if (openParquetMerge && messageType != null) {
try {
// schema is lazy init, so we need to get it here,or it will be exception in kryoSerializes
for (FileScanTask fileScanTask : value.group().fileScanTasks()) {
fileScanTask.schema();
}

DataFile resultFile =
rewriteDataFilesUseParquetMerge(value.group(), value.table(), messageType);
value.group().setOutputFiles(Sets.newHashSet(resultFile));
ExecutedGroup executedGroup =
new ExecutedGroup(
value.table().currentSnapshot().snapshotId(),
value.groupsPerCommit(),
value.group());
out.collect(executedGroup);
} catch (Exception ex) {
LOG.info(
DataFileRewritePlanner.MESSAGE_PREFIX
+ "Exception creating compaction writer for group {}",
tableName,
taskName,
taskIndex,
ctx.timestamp(),
value.group(),
ex);
ctx.output(TaskResultAggregator.ERROR_STREAM, ex);
errorCounter.inc();
}

return;
}

boolean preserveRowId = TableUtil.supportsRowLineage(value.table());

try (TaskWriter<RowData> writer = writerFor(value, preserveRowId)) {
Expand Down Expand Up @@ -175,6 +235,65 @@ public void processElement(PlannedGroup value, Context ctx, Collector<ExecutedGr
}
}

private DataFile rewriteDataFilesUseParquetMerge(
RewriteFileGroup group, Table table, MessageType messageType) throws IOException {

PartitionSpec spec = table.specs().get(group.outputSpecId());

long rowGroupSize =
PropertyUtil.propertyAsLong(
table.properties(),
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);

OutputFileFactory outputFileFactory =
OutputFileFactory.builderFor(table, taskIndex, attemptId)
.format(FileFormat.PARQUET)
.ioSupplier(table::io)
.defaultSpec(spec)
.build();

OutputFile outputFile =
outputFileFactory.newOutputFile(group.info().partition()).encryptingOutputFile();

return ParquetFileMerger.mergeFiles(
Lists.newArrayList(group.rewrittenFiles()),
table.io(),
outputFile,
messageType,
rowGroupSize,
table.spec(),
group.info().partition());
}

private MessageType canMergeAndGetSchema(RewriteFileGroup group, Table table) {
// Check if group expects exactly one output file
if (group.expectedOutputFiles() != 1) {
return null;
}

// Check if table has a sort order
if (table.sortOrder().isSorted()) {
return null;
}

// Check for delete files
boolean hasDeletes = group.fileScanTasks().stream().anyMatch(task -> !task.deletes().isEmpty());
if (hasDeletes) {
return null;
}

boolean allTheSamePartition =
group.rewrittenFiles().stream().anyMatch(file -> file.specId() != group.outputSpecId());
if (allTheSamePartition) {
return null;
}

// Validate Parquet-specific requirements and get schema
return ParquetFileMerger.canMergeAndGetSchema(
Lists.newArrayList(group.rewrittenFiles()), table.io(), group.maxOutputFileSize());
}

private TaskWriter<RowData> writerFor(PlannedGroup value, boolean preserveRowId) {
String formatString =
PropertyUtil.propertyAsString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
Expand All @@ -43,9 +46,21 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(ParameterizedTestExtension.class)
class TestRewriteDataFiles extends MaintenanceTaskTestBase {
@Test

@Parameter(index = 0)
private boolean openParquetMerge;

@Parameters(name = "openParquetMerge = {0}")
private static Object[][] parameters() {
return new Object[][] {{true}, {false}};
}

@TestTemplate
void testRewriteUnpartitioned() throws Exception {
Table table = createTable();
insert(table, 1, "a");
Expand All @@ -64,6 +79,7 @@ void testRewriteUnpartitioned() throws Exception {
.maxFileSizeBytes(2_000_000L)
.minFileSizeBytes(500_000L)
.minInputFiles(2)
.openParquetMerge(openParquetMerge)
.partialProgressEnabled(true)
.partialProgressMaxCommits(1)
.maxRewriteBytes(100_000L)
Expand All @@ -82,7 +98,7 @@ void testRewriteUnpartitioned() throws Exception {
createRecord(4, "d")));
}

@Test
@TestTemplate
void testRewriteUnpartitionedPreserveLineage() throws Exception {
Table table = createTable(3);
insert(table, 1, "a");
Expand All @@ -101,6 +117,7 @@ void testRewriteUnpartitionedPreserveLineage() throws Exception {
.maxFileSizeBytes(2_000_000L)
.minFileSizeBytes(500_000L)
.minInputFiles(2)
.openParquetMerge(openParquetMerge)
.partialProgressEnabled(true)
.partialProgressMaxCommits(1)
.maxRewriteBytes(100_000L)
Expand All @@ -122,7 +139,7 @@ void testRewriteUnpartitionedPreserveLineage() throws Exception {
schema);
}

@Test
@TestTemplate
void testRewriteTheSameFilePreserveLineage() throws Exception {
Table table = createTable(3);
insert(table, 1, "a");
Expand All @@ -143,6 +160,7 @@ void testRewriteTheSameFilePreserveLineage() throws Exception {
.maxFileSizeBytes(2_000_000L)
.minFileSizeBytes(500_000L)
.minInputFiles(2)
.openParquetMerge(openParquetMerge)
.partialProgressEnabled(true)
.partialProgressMaxCommits(1)
.maxRewriteBytes(100_000L)
Expand All @@ -166,7 +184,7 @@ void testRewriteTheSameFilePreserveLineage() throws Exception {
schema);
}

@Test
@TestTemplate
void testRewritePartitionedPreserveLineage() throws Exception {
Table table = createPartitionedTable(3);
insertPartitioned(table, 1, "p1");
Expand Down Expand Up @@ -194,7 +212,7 @@ void testRewritePartitionedPreserveLineage() throws Exception {
schema);
}

@Test
@TestTemplate
void testRewritePartitioned() throws Exception {
Table table = createPartitionedTable();
insertPartitioned(table, 1, "p1");
Expand Down Expand Up @@ -294,7 +312,7 @@ void testPlannerFailure() throws Exception {
.build());
}

@Test
@TestTemplate
void testUidAndSlotSharingGroup() {
createTable();

Expand All @@ -316,7 +334,7 @@ void testUidAndSlotSharingGroup() {
checkSlotSharingGroupsAreSet(infra.env(), SLOT_SHARING_GROUP);
}

@Test
@TestTemplate
void testUidAndSlotSharingGroupUnset() {
createTable();

Expand All @@ -336,7 +354,7 @@ void testUidAndSlotSharingGroupUnset() {
checkSlotSharingGroupsAreSet(infra.env(), null);
}

@Test
@TestTemplate
void testMetrics() throws Exception {
Table table = createTable();
insert(table, 1, "a");
Expand Down Expand Up @@ -410,7 +428,7 @@ void testMetrics() throws Exception {
.build());
}

@Test
@TestTemplate
void testV2Table() throws Exception {
Table table = createTableWithDelete();
update(table, 1, null, "a", "b");
Expand Down Expand Up @@ -489,7 +507,7 @@ void testV2Table() throws Exception {
.build());
}

@Test
@TestTemplate
void testRewriteWithFilter() throws Exception {
Table table = createTable();
insert(table, 1, "a");
Expand Down Expand Up @@ -530,7 +548,8 @@ void testRewriteWithFilter() throws Exception {
}

private void appendRewriteDataFiles() {
appendRewriteDataFiles(RewriteDataFiles.builder().rewriteAll(true));
appendRewriteDataFiles(
RewriteDataFiles.builder().openParquetMerge(openParquetMerge).rewriteAll(true));
}

private void appendRewriteDataFiles(RewriteDataFiles.Builder builder) {
Expand Down
Loading