diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java index 4df3eecb18e5..d09e1a1cb3ae 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IncrementalScanSource.java @@ -42,11 +42,11 @@ * source creates a single range, while the unbounded implementation continuously polls for new * snapshots at the specified interval. */ -class IncrementalScanSource extends PTransform> { +public class IncrementalScanSource extends PTransform> { private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(60); - private final IcebergScanConfig scanConfig; + protected final IcebergScanConfig scanConfig; - IncrementalScanSource(IcebergScanConfig scanConfig) { + public IncrementalScanSource(IcebergScanConfig scanConfig) { this.scanConfig = scanConfig; } @@ -74,14 +74,15 @@ public PCollection expand(PBegin input) { } /** Continuously watches for new snapshots. */ - private PCollection>> unboundedSnapshots(PBegin input) { + protected PCollection>> unboundedSnapshots(PBegin input) { Duration pollInterval = MoreObjects.firstNonNull(scanConfig.getPollInterval(), DEFAULT_POLL_INTERVAL); return input.apply("Watch for Snapshots", new WatchForSnapshots(scanConfig, pollInterval)); } /** Creates a fixed snapshot range. */ - private PCollection>> boundedSnapshots(PBegin input, Table table) { + protected PCollection>> boundedSnapshots( + PBegin input, Table table) { checkStateNotNull( table.currentSnapshot().snapshotId(), "Table %s does not have any snapshots to read from.", diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java index 4b94663c64c5..6c0230153cdf 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java @@ -25,8 +25,15 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; import org.checkerframework.checker.nullness.qual.Nullable; class PartitionUtils { @@ -90,4 +97,51 @@ static PartitionSpec toPartitionSpec( return builder.build(); } + + /** + * Copied over from Apache Iceberg's PartitionUtil. + */ + public static Map constantsMap( + PartitionSpec spec, ContentFile file, BiFunction convertConstant) { + StructLike partitionData = file.partition(); + + // use java.util.HashMap because partition data may contain null values + Map idToConstant = Maps.newHashMap(); + + // add first_row_id as _row_id + if (file.firstRowId() != null) { + idToConstant.put( + MetadataColumns.ROW_ID.fieldId(), + convertConstant.apply(Types.LongType.get(), file.firstRowId())); + } + + idToConstant.put( + MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(), + convertConstant.apply(Types.LongType.get(), file.fileSequenceNumber())); + + // add _file + idToConstant.put( + MetadataColumns.FILE_PATH.fieldId(), + convertConstant.apply(Types.StringType.get(), file.location())); + + // add _spec_id + idToConstant.put( + MetadataColumns.SPEC_ID.fieldId(), + convertConstant.apply(Types.IntegerType.get(), file.specId())); + + List partitionFields = spec.partitionType().fields(); + List fields = spec.fields(); + for (int pos = 0; pos < fields.size(); pos += 1) { + PartitionField field = fields.get(pos); + if (field.transform().isIdentity()) { + Object converted = + convertConstant.apply( + partitionFields.get(pos).type(), partitionData.get(pos, Object.class)); + idToConstant.put(field.sourceId(), converted); + } + } + + return idToConstant; + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java index 528b89c203bf..fea62356e431 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadFromTasks.java @@ -75,9 +75,7 @@ public void process( } FileScanTask task = fileScanTasks.get((int) l); Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema()); - try (CloseableIterable fullIterable = - ReadUtils.createReader(task, table, scanConfig.getRequiredSchema())) { - CloseableIterable reader = ReadUtils.maybeApplyFilter(fullIterable, scanConfig); + try (CloseableIterable reader = ReadUtils.createReader(task, table, scanConfig)) { for (Record record : reader) { Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java index 4b127fcdef22..4a319663f994 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.iceberg.util.SnapshotUtil.ancestorsOf; import java.util.Collection; @@ -28,16 +29,22 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy; +import org.apache.beam.sdk.io.iceberg.cdc.DeleteReader; +import org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.Record; @@ -55,7 +62,6 @@ import org.apache.iceberg.parquet.ParquetReader; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.util.PartitionUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; @@ -72,16 +78,42 @@ public class ReadUtils { "parquet.read.support.class", "parquet.crypto.factory.class"); - static ParquetReader createReader(FileScanTask task, Table table, Schema schema) { - String filePath = task.file().path().toString(); + public static CloseableIterable createReader( + SerializableChangelogTask task, Table table, IcebergScanConfig scanConfig) { + return createReader( + table, + scanConfig, + checkStateNotNull(table.specs().get(task.getSpecId())), + task.getDataFile().createDataFile(table.specs()), + task.getStart(), + task.getLength(), + task.getExpression(table.schema())); + } + + public static CloseableIterable createReader( + ContentScanTask task, Table table, IcebergScanConfig scanConfig) { + return createReader( + table, scanConfig, task.spec(), task.file(), task.start(), task.length(), task.residual()); + } + + public static CloseableIterable createReader( + Table table, + IcebergScanConfig scanConfig, + PartitionSpec spec, + ContentFile file, + long start, + long length, + Expression residual) { + Schema schema = scanConfig.getRequiredSchema(); InputFile inputFile; try (FileIO io = table.io()) { EncryptedInputFile encryptedInput = - EncryptedFiles.encryptedInput(io.newInputFile(filePath), task.file().keyMetadata()); + EncryptedFiles.encryptedInput(io.newInputFile(file.location()), file.keyMetadata()); inputFile = table.encryption().decrypt(encryptedInput); } Map idToConstants = - ReadUtils.constantsMap(task, IdentityPartitionConverters::convertConstant, table.schema()); + ReadUtils.constantsMap( + spec, file, IdentityPartitionConverters::convertConstant, table.schema()); ParquetReadOptions.Builder optionsBuilder; if (inputFile instanceof HadoopInputFile) { @@ -96,37 +128,40 @@ static ParquetReader createReader(FileScanTask task, Table table, Schema } optionsBuilder = optionsBuilder - .withRange(task.start(), task.start() + task.length()) + .withRange(start, start + length) .withMaxAllocationInBytes(MAX_FILE_BUFFER_SIZE); @Nullable String nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); NameMapping mapping = nameMapping != null ? NameMappingParser.fromJson(nameMapping) : NameMapping.empty(); - return new ParquetReader<>( - inputFile, - schema, - optionsBuilder.build(), - // TODO(ahmedabu98): Implement a Parquet-to-Beam Row reader, bypassing conversion to Iceberg - // Record - fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema, idToConstants), - mapping, - task.residual(), - false, - true); + ParquetReader records = + new ParquetReader<>( + inputFile, + schema, + optionsBuilder.build(), + // TODO(ahmedabu98): Implement a Parquet-to-Beam Row reader, bypassing conversion to + // Iceberg + // Record + fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema, idToConstants), + mapping, + residual, + false, + true); + return maybeApplyFilter(records, scanConfig); } static Map constantsMap( - FileScanTask task, + PartitionSpec spec, + ContentFile file, BiFunction converter, org.apache.iceberg.Schema schema) { - PartitionSpec spec = task.spec(); Set idColumns = spec.identitySourceIds(); org.apache.iceberg.Schema partitionSchema = TypeUtil.select(schema, idColumns); boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty(); if (projectsIdentityPartitionColumns) { - return PartitionUtil.constantsMap(task, converter); + return PartitionUtils.constantsMap(spec, file, converter); } else { return Collections.emptyMap(); } @@ -208,4 +243,88 @@ public static CloseableIterable maybeApplyFilter( } return iterable; } + + public static DeleteFilter genericDeleteFilter( + Table table, + IcebergScanConfig scanConfig, + String dataFilePath, + List deletes) { + return new BeamDeleteFilter( + table.io(), + dataFilePath, + scanConfig.getRequiredSchema(), + scanConfig.getProjectedSchema(), + deletes.stream() + .map(sdf -> sdf.createDeleteFile(table.specs(), table.sortOrders())) + .collect(Collectors.toList())); + } + + public static DeleteReader genericDeleteReader( + Table table, + IcebergScanConfig scanConfig, + String dataFilePath, + List deletes) { + return new BeamDeleteReader( + table.io(), + dataFilePath, + scanConfig.getRequiredSchema(), + scanConfig.getProjectedSchema(), + deletes.stream() + .map(sdf -> sdf.createDeleteFile(table.specs(), table.sortOrders())) + .collect(Collectors.toList())); + } + + public static class BeamDeleteFilter extends DeleteFilter { + private final FileIO io; + private final InternalRecordWrapper asStructLike; + + @SuppressWarnings("method.invocation") + public BeamDeleteFilter( + FileIO io, + String dataFilePath, + Schema tableSchema, + Schema projectedSchema, + List deleteFiles) { + super(dataFilePath, deleteFiles, tableSchema, projectedSchema); + this.io = io; + this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct()); + } + + @Override + protected StructLike asStructLike(Record record) { + return asStructLike.wrap(record); + } + + @Override + protected InputFile getInputFile(String location) { + return io.newInputFile(location); + } + } + + public static class BeamDeleteReader extends DeleteReader { + private final FileIO io; + private final InternalRecordWrapper asStructLike; + + @SuppressWarnings("method.invocation") + public BeamDeleteReader( + FileIO io, + String dataFilePath, + Schema tableSchema, + Schema projectedSchema, + List deleteFiles) { + super(dataFilePath, deleteFiles, tableSchema, projectedSchema); + this.io = io; + this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct()); + } + + @Override + protected StructLike asStructLike(Record record) { + return asStructLike.wrap(record); + } + + @Override + protected InputFile getInputFile(String location) { + return io.newInputFile(location); + } + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java index 81ec229df70f..452012766e3c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ScanTaskReader.java @@ -122,7 +122,10 @@ public boolean advance() throws IOException { InputFile input = decryptor.getInputFile(fileTask); Map idToConstants = ReadUtils.constantsMap( - fileTask, IdentityPartitionConverters::convertConstant, requiredSchema); + fileTask.spec(), + fileTask.file(), + IdentityPartitionConverters::convertConstant, + requiredSchema); CloseableIterable iterable; switch (file.format()) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java index 5c994c3e5651..f67b9a3b91ae 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java @@ -54,13 +54,13 @@ */ @DefaultSchema(AutoValueSchema.class) @AutoValue -abstract class SerializableDataFile { +public abstract class SerializableDataFile { public static Builder builder() { return new AutoValue_SerializableDataFile.Builder(); } @SchemaFieldNumber("0") - abstract String getPath(); + public abstract String getPath(); @SchemaFieldNumber("1") abstract String getFileFormat(); @@ -69,10 +69,10 @@ public static Builder builder() { abstract long getRecordCount(); @SchemaFieldNumber("3") - abstract long getFileSizeInBytes(); + public abstract long getFileSizeInBytes(); @SchemaFieldNumber("4") - abstract String getPartitionPath(); + public abstract String getPartitionPath(); @SchemaFieldNumber("5") abstract int getPartitionSpecId(); @@ -96,13 +96,22 @@ public static Builder builder() { abstract @Nullable Map getNanValueCounts(); @SchemaFieldNumber("12") - abstract @Nullable Map getLowerBounds(); + public abstract @Nullable Map getLowerBounds(); @SchemaFieldNumber("13") - abstract @Nullable Map getUpperBounds(); + public abstract @Nullable Map getUpperBounds(); + + @SchemaFieldNumber("14") + public abstract @Nullable Long getDataSequenceNumber(); + + @SchemaFieldNumber("15") + public abstract @Nullable Long getFileSequenceNumber(); + + @SchemaFieldNumber("16") + public abstract @Nullable Long getFirstRowId(); @AutoValue.Builder - abstract static class Builder { + public abstract static class Builder { abstract Builder setPath(String path); abstract Builder setFileFormat(String fileFormat); @@ -131,31 +140,49 @@ abstract static class Builder { abstract Builder setUpperBounds(@Nullable Map upperBounds); + abstract Builder setDataSequenceNumber(@Nullable Long number); + + abstract Builder setFileSequenceNumber(@Nullable Long number); + + abstract Builder setFirstRowId(@Nullable Long id); + abstract SerializableDataFile build(); } + public static SerializableDataFile from(DataFile f, String partitionPath) { + return from(f, partitionPath, true); + } + /** * Create a {@link SerializableDataFile} from a {@link DataFile} and its associated {@link * PartitionKey}. */ - static SerializableDataFile from(DataFile f, String partitionPath) { - - return SerializableDataFile.builder() - .setPath(f.path().toString()) - .setFileFormat(f.format().toString()) - .setRecordCount(f.recordCount()) - .setFileSizeInBytes(f.fileSizeInBytes()) - .setPartitionPath(partitionPath) - .setPartitionSpecId(f.specId()) - .setKeyMetadata(f.keyMetadata()) - .setSplitOffsets(f.splitOffsets()) - .setColumnSizes(f.columnSizes()) - .setValueCounts(f.valueCounts()) - .setNullValueCounts(f.nullValueCounts()) - .setNanValueCounts(f.nanValueCounts()) - .setLowerBounds(toByteArrayMap(f.lowerBounds())) - .setUpperBounds(toByteArrayMap(f.upperBounds())) - .build(); + public static SerializableDataFile from( + DataFile f, String partitionPath, boolean includeMetrics) { + SerializableDataFile.Builder builder = + SerializableDataFile.builder() + .setPath(f.location()) + .setFileFormat(f.format().toString()) + .setRecordCount(f.recordCount()) + .setFileSizeInBytes(f.fileSizeInBytes()) + .setPartitionPath(partitionPath) + .setPartitionSpecId(f.specId()) + .setKeyMetadata(f.keyMetadata()) + .setSplitOffsets(f.splitOffsets()) + .setColumnSizes(f.columnSizes()) + .setValueCounts(f.valueCounts()) + .setNullValueCounts(f.nullValueCounts()) + .setNanValueCounts(f.nanValueCounts()) + .setDataSequenceNumber(f.dataSequenceNumber()) + .setFileSequenceNumber(f.fileSequenceNumber()) + .setFirstRowId(f.firstRowId()); + if (includeMetrics) { + builder = + builder + .setLowerBounds(toByteArrayMap(f.lowerBounds())) + .setUpperBounds(toByteArrayMap(f.upperBounds())); + } + return builder.build(); } /** @@ -192,14 +219,14 @@ DataFile createDataFile(Map partitionSpecs) { .withFileSizeInBytes(getFileSizeInBytes()) .withMetrics(dataFileMetrics) .withSplitOffsets(getSplitOffsets()) + .withFirstRowId(getFirstRowId()) .build(); } // ByteBuddyUtils has trouble converting Map value type ByteBuffer // to byte[] and back to ByteBuffer, so we perform these conversions manually // TODO(https://github.com/apache/beam/issues/32701) - private static @Nullable Map toByteArrayMap( - @Nullable Map input) { + static @Nullable Map toByteArrayMap(@Nullable Map input) { if (input == null) { return null; } @@ -210,8 +237,7 @@ DataFile createDataFile(Map partitionSpecs) { return output; } - private static @Nullable Map toByteBufferMap( - @Nullable Map input) { + static @Nullable Map toByteBufferMap(@Nullable Map input) { if (input == null) { return null; } @@ -244,10 +270,13 @@ && getPartitionSpecId() == that.getPartitionSpecId() && Objects.equals(getNullValueCounts(), that.getNullValueCounts()) && Objects.equals(getNanValueCounts(), that.getNanValueCounts()) && mapEquals(getLowerBounds(), that.getLowerBounds()) - && mapEquals(getUpperBounds(), that.getUpperBounds()); + && mapEquals(getUpperBounds(), that.getUpperBounds()) + && Objects.equals(getDataSequenceNumber(), that.getDataSequenceNumber()) + && Objects.equals(getFileSequenceNumber(), that.getFileSequenceNumber()) + && Objects.equals(getFirstRowId(), that.getFirstRowId()); } - private static boolean mapEquals( + static boolean mapEquals( @Nullable Map map1, @Nullable Map map2) { if (map1 == null && map2 == null) { return true; @@ -285,13 +314,16 @@ public final int hashCode() { getColumnSizes(), getValueCounts(), getNullValueCounts(), - getNanValueCounts()); + getNanValueCounts(), + getDataSequenceNumber(), + getFileSequenceNumber(), + getFirstRowId()); hashCode = 31 * hashCode + computeMapByteHashCode(getLowerBounds()); hashCode = 31 * hashCode + computeMapByteHashCode(getUpperBounds()); return hashCode; } - private static int computeMapByteHashCode(@Nullable Map map) { + static int computeMapByteHashCode(@Nullable Map map) { if (map == null) { return 0; } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDeleteFile.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDeleteFile.java new file mode 100644 index 000000000000..ec165943fabe --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDeleteFile.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.apache.beam.sdk.io.iceberg.SerializableDataFile.computeMapByteHashCode; +import static org.apache.beam.sdk.io.iceberg.SerializableDataFile.mapEquals; +import static org.apache.beam.sdk.io.iceberg.SerializableDataFile.toByteArrayMap; +import static org.apache.beam.sdk.io.iceberg.SerializableDataFile.toByteBufferMap; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import com.google.auto.value.AutoValue; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SortOrder; +import org.checkerframework.checker.nullness.qual.Nullable; + +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class SerializableDeleteFile { + public static SerializableDeleteFile.Builder builder() { + return new AutoValue_SerializableDeleteFile.Builder(); + } + + @SchemaFieldNumber("0") + public abstract FileContent getContentType(); + + @SchemaFieldNumber("1") + public abstract String getLocation(); + + @SchemaFieldNumber("2") + public abstract String getFileFormat(); + + @SchemaFieldNumber("3") + public abstract long getRecordCount(); + + @SchemaFieldNumber("4") + public abstract long getFileSizeInBytes(); + + @SchemaFieldNumber("5") + public abstract String getPartitionPath(); + + @SchemaFieldNumber("6") + public abstract int getPartitionSpecId(); + + @SchemaFieldNumber("7") + public abstract @Nullable Integer getSortOrderId(); + + @SchemaFieldNumber("8") + public abstract @Nullable List getEqualityFieldIds(); + + @SchemaFieldNumber("9") + public abstract @Nullable ByteBuffer getKeyMetadata(); + + @SchemaFieldNumber("10") + public abstract @Nullable List getSplitOffsets(); + + @SchemaFieldNumber("11") + public abstract @Nullable Map getColumnSizes(); + + @SchemaFieldNumber("12") + public abstract @Nullable Map getValueCounts(); + + @SchemaFieldNumber("13") + public abstract @Nullable Map getNullValueCounts(); + + @SchemaFieldNumber("14") + public abstract @Nullable Map getNanValueCounts(); + + @SchemaFieldNumber("15") + public abstract @Nullable Map getLowerBounds(); + + @SchemaFieldNumber("16") + public abstract @Nullable Map getUpperBounds(); + + @SchemaFieldNumber("17") + public abstract @Nullable Long getContentOffset(); + + @SchemaFieldNumber("18") + public abstract @Nullable Long getContentSizeInBytes(); + + @SchemaFieldNumber("19") + public abstract @Nullable String getReferencedDataFile(); + + @SchemaFieldNumber("20") + public abstract @Nullable Long getDataSequenceNumber(); + + @SchemaFieldNumber("21") + public abstract @Nullable Long getFileSequenceNumber(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setContentType(FileContent content); + + abstract Builder setLocation(String path); + + abstract Builder setFileFormat(String fileFormat); + + abstract Builder setRecordCount(long recordCount); + + abstract Builder setFileSizeInBytes(long fileSizeInBytes); + + abstract Builder setPartitionPath(String partitionPath); + + abstract Builder setPartitionSpecId(int partitionSpec); + + abstract Builder setSortOrderId(int sortOrderId); + + abstract Builder setEqualityFieldIds(List equalityFieldIds); + + abstract Builder setKeyMetadata(ByteBuffer keyMetadata); + + abstract Builder setSplitOffsets(List splitOffsets); + + abstract Builder setColumnSizes(Map columnSizes); + + abstract Builder setValueCounts(Map valueCounts); + + abstract Builder setNullValueCounts(Map nullValueCounts); + + abstract Builder setNanValueCounts(Map nanValueCounts); + + abstract Builder setLowerBounds(@Nullable Map lowerBounds); + + abstract Builder setUpperBounds(@Nullable Map upperBounds); + + abstract Builder setContentOffset(@Nullable Long offset); + + abstract Builder setContentSizeInBytes(@Nullable Long sizeInBytes); + + abstract Builder setReferencedDataFile(@Nullable String dataFile); + + abstract Builder setDataSequenceNumber(@Nullable Long number); + + abstract Builder setFileSequenceNumber(@Nullable Long number); + + abstract SerializableDeleteFile build(); + } + + public static SerializableDeleteFile from( + DeleteFile deleteFile, String partitionPath, boolean includeMetrics) { + SerializableDeleteFile.Builder builder = + SerializableDeleteFile.builder() + .setLocation(deleteFile.location()) + .setFileFormat(deleteFile.format().name()) + .setFileSizeInBytes(deleteFile.fileSizeInBytes()) + .setPartitionPath(partitionPath) + .setPartitionSpecId(deleteFile.specId()) + .setRecordCount(deleteFile.recordCount()) + .setColumnSizes(deleteFile.columnSizes()) + .setValueCounts(deleteFile.valueCounts()) + .setNullValueCounts(deleteFile.nullValueCounts()) + .setNanValueCounts(deleteFile.nanValueCounts()) + .setSplitOffsets(deleteFile.splitOffsets()) + .setKeyMetadata(deleteFile.keyMetadata()) + .setEqualityFieldIds(deleteFile.equalityFieldIds()) + .setSortOrderId(deleteFile.sortOrderId()) + .setContentOffset(deleteFile.contentOffset()) + .setContentSizeInBytes(deleteFile.contentSizeInBytes()) + .setReferencedDataFile(deleteFile.referencedDataFile()) + .setContentType(deleteFile.content()) + .setDataSequenceNumber(deleteFile.dataSequenceNumber()) + .setFileSequenceNumber(deleteFile.fileSequenceNumber()); + + if (includeMetrics) { + builder = + builder + .setLowerBounds(toByteArrayMap(deleteFile.lowerBounds())) + .setUpperBounds(toByteArrayMap(deleteFile.upperBounds())); + } + + return builder.build(); + } + + @SuppressWarnings("nullness") + public DeleteFile createDeleteFile( + Map partitionSpecs, @Nullable Map sortOrders) { + PartitionSpec partitionSpec = + checkStateNotNull( + partitionSpecs.get(getPartitionSpecId()), + "This DeleteFile was originally created with spec id '%s', " + + "but table only has spec ids: %s.", + getPartitionSpecId(), + partitionSpecs.keySet()); + + Metrics metrics = + new Metrics( + getRecordCount(), + getColumnSizes(), + getValueCounts(), + getNullValueCounts(), + getNanValueCounts(), + toByteBufferMap(getLowerBounds()), + toByteBufferMap(getUpperBounds())); + + FileMetadata.Builder deleteFileBuilder = + FileMetadata.deleteFileBuilder(partitionSpec) + .withPath(getLocation()) + .withFormat(getFileFormat()) + .withFileSizeInBytes(getFileSizeInBytes()) + .withRecordCount(getRecordCount()) + .withMetrics(metrics) + .withSplitOffsets(getSplitOffsets()) + .withEncryptionKeyMetadata(getKeyMetadata()) + .withPartitionPath(getPartitionPath()); + + switch (getContentType()) { + case POSITION_DELETES: + deleteFileBuilder = deleteFileBuilder.ofPositionDeletes(); + break; + case EQUALITY_DELETES: + int[] equalityFieldIds = + Objects.requireNonNullElse(getEqualityFieldIds(), new ArrayList()).stream() + .mapToInt(Integer::intValue) + .toArray(); + SortOrder sortOrder = SortOrder.unsorted(); + if (sortOrders != null) { + sortOrder = + checkStateNotNull( + sortOrders.get(getSortOrderId()), + "This DeleteFile was originally created with sort order id '%s', " + + "but table only has sort order ids: %s.", + getSortOrderId(), + sortOrders.keySet()); + } + deleteFileBuilder = + deleteFileBuilder.ofEqualityDeletes(equalityFieldIds).withSortOrder(sortOrder); + break; + default: + throw new IllegalStateException( + "Unexpected content type for DeleteFile: " + getContentType()); + } + + // needed for puffin files + if (getFileFormat().equalsIgnoreCase(FileFormat.PUFFIN.name())) { + deleteFileBuilder = + deleteFileBuilder + .withContentOffset(checkStateNotNull(getContentOffset())) + .withContentSizeInBytes(checkStateNotNull(getContentSizeInBytes())) + .withReferencedDataFile(checkStateNotNull(getReferencedDataFile())); + } + return deleteFileBuilder.build(); + } + + @Override + public final boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SerializableDeleteFile that = (SerializableDeleteFile) o; + return getContentType().equals(that.getContentType()) + && getLocation().equals(that.getLocation()) + && getFileFormat().equals(that.getFileFormat()) + && getRecordCount() == that.getRecordCount() + && getFileSizeInBytes() == that.getFileSizeInBytes() + && getPartitionPath().equals(that.getPartitionPath()) + && getPartitionSpecId() == that.getPartitionSpecId() + && Objects.equals(getSortOrderId(), that.getSortOrderId()) + && Objects.equals(getEqualityFieldIds(), that.getEqualityFieldIds()) + && Objects.equals(getKeyMetadata(), that.getKeyMetadata()) + && Objects.equals(getSplitOffsets(), that.getSplitOffsets()) + && Objects.equals(getColumnSizes(), that.getColumnSizes()) + && Objects.equals(getValueCounts(), that.getValueCounts()) + && Objects.equals(getNullValueCounts(), that.getNullValueCounts()) + && Objects.equals(getNanValueCounts(), that.getNanValueCounts()) + && mapEquals(getLowerBounds(), that.getLowerBounds()) + && mapEquals(getUpperBounds(), that.getUpperBounds()) + && Objects.equals(getContentOffset(), that.getContentOffset()) + && Objects.equals(getContentSizeInBytes(), that.getContentSizeInBytes()) + && Objects.equals(getReferencedDataFile(), that.getReferencedDataFile()) + && Objects.equals(getDataSequenceNumber(), that.getDataSequenceNumber()) + && Objects.equals(getFileSequenceNumber(), that.getFileSequenceNumber()); + } + + @Override + public final int hashCode() { + int hashCode = + Objects.hash( + getContentType(), + getLocation(), + getFileFormat(), + getRecordCount(), + getFileSizeInBytes(), + getPartitionPath(), + getPartitionSpecId(), + getSortOrderId(), + getEqualityFieldIds(), + getKeyMetadata(), + getSplitOffsets(), + getColumnSizes(), + getValueCounts(), + getNullValueCounts(), + getNanValueCounts(), + getContentOffset(), + getContentSizeInBytes(), + getReferencedDataFile(), + getDataSequenceNumber(), + getFileSequenceNumber()); + hashCode = 31 * hashCode + computeMapByteHashCode(getLowerBounds()); + hashCode = 31 * hashCode + computeMapByteHashCode(getUpperBounds()); + return hashCode; + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java index cb00d90f7fb3..d9d8802e2b49 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java @@ -33,7 +33,7 @@ import org.apache.iceberg.catalog.TableIdentifier; /** Utility to fetch and cache Iceberg {@link Table}s. */ -class TableCache { +public class TableCache { private static final Map CATALOG_CACHE = new ConcurrentHashMap<>(); private static final LoadingCache INTERNAL_CACHE = CacheBuilder.newBuilder() @@ -55,7 +55,7 @@ public ListenableFuture reload(String unusedIdentifier, Table table) { } });; - static Table get(String identifier) { + public static Table get(String identifier) { try { return INTERNAL_CACHE.get(identifier); } catch (ExecutionException e) { @@ -65,12 +65,12 @@ static Table get(String identifier) { } /** Forces a table refresh and returns. */ - static Table getRefreshed(String identifier) { + public static Table getRefreshed(String identifier) { INTERNAL_CACHE.refresh(identifier); return get(identifier); } - static void setup(IcebergScanConfig scanConfig) { + public static void setup(IcebergScanConfig scanConfig) { String tableIdentifier = scanConfig.getTableIdentifier(); IcebergCatalogConfig catalogConfig = scanConfig.getCatalogConfig(); if (CATALOG_CACHE.containsKey(tableIdentifier)) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogDescriptor.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogDescriptor.java new file mode 100644 index 000000000000..5f26f3b4a3c7 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogDescriptor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; + +/** Descriptor for a set of {@link SerializableChangelogTask}s. */ +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class ChangelogDescriptor { + public static Builder builder() { + return new AutoValue_ChangelogDescriptor.Builder(); + } + + public static SchemaCoder coder() { + try { + return SchemaRegistry.createDefault().getSchemaCoder(ChangelogDescriptor.class); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + + @SchemaFieldNumber("0") + abstract String getTableIdentifierString(); + + @AutoValue.Builder + public abstract static class Builder { + abstract Builder setTableIdentifierString(String table); + + abstract ChangelogDescriptor build(); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScanner.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScanner.java new file mode 100644 index 000000000000..763d19c8f9f0 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ChangelogScanner.java @@ -0,0 +1,815 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import static org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask.Type.ADDED_ROWS; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.io.iceberg.IcebergScanConfig; +import org.apache.beam.sdk.io.iceberg.SnapshotInfo; +import org.apache.beam.sdk.io.iceberg.TableCache; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.iceberg.AddedRowsScanTask; +import org.apache.iceberg.ChangelogScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataOperations; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.DeletedDataFileScanTask; +import org.apache.iceberg.DeletedRowsScanTask; +import org.apache.iceberg.IncrementalChangelogScan; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.MetricsModes; +import org.apache.iceberg.PartitionField; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.ScanTaskGroup; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SerializableTable; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Comparators; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.iceberg.util.PropertyUtil; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * DoFn that takes a list of snapshots and scans for changelogs using Iceberg's {@link + * IncrementalChangelogScan} and routes them to different downstream PCollections based on + * complexity. + * + *

The Iceberg scan generates groups of changelog scan tasks, where each task belongs to a + * specific "ordinal" (a position in the sequence of table snapshots). Task grouping depends on the + * table's split-size + * property. + * + *

This DoFn analyzes the nature of changes within each ordinal and routes them to accordingly: + * + *

    + *
  1. Unidirectional (Fast Path): If an ordinal contains only inserts OR only deletes, its + * tasks are emitted to {@link #UNIDIRECTIONAL_CHANGES}. These records bypass the CoGBK + * shuffle and are output immediately. + *
  2. Bidirectional (Slow Path): If an ordinal contains a mix of inserts and deletes, its + * tasks are emitted to {@link #BIDIRECTIONAL_CHANGES}. These records are grouped by Primary + * Key and processed by {@link ResolveChanges} to identify potential updates. + *
+ * + *

Optimizing by Shuffling Less Data

+ * + *

We take a three-layered approach to identify data that can bypass the expensive downstream + * CoGroupByKey shuffle: + * + *

Snapshots

+ * + * We start by analyzing the nature of changes at the snapshot level. If a snapshot's operation is + * not of type {@link DataOperations#OVERWRITE}, then it's a uni-directional change. + * + *

Pinned Partitions

+ * + *

If the table's partition fields are derived entirely from Primary Key fields, we assume that a + * record will not migrate between partitions. This narrows down data locality and allows us to only + * check for bi-directional changes within a partition. Doing this will allow partitions with + * uni-directional changes to bypass the expensive CoGBK shuffle. + * + *

Optimization for Individual Files

+ * + * When we have narrowed down our group of tasks with bi-directional changes, we start analyzing the + * metadata of their underlying files. We compare the upper and lower bounds of Partition Keys + * relevant to each file, and consider any overlaps as potentially containing an update. If a given + * task's bounds of inserted Partition Keys has no overlap with any other task's bounds of deleted + * Partition Keys, then we can safely let that task bypass the shuffle, as it would be impossible to + * create an (insert, delete) pair with it. + * + *

For example, say we have a group of tasks: + * + *

    + *
  1. Task A (adds rows): bounds [3, 8] + *
  2. Task B (adds rows): bounds [2, 4] + *
  3. Task C (deletes rows): bounds [1, 5] + *
  4. Task D (adds rows): bounds [6, 12] + *
+ * + *

Tasks A and B add rows, and overlap with Task C which deletes row. We need to shuffle the rows + * in these 3 tasks because there may be (insert, delete) pairs that lead to an update. + * + *

Task D however, does not overlap with any delete rows. It will never produce an (insert, + * delete) pair, so we don't need to shuffle it's output rows. + */ +public class ChangelogScanner + extends DoFn< + KV>, KV>> { + private static final Logger LOG = LoggerFactory.getLogger(ChangelogScanner.class); + private static final Counter totalChangelogScanTasks = + Metrics.counter(ChangelogScanner.class, "totalChangelogScanTasks"); + private static final Counter numAddedRowsScanTasks = + Metrics.counter(ChangelogScanner.class, "numAddedRowsScanTasks"); + private static final Counter numDeletedRowsScanTasks = + Metrics.counter(ChangelogScanner.class, "numDeletedRowsScanTasks"); + private static final Counter numDeletedDataFileScanTasks = + Metrics.counter(ChangelogScanner.class, "numDeletedDataFileScanTasks"); + private static final Counter numUniDirectionalTasks = + Metrics.counter(ChangelogScanner.class, "numUniDirectionalTasks"); + private static final Counter numBiDirectionalTasks = + Metrics.counter(ChangelogScanner.class, "numBiDirectionalTasks"); + public static final TupleTag>> + UNIDIRECTIONAL_CHANGES = new TupleTag<>(); + public static final TupleTag>> + BIDIRECTIONAL_CHANGES = new TupleTag<>(); + public static final KvCoder> OUTPUT_CODER = + KvCoder.of(ChangelogDescriptor.coder(), ListCoder.of(SerializableChangelogTask.coder())); + private final IcebergScanConfig scanConfig; + private Map snapshotMap = new HashMap<>(); + + ChangelogScanner(IcebergScanConfig scanConfig) { + this.scanConfig = scanConfig; + TableCache.setup(scanConfig); + } + + @StartBundle + public void start() { + snapshotMap = new HashMap<>(); + } + + @ProcessElement + public void process(@Element KV> element, MultiOutputReceiver out) + throws IOException { + Table table = TableCache.getRefreshed(scanConfig.getTableIdentifier()); + + List snapshots = element.getValue(); + SnapshotInfo startSnapshot = snapshots.get(0); + SnapshotInfo endSnapshot = snapshots.get(snapshots.size() - 1); + @Nullable Long fromSnapshotId = startSnapshot.getParentId(); + long toSnapshot = endSnapshot.getSnapshotId(); + + IncrementalChangelogScan scan = + table + .newIncrementalChangelogScan() + .toSnapshot(toSnapshot) + .project(scanConfig.getProjectedSchema()); + + // configure the scan to store upper/lower bound metrics only + // if it's available for primary key fields + boolean metricsAvailable = true; + MetricsConfig metricsConfig = MetricsConfig.forTable(table); + Collection pkFields = table.schema().identifierFieldNames(); + for (String field : pkFields) { + MetricsModes.MetricsMode mode = metricsConfig.columnMode(field); + if (!(mode instanceof MetricsModes.Full) && !(mode instanceof MetricsModes.Truncate)) { + metricsAvailable = false; + break; + } + } + if (metricsAvailable) { + scan = scan.includeColumnStats(pkFields); + } + if (fromSnapshotId != null) { + scan = scan.fromSnapshotExclusive(fromSnapshotId); + } + @Nullable Expression filter = scanConfig.getFilter(); + if (filter != null) { + scan = scan.filter(filter); + } + LOG.info("Planning to scan snapshot range [{}, {}]", fromSnapshotId, toSnapshot); + + snapshots.forEach(s -> snapshotMap.put(s.getSnapshotId(), s)); + + createAndOutputReadTasks( + element.getKey(), + element.getValue(), + scan, + startSnapshot, + endSnapshot, + SerializableTable.copyOf(table), + out); + } + + private void gatherPartitionData( + IncrementalChangelogScan scan, + Map>> + changeTypesPerPartitionPerSnapshot, + Set pinnedSpecs, + Set snapshotsWithUnpinnedSpecs) + throws IOException { + try (CloseableIterable> groups = scan.planTasks()) { + for (ScanTaskGroup group : groups) { + for (ChangelogScanTask task : group.tasks()) { + long snapshotId = task.commitSnapshotId(); + int specId = getSpecId(task); + + if (!pinnedSpecs.contains(specId)) { + snapshotsWithUnpinnedSpecs.add(snapshotId); + continue; + } + + SerializableChangelogTask.Type type = SerializableChangelogTask.getType(task); + StructLike partition = getPartition(task); + + changeTypesPerPartitionPerSnapshot + .computeIfAbsent(snapshotId, (id) -> new HashMap<>()) + .computeIfAbsent(partition, (p) -> new HashSet<>()) + .add(type); + } + } + } + } + + private void createAndOutputReadTasks( + String tableIdentifier, + List snapshots, + IncrementalChangelogScan scan, + SnapshotInfo startSnapshot, + SnapshotInfo endSnapshot, + Table table, + MultiOutputReceiver multiOutputReceiver) + throws IOException { + int numAddedRowsTasks = 0; + int numDeletedRowsTasks = 0; + int numDeletedFileTasks = 0; + + // ******** Partition Optimization ******** + // First pass over the scan to get a full picture of the nature of changes per partition, per + // snapshot. + // if partition fields are sourced entirely from a record's PK, that record will + // always be pinned to that partition (so long as the spec doesn't change). + // we can optimize the scan by only shuffling bi-directional changes *within* a partition. + // this is safe to do because we can assume no cross-partition changes will occur + Set pinnedSpecs = + table.specs().entrySet().stream() + .filter(e -> isRowPinnedToPartition(e.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + // optimization cannot apply to snapshots that have any file using an unpinned spec + Set snapshotsWithUnpinnedSpecs = new HashSet<>(); + Map>> + changeTypesPerPartitionPerSnapshot = new HashMap<>(); + // Don't bother scanning if the table never had a spec where records are pinned to their + // partitions. + if (!pinnedSpecs.isEmpty()) { + gatherPartitionData( + scan, changeTypesPerPartitionPerSnapshot, pinnedSpecs, snapshotsWithUnpinnedSpecs); + } + + // Second pass to route uni-directional tasks downstream, and buffer bi-directional tasks for + // further processing + Schema schema = table.schema(); + Schema recIdSchema = TypeUtil.select(schema, schema.identifierFieldIds()); + Comparator idComp = Comparators.forType(recIdSchema.asStruct()); + + // Best effort maintain the same scan task groupings produced by Iceberg's binpacking, for + // better work load distribution among readers. + // This allows the user to control load per worker by tuning `read.split.target-size`: + // https://iceberg.apache.org/docs/latest/configuration/#read-properties + Map>> potentiallyBidirectionalTasks = + new HashMap<>(); + long splitSize = + PropertyUtil.propertyAsLong( + table.properties(), TableProperties.SPLIT_SIZE, TableProperties.SPLIT_SIZE_DEFAULT); + + TaskBatcher batcher = + new TaskBatcher( + tableIdentifier, splitSize, multiOutputReceiver.get(UNIDIRECTIONAL_CHANGES)); + + int numUniDirTasks = 0; + int numBiDirTasks = 0; + + try (CloseableIterable> scanTaskGroups = scan.planTasks()) { + for (ScanTaskGroup scanTaskGroup : scanTaskGroups) { + for (ChangelogScanTask task : scanTaskGroup.tasks()) { + long snapshotId = task.commitSnapshotId(); + String snapshotOperation = checkStateNotNull(snapshotMap.get(snapshotId)).getOperation(); + SerializableChangelogTask.Type type = SerializableChangelogTask.getType(task); + // gather metrics + switch (type) { + case ADDED_ROWS: + numAddedRowsTasks++; + break; + case DELETED_ROWS: + numDeletedRowsTasks++; + break; + case DELETED_FILE: + numDeletedFileTasks++; + break; + } + + StructLike partition = getPartition(task); + + // potentially bi-directional if it's an overwrite snapshot AND: + // 1. we are not dealing with a spec where rows are pinned to partitions, OR + // 2. rows are pinned to partitions, but the changes in this partition are still + // bi-directional + // in such a case, we buffer the task for more thorough analysis below + if (DataOperations.OVERWRITE.equals(snapshotOperation) + && (snapshotsWithUnpinnedSpecs.contains(snapshotId) + || containsBiDirectionalChanges( + checkStateNotNull( + checkStateNotNull(changeTypesPerPartitionPerSnapshot.get(snapshotId)) + .get(partition))))) { + // TODO: remove debug printing + System.out.printf("\tUnidirectional task with partition '%s':\n", partition); + System.out.printf( + "\t\t(%s) DF: %s\n", + task.getClass().getSimpleName(), name(getDataFile(task).location())); + for (DeleteFile delf : getDeleteFiles(task)) { + System.out.println("\t\t\tAdded DelF: " + name(delf.location())); + } + potentiallyBidirectionalTasks + .computeIfAbsent(snapshotId, (id) -> new HashMap<>()) + .computeIfAbsent(partition, (p) -> new ArrayList<>()) + .add(task); + continue; + } + + // unidirectional. put into batches of appropriate split size and flush to downstream + // route, bypassing the shuffle + batcher.add(task); + numUniDirTasks++; + } + } + // we won't see these snapshots again, so flush what we have before processing the more + // complex case below + batcher.flush(); + } + + // pass over the buffered bi-directional files and analyze further + for (Map.Entry>> tasksInSnapshot : + potentiallyBidirectionalTasks.entrySet()) { + long snapshotId = tasksInSnapshot.getKey(); + SnapshotBuffer uniBuffer = + new SnapshotBuffer( + tableIdentifier, + snapshotId, + splitSize, + multiOutputReceiver.get(UNIDIRECTIONAL_CHANGES)); + SnapshotBuffer biBuffer = + new SnapshotBuffer( + tableIdentifier, + snapshotId, + splitSize, + multiOutputReceiver.get(BIDIRECTIONAL_CHANGES)); + + if (snapshotsWithUnpinnedSpecs.contains(snapshotId)) { + // Records are not pinned to partition + // We need to compare the underlying files in the whole snapshot + List tasks = new ArrayList<>(); + tasksInSnapshot.getValue().values().forEach(tasks::addAll); + Pair, List> uniBirTasks = + analyzeFiles(tasks, schema, idComp); + + uniBirTasks.first().forEach(uniBuffer::add); + uniBirTasks.second().forEach(biBuffer::add); + + // metrics + numUniDirTasks += uniBirTasks.first().size(); + + numBiDirTasks += uniBirTasks.second().size(); + System.out.println("\t\tUnpinned spec:"); + for (ChangelogScanTask task : tasks) { + System.out.printf( + "\t\t\t(%s) DF: %s\n", + task.getClass().getSimpleName(), name(getDataFile(task).location())); + for (DeleteFile delf : getDeleteFiles(task)) { + System.out.println("\t\t\tAdded DelF: " + name(delf.location())); + } + } + } else { + // Records are pinned to partition + // We can narrow down by only comparing the underlying files within each partition + for (Map.Entry> tasksInPartition : + tasksInSnapshot.getValue().entrySet()) { + Pair, List> uniBirTasks = + analyzeFiles(tasksInPartition.getValue(), schema, idComp); + + uniBirTasks.first().forEach(uniBuffer::add); + uniBirTasks.second().forEach(biBuffer::add); + + // metrics + numUniDirTasks += uniBirTasks.first().size(); + numBiDirTasks += uniBirTasks.second().size(); + + // TODO: remove debug printing + System.out.printf("\t\tPartition '%s' bidirectional:\n", tasksInPartition.getKey()); + for (ChangelogScanTask task : tasksInPartition.getValue()) { + System.out.printf( + "\t\t\t(%s) DF: %s\n", + task.getClass().getSimpleName(), name(getDataFile(task).location())); + for (DeleteFile delf : getDeleteFiles(task)) { + System.out.println("\t\t\tAdded DelF: " + name(delf.location())); + } + } + } + } + + // we won't see this snapshot again, so flush what we have and continue processing the next + // snapshot's metadata + uniBuffer.flush(); + biBuffer.flush(); + } + + int totalTasks = numAddedRowsTasks + numDeletedRowsTasks + numDeletedFileTasks; + totalChangelogScanTasks.inc(totalTasks); + numAddedRowsScanTasks.inc(numAddedRowsTasks); + numDeletedRowsScanTasks.inc(numDeletedRowsTasks); + numDeletedDataFileScanTasks.inc(numDeletedFileTasks); + numUniDirectionalTasks.inc(numUniDirTasks); + numBiDirectionalTasks.inc(numBiDirTasks); + + LOG.info( + "Snapshots [{}, {}] produced {} tasks:\n\t{} AddedRowsScanTasks\n\t{} DeletedRowsScanTasks\n\t{} DeletedDataFileScanTasks\n" + + "Observed {} uni-directional tasks and {} bi-directional tasks.", + startSnapshot.getSnapshotId(), + endSnapshot.getSnapshotId(), + totalTasks, + numAddedRowsTasks, + numDeletedRowsTasks, + numDeletedFileTasks, + numUniDirTasks, + numBiDirTasks); + } + + /** Checks if a set of change types include both inserts and deletes. */ + private static boolean containsBiDirectionalChanges( + Set changeTypes) { + return changeTypes.contains(ADDED_ROWS) && changeTypes.size() > 1; + } + + /** + * Analyzes all tasks in the given list by comparing the bounds of each task's underlying files. + * If a task's partition key bounds overlap with an opposing task's partition key bounds, they are + * both considered bi-directional changes. If a task's bounds do not overlap with any opposing + * task's bounds, it is considered a uni-directional change. + * + *

Note: "opposing" refers to a change that happens in the opposite direction (e.g. insert is + * "positive", delete is "negative") + */ + private Pair, List> analyzeFiles( + List tasks, Schema schema, Comparator idComp) { + // separate insert and delete tasks + List insertTasks = new ArrayList<>(); + List deleteTasks = new ArrayList<>(); + try { + for (ChangelogScanTask task : tasks) { + if (task instanceof AddedRowsScanTask) { + insertTasks.add(TaskAndBounds.of(task, schema, idComp)); + } else { + deleteTasks.add(TaskAndBounds.of(task, schema, idComp)); + } + } + } catch (TaskAndBounds.NoBoundMetricsException e) { + // if metrics are not fully available, we need to play it safe and shuffle all the tasks. + return Pair.of(Collections.emptyList(), tasks); + } + + // check for any overlapping delete and insert tasks + for (TaskAndBounds insertTask : insertTasks) { + for (TaskAndBounds deleteTask : deleteTasks) { + deleteTask.checkOverlapWith(insertTask, idComp); + } + } + + // collect results and return. + // overlapping tasks are bidirectional. + // otherwise they are unidirectional. + List unidirectional = new ArrayList<>(); + List bidirectional = new ArrayList<>(); + + for (List boundsList : Arrays.asList(deleteTasks, insertTasks)) { + for (TaskAndBounds taskAndBounds : boundsList) { + String msg = ""; + if (taskAndBounds.overlaps) { + msg += + String.format( + "overlapping task: (%s, %s)", + taskAndBounds.task.commitSnapshotId(), + taskAndBounds.task.getClass().getSimpleName()); + bidirectional.add(taskAndBounds.task); + } else { + unidirectional.add(taskAndBounds.task); + msg += + String.format( + "NON-overlapping task: (%s, %s)", + taskAndBounds.task.commitSnapshotId(), + taskAndBounds.task.getClass().getSimpleName()); + } + msg += "\n\tDF: " + name(getDataFile(taskAndBounds.task).location()); + msg += "\n\t\tlower: " + taskAndBounds.lowerId + ", upper: " + taskAndBounds.upperId; + if (!getDeleteFiles(taskAndBounds.task).isEmpty()) { + for (DeleteFile df : getDeleteFiles(taskAndBounds.task)) { + msg += "\n\tAdded DelF: " + name(df.location()); + msg += "\n\t\tlower: " + taskAndBounds.lowerId + ", upper: " + taskAndBounds.upperId; + } + } + System.out.println(msg); + } + } + return Pair.of(unidirectional, bidirectional); + } + + /** + * Wraps the {@link ChangelogScanTask}, and stores its lower and upper Primary Keys. Identifies + * overlaps with other tasks by comparing lower and upper keys using Iceberg libraries. + */ + static class TaskAndBounds { + ChangelogScanTask task; + StructLike lowerId; + StructLike upperId; + boolean overlaps = false; + + private TaskAndBounds(ChangelogScanTask task, StructLike lowerId, StructLike upperId) { + this.task = task; + this.lowerId = lowerId; + this.upperId = upperId; + } + + static TaskAndBounds of(ChangelogScanTask task, Schema schema, Comparator idComp) + throws NoBoundMetricsException { + Schema recIdSchema = TypeUtil.select(schema, schema.identifierFieldIds()); + GenericRecord wrapper = GenericRecord.create(recIdSchema); + @MonotonicNonNull GenericRecord lowerId = null; + @MonotonicNonNull GenericRecord upperId = null; + + if (task instanceof AddedRowsScanTask || task instanceof DeletedDataFileScanTask) { + // just store the bounds of the DataFile + DataFile df = getDataFile(task); + @Nullable Map lowerBounds = df.lowerBounds(); + @Nullable Map upperBounds = df.upperBounds(); + if (lowerBounds == null || upperBounds == null) { + throw new NoBoundMetricsException( + String.format( + "Upper and/or lower bounds are missing for %s with DataFile: %s.", + task.getClass().getSimpleName(), name(df.location()))); + } + + lowerId = fillValues(wrapper, schema, lowerBounds); + upperId = fillValues(wrapper, schema, upperBounds); + } else if (task instanceof DeletedRowsScanTask) { + // iterate over all added DeleteFiles and keep track of only the + // minimum and maximum bounds over the list + for (DeleteFile deleteFile : getDeleteFiles(task)) { + @Nullable Map lowerDelBounds = deleteFile.lowerBounds(); + @Nullable Map upperDelBounds = deleteFile.upperBounds(); + if (lowerDelBounds == null || upperDelBounds == null) { + throw new NoBoundMetricsException( + String.format( + "Upper and/or lower bounds are missing for %s with " + + "DataFile '%s' and DeleteFile '%s'", + task.getClass().getSimpleName(), + name(getDataFile(task).location()), + name(deleteFile.location()))); + } + + GenericRecord delFileLower = fillValues(wrapper, schema, lowerDelBounds); + GenericRecord delFileUpper = fillValues(wrapper, schema, upperDelBounds); + + if (lowerId == null || idComp.compare(delFileLower, lowerId) < 0) { + lowerId = delFileLower; + } + if (upperId == null || idComp.compare(delFileUpper, upperId) > 0) { + upperId = delFileUpper; + } + } + } else { + throw new UnsupportedOperationException( + "Unsupported task type: " + task.getClass().getSimpleName()); + } + + if (lowerId == null || upperId == null) { + throw new NoBoundMetricsException( + String.format( + "Could not compute min and/or max bounds for %s with DataFile: %s", + task.getClass().getSimpleName(), getDataFile(task).location())); + } + return new TaskAndBounds(task, lowerId, upperId); + } + + /** + * Compares itself with another task. If the bounds overlap, sets {@link #overlaps} to true for + * both tasks. + */ + private void checkOverlapWith(TaskAndBounds other, Comparator idComp) { + if (overlaps && other.overlaps) { + return; + } + + int left = idComp.compare(lowerId, other.upperId); + int right = idComp.compare(other.lowerId, upperId); + + if (left <= 0 && right <= 0) { + overlaps = true; + other.overlaps = true; + } + } + + private static GenericRecord fillValues( + GenericRecord wrapper, Schema schema, Map bounds) + throws NoBoundMetricsException { + for (Types.NestedField field : schema.columns()) { + int idx = field.fieldId(); + Type type = field.type(); + String name = field.name(); + if (!schema.identifierFieldIds().contains(idx)) { + continue; + } + @Nullable ByteBuffer value = bounds.get(idx); + if (value == null) { + throw new NoBoundMetricsException("Could not fetch metric value for column: " + name); + } + Object data = checkStateNotNull(Conversions.fromByteBuffer(type, value)); + wrapper.setField(name, data); + } + return wrapper.copy(); + } + + static class NoBoundMetricsException extends NullPointerException { + public NoBoundMetricsException(String msg) { + super(msg); + } + } + } + + /** Checks if all partition fields are derived from record identifier fields. */ + private static boolean isRowPinnedToPartition(PartitionSpec spec) { + Set identifierFieldsIds = spec.schema().identifierFieldIds(); + if (spec.isUnpartitioned() || identifierFieldsIds.isEmpty()) { + return false; + } + + for (PartitionField field : spec.fields()) { + if (!identifierFieldsIds.contains(field.sourceId())) { + return false; + } + } + + return true; + } + + class TaskBatcher { + Map taskBuffers = new HashMap<>(); + final long maxSplitSize; + final String tableIdentifier; + final OutputReceiver>> output; + + TaskBatcher( + String tableIdentifier, + long maxSplitSize, + OutputReceiver>> output) { + this.tableIdentifier = tableIdentifier; + this.maxSplitSize = maxSplitSize; + this.output = output; + } + + void add(ChangelogScanTask task) { + taskBuffers + .computeIfAbsent( + task.commitSnapshotId(), + (id) -> new SnapshotBuffer(tableIdentifier, id, maxSplitSize, output)) + .add(task); + } + + void flush() { + taskBuffers.values().forEach(SnapshotBuffer::flush); + } + } + + class SnapshotBuffer { + List tasks = new ArrayList<>(); + long byteSize = 0L; + final long maxSplitSize; + final String tableIdentifier; + final long snapshotId; + final OutputReceiver>> output; + + SnapshotBuffer( + String tableIdentifier, + long snapshotId, + long maxSplitSize, + OutputReceiver>> output) { + this.tableIdentifier = tableIdentifier; + this.snapshotId = snapshotId; + this.maxSplitSize = maxSplitSize; + this.output = output; + } + + boolean canTake(ChangelogScanTask task) { + return byteSize + task.sizeBytes() <= maxSplitSize; + } + + void add(ChangelogScanTask task) { + if (!canTake(task)) { + flush(); + } + byteSize += task.sizeBytes(); + tasks.add(task); + } + + void flush() { + ChangelogDescriptor descriptor = + ChangelogDescriptor.builder().setTableIdentifierString(tableIdentifier).build(); + Instant timestamp = + Instant.ofEpochMilli(checkStateNotNull(snapshotMap.get(snapshotId)).getTimestampMillis()); + + List serializableTasks = + tasks.stream().map(SerializableChangelogTask::from).collect(Collectors.toList()); + + output.outputWithTimestamp(KV.of(descriptor, serializableTasks), timestamp); + + byteSize = 0; + tasks = new ArrayList<>(); + } + } + + // TODO: remove + private static DataFile getDataFile(ChangelogScanTask task) { + if (task instanceof AddedRowsScanTask) { + return ((AddedRowsScanTask) task).file(); + } else if (task instanceof DeletedRowsScanTask) { + return ((DeletedRowsScanTask) task).file(); + } else if (task instanceof DeletedDataFileScanTask) { + return ((DeletedDataFileScanTask) task).file(); + } + throw new IllegalStateException("Unknown task type: " + task.getClass()); + } + + private static List getDeleteFiles(ChangelogScanTask task) { + if (task instanceof AddedRowsScanTask) { + return Collections.emptyList(); + } else if (task instanceof DeletedRowsScanTask) { + return ((DeletedRowsScanTask) task).addedDeletes(); + } else if (task instanceof DeletedDataFileScanTask) { + return Collections.emptyList(); + } + throw new IllegalStateException("Unknown task type: " + task.getClass()); + } + + private static StructLike getPartition(ChangelogScanTask task) { + if (task instanceof AddedRowsScanTask) { + return ((AddedRowsScanTask) task).partition(); + } else if (task instanceof DeletedRowsScanTask) { + return ((DeletedRowsScanTask) task).partition(); + } else if (task instanceof DeletedDataFileScanTask) { + return ((DeletedDataFileScanTask) task).partition(); + } + throw new IllegalStateException("Unknown task type: " + task.getClass()); + } + + private static int getSpecId(ChangelogScanTask task) { + if (task instanceof AddedRowsScanTask) { + return ((AddedRowsScanTask) task).spec().specId(); + } else if (task instanceof DeletedRowsScanTask) { + return ((DeletedRowsScanTask) task).spec().specId(); + } else if (task instanceof DeletedDataFileScanTask) { + return ((DeletedDataFileScanTask) task).spec().specId(); + } + throw new IllegalStateException("Unknown task type: " + task.getClass()); + } + + static String name(String path) { + return Iterables.getLast(Splitter.on("-").split(path)); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/DeleteReader.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/DeleteReader.java new file mode 100644 index 000000000000..e85bac6136a3 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/DeleteReader.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimaps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.apache.iceberg.Accessor; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.BaseDeleteLoader; +import org.apache.iceberg.data.DeleteLoader; +import org.apache.iceberg.deletes.PositionDeleteIndex; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructLikeSet; +import org.apache.iceberg.util.StructProjection; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reads a {@link org.apache.iceberg.DataFile} and returns records marked deleted by the given + * {@link DeleteFile}s. + * + *

This is mostly a copy of {@link org.apache.iceberg.data.DeleteFilter}, but flipping the logic + * to output deleted records instead of filtering them out. + */ +public abstract class DeleteReader { + private static final Logger LOG = LoggerFactory.getLogger(DeleteReader.class); + + private final String filePath; + private final List posDeletes; + private final List eqDeletes; + private final Schema requiredSchema; + private final Accessor posAccessor; + private volatile @Nullable DeleteLoader deleteLoader = null; + private @Nullable PositionDeleteIndex deleteRowPositions = null; + private @Nullable List> isInDeleteSets = null; + + protected DeleteReader( + String filePath, + List deletes, + Schema tableSchema, + Schema expectedSchema, + boolean needRowPosCol) { + this.filePath = filePath; + + ImmutableList.Builder posDeleteBuilder = ImmutableList.builder(); + ImmutableList.Builder eqDeleteBuilder = ImmutableList.builder(); + for (DeleteFile delete : deletes) { + switch (delete.content()) { + case POSITION_DELETES: + LOG.debug("Adding position delete file {} to reader", delete.location()); + posDeleteBuilder.add(delete); + break; + case EQUALITY_DELETES: + LOG.debug("Adding equality delete file {} to reader", delete.location()); + eqDeleteBuilder.add(delete); + break; + default: + throw new UnsupportedOperationException( + "Unknown delete file content: " + delete.content()); + } + } + + this.posDeletes = posDeleteBuilder.build(); + this.eqDeletes = eqDeleteBuilder.build(); + this.requiredSchema = + fileProjection(tableSchema, expectedSchema, posDeletes, eqDeletes, needRowPosCol); + this.posAccessor = requiredSchema.accessorForField(MetadataColumns.ROW_POSITION.fieldId()); + } + + protected DeleteReader( + String filePath, List deletes, Schema tableSchema, Schema requestedSchema) { + this(filePath, deletes, tableSchema, requestedSchema, true); + } + + public Schema requiredSchema() { + return requiredSchema; + } + + protected abstract StructLike asStructLike(T record); + + protected abstract InputFile getInputFile(String location); + + protected InputFile loadInputFile(DeleteFile deleteFile) { + return getInputFile(deleteFile.location()); + } + + protected long pos(T record) { + return (Long) posAccessor.get(asStructLike(record)); + } + + protected DeleteLoader newDeleteLoader() { + return new BaseDeleteLoader(this::loadInputFile); + } + + private DeleteLoader deleteLoader() { + if (deleteLoader == null) { + synchronized (this) { + if (deleteLoader == null) { + this.deleteLoader = newDeleteLoader(); + } + } + } + + return deleteLoader; + } + + public CloseableIterable read(CloseableIterable records) { + return applyEqDeletes(applyPosDeletes(records)); + } + + private List> applyEqDeletes() { + if (isInDeleteSets != null) { + return isInDeleteSets; + } + + isInDeleteSets = Lists.newArrayList(); + if (eqDeletes.isEmpty()) { + return isInDeleteSets; + } + + Multimap, DeleteFile> filesByDeleteIds = + Multimaps.newMultimap(Maps.newHashMap(), Lists::newArrayList); + for (DeleteFile delete : eqDeletes) { + filesByDeleteIds.put(Sets.newHashSet(delete.equalityFieldIds()), delete); + } + + for (Map.Entry, Collection> entry : + filesByDeleteIds.asMap().entrySet()) { + Set ids = entry.getKey(); + Iterable deletes = entry.getValue(); + + Schema deleteSchema = TypeUtil.select(requiredSchema, ids); + + // a projection to select and reorder fields of the file schema to match the delete rows + StructProjection projectRow = StructProjection.create(requiredSchema, deleteSchema); + + StructLikeSet deleteSet = deleteLoader().loadEqualityDeletes(deletes, deleteSchema); + Predicate isInDeleteSet = + record -> deleteSet.contains(projectRow.wrap(asStructLike(record))); + checkStateNotNull(isInDeleteSets).add(isInDeleteSet); + } + + return checkStateNotNull(isInDeleteSets); + } + + private CloseableIterable applyEqDeletes(CloseableIterable records) { + Predicate isEqDeleted = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false); + + return CloseableIterable.filter(records, isEqDeleted); + } + + public PositionDeleteIndex deletedRowPositions() { + if (deleteRowPositions == null && !posDeletes.isEmpty()) { + deleteRowPositions = deleteLoader().loadPositionDeletes(posDeletes, filePath); + } + + return checkStateNotNull(deleteRowPositions); + } + + private CloseableIterable applyPosDeletes(CloseableIterable records) { + if (posDeletes.isEmpty()) { + return records; + } + + PositionDeleteIndex positionIndex = deletedRowPositions(); + Predicate isDeleted = record -> positionIndex.isDeleted(pos(record)); + return CloseableIterable.filter(records, isDeleted); + } + + private static Schema fileProjection( + Schema tableSchema, + Schema requestedSchema, + List posDeletes, + List eqDeletes, + boolean needRowPosCol) { + if (posDeletes.isEmpty() && eqDeletes.isEmpty()) { + return requestedSchema; + } + + Set requiredIds = Sets.newLinkedHashSet(); + if (needRowPosCol && !posDeletes.isEmpty()) { + requiredIds.add(MetadataColumns.ROW_POSITION.fieldId()); + } + + for (DeleteFile eqDelete : eqDeletes) { + requiredIds.addAll(eqDelete.equalityFieldIds()); + } + + Set missingIds = + Sets.newLinkedHashSet( + Sets.difference(requiredIds, TypeUtil.getProjectedIds(requestedSchema))); + + if (missingIds.isEmpty()) { + return requestedSchema; + } + + // TODO: support adding nested columns. this will currently fail when finding nested columns to + // add + List columns = Lists.newArrayList(requestedSchema.columns()); + for (int fieldId : missingIds) { + if (fieldId == MetadataColumns.ROW_POSITION.fieldId() + || fieldId == MetadataColumns.IS_DELETED.fieldId()) { + continue; // add _pos and _deleted at the end + } + + Types.NestedField field = tableSchema.asStruct().field(fieldId); + Preconditions.checkArgument(field != null, "Cannot find required field for ID %s", fieldId); + + columns.add(field); + } + + if (missingIds.contains(MetadataColumns.ROW_POSITION.fieldId())) { + columns.add(MetadataColumns.ROW_POSITION); + } + + if (missingIds.contains(MetadataColumns.IS_DELETED.fieldId())) { + columns.add(MetadataColumns.IS_DELETED); + } + + return new Schema(columns); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/IncrementalChangelogSource.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/IncrementalChangelogSource.java new file mode 100644 index 000000000000..32e4c601a46d --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/IncrementalChangelogSource.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import static org.apache.beam.sdk.io.iceberg.cdc.ChangelogScanner.BIDIRECTIONAL_CHANGES; +import static org.apache.beam.sdk.io.iceberg.cdc.ChangelogScanner.UNIDIRECTIONAL_CHANGES; +import static org.apache.beam.sdk.io.iceberg.cdc.ResolveChanges.DELETES; +import static org.apache.beam.sdk.io.iceberg.cdc.ResolveChanges.INSERTS; + +import java.util.List; +import org.apache.beam.sdk.io.iceberg.IcebergScanConfig; +import org.apache.beam.sdk.io.iceberg.IcebergUtils; +import org.apache.beam.sdk.io.iceberg.IncrementalScanSource; +import org.apache.beam.sdk.io.iceberg.SnapshotInfo; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.transforms.join.CoGroupByKey; +import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; + +/** + * An Iceberg source that incrementally reads a table's changelogs using range(s) of table + * snapshots. The bounded source creates a single range, while the unbounded implementation + * continuously polls for new snapshots at the specified interval. + */ +public class IncrementalChangelogSource extends IncrementalScanSource { + public IncrementalChangelogSource(IcebergScanConfig scanConfig) { + super(scanConfig); + } + + @Override + public PCollection expand(PBegin input) { + Table table = + scanConfig + .getCatalogConfig() + .catalog() + .loadTable(TableIdentifier.parse(scanConfig.getTableIdentifier())); + + PCollection>> snapshots = + MoreObjects.firstNonNull(scanConfig.getStreaming(), false) + ? unboundedSnapshots(input) + : boundedSnapshots(input, table); + + // scan each interval of snapshots and create groups of changelog tasks + PCollectionTuple changelogTasks = + snapshots + .apply(Redistribute.byKey()) + .apply( + "Create Changelog Tasks", + ParDo.of(new ChangelogScanner(scanConfig)) + .withOutputTags( + UNIDIRECTIONAL_CHANGES, TupleTagList.of(BIDIRECTIONAL_CHANGES))); + changelogTasks.get(UNIDIRECTIONAL_CHANGES).setCoder(ChangelogScanner.OUTPUT_CODER); + changelogTasks.get(BIDIRECTIONAL_CHANGES).setCoder(ChangelogScanner.OUTPUT_CODER); + + // process changelog tasks and output rows + ReadFromChangelogs.CdcOutput outputRows = + changelogTasks.apply(new ReadFromChangelogs(scanConfig)); + + // compare bi-directional rows to identify potential updates + PCollection biDirectionalCdcRows = + KeyedPCollectionTuple.of(INSERTS, outputRows.keyedInserts()) + .and(DELETES, outputRows.keyedDeletes()) + .apply("CoGroupBy Primary Key", CoGroupByKey.create()) + .apply("Resolve Delete-Insert Pairs", ParDo.of(new ResolveChanges())) + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema())); + + // Merge uni-directional and bi-directional outputs + return PCollectionList.of(outputRows.uniDirectionalRows()) + .and(biDirectionalCdcRows) + .apply(Flatten.pCollections()); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ReadFromChangelogs.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ReadFromChangelogs.java new file mode 100644 index 000000000000..68557bf21605 --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ReadFromChangelogs.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import static org.apache.beam.sdk.io.iceberg.IcebergUtils.icebergSchemaToBeamSchema; +import static org.apache.beam.sdk.io.iceberg.cdc.ChangelogScanner.BIDIRECTIONAL_CHANGES; +import static org.apache.beam.sdk.io.iceberg.cdc.ChangelogScanner.UNIDIRECTIONAL_CHANGES; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.io.iceberg.IcebergScanConfig; +import org.apache.beam.sdk.io.iceberg.IcebergUtils; +import org.apache.beam.sdk.io.iceberg.ReadUtils; +import org.apache.beam.sdk.io.iceberg.SerializableDeleteFile; +import org.apache.beam.sdk.io.iceberg.TableCache; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.transforms.Reify; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.StructProjection; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +/** + * A {@link PTransform} that processed {@link org.apache.iceberg.ChangelogScanTask}s. They come in + * three types: + * + *

    + *
  1. AddedRowsScanTask: Indicates records have been inserted via a new DataFile. + *
  2. DeletedRowsScanTask: Indicates records have been deleted via a Position DeleteFile + * or Equality DeleteFile. + *
  3. DeletedDataFileScanTask: Indicates a whole DataFile has been deleted. + *
+ * + * Each of these ChangelogScanTasks need to be processed differently. More details in the + * corresponding methods: + * + *
    + *
  1. {@link ReadDoFn#processAddedRowsTask(SerializableChangelogTask, Table, + * DoFn.MultiOutputReceiver)} + *
  2. {@link ReadDoFn#processDeletedRowsTask(SerializableChangelogTask, Table, + * DoFn.MultiOutputReceiver)} + *
  3. {@link ReadDoFn#processDeletedFileTask(SerializableChangelogTask, Table, + * DoFn.MultiOutputReceiver)} + *
+ */ +class ReadFromChangelogs extends PTransform { + private static final Counter numAddedRowsScanTasksCompleted = + Metrics.counter(ReadFromChangelogs.class, "numAddedRowsScanTasksCompleted"); + private static final Counter numDeletedRowsScanTasksCompleted = + Metrics.counter(ReadFromChangelogs.class, "numDeletedRowsScanTasksCompleted"); + private static final Counter numDeletedDataFileScanTasksCompleted = + Metrics.counter(ReadFromChangelogs.class, "numDeletedDataFileScanTasksCompleted"); + + private static final TupleTag UNIDIRECTIONAL_ROWS = new TupleTag<>(); + private static final TupleTag> KEYED_INSERTS = new TupleTag<>(); + private static final TupleTag> KEYED_DELETES = new TupleTag<>(); + + private final IcebergScanConfig scanConfig; + private final Schema rowAndSnapshotIDBeamSchema; + // TODO: Any better way of doing this? + private static final String SNAPSHOT_FIELD = "__beam__changelog__snapshot__id__"; + + ReadFromChangelogs(IcebergScanConfig scanConfig) { + this.scanConfig = scanConfig; + this.rowAndSnapshotIDBeamSchema = rowAndSnapshotIDBeamSchema(scanConfig); + } + + /** Computes the keyed output coder, which depends on the table's primary key spec. */ + private KvCoder keyedOutputCoder(IcebergScanConfig scanConfig) { + org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema(); + return KvCoder.of( + SchemaCoder.of(rowAndSnapshotIDBeamSchema), + SchemaCoder.of(icebergSchemaToBeamSchema(recordSchema))); + } + + private static Schema rowAndSnapshotIDBeamSchema(IcebergScanConfig scanConfig) { + org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema(); + org.apache.iceberg.Schema recordIdSchema = + recordSchema.select(recordSchema.identifierFieldNames()); + Schema rowIdBeamSchema = icebergSchemaToBeamSchema(recordIdSchema); + List fields = + ImmutableList.builder() + .add(Schema.Field.of(SNAPSHOT_FIELD, Schema.FieldType.INT64)) + .addAll(rowIdBeamSchema.getFields()) + .build(); + return new Schema(fields); + } + + @Override + public CdcOutput expand(PCollectionTuple input) { + PCollection>> uniDirectionalChanges = + input.get(UNIDIRECTIONAL_CHANGES); + PCollection>> biDirectionalChanges = + input.get(BIDIRECTIONAL_CHANGES); + + // === UNIDIRECTIONAL changes === + // (i.e. only deletes, or only inserts) + // take the fast approach of just reading and emitting CDC records. + PCollection uniDirectionalCdcRows = + uniDirectionalChanges + .apply(Redistribute.arbitrarily()) + .apply( + "Read Uni-Directional Changes", + ParDo.of(ReadDoFn.unidirectional(scanConfig)) + .withOutputTags(UNIDIRECTIONAL_ROWS, TupleTagList.empty())) + .get(UNIDIRECTIONAL_ROWS) + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema())); + + // === BIDIRECTIONAL changes === + // (i.e. a mix of deletes and inserts) + // will need to be prepared for a downstream CoGBK shuffle to identify potential updates + PCollectionTuple biDirectionalKeyedCdcRows = + biDirectionalChanges + .apply(Redistribute.arbitrarily()) + .apply( + "Read Bi-Directional Changes", + ParDo.of(ReadDoFn.bidirectional(scanConfig)) + .withOutputTags(KEYED_INSERTS, TupleTagList.of(KEYED_DELETES))); + + // set a windowing strategy to maintain the earliest timestamp + // this allows us to emit records afterward that may have larger reified timestamps + Window>> windowingStrategy = + Window.>>into(new GlobalWindows()) + .withTimestampCombiner(TimestampCombiner.EARLIEST); + + // Reify to preserve the element's timestamp. This is currently a no-op because we are + // setting the ordinal's commit timestamp for all records. + // But this will matter if user configures a watermark column to derive + // timestamps from (not supported yet) + KvCoder keyedOutputCoder = keyedOutputCoder(scanConfig); + PCollection>> keyedInsertsWithTimestamps = + biDirectionalKeyedCdcRows + .get(KEYED_INSERTS) + .setCoder(keyedOutputCoder) + .apply("Reify INSERT Timestamps", Reify.timestampsInValue()) + .apply("Re-window INSERTs", windowingStrategy); + PCollection>> keyedDeletesWithTimestamps = + biDirectionalKeyedCdcRows + .get(KEYED_DELETES) + .setCoder(keyedOutputCoder) + .apply("Reify DELETE Timestamps", Reify.timestampsInValue()) + .apply("Re-window DELETEs", windowingStrategy); + + return new CdcOutput( + input.getPipeline(), + uniDirectionalCdcRows, + keyedInsertsWithTimestamps, + keyedDeletesWithTimestamps); + } + + public static class CdcOutput implements POutput { + private final Pipeline pipeline; + private final PCollection uniDirectionalRows; + private final PCollection>> keyedInserts; + private final PCollection>> keyedDeletes; + + CdcOutput( + Pipeline p, + PCollection uniDirectionalRows, + PCollection>> keyedInserts, + PCollection>> keyedDeletes) { + this.pipeline = p; + this.uniDirectionalRows = uniDirectionalRows; + this.keyedInserts = keyedInserts; + this.keyedDeletes = keyedDeletes; + } + + PCollection uniDirectionalRows() { + return uniDirectionalRows; + } + + PCollection>> keyedInserts() { + return keyedInserts; + } + + PCollection>> keyedDeletes() { + return keyedDeletes; + } + + @Override + public Pipeline getPipeline() { + return pipeline; + } + + @Override + public Map, PValue> expand() { + return ImmutableMap.of( + UNIDIRECTIONAL_ROWS, + uniDirectionalRows, + KEYED_INSERTS, + keyedInserts, + KEYED_DELETES, + keyedDeletes); + } + + @Override + public void finishSpecifyingOutput( + String transformName, PInput input, PTransform transform) {} + } + + @DoFn.BoundedPerElement + private static class ReadDoFn + extends DoFn>, OutT> { + private final IcebergScanConfig scanConfig; + private final boolean keyedOutput; + private transient @MonotonicNonNull StructProjection recordIdProjection; + private transient org.apache.iceberg.@MonotonicNonNull Schema recordIdSchema; + private final Schema beamRowSchema; + private final Schema rowAndSnapshotIDBeamSchema; + + /** Used for uni-directional changes. Records are output immediately as-is. */ + static ReadDoFn unidirectional(IcebergScanConfig scanConfig) { + return new ReadDoFn<>(scanConfig, false); + } + + /** + * Used for bi-directional changes. Records are keyed by (primary key, snapshot ID) and sent to + * a CoGBK. + */ + static ReadDoFn> bidirectional(IcebergScanConfig scanConfig) { + return new ReadDoFn<>(scanConfig, true); + } + + private ReadDoFn(IcebergScanConfig scanConfig, boolean keyedOutput) { + this.scanConfig = scanConfig; + this.keyedOutput = keyedOutput; + + this.beamRowSchema = icebergSchemaToBeamSchema(scanConfig.getProjectedSchema()); + this.rowAndSnapshotIDBeamSchema = rowAndSnapshotIDBeamSchema(scanConfig); + } + + @Setup + public void setup() { + // StructProjection is not serializable, so we need to recompute it when the DoFn gets + // deserialized + org.apache.iceberg.Schema recordSchema = scanConfig.getProjectedSchema(); + this.recordIdSchema = recordSchema.select(recordSchema.identifierFieldNames()); + this.recordIdProjection = StructProjection.create(recordSchema, recordIdSchema); + TableCache.setup(scanConfig); + } + + @ProcessElement + public void process( + @Element KV> element, + RestrictionTracker tracker, + MultiOutputReceiver out) + throws IOException { + Table table = TableCache.get(scanConfig.getTableIdentifier()); + + List tasks = element.getValue(); + + for (long l = tracker.currentRestriction().getFrom(); + l < tracker.currentRestriction().getTo(); + l++) { + if (!tracker.tryClaim(l)) { + return; + } + + SerializableChangelogTask task = tasks.get((int) l); + switch (task.getType()) { + case ADDED_ROWS: + processAddedRowsTask(task, table, out); + break; + case DELETED_ROWS: + processDeletedRowsTask(task, table, out); + break; + case DELETED_FILE: + processDeletedFileTask(task, table, out); + break; + } + } + } + + /** + * 1. Reads the added DataFile. 2. Filters out any matching deletes. This may happen if a + * matching position delete file is committed in the same snapshot or if changes for multiple + * snapshots are squashed together. 3. Outputs record. + */ + private void processAddedRowsTask( + SerializableChangelogTask task, Table table, MultiOutputReceiver outputReceiver) + throws IOException { + try (CloseableIterable fullIterable = + ReadUtils.createReader(task, table, scanConfig)) { + + // TODO: AddedRowsScanTask comes with a datafile and potential deletes on that new datafile + // (that happened in the same commit). + // Should we: + // 1. Only output the (non-deleted) inserted records? + // 2. Or output all inserted records and also all deleted records? + // Currently we do 1 (only output what is actually 'inserted' in this commit). + DeleteFilter deleteFilter = + ReadUtils.genericDeleteFilter( + table, scanConfig, task.getDataFile().getPath(), task.getAddedDeletes()); + CloseableIterable filtered = deleteFilter.filter(fullIterable); + + for (Record rec : filtered) { + outputRecord("INSERT", rec, outputReceiver, task.getCommitSnapshotId(), KEYED_INSERTS); + } + } + numAddedRowsScanTasksCompleted.inc(); + } + + /** + * + * + *
    + *
  1. 1. Fetches the referenced DataFile (that deletes will be applied to) and iterates over + * records. + *
  2. 2. Applies a filter to ignore any existing deletes. + *
  3. 3. Applies a filter to read only the new deletes. + *
  4. 4. Outputs records with delete row kind. + *
+ */ + private void processDeletedRowsTask( + SerializableChangelogTask task, Table table, MultiOutputReceiver outputReceiver) + throws IOException { + DeleteFilter existingDeletesFilter = + ReadUtils.genericDeleteFilter( + table, scanConfig, task.getDataFile().getPath(), task.getExistingDeletes()); + DeleteReader newDeletesReader = + ReadUtils.genericDeleteReader( + table, scanConfig, task.getDataFile().getPath(), task.getAddedDeletes()); + + try (CloseableIterable allRecords = ReadUtils.createReader(task, table, scanConfig)) { + CloseableIterable liveRecords = existingDeletesFilter.filter(allRecords); + CloseableIterable newlyDeletedRecords = newDeletesReader.read(liveRecords); + + for (Record rec : newlyDeletedRecords) { + // TODO: output with DELETE kind + outputRecord("DELETE", rec, outputReceiver, task.getCommitSnapshotId(), KEYED_DELETES); + } + } + numDeletedRowsScanTasksCompleted.inc(); + } + + /** + * + * + *
    + *
  1. 1. Fetches the referenced DataFile (that deletes will be applied to) and iterates over + * records. + *
  2. 2. Applies a filter to ignore any existing deletes. + *
  3. 4. Outputs records with delete row kind. + *
+ */ + private void processDeletedFileTask( + SerializableChangelogTask task, Table table, MultiOutputReceiver outputReceiver) + throws IOException { + try (CloseableIterable fullIterable = + ReadUtils.createReader(task, table, scanConfig)) { + DeleteFilter deleteFilter = + ReadUtils.genericDeleteFilter( + table, scanConfig, task.getDataFile().getPath(), task.getExistingDeletes()); + CloseableIterable filtered = deleteFilter.filter(fullIterable); + for (Record rec : filtered) { + // TODO: output with DELETE kind + outputRecord("DELETE-DF", rec, outputReceiver, task.getCommitSnapshotId(), KEYED_DELETES); + } + } + numDeletedDataFileScanTasksCompleted.inc(); + } + + /** + * Outputs records to the appropriate downstream collection. + * + *

If this DoFn is configured for uni-directional changes, records are output directly to the + * {@link ReadFromChangelogs#UNIDIRECTIONAL_ROWS} tag. + * + *

If this DoFn is configured for bi-directional changes, records will be keyed by their + * Primary Key and commit snapshot ID, then output to either {@link + * ReadFromChangelogs#KEYED_INSERTS} or {@link ReadFromChangelogs#KEYED_DELETES}. + */ + private void outputRecord( + String type, + Record rec, + MultiOutputReceiver outputReceiver, + long snapshotId, + TupleTag> keyedTag) { + Row row = IcebergUtils.icebergRecordToBeamRow(beamRowSchema, rec); + if (keyedOutput) { // slow path + StructProjection recId = checkStateNotNull(recordIdProjection).wrap(rec); + // Create a Row ID consisting of: + // 1. the task's commit snapshot ID + // 2. the record ID column values + // This is needed to sufficiently distinguish a record change + Row id = + structToBeamRow( + snapshotId, recId, checkStateNotNull(recordIdSchema), rowAndSnapshotIDBeamSchema); + outputReceiver.get(keyedTag).output(KV.of(id, row)); + } else { // fast path + System.out.printf("[UNIDIRECTIONAL] -- %s(%s)%n%s%n", type, snapshotId, row); + outputReceiver.get(UNIDIRECTIONAL_ROWS).output(row); + } + } + + public static Row structToBeamRow( + long snapshotId, StructLike struct, org.apache.iceberg.Schema schema, Schema beamSchema) { + ImmutableMap.Builder values = ImmutableMap.builder(); + List columns = schema.columns(); + for (Types.NestedField column : columns) { + String name = column.name(); + Object value = schema.accessorForField(column.fieldId()).get(struct); + values.put(name, value); + } + // Include snapshot ID as part of the row ID. + // This is essential to ensure that the downstream ReconcileChanges compares rows + // within the same operation. + values.put(SNAPSHOT_FIELD, snapshotId); + return Row.withSchema(beamSchema).withFieldValues(values.build()).build(); + } + + @GetSize + public double getSize( + @Element KV> element, + @Restriction OffsetRange restriction) { + // TODO(ahmedabu98): this is just the compressed byte size. find a way to make a better byte + // size estimate + long size = 0; + + for (long l = restriction.getFrom(); l < restriction.getTo(); l++) { + SerializableChangelogTask task = element.getValue().get((int) l); + size += task.getDataFile().getFileSizeInBytes(); + size += + task.getAddedDeletes().stream() + .mapToLong(SerializableDeleteFile::getFileSizeInBytes) + .sum(); + size += + task.getExistingDeletes().stream() + .mapToLong(SerializableDeleteFile::getFileSizeInBytes) + .sum(); + } + + return size; + } + + @GetInitialRestriction + public OffsetRange getInitialRange( + @Element KV> element) { + return new OffsetRange(0, element.getValue().size()); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ResolveChanges.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ResolveChanges.java new file mode 100644 index 000000000000..07fad1b336eb --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/ResolveChanges.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import java.util.Iterator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.join.CoGbkResult; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * Receives inserts and deletes, keyed by snapshot ID and Primary Key, and determines if any updates + * have occurred. + * + *

If the element has a mix of inserts and deletes, it is considered an update. INSERT becomes + * UPDATE_BEFORE and DELETE becomes UPDATE_AFTER. + * + *

Otherwise, records are output as-is: INSERT as INSERT, and DELETE as DELETE. + * + *

Input elements have their timestamp reified. This is because CoGroupByKey assigns all elements + * in a window with the same timestamp, erasing individual record timestamps. This DoFn preserves it + * by outputting records with their reified timestamps. + */ +public class ResolveChanges extends DoFn, Row> { + public static final TupleTag> DELETES = new TupleTag<>() {}; + public static final TupleTag> INSERTS = new TupleTag<>() {}; + + @DoFn.ProcessElement + public void processElement(@Element KV element, OutputReceiver out) { + CoGbkResult result = element.getValue(); + + // iterables are lazy-loaded from the shuffle service + Iterable> deletes = result.getAll(DELETES); + Iterable> inserts = result.getAll(INSERTS); + + boolean hasDeletes = deletes.iterator().hasNext(); + boolean hasInserts = inserts.iterator().hasNext(); + + if (hasInserts && hasDeletes) { + // UPDATE: row ID exists in both streams + // - emit all deletes as 'UPDATE_BEFORE', and all inserts as 'UPDATE_AFTER' + // - emit extra inserts as 'UPDATE_AFTER' + // - ignore extra deletes (TODO: double check if this is a good decision) + Iterator> deletesIterator = deletes.iterator(); + Iterator> insertsIterator = inserts.iterator(); + while (deletesIterator.hasNext() && insertsIterator.hasNext()) { + // TODO: output as UPDATE_BEFORE kind + TimestampedValue updateBefore = deletesIterator.next(); + out.outputWithTimestamp(updateBefore.getValue(), updateBefore.getTimestamp()); + System.out.printf("[BIDIRECTIONAL] -- UpdateBefore%n%s%n", updateBefore); + + // TODO: output as UPDATE_AFTER kind + TimestampedValue updateAfter = insertsIterator.next(); + out.outputWithTimestamp(updateAfter.getValue(), updateAfter.getTimestamp()); + System.out.printf("[BIDIRECTIONAL] -- UpdateAfter%n%s%n", updateAfter); + } + while (insertsIterator.hasNext()) { + // TODO: output as UPDATE_AFTER kind + TimestampedValue insert = insertsIterator.next(); + out.outputWithTimestamp(insert.getValue(), insert.getTimestamp()); + System.out.printf("[BIDIRECTIONAL] -- Added(extra)%n%s%n", insert); + } + } else if (hasInserts) { + // INSERT only + for (TimestampedValue rec : inserts) { + System.out.printf("[UNIDIRECTIONAL] -- Added%n%s%n", rec); + out.outputWithTimestamp(rec.getValue(), rec.getTimestamp()); + } + } else if (hasDeletes) { + // DELETE only + for (TimestampedValue rec : deletes) { + // TODO: output as DELETE kind + System.out.printf("[UNIDIRECTIONAL] -- Deleted%n%s%n", rec); + out.outputWithTimestamp(rec.getValue(), rec.getTimestamp()); + } + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/SerializableChangelogTask.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/SerializableChangelogTask.java new file mode 100644 index 000000000000..1a5bf023bd0e --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/SerializableChangelogTask.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg.cdc; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import com.google.auto.value.AutoValue; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.iceberg.SerializableDataFile; +import org.apache.beam.sdk.io.iceberg.SerializableDeleteFile; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.NoSuchSchemaException; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldNumber; +import org.apache.beam.sdk.schemas.annotations.SchemaIgnore; +import org.apache.iceberg.AddedRowsScanTask; +import org.apache.iceberg.ChangelogOperation; +import org.apache.iceberg.ChangelogScanTask; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.DeletedDataFileScanTask; +import org.apache.iceberg.DeletedRowsScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.ExpressionParser; + +@DefaultSchema(AutoValueSchema.class) +@AutoValue +public abstract class SerializableChangelogTask { + public enum Type { + ADDED_ROWS, + DELETED_ROWS, + DELETED_FILE + } + + public static SchemaCoder coder() { + try { + return SchemaRegistry.createDefault().getSchemaCoder(SerializableChangelogTask.class); + } catch (NoSuchSchemaException e) { + throw new RuntimeException(e); + } + } + + public static SerializableChangelogTask.Builder builder() { + return new AutoValue_SerializableChangelogTask.Builder() + .setExistingDeletes(Collections.emptyList()) + .setAddedDeletes(Collections.emptyList()); + } + + @SchemaFieldNumber("0") + public abstract Type getType(); + + @SchemaFieldNumber("1") + public abstract SerializableDataFile getDataFile(); + + @SchemaFieldNumber("2") + public abstract List getExistingDeletes(); + + @SchemaFieldNumber("3") + public abstract List getAddedDeletes(); + + @SchemaFieldNumber("4") + public abstract int getSpecId(); + + @SchemaFieldNumber("5") + public abstract ChangelogOperation getOperation(); + + @SchemaFieldNumber("6") + public abstract int getOrdinal(); + + @SchemaFieldNumber("7") + public abstract long getCommitSnapshotId(); + + @SchemaFieldNumber("8") + public abstract long getStart(); + + @SchemaFieldNumber("9") + public abstract long getLength(); + + @SchemaFieldNumber("10") + public abstract String getJsonExpression(); + + @SchemaIgnore + public Expression getExpression(Schema schema) { + return ExpressionParser.fromJson(getJsonExpression(), schema); + } + + @AutoValue.Builder + public abstract static class Builder { + abstract Builder setType(Type type); + + abstract Builder setDataFile(SerializableDataFile dataFile); + + @SchemaIgnore + public Builder setDataFile(DataFile df, String partitionPath, boolean includeMetrics) { + return setDataFile(SerializableDataFile.from(df, partitionPath, includeMetrics)); + } + + abstract Builder setExistingDeletes(List existingDeletes); + + abstract Builder setAddedDeletes(List addedDeletes); + + abstract Builder setSpecId(int specId); + + abstract Builder setOperation(ChangelogOperation operation); + + abstract Builder setOrdinal(int ordinal); + + abstract Builder setCommitSnapshotId(long commitSnapshotId); + + abstract Builder setStart(long start); + + abstract Builder setLength(long length); + + abstract Builder setJsonExpression(String expression); + + abstract SerializableChangelogTask build(); + } + + public static SerializableChangelogTask from(ChangelogScanTask task) { + return from(task, false); + } + + public static SerializableChangelogTask from(ChangelogScanTask task, boolean includeMetrics) { + checkState( + task instanceof ContentScanTask, "Expected ChangelogScanTask to also be a ContentScanTask"); + ContentScanTask contentScanTask = (ContentScanTask) task; + PartitionSpec spec = contentScanTask.spec(); + SerializableChangelogTask.Builder builder = + SerializableChangelogTask.builder() + .setOperation(task.operation()) + .setOrdinal(task.changeOrdinal()) + .setCommitSnapshotId(task.commitSnapshotId()) + .setDataFile( + contentScanTask.file(), + spec.partitionToPath(contentScanTask.partition()), + includeMetrics) + .setSpecId(spec.specId()) + .setStart(contentScanTask.start()) + .setLength(contentScanTask.length()) + .setJsonExpression(ExpressionParser.toJson(contentScanTask.residual())); + + if (task instanceof AddedRowsScanTask) { + AddedRowsScanTask addedRowsTask = (AddedRowsScanTask) task; + builder = + builder + .setType(Type.ADDED_ROWS) + .setAddedDeletes( + toSerializableDeletes(addedRowsTask.deletes(), spec, includeMetrics)); + } else if (task instanceof DeletedRowsScanTask) { + DeletedRowsScanTask deletedRowsTask = (DeletedRowsScanTask) task; + builder = + builder + .setType(Type.DELETED_ROWS) + .setAddedDeletes( + toSerializableDeletes(deletedRowsTask.addedDeletes(), spec, includeMetrics)) + .setExistingDeletes( + toSerializableDeletes(deletedRowsTask.existingDeletes(), spec, includeMetrics)); + } else if (task instanceof DeletedDataFileScanTask) { + DeletedDataFileScanTask deletedFileTask = (DeletedDataFileScanTask) task; + builder = + builder + .setType(Type.DELETED_FILE) + .setExistingDeletes( + toSerializableDeletes(deletedFileTask.existingDeletes(), spec, includeMetrics)); + } else { + throw new IllegalStateException("Unknown ChangelogScanTask type: " + task.getClass()); + } + return builder.build(); + } + + public static Type getType(ChangelogScanTask task) { + if (task instanceof AddedRowsScanTask) { + return Type.ADDED_ROWS; + } else if (task instanceof DeletedRowsScanTask) { + return Type.DELETED_ROWS; + } else if (task instanceof DeletedDataFileScanTask) { + return Type.DELETED_FILE; + } else { + throw new IllegalStateException("Unknown ChangelogScanTask type: " + task.getClass()); + } + } + + private static List toSerializableDeletes( + List dfs, PartitionSpec spec, boolean includeMetrics) { + return dfs.stream() + .map( + df -> + SerializableDeleteFile.from( + df, spec.partitionToPath(df.partition()), includeMetrics)) + .collect(Collectors.toList()); + } + + public static Comparator comparator() { + return (task1, task2) -> { + int ordinalCompare = Integer.compare(task1.getOrdinal(), task2.getOrdinal()); + if (ordinalCompare != 0) { + return ordinalCompare; + } + + int op1Weight = getOperationWeight(task1.getOperation()); + int op2Weight = getOperationWeight(task2.getOperation()); + + return Integer.compare(op1Weight, op2Weight); + }; + } + + private static int getOperationWeight(ChangelogOperation op) { + switch (op) { + case DELETE: + case UPDATE_BEFORE: + return 0; + case INSERT: + case UPDATE_AFTER: + return 1; + default: + throw new UnsupportedOperationException("Unknown ChangelogOperation: " + op); + } + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/package-info.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/package-info.java new file mode 100644 index 000000000000..8285d91689be --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/cdc/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Iceberg CDC connectors. */ +package org.apache.beam.sdk.io.iceberg.cdc; diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ReadUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ReadUtilsTest.java index 73a0fd19e893..6287a6e06197 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ReadUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/ReadUtilsTest.java @@ -40,7 +40,6 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.parquet.ParquetReader; import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.ClassRule; import org.junit.Rule; @@ -75,14 +74,25 @@ public void testCreateReader() throws IOException { .commit(); } + IcebergScanConfig scanConfig = + IcebergScanConfig.builder() + .setCatalogConfig( + IcebergCatalogConfig.builder() + .setCatalogProperties( + ImmutableMap.of("type", "hadoop", "warehouse", warehouse.location)) + .build()) + .setTableIdentifier(tableId) + .setSchema(IcebergUtils.icebergSchemaToBeamSchema(simpleTable.schema())) + .build(); + int numFiles = 0; try (CloseableIterable iterable = simpleTable.newScan().planTasks()) { for (CombinedScanTask combinedScanTask : iterable) { for (FileScanTask fileScanTask : combinedScanTask.tasks()) { String fileName = Iterables.getLast(Splitter.on("/").split(fileScanTask.file().path())); List recordsRead = new ArrayList<>(); - try (ParquetReader reader = - ReadUtils.createReader(fileScanTask, simpleTable, simpleTable.schema())) { + try (CloseableIterable reader = + ReadUtils.createReader(fileScanTask, simpleTable, scanConfig)) { reader.forEach(recordsRead::add); } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java index 983f021fd7ce..58bc55744ec6 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/SerializableDataFileTest.java @@ -47,6 +47,9 @@ public class SerializableDataFileTest { .add("nanValueCounts") .add("lowerBounds") .add("upperBounds") + .add("dataSequenceNumber") + .add("fileSequenceNumber") + .add("firstRowId") .build(); @Test