Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
* source creates a single range, while the unbounded implementation continuously polls for new
* snapshots at the specified interval.
*/
class IncrementalScanSource extends PTransform<PBegin, PCollection<Row>> {
public class IncrementalScanSource extends PTransform<PBegin, PCollection<Row>> {
private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(60);
private final IcebergScanConfig scanConfig;
protected final IcebergScanConfig scanConfig;

IncrementalScanSource(IcebergScanConfig scanConfig) {
public IncrementalScanSource(IcebergScanConfig scanConfig) {
this.scanConfig = scanConfig;
}

Expand Down Expand Up @@ -74,14 +74,15 @@ public PCollection<Row> expand(PBegin input) {
}

/** Continuously watches for new snapshots. */
private PCollection<KV<String, List<SnapshotInfo>>> unboundedSnapshots(PBegin input) {
protected PCollection<KV<String, List<SnapshotInfo>>> unboundedSnapshots(PBegin input) {
Duration pollInterval =
MoreObjects.firstNonNull(scanConfig.getPollInterval(), DEFAULT_POLL_INTERVAL);
return input.apply("Watch for Snapshots", new WatchForSnapshots(scanConfig, pollInterval));
}

/** Creates a fixed snapshot range. */
private PCollection<KV<String, List<SnapshotInfo>>> boundedSnapshots(PBegin input, Table table) {
protected PCollection<KV<String, List<SnapshotInfo>>> boundedSnapshots(
PBegin input, Table table) {
checkStateNotNull(
table.currentSnapshot().snapshotId(),
"Table %s does not have any snapshots to read from.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.checkerframework.checker.nullness.qual.Nullable;

class PartitionUtils {
Expand Down Expand Up @@ -90,4 +97,51 @@ static PartitionSpec toPartitionSpec(

return builder.build();
}

/**
* Copied over from Apache Iceberg's <a
* href="https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java">PartitionUtil</a>.
*/
public static Map<Integer, ?> constantsMap(
PartitionSpec spec, ContentFile<?> file, BiFunction<Type, Object, Object> convertConstant) {
StructLike partitionData = file.partition();

// use java.util.HashMap because partition data may contain null values
Map<Integer, Object> idToConstant = Maps.newHashMap();

// add first_row_id as _row_id
if (file.firstRowId() != null) {
idToConstant.put(
MetadataColumns.ROW_ID.fieldId(),
convertConstant.apply(Types.LongType.get(), file.firstRowId()));
}

idToConstant.put(
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(),
convertConstant.apply(Types.LongType.get(), file.fileSequenceNumber()));

// add _file
idToConstant.put(
MetadataColumns.FILE_PATH.fieldId(),
convertConstant.apply(Types.StringType.get(), file.location()));

// add _spec_id
idToConstant.put(
MetadataColumns.SPEC_ID.fieldId(),
convertConstant.apply(Types.IntegerType.get(), file.specId()));

List<Types.NestedField> partitionFields = spec.partitionType().fields();
List<PartitionField> fields = spec.fields();
for (int pos = 0; pos < fields.size(); pos += 1) {
PartitionField field = fields.get(pos);
if (field.transform().isIdentity()) {
Object converted =
convertConstant.apply(
partitionFields.get(pos).type(), partitionData.get(pos, Object.class));
idToConstant.put(field.sourceId(), converted);
}
}

return idToConstant;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ public void process(
}
FileScanTask task = fileScanTasks.get((int) l);
Schema beamSchema = IcebergUtils.icebergSchemaToBeamSchema(scanConfig.getProjectedSchema());
try (CloseableIterable<Record> fullIterable =
ReadUtils.createReader(task, table, scanConfig.getRequiredSchema())) {
CloseableIterable<Record> reader = ReadUtils.maybeApplyFilter(fullIterable, scanConfig);
try (CloseableIterable<Record> reader = ReadUtils.createReader(task, table, scanConfig)) {

for (Record record : reader) {
Row row = IcebergUtils.icebergRecordToBeamRow(beamSchema, record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
import static org.apache.iceberg.util.SnapshotUtil.ancestorsOf;

import java.util.Collection;
Expand All @@ -28,16 +29,22 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy;
import org.apache.beam.sdk.io.iceberg.cdc.DeleteReader;
import org.apache.beam.sdk.io.iceberg.cdc.SerializableChangelogTask;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
Expand All @@ -55,7 +62,6 @@
import org.apache.iceberg.parquet.ParquetReader;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
Expand All @@ -72,16 +78,42 @@ public class ReadUtils {
"parquet.read.support.class",
"parquet.crypto.factory.class");

static ParquetReader<Record> createReader(FileScanTask task, Table table, Schema schema) {
String filePath = task.file().path().toString();
public static CloseableIterable<Record> createReader(
SerializableChangelogTask task, Table table, IcebergScanConfig scanConfig) {
return createReader(
table,
scanConfig,
checkStateNotNull(table.specs().get(task.getSpecId())),
task.getDataFile().createDataFile(table.specs()),
task.getStart(),
task.getLength(),
task.getExpression(table.schema()));
}

public static CloseableIterable<Record> createReader(
ContentScanTask<?> task, Table table, IcebergScanConfig scanConfig) {
return createReader(
table, scanConfig, task.spec(), task.file(), task.start(), task.length(), task.residual());
}

public static CloseableIterable<Record> createReader(
Table table,
IcebergScanConfig scanConfig,
PartitionSpec spec,
ContentFile<?> file,
long start,
long length,
Expression residual) {
Schema schema = scanConfig.getRequiredSchema();
InputFile inputFile;
try (FileIO io = table.io()) {
EncryptedInputFile encryptedInput =
EncryptedFiles.encryptedInput(io.newInputFile(filePath), task.file().keyMetadata());
EncryptedFiles.encryptedInput(io.newInputFile(file.location()), file.keyMetadata());
inputFile = table.encryption().decrypt(encryptedInput);
}
Map<Integer, ?> idToConstants =
ReadUtils.constantsMap(task, IdentityPartitionConverters::convertConstant, table.schema());
ReadUtils.constantsMap(
spec, file, IdentityPartitionConverters::convertConstant, table.schema());

ParquetReadOptions.Builder optionsBuilder;
if (inputFile instanceof HadoopInputFile) {
Expand All @@ -96,37 +128,40 @@ static ParquetReader<Record> createReader(FileScanTask task, Table table, Schema
}
optionsBuilder =
optionsBuilder
.withRange(task.start(), task.start() + task.length())
.withRange(start, start + length)
.withMaxAllocationInBytes(MAX_FILE_BUFFER_SIZE);

@Nullable String nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
NameMapping mapping =
nameMapping != null ? NameMappingParser.fromJson(nameMapping) : NameMapping.empty();

return new ParquetReader<>(
inputFile,
schema,
optionsBuilder.build(),
// TODO(ahmedabu98): Implement a Parquet-to-Beam Row reader, bypassing conversion to Iceberg
// Record
fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema, idToConstants),
mapping,
task.residual(),
false,
true);
ParquetReader<Record> records =
new ParquetReader<>(
inputFile,
schema,
optionsBuilder.build(),
// TODO(ahmedabu98): Implement a Parquet-to-Beam Row reader, bypassing conversion to
// Iceberg
// Record
fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema, idToConstants),
mapping,
residual,
false,
true);
return maybeApplyFilter(records, scanConfig);
}

static Map<Integer, ?> constantsMap(
FileScanTask task,
PartitionSpec spec,
ContentFile<?> file,
BiFunction<Type, Object, Object> converter,
org.apache.iceberg.Schema schema) {
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();
org.apache.iceberg.Schema partitionSchema = TypeUtil.select(schema, idColumns);
boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();

if (projectsIdentityPartitionColumns) {
return PartitionUtil.constantsMap(task, converter);
return PartitionUtils.constantsMap(spec, file, converter);
} else {
return Collections.emptyMap();
}
Expand Down Expand Up @@ -208,4 +243,88 @@ public static CloseableIterable<Record> maybeApplyFilter(
}
return iterable;
}

public static DeleteFilter<Record> genericDeleteFilter(
Table table,
IcebergScanConfig scanConfig,
String dataFilePath,
List<SerializableDeleteFile> deletes) {
return new BeamDeleteFilter(
table.io(),
dataFilePath,
scanConfig.getRequiredSchema(),
scanConfig.getProjectedSchema(),
deletes.stream()
.map(sdf -> sdf.createDeleteFile(table.specs(), table.sortOrders()))
.collect(Collectors.toList()));
}

public static DeleteReader<Record> genericDeleteReader(
Table table,
IcebergScanConfig scanConfig,
String dataFilePath,
List<SerializableDeleteFile> deletes) {
return new BeamDeleteReader(
table.io(),
dataFilePath,
scanConfig.getRequiredSchema(),
scanConfig.getProjectedSchema(),
deletes.stream()
.map(sdf -> sdf.createDeleteFile(table.specs(), table.sortOrders()))
.collect(Collectors.toList()));
}

public static class BeamDeleteFilter extends DeleteFilter<Record> {
private final FileIO io;
private final InternalRecordWrapper asStructLike;

@SuppressWarnings("method.invocation")
public BeamDeleteFilter(
FileIO io,
String dataFilePath,
Schema tableSchema,
Schema projectedSchema,
List<DeleteFile> deleteFiles) {
super(dataFilePath, deleteFiles, tableSchema, projectedSchema);
this.io = io;
this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct());
}

@Override
protected StructLike asStructLike(Record record) {
return asStructLike.wrap(record);
}

@Override
protected InputFile getInputFile(String location) {
return io.newInputFile(location);
}
}

public static class BeamDeleteReader extends DeleteReader<Record> {
private final FileIO io;
private final InternalRecordWrapper asStructLike;

@SuppressWarnings("method.invocation")
public BeamDeleteReader(
FileIO io,
String dataFilePath,
Schema tableSchema,
Schema projectedSchema,
List<DeleteFile> deleteFiles) {
super(dataFilePath, deleteFiles, tableSchema, projectedSchema);
this.io = io;
this.asStructLike = new InternalRecordWrapper(requiredSchema().asStruct());
}

@Override
protected StructLike asStructLike(Record record) {
return asStructLike.wrap(record);
}

@Override
protected InputFile getInputFile(String location) {
return io.newInputFile(location);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ public boolean advance() throws IOException {
InputFile input = decryptor.getInputFile(fileTask);
Map<Integer, ?> idToConstants =
ReadUtils.constantsMap(
fileTask, IdentityPartitionConverters::convertConstant, requiredSchema);
fileTask.spec(),
fileTask.file(),
IdentityPartitionConverters::convertConstant,
requiredSchema);

CloseableIterable<Record> iterable;
switch (file.format()) {
Expand Down
Loading
Loading