From e0fce7f3d8087cd4c527ea1b37163ab24b9a7d78 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Mon, 17 Aug 2020 13:51:07 +0800 Subject: [PATCH 1/8] Flink: Introduce Flink InputFormat --- .../apache/iceberg/flink/FlinkFixupTypes.java | 51 +++ .../apache/iceberg/flink/FlinkSchemaUtil.java | 23 + .../iceberg/flink/data/FlinkOrcReader.java | 8 +- .../iceberg/flink/data/FlinkOrcReaders.java | 12 +- .../flink/data/FlinkParquetReaders.java | 2 +- .../iceberg/flink/source/DataIterator.java | 144 +++++++ .../flink/source/FlinkInputFormat.java | 243 +++++++++++ .../iceberg/flink/source/FlinkInputSplit.java | 64 +++ .../flink/source/FlinkSplitGenerator.java | 103 +++++ .../iceberg/flink/source/RowDataIterator.java | 141 ++++++ .../iceberg/flink/source/ScanOptions.java | 221 ++++++++++ .../flink/data/TestFlinkOrcReaderWriter.java | 2 +- .../flink/source/TestFlinkInputFormat.java | 115 +++++ .../iceberg/flink/source/TestFlinkScan.java | 408 ++++++++++++++++++ 14 files changed, 1528 insertions(+), 9 deletions(-) create mode 100644 flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java create mode 100644 flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java create mode 100644 flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java new file mode 100644 index 000000000000..6501c0226e44 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.FixupTypes; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +/** + * The uuid and fixed are converted to the same Flink type. Conversion back can produce only one, + * which may not be correct. + */ +class FlinkFixupTypes extends FixupTypes { + + private FlinkFixupTypes(Schema referenceSchema) { + super(referenceSchema); + } + + static Schema fixup(Schema schema, Schema referenceSchema) { + return new Schema(TypeUtil.visit(schema, + new FlinkFixupTypes(referenceSchema)).asStructType().fields()); + } + + @Override + protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) { + if (type instanceof Types.FixedType) { + int length = ((Types.FixedType) type).length(); + return source.typeId() == Type.TypeID.UUID && length == 16; + } + return false; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java index 90534d714630..fa871e505129 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java @@ -27,6 +27,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; /** * Converter between Flink types and Iceberg type. @@ -63,6 +64,28 @@ public static Schema convert(TableSchema schema) { return new Schema(converted.asStructType().fields()); } + /** + * Convert a Flink {@link TableSchema} to a {@link Schema} based on the given schema. + *

+ * This conversion does not assign new ids; it uses ids from the base schema. + *

+ * Data types, field order, and nullability will match the Flink type. This conversion may return + * a schema that is not compatible with base schema. + * + * @param baseSchema a Schema on which conversion is based + * @param flinkSchema a Flink TableSchema + * @return the equivalent Schema + * @throws IllegalArgumentException if the type cannot be converted or there are missing ids + */ + public static Schema convert(Schema baseSchema, TableSchema flinkSchema) { + // convert to a type with fresh ids + Types.StructType struct = convert(flinkSchema).asStruct(); + // reassign ids to match the base schema + Schema schema = TypeUtil.reassignIds(new Schema(struct.fields()), baseSchema); + // fix types that can't be represented in Flink (UUID) + return FlinkFixupTypes.fixup(schema, baseSchema); + } + /** * Convert a {@link Schema} to a {@link RowType Flink type}. * diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java index 2f5db1967ef2..4c4e2050263b 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java @@ -39,18 +39,14 @@ public class FlinkOrcReader implements OrcRowReader { private final OrcValueReader reader; - private FlinkOrcReader(Schema iSchema, TypeDescription readSchema) { + public FlinkOrcReader(Schema iSchema, TypeDescription readSchema) { this(iSchema, readSchema, ImmutableMap.of()); } - private FlinkOrcReader(Schema iSchema, TypeDescription readSchema, Map idToConstant) { + public FlinkOrcReader(Schema iSchema, TypeDescription readSchema, Map idToConstant) { this.reader = OrcSchemaWithTypeVisitor.visit(iSchema, readSchema, new ReadBuilder(idToConstant)); } - public static OrcRowReader buildReader(Schema schema, TypeDescription readSchema) { - return new FlinkOrcReader(schema, readSchema); - } - @Override public RowData read(VectorizedRowBatch batch, int row) { return (RowData) reader.read(new StructColumnVector(batch.size, batch.cols), row); diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java index a434bddfe265..744a05eb2d21 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java @@ -36,6 +36,7 @@ import org.apache.flink.table.data.TimestampData; import org.apache.iceberg.orc.OrcValueReader; import org.apache.iceberg.orc.OrcValueReaders; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -127,6 +128,11 @@ private static class Decimal18Reader implements OrcValueReader { @Override public DecimalData nonNullRead(ColumnVector vector, int row) { HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row]; + + // The hive ORC writer may will adjust the scale of decimal data. + Preconditions.checkArgument(value.precision() <= precision, + "Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value); + return DecimalData.fromUnscaledLong(value.serialize64(scale), precision, scale); } } @@ -143,6 +149,10 @@ private static class Decimal38Reader implements OrcValueReader { @Override public DecimalData nonNullRead(ColumnVector vector, int row) { BigDecimal value = ((DecimalColumnVector) vector).vector[row].getHiveDecimal().bigDecimalValue(); + + Preconditions.checkArgument(value.precision() <= precision, + "Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value); + return DecimalData.fromBigDecimal(value, precision, scale); } } @@ -246,7 +256,7 @@ private static class StructReader extends OrcValueReaders.StructReader StructReader(List> readers, Types.StructType struct, Map idToConstant) { super(readers, struct, idToConstant); - this.numFields = readers.size(); + this.numFields = struct.fields().size(); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 3012544cba83..720a842e32a8 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -52,7 +52,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -class FlinkParquetReaders { +public class FlinkParquetReaders { private FlinkParquetReaders() { } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java new file mode 100644 index 000000000000..4dfc94fab4c4 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Iterator; +import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.DateTimeUtil; + +/** + * Base class of Flink iterators. + * + * @param is the Java class returned by this iterator whose objects contain one or more rows. + */ +abstract class DataIterator implements CloseableIterator { + + private final Iterator tasks; + private final FileIO fileIo; + private final EncryptionManager encryption; + + private CloseableIterator currentIterator; + + DataIterator(CombinedScanTask task, FileIO fileIo, EncryptionManager encryption) { + this.tasks = task.files().iterator(); + this.fileIo = fileIo; + this.encryption = encryption; + this.currentIterator = CloseableIterator.empty(); + } + + InputFile getInputFile(FileScanTask task) { + Preconditions.checkArgument(!task.isDataTask(), "Invalid task type"); + return encryption.decrypt(EncryptedFiles.encryptedInput( + fileIo.newInputFile(task.file().path().toString()), + task.file().keyMetadata())); + } + + @Override + public boolean hasNext() { + updateCurrentIterator(); + return currentIterator.hasNext(); + } + + @Override + public T next() { + updateCurrentIterator(); + return currentIterator.next(); + } + + /** + * Updates the current iterator field to ensure that the current Iterator + * is not exhausted. + */ + private void updateCurrentIterator() { + try { + while (!currentIterator.hasNext() && tasks.hasNext()) { + currentIterator.close(); + currentIterator = openTaskIterator(tasks.next()); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + abstract CloseableIterator openTaskIterator(FileScanTask scanTask) throws IOException; + + @Override + public void close() throws IOException { + // close the current iterator + this.currentIterator.close(); + + // exhaust the task iterator + while (tasks.hasNext()) { + tasks.next(); + } + } + + static Object convertConstant(Type type, Object value) { + if (value == null) { + return null; + } + + switch (type.typeId()) { + case DECIMAL: // DecimalData + Types.DecimalType decimal = (Types.DecimalType) type; + return DecimalData.fromBigDecimal((BigDecimal) value, decimal.precision(), decimal.scale()); + case STRING: // StringData + if (value instanceof Utf8) { + Utf8 utf8 = (Utf8) value; + return StringData.fromBytes(utf8.getBytes(), 0, utf8.getByteLength()); + } + return StringData.fromString(value.toString()); + case FIXED: // byte[] + if (value instanceof byte[]) { + return value; + } else if (value instanceof GenericData.Fixed) { + return ((GenericData.Fixed) value).bytes(); + } + return ByteBuffers.toByteArray((ByteBuffer) value); + case BINARY: // byte[] + return ByteBuffers.toByteArray((ByteBuffer) value); + case TIME: // int mills instead of long + return (int) ((Long) value / 1000); + case TIMESTAMP: // TimestampData + return TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromMicros((Long) value)); + default: + } + return value; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java new file mode 100644 index 000000000000..ebd7da4e72c4 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.flink.FlinkCatalogFactory; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * Flink {@link InputFormat} for Iceberg. + */ +public class FlinkInputFormat extends RichInputFormat { + + private static final long serialVersionUID = 1L; + + private final TableLoader tableLoader; + private final Schema projectedSchema; + private final ScanOptions options; + private final List filterExpressions; + private final FileIO io; + private final EncryptionManager encryption; + private final SerializableConfiguration serializableConf; + + private transient RowDataIterator iterator; + + private FlinkInputFormat( + TableLoader tableLoader, Schema projectedSchema, FileIO io, EncryptionManager encryption, + List filterExpressions, ScanOptions options, SerializableConfiguration serializableConf) { + this.tableLoader = tableLoader; + this.projectedSchema = projectedSchema; + this.options = options; + this.filterExpressions = filterExpressions; + this.io = io; + this.encryption = encryption; + this.serializableConf = serializableConf; + } + + @VisibleForTesting + Schema projectedSchema() { + return projectedSchema; + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + // Legacy method, not be used. + return null; + } + + @Override + public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException { + // Called in Job manager, so it is OK to load table from catalog. + tableLoader.open(serializableConf.get()); + try (TableLoader loader = tableLoader) { + Table table = loader.loadTable(); + FlinkSplitGenerator generator = new FlinkSplitGenerator(table, projectedSchema, options, filterExpressions); + return generator.createInputSplits(); + } + } + + @Override + public InputSplitAssigner getInputSplitAssigner(FlinkInputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + @Override + public void configure(Configuration parameters) { + } + + @Override + public void open(FlinkInputSplit split) { + this.iterator = new RowDataIterator(split.getTask(), io, encryption, projectedSchema, + options.getNameMapping(), options.isCaseSensitive()); + } + + @Override + public boolean reachedEnd() { + return !iterator.hasNext(); + } + + @Override + public RowData nextRecord(RowData reuse) { + return iterator.next(); + } + + @Override + public void close() throws IOException { + if (iterator != null) { + iterator.close(); + } + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private Table table; + private TableLoader tableLoader; + private List selectedFields; + private TableSchema projectedSchema; + private ScanOptions options = ScanOptions.builder().build(); + private List filterExpressions; + private org.apache.hadoop.conf.Configuration hadoopConf; + + private Builder() { + } + + // -------------------------- Required options ------------------------------- + + public Builder tableLoader(TableLoader newLoader) { + this.tableLoader = newLoader; + return this; + } + + // -------------------------- Optional options ------------------------------- + + public Builder table(Table newTable) { + this.table = newTable; + return this; + } + + public Builder filters(List newFilters) { + this.filterExpressions = newFilters; + return this; + } + + public Builder project(TableSchema schema) { + this.projectedSchema = schema; + return this; + } + + public Builder select(String... fields) { + this.selectedFields = Lists.newArrayList(fields); + return this; + } + + public Builder select(List fields) { + this.selectedFields = fields; + return this; + } + + public Builder options(ScanOptions newOptions) { + this.options = newOptions; + return this; + } + + public Builder hadoopConf(org.apache.hadoop.conf.Configuration newConf) { + this.hadoopConf = newConf; + return this; + } + + public FlinkInputFormat build() { + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null."); + + hadoopConf = hadoopConf == null ? FlinkCatalogFactory.clusterHadoopConf() : hadoopConf; + + Schema icebergSchema; + FileIO io; + EncryptionManager encryption; + if (table == null) { + // load required fields by table loader. + tableLoader.open(hadoopConf); + try (TableLoader loader = tableLoader) { + Table loadedTable = loader.loadTable(); + icebergSchema = loadedTable.schema(); + io = loadedTable.io(); + encryption = loadedTable.encryption(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else { + icebergSchema = table.schema(); + io = table.io(); + encryption = table.encryption(); + } + + if (projectedSchema != null && selectedFields != null) { + throw new IllegalArgumentException( + "Cannot using both requestedSchema and projectedFields to project."); + } + + TableSchema flinkProjectedSchema = projectedSchema; + + if (selectedFields != null) { + TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)); + TableSchema.Builder builder = TableSchema.builder(); + for (String field : selectedFields) { + TableColumn column = tableSchema.getTableColumn(field).orElseThrow( + () -> new IllegalArgumentException(String.format("The field(%s) can not be found in the table schema: %s", + field, tableSchema))); + builder.field(column.getName(), column.getType()); + } + flinkProjectedSchema = builder.build(); + } + + Schema icebergProjectedSchema = icebergSchema; + if (flinkProjectedSchema != null) { + icebergProjectedSchema = FlinkSchemaUtil.convert(icebergSchema, flinkProjectedSchema); + } + + return new FlinkInputFormat(tableLoader, icebergProjectedSchema, io, encryption, filterExpressions, options, + new SerializableConfiguration(hadoopConf)); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java new file mode 100644 index 000000000000..fa61e987fbe8 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.LocatableInputSplit; +import org.apache.iceberg.CombinedScanTask; + +/** + * TODO Implement {@link LocatableInputSplit}. + */ +public class FlinkInputSplit implements InputSplit { + + private final int splitNumber; + private final CombinedScanTask task; + + FlinkInputSplit(int splitNumber, CombinedScanTask task) { + this.splitNumber = splitNumber; + this.task = task; + } + + @Override + public int getSplitNumber() { + return splitNumber; + } + + CombinedScanTask getTask() { + return task; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FlinkInputSplit that = (FlinkInputSplit) o; + return splitNumber == that.splitNumber; + } + + @Override + public int hashCode() { + return splitNumber; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java new file mode 100644 index 000000000000..3502055d39fc --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +class FlinkSplitGenerator { + + private final Table table; + private final Schema projectedSchema; + private final ScanOptions options; + private final List filterExpressions; + + FlinkSplitGenerator(Table table, Schema projectedSchema, ScanOptions options, List filterExpressions) { + this.table = table; + this.projectedSchema = projectedSchema; + this.options = options; + this.filterExpressions = filterExpressions; + } + + FlinkInputSplit[] createInputSplits() { + List tasks = tasks(); + FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()]; + for (int i = 0; i < tasks.size(); i++) { + splits[i] = new FlinkInputSplit(i, tasks.get(i)); + } + return splits; + } + + private List tasks() { + TableScan scan = table + .newScan() + .caseSensitive(options.isCaseSensitive()) + .project(projectedSchema); + + if (options.getSnapshotId() != null) { + scan = scan.useSnapshot(options.getSnapshotId()); + } + + if (options.getAsOfTimestamp() != null) { + scan = scan.asOfTime(options.getAsOfTimestamp()); + } + + if (options.getStartSnapshotId() != null) { + if (options.getEndSnapshotId() != null) { + scan = scan.appendsBetween(options.getStartSnapshotId(), options.getEndSnapshotId()); + } else { + scan = scan.appendsAfter(options.getStartSnapshotId()); + } + } + + if (options.getSplitSize() != null) { + scan = scan.option(TableProperties.SPLIT_SIZE, options.getSplitSize().toString()); + } + + if (options.getSplitLookback() != null) { + scan = scan.option(TableProperties.SPLIT_LOOKBACK, options.getSplitLookback().toString()); + } + + if (options.getSplitOpenFileCost() != null) { + scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, options.getSplitOpenFileCost().toString()); + } + + if (filterExpressions != null) { + for (Expression filter : filterExpressions) { + scan = scan.filter(filter); + } + } + + try (CloseableIterable tasksIterable = scan.planTasks()) { + return Lists.newArrayList(tasksIterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table scan: " + scan, e); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java new file mode 100644 index 000000000000..26c6b32502c5 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.util.Map; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.flink.data.FlinkAvroReader; +import org.apache.iceberg.flink.data.FlinkOrcReader; +import org.apache.iceberg.flink.data.FlinkParquetReaders; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PartitionUtil; + +class RowDataIterator extends DataIterator { + + private final Schema projectedSchema; + private final String nameMapping; + private final boolean caseSensitive; + + RowDataIterator(CombinedScanTask task, FileIO fileIo, EncryptionManager encryption, Schema projectedSchema, + String nameMapping, boolean caseSensitive) { + super(task, fileIo, encryption); + this.projectedSchema = projectedSchema; + this.nameMapping = nameMapping; + this.caseSensitive = caseSensitive; + } + + @Override + protected CloseableIterator openTaskIterator(FileScanTask task) { + Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds()); + + Map idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() : + PartitionUtil.constantsMap(task, RowDataIterator::convertConstant); + CloseableIterable iterable = newIterable(task, idToConstant); + return iterable.iterator(); + } + + private CloseableIterable newIterable(FileScanTask task, Map idToConstant) { + CloseableIterable iter; + if (task.isDataTask()) { + throw new UnsupportedOperationException("Cannot read data task."); + } else { + switch (task.file().format()) { + case PARQUET: + iter = newParquetIterable(task, idToConstant); + break; + + case AVRO: + iter = newAvroIterable(task, idToConstant); + break; + + case ORC: + iter = newOrcIterable(task, idToConstant); + break; + + default: + throw new UnsupportedOperationException( + "Cannot read unknown format: " + task.file().format()); + } + } + + return iter; + } + + private CloseableIterable newAvroIterable(FileScanTask task, Map idToConstant) { + Avro.ReadBuilder builder = Avro.read(getInputFile(task)) + .reuseContainers() + .project(projectedSchema) + .split(task.start(), task.length()) + .createReaderFunc(readSchema -> new FlinkAvroReader(projectedSchema, readSchema, idToConstant)); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } + + private CloseableIterable newParquetIterable(FileScanTask task, Map idToConstant) { + Parquet.ReadBuilder builder = Parquet.read(getInputFile(task)) + .split(task.start(), task.length()) + .project(projectedSchema) + .createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(projectedSchema, fileSchema, idToConstant)) + .filter(task.residual()) + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } + + private CloseableIterable newOrcIterable(FileScanTask task, Map idToConstant) { + Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(projectedSchema, + Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); + + ORC.ReadBuilder builder = ORC.read(getInputFile(task)) + .project(readSchemaWithoutConstantAndMetadataFields) + .split(task.start(), task.length()) + .createReaderFunc(readOrcSchema -> new FlinkOrcReader(projectedSchema, readOrcSchema, idToConstant)) + .filter(task.residual()) + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java new file mode 100644 index 000000000000..c13150232356 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.Serializable; +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; + +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + +public class ScanOptions implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final ConfigOption SNAPSHOT_ID = + ConfigOptions.key("snapshot-id").longType().defaultValue(null); + + public static final ConfigOption CASE_SENSITIVE = + ConfigOptions.key("case-sensitive").booleanType().defaultValue(false); + + public static final ConfigOption AS_OF_TIMESTAMP = + ConfigOptions.key("as-of-timestamp").longType().defaultValue(null); + + public static final ConfigOption START_SNAPSHOT_ID = + ConfigOptions.key("start-snapshot-id").longType().defaultValue(null); + + public static final ConfigOption END_SNAPSHOT_ID = + ConfigOptions.key("end-snapshot-id").longType().defaultValue(null); + + public static final ConfigOption SPLIT_SIZE = + ConfigOptions.key("split-size").longType().defaultValue(null); + + public static final ConfigOption SPLIT_LOOKBACK = + ConfigOptions.key("split-lookback").intType().defaultValue(null); + + public static final ConfigOption SPLIT_FILE_OPEN_COST = + ConfigOptions.key("split-file-open-cost").longType().defaultValue(null); + + private final boolean caseSensitive; + private final Long snapshotId; + private final Long startSnapshotId; + private final Long endSnapshotId; + private final Long asOfTimestamp; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final String nameMapping; + + public ScanOptions(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId, + Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, + String nameMapping) { + this.caseSensitive = caseSensitive; + this.snapshotId = snapshotId; + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + this.asOfTimestamp = asOfTimestamp; + this.splitSize = splitSize; + this.splitLookback = splitLookback; + this.splitOpenFileCost = splitOpenFileCost; + this.nameMapping = nameMapping; + } + + public boolean isCaseSensitive() { + return caseSensitive; + } + + public Long getSnapshotId() { + return snapshotId; + } + + public Long getStartSnapshotId() { + return startSnapshotId; + } + + public Long getEndSnapshotId() { + return endSnapshotId; + } + + public Long getAsOfTimestamp() { + return asOfTimestamp; + } + + public Long getSplitSize() { + return splitSize; + } + + public Integer getSplitLookback() { + return splitLookback; + } + + public Long getSplitOpenFileCost() { + return splitOpenFileCost; + } + + public String getNameMapping() { + return nameMapping; + } + + public static Builder builder() { + return new Builder(); + } + + public static ScanOptions of(Map options) { + return builder().options(options).build(); + } + + public static final class Builder { + private boolean caseSensitive = CASE_SENSITIVE.defaultValue(); + private Long snapshotId = SNAPSHOT_ID.defaultValue(); + private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue(); + private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue(); + private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue(); + private Long splitSize = SPLIT_SIZE.defaultValue(); + private Integer splitLookback = SPLIT_LOOKBACK.defaultValue(); + private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue(); + private String nameMapping; + + private Builder() { + } + + public Builder options(Map options) { + Configuration config = new Configuration(); + options.forEach(config::setString); + this.caseSensitive = config.get(CASE_SENSITIVE); + this.snapshotId = config.get(SNAPSHOT_ID); + this.asOfTimestamp = config.get(AS_OF_TIMESTAMP); + this.startSnapshotId = config.get(START_SNAPSHOT_ID); + this.endSnapshotId = config.get(END_SNAPSHOT_ID); + this.splitSize = config.get(SPLIT_SIZE); + this.splitLookback = config.get(SPLIT_LOOKBACK); + this.splitOpenFileCost = config.get(SPLIT_FILE_OPEN_COST); + this.nameMapping = options.get(DEFAULT_NAME_MAPPING); + return this; + } + + public Builder caseSensitive(boolean newCaseSensitive) { + this.caseSensitive = newCaseSensitive; + return this; + } + + public Builder snapshotId(Long newSnapshotId) { + this.snapshotId = newSnapshotId; + return this; + } + + public Builder startSnapshotId(Long newStartSnapshotId) { + this.startSnapshotId = newStartSnapshotId; + return this; + } + + public Builder endSnapshotId(Long newEndSnapshotId) { + this.endSnapshotId = newEndSnapshotId; + return this; + } + + public Builder asOfTimestamp(Long newAsOfTimestamp) { + this.asOfTimestamp = newAsOfTimestamp; + return this; + } + + public Builder splitSize(Long newSplitSize) { + this.splitSize = newSplitSize; + return this; + } + + public Builder splitLookback(Integer newSplitLookback) { + this.splitLookback = newSplitLookback; + return this; + } + + public Builder splitOpenFileCost(Long newSplitOpenFileCost) { + this.splitOpenFileCost = newSplitOpenFileCost; + return this; + } + + public Builder nameMapping(String newNameMapping) { + this.nameMapping = newNameMapping; + return this; + } + + public ScanOptions build() { + if (snapshotId != null && asOfTimestamp != null) { + throw new IllegalArgumentException( + "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); + } + + if (snapshotId != null || asOfTimestamp != null) { + if (startSnapshotId != null || endSnapshotId != null) { + throw new IllegalArgumentException( + "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id" + + " or as-of-timestamp is specified"); + } + } else { + if (startSnapshotId == null && endSnapshotId != null) { + throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan"); + } + } + return new ScanOptions(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, + splitLookback, splitOpenFileCost, nameMapping); + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java index d86469a9115a..987e070f78b6 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java @@ -61,7 +61,7 @@ protected void writeAndValidate(Schema schema) throws IOException { try (CloseableIterable reader = ORC.read(Files.localInput(recordsFile)) .project(schema) - .createReaderFunc(type -> FlinkOrcReader.buildReader(schema, type)) + .createReaderFunc(type -> new FlinkOrcReader(schema, type)) .build()) { Iterator expected = expectedRecords.iterator(); Iterator rows = reader.iterator(); diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java new file mode 100644 index 000000000000..a145c5033eaf --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.util.List; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * Test {@link FlinkInputFormat}. + */ +public class TestFlinkInputFormat extends TestFlinkScan { + + private FlinkInputFormat.Builder builder; + + public TestFlinkInputFormat(String fileFormat) { + super(fileFormat); + } + + @Override + public void before() throws IOException { + super.before(); + builder = FlinkInputFormat.builder().tableLoader(TableLoader.fromHadoopTable(warehouse + "/default/t")); + } + + @Override + protected List executeWithOptions( + Table table, List projectFields, CatalogLoader loader, Long snapshotId, Long startSnapshotId, + Long endSnapshotId, Long asOfTimestamp, List filters, String sqlFilter) throws IOException { + ScanOptions options = ScanOptions.builder().snapshotId(snapshotId).startSnapshotId(startSnapshotId) + .endSnapshotId(endSnapshotId).asOfTimestamp(asOfTimestamp).build(); + if (loader != null) { + builder.tableLoader(TableLoader.fromCatalog(loader, TableIdentifier.of("default", "t"))); + } + + return run(builder.select(projectFields).filters(filters).options(options).build()); + } + + @Override + protected void assertResiduals( + Schema shcema, List results, List writeRecords, List filteredRecords) { + // can not filter the data. + assertRecords(results, writeRecords, shcema); + } + + @Override + protected void assertNestedProjection(Table table, List records) throws IOException { + TableSchema projectedSchema = TableSchema.builder() + .field("nested", DataTypes.ROW(DataTypes.FIELD("f2", DataTypes.STRING()))) + .field("data", DataTypes.STRING()) + .build(); + List result = run(builder.project(projectedSchema).build()); + + List expected = Lists.newArrayList(); + for (Record record : records) { + Row nested = Row.of(((Record) record.get(1)).get(1)); + expected.add(Row.of(nested, record.get(0))); + } + + assertRows(result, expected); + } + + private List run(FlinkInputFormat inputFormat) throws IOException { + FlinkInputSplit[] splits = inputFormat.createInputSplits(0); + List results = Lists.newArrayList(); + + RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema()); + + DataStructureConverter converter = DataStructureConverters.getConverter( + TypeConversions.fromLogicalToDataType(rowType)); + + for (FlinkInputSplit s : splits) { + inputFormat.open(s); + while (!inputFormat.reachedEnd()) { + RowData row = inputFormat.nextRecord(null); + results.add((Row) converter.toExternal(row)); + } + } + inputFormat.close(); + return results; + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java new file mode 100644 index 000000000000..6032339713f4 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.File; +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.required; + +@RunWith(Parameterized.class) +public abstract class TestFlinkScan extends AbstractTestBase { + + private static final Schema SCHEMA = new Schema( + required(1, "data", Types.StringType.get()), + required(2, "id", Types.LongType.get()), + required(3, "dt", Types.StringType.get())); + + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + .identity("dt") + .bucket("id", 1) + .build(); + + // before variables + private Configuration conf; + String warehouse; + private HadoopCatalog catalog; + + // parametrized variables + private final FileFormat fileFormat; + + @Parameterized.Parameters(name = "format={0}") + public static Object[] parameters() { + return new Object[] {"avro", "parquet", "orc"}; + } + + TestFlinkScan(String fileFormat) { + this.fileFormat = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH)); + } + + @Before + public void before() throws IOException { + File warehouseFile = TEMPORARY_FOLDER.newFolder(); + Assert.assertTrue(warehouseFile.delete()); + conf = new Configuration(); + warehouse = "file:" + warehouseFile; + catalog = new HadoopCatalog(conf, warehouse); + } + + private List execute(Table table) throws IOException { + return executeWithOptions(table, null, null, null, null, null, null, null, null); + } + + private List execute(Table table, List projectFields) throws IOException { + return executeWithOptions(table, projectFields, null, null, null, null, null, null, null); + } + + protected abstract List executeWithOptions( + Table table, List projectFields, CatalogLoader loader, Long snapshotId, + Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp, List filters, String sqlFilter) + throws IOException; + + /** + * The Flink SQL has no residuals, because there will be operator to filter all the data that should be filtered. + * But the FlinkInputFormat can't. + */ + protected abstract void assertResiduals(Schema schema, List results, List writeRecords, + List filteredRecords) throws IOException; + + /** + * Schema: [data, nested[f1, f2, f3], id] + * Projection: [nested.f2, data] + * The Flink SQL output: [f2, data] + * The FlinkInputFormat output: [nested[f2], data]. + */ + protected abstract void assertNestedProjection(Table table, List records) throws IOException; + + @Test + public void testUnpartitionedTable() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); + List expectedRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords); + assertRecords(execute(table), expectedRecords, SCHEMA); + } + + @Test + public void testPartitionedTable() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + List expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( + org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); + assertRecords(execute(table), expectedRecords, SCHEMA); + } + + @Test + public void testProjection() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + List inputRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( + org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), inputRecords); + assertRows(execute(table, Collections.singletonList("data")), Row.of(inputRecords.get(0).get(0))); + } + + @Test + public void testIdentityPartitionProjections() throws Exception { + Schema logSchema = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "level", Types.StringType.get()), + Types.NestedField.optional(4, "message", Types.StringType.get()) + ); + PartitionSpec spec = + PartitionSpec.builderFor(logSchema).identity("dt").identity("level").build(); + + Table table = catalog.createTable(TableIdentifier.of("default", "t"), logSchema, spec); + List inputRecords = RandomGenericData.generate(logSchema, 10, 0L); + + int idx = 0; + AppendFiles append = table.newAppend(); + for (Record record : inputRecords) { + record.set(1, "2020-03-2" + idx); + record.set(2, Integer.toString(idx)); + append.appendFile(new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).writeFile( + org.apache.iceberg.TestHelpers.Row.of("2020-03-2" + idx, Integer.toString(idx)), ImmutableList.of(record))); + idx += 1; + } + append.commit(); + + // individual fields + validateIdentityPartitionProjections(table, Collections.singletonList("dt"), inputRecords); + validateIdentityPartitionProjections(table, Collections.singletonList("level"), inputRecords); + validateIdentityPartitionProjections(table, Collections.singletonList("message"), inputRecords); + validateIdentityPartitionProjections(table, Collections.singletonList("id"), inputRecords); + // field pairs + validateIdentityPartitionProjections(table, Arrays.asList("dt", "message"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("level", "message"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("dt", "level"), inputRecords); + // out-of-order pairs + validateIdentityPartitionProjections(table, Arrays.asList("message", "dt"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("message", "level"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("level", "dt"), inputRecords); + // out-of-order triplets + validateIdentityPartitionProjections(table, Arrays.asList("dt", "level", "message"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("level", "dt", "message"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("dt", "message", "level"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("level", "message", "dt"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("message", "dt", "level"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("message", "level", "dt"), inputRecords); + } + + private void validateIdentityPartitionProjections(Table table, List projectedFields, + List inputRecords) throws IOException { + List rows = execute(table, projectedFields); + + for (int pos = 0; pos < inputRecords.size(); pos++) { + Record inputRecord = inputRecords.get(pos); + Row actualRecord = rows.get(pos); + + for (int i = 0; i < projectedFields.size(); i++) { + String name = projectedFields.get(i); + Assert.assertEquals( + "Projected field " + name + " should match", inputRecord.getField(name), actualRecord.getField(i)); + } + } + } + + @Test + public void testSnapshotReads() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + helper.appendToTable(expectedRecords); + long snapshotId = table.currentSnapshot().snapshotId(); + + long timestampMillis = table.currentSnapshot().timestampMillis(); + + // produce another timestamp + Thread.sleep(10); + helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L)); + + assertRecords(executeWithOptions(table, null, null, snapshotId, null, null, null, null, null), expectedRecords, + SCHEMA); + assertRecords(executeWithOptions(table, null, null, null, null, null, timestampMillis, null, null), + expectedRecords, SCHEMA); + } + + @Test + public void testIncrementalRead() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List records1 = RandomGenericData.generate(SCHEMA, 1, 0L); + helper.appendToTable(records1); + long snapshotId1 = table.currentSnapshot().snapshotId(); + + // snapshot 2 + List records2 = RandomGenericData.generate(SCHEMA, 1, 0L); + helper.appendToTable(records2); + + List records3 = RandomGenericData.generate(SCHEMA, 1, 0L); + helper.appendToTable(records3); + long snapshotId3 = table.currentSnapshot().snapshotId(); + + // snapshot 4 + List records4 = RandomGenericData.generate(SCHEMA, 1, 0L); + helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L)); + + List expected1 = Lists.newArrayList(); + expected1.addAll(records2); + expected1.addAll(records3); + expected1.addAll(records4); + assertRecords(executeWithOptions(table, null, null, null, snapshotId1, null, null, null, null), expected1, SCHEMA); + + List expected2 = Lists.newArrayList(); + expected2.addAll(records2); + expected2.addAll(records3); + assertRecords(executeWithOptions(table, null, null, null, snapshotId1, snapshotId3, null, null, null), expected2, + SCHEMA); + } + + @Test + public void testCustomCatalog() throws IOException { + String newWarehouse = TEMPORARY_FOLDER.newFolder().toURI().toString(); + Table table = new HadoopCatalog(conf, newWarehouse).createTable(TableIdentifier.of("default", "t"), SCHEMA); + + List expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( + org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); + + List rows = executeWithOptions(table, null, CatalogLoader.hadoop("new_catalog", newWarehouse), null, null, + null, null, null, null); + assertRecords(rows, expectedRecords, SCHEMA); + } + + @Test + public void testFilterExp() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + + List expectedRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + expectedRecords.get(1).set(2, "2020-03-20"); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords); + DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0), + RandomGenericData.generate(SCHEMA, 2, 0L)); + helper.appendToTable(dataFile1, dataFile2); + List filters = Collections.singletonList(Expressions.equal("dt", "2020-03-20")); + assertRecords(executeWithOptions(table, null, null, null, null, null, null, filters, "dt='2020-03-20'"), + expectedRecords, SCHEMA); + } + + @Test + public void testResiduals() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + + List writeRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + writeRecords.get(0).set(1, 123L); + writeRecords.get(0).set(2, "2020-03-20"); + writeRecords.get(1).set(1, 456L); + writeRecords.get(1).set(2, "2020-03-20"); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.add(writeRecords.get(0)); + + DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), writeRecords); + DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0), + RandomGenericData.generate(SCHEMA, 2, 0L)); + helper.appendToTable(dataFile1, dataFile2); + + List filters = Arrays.asList(Expressions.equal("dt", "2020-03-20"), Expressions.equal("id", 123)); + assertResiduals( + SCHEMA, executeWithOptions(table, null, null, null, null, null, null, filters, "dt='2020-03-20' and id=123"), + writeRecords, expectedRecords); + } + + @Test + public void testPartitionTypes() throws Exception { + Schema typesSchema = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "decimal", Types.DecimalType.of(38, 18)), + Types.NestedField.optional(3, "str", Types.StringType.get()), + Types.NestedField.optional(4, "binary", Types.BinaryType.get()), + Types.NestedField.optional(5, "date", Types.DateType.get()), + Types.NestedField.optional(6, "time", Types.TimeType.get()), + Types.NestedField.optional(7, "timestamp", Types.TimestampType.withoutZone()) + ); + PartitionSpec spec = PartitionSpec.builderFor(typesSchema).identity("decimal").identity("str").identity("binary") + .identity("date").identity("time").identity("timestamp").build(); + + Table table = catalog.createTable(TableIdentifier.of("default", "t"), typesSchema, spec); + List records = RandomGenericData.generate(typesSchema, 10, 0L); + GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + for (Record record : records) { + TestHelpers.Row partition = TestHelpers.Row.of( + record.get(1), + record.get(2), + record.get(3), + record.get(4) == null ? null : DateTimeUtil.daysFromDate((LocalDate) record.get(4)), + record.get(5) == null ? null : DateTimeUtil.microsFromTime((LocalTime) record.get(5)), + record.get(6) == null ? null : DateTimeUtil.microsFromTimestamp((LocalDateTime) record.get(6))); + appender.appendToTable(partition, Collections.singletonList(record)); + } + + assertRecords(execute(table), records, typesSchema); + } + + @Test + public void testNestedProjection() throws Exception { + Schema schema = new Schema( + required(1, "data", Types.StringType.get()), + required(2, "nested", Types.StructType.of( + Types.NestedField.required(3, "f1", Types.StringType.get()), + Types.NestedField.required(4, "f2", Types.StringType.get()), + Types.NestedField.required(5, "f3", Types.LongType.get()))), + required(6, "id", Types.LongType.get())); + + Table table = catalog.createTable(TableIdentifier.of("default", "t"), schema); + + List writeRecords = RandomGenericData.generate(schema, 2, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords); + + assertNestedProjection(table, writeRecords); + } + + static void assertRecords(List results, List expectedRecords, Schema schema) { + List expected = Lists.newArrayList(); + @SuppressWarnings("unchecked") + DataStructureConverter converter = (DataStructureConverter) DataStructureConverters.getConverter( + TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema))); + expectedRecords.forEach(r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r)))); + assertRows(results, expected); + } + + private static void assertRows(List results, Row... expected) { + assertRows(results, Arrays.asList(expected)); + } + + static void assertRows(List results, List expected) { + expected.sort(Comparator.comparing(Row::toString)); + results.sort(Comparator.comparing(Row::toString)); + Assert.assertEquals(expected, results); + } +} From b6e9ecd42ed8831efd95459bf21e502aba09d807 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 1 Sep 2020 15:11:09 +0800 Subject: [PATCH 2/8] Address comment --- .../org/apache/iceberg/flink/source/TestFlinkInputFormat.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java index a145c5033eaf..c78021282424 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -71,9 +71,9 @@ protected List executeWithOptions( @Override protected void assertResiduals( - Schema shcema, List results, List writeRecords, List filteredRecords) { + Schema schema, List results, List writeRecords, List filteredRecords) { // can not filter the data. - assertRecords(results, writeRecords, shcema); + assertRecords(results, writeRecords, schema); } @Override From b13ed9b5183f3a4ad3656a3bd8a5bbbc4a5f4745 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 1 Sep 2020 16:11:25 +0800 Subject: [PATCH 3/8] checkstyles --- .../flink/source/FlinkInputFormat.java | 11 +++++----- .../iceberg/flink/source/ScanOptions.java | 2 +- .../flink/source/TestFlinkInputFormat.java | 5 ++--- .../iceberg/flink/source/TestFlinkScan.java | 21 ++++++++++--------- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index ebd7da4e72c4..72b2e054c49f 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -106,8 +106,8 @@ public void configure(Configuration parameters) { @Override public void open(FlinkInputSplit split) { - this.iterator = new RowDataIterator(split.getTask(), io, encryption, projectedSchema, - options.getNameMapping(), options.isCaseSensitive()); + this.iterator = new RowDataIterator( + split.getTask(), io, encryption, projectedSchema, options.getNameMapping(), options.isCaseSensitive()); } @Override @@ -223,9 +223,8 @@ public FlinkInputFormat build() { TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)); TableSchema.Builder builder = TableSchema.builder(); for (String field : selectedFields) { - TableColumn column = tableSchema.getTableColumn(field).orElseThrow( - () -> new IllegalArgumentException(String.format("The field(%s) can not be found in the table schema: %s", - field, tableSchema))); + TableColumn column = tableSchema.getTableColumn(field).orElseThrow(() -> new IllegalArgumentException( + String.format("The field(%s) can not be found in the table schema: %s", field, tableSchema))); builder.field(column.getName(), column.getType()); } flinkProjectedSchema = builder.build(); @@ -237,7 +236,7 @@ public FlinkInputFormat build() { } return new FlinkInputFormat(tableLoader, icebergProjectedSchema, io, encryption, filterExpressions, options, - new SerializableConfiguration(hadoopConf)); + new SerializableConfiguration(hadoopConf)); } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java index c13150232356..6da2368292b2 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java @@ -215,7 +215,7 @@ public ScanOptions build() { } } return new ScanOptions(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, - splitLookback, splitOpenFileCost, nameMapping); + splitLookback, splitOpenFileCost, nameMapping); } } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java index c78021282424..5824825308dc 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -79,9 +79,8 @@ protected void assertResiduals( @Override protected void assertNestedProjection(Table table, List records) throws IOException { TableSchema projectedSchema = TableSchema.builder() - .field("nested", DataTypes.ROW(DataTypes.FIELD("f2", DataTypes.STRING()))) - .field("data", DataTypes.STRING()) - .build(); + .field("nested", DataTypes.ROW(DataTypes.FIELD("f2", DataTypes.STRING()))) + .field("data", DataTypes.STRING()).build(); List result = run(builder.project(projectedSchema).build()); List expected = Lists.newArrayList(); diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java index 6032339713f4..0812e8b4472f 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -238,10 +238,10 @@ public void testSnapshotReads() throws Exception { Thread.sleep(10); helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L)); - assertRecords(executeWithOptions(table, null, null, snapshotId, null, null, null, null, null), expectedRecords, - SCHEMA); - assertRecords(executeWithOptions(table, null, null, null, null, null, timestampMillis, null, null), - expectedRecords, SCHEMA); + assertRecords( + executeWithOptions(table, null, null, snapshotId, null, null, null, null, null), expectedRecords, SCHEMA); + assertRecords( + executeWithOptions(table, null, null, null, null, null, timestampMillis, null, null), expectedRecords, SCHEMA); } @Test @@ -275,8 +275,8 @@ public void testIncrementalRead() throws Exception { List expected2 = Lists.newArrayList(); expected2.addAll(records2); expected2.addAll(records3); - assertRecords(executeWithOptions(table, null, null, null, snapshotId1, snapshotId3, null, null, null), expected2, - SCHEMA); + assertRecords( + executeWithOptions(table, null, null, null, snapshotId1, snapshotId3, null, null, null), expected2, SCHEMA); } @Test @@ -289,8 +289,8 @@ public void testCustomCatalog() throws IOException { new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); - List rows = executeWithOptions(table, null, CatalogLoader.hadoop("new_catalog", newWarehouse), null, null, - null, null, null, null); + List rows = executeWithOptions( + table, null, CatalogLoader.hadoop("new_catalog", newWarehouse), null, null, null, null, null, null); assertRecords(rows, expectedRecords, SCHEMA); } @@ -308,8 +308,9 @@ public void testFilterExp() throws Exception { RandomGenericData.generate(SCHEMA, 2, 0L)); helper.appendToTable(dataFile1, dataFile2); List filters = Collections.singletonList(Expressions.equal("dt", "2020-03-20")); - assertRecords(executeWithOptions(table, null, null, null, null, null, null, filters, "dt='2020-03-20'"), - expectedRecords, SCHEMA); + assertRecords( + executeWithOptions(table, null, null, null, null, null, null, filters, "dt='2020-03-20'"), + expectedRecords, SCHEMA); } @Test From 8d080ad9c7e2a8a30e4797401552c03abc81de96 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Wed, 9 Sep 2020 17:58:44 +0800 Subject: [PATCH 4/8] Introduce FlinkSource --- .../flink/source/FlinkInputFormat.java | 122 +----------- .../iceberg/flink/source/FlinkSource.java | 188 ++++++++++++++++++ .../flink/source/TestFlinkInputFormat.java | 8 +- 3 files changed, 193 insertions(+), 125 deletions(-) create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index 72b2e054c49f..164db94020a9 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -20,7 +20,6 @@ package org.apache.iceberg.flink.source; import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; @@ -28,21 +27,15 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.flink.FlinkCatalogFactory; -import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.hadoop.SerializableConfiguration; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; -import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; /** * Flink {@link InputFormat} for Iceberg. @@ -61,7 +54,7 @@ public class FlinkInputFormat extends RichInputFormat private transient RowDataIterator iterator; - private FlinkInputFormat( + FlinkInputFormat( TableLoader tableLoader, Schema projectedSchema, FileIO io, EncryptionManager encryption, List filterExpressions, ScanOptions options, SerializableConfiguration serializableConf) { this.tableLoader = tableLoader; @@ -126,117 +119,4 @@ public void close() throws IOException { iterator.close(); } } - - public static Builder builder() { - return new Builder(); - } - - public static final class Builder { - private Table table; - private TableLoader tableLoader; - private List selectedFields; - private TableSchema projectedSchema; - private ScanOptions options = ScanOptions.builder().build(); - private List filterExpressions; - private org.apache.hadoop.conf.Configuration hadoopConf; - - private Builder() { - } - - // -------------------------- Required options ------------------------------- - - public Builder tableLoader(TableLoader newLoader) { - this.tableLoader = newLoader; - return this; - } - - // -------------------------- Optional options ------------------------------- - - public Builder table(Table newTable) { - this.table = newTable; - return this; - } - - public Builder filters(List newFilters) { - this.filterExpressions = newFilters; - return this; - } - - public Builder project(TableSchema schema) { - this.projectedSchema = schema; - return this; - } - - public Builder select(String... fields) { - this.selectedFields = Lists.newArrayList(fields); - return this; - } - - public Builder select(List fields) { - this.selectedFields = fields; - return this; - } - - public Builder options(ScanOptions newOptions) { - this.options = newOptions; - return this; - } - - public Builder hadoopConf(org.apache.hadoop.conf.Configuration newConf) { - this.hadoopConf = newConf; - return this; - } - - public FlinkInputFormat build() { - Preconditions.checkNotNull(tableLoader, "TableLoader should not be null."); - - hadoopConf = hadoopConf == null ? FlinkCatalogFactory.clusterHadoopConf() : hadoopConf; - - Schema icebergSchema; - FileIO io; - EncryptionManager encryption; - if (table == null) { - // load required fields by table loader. - tableLoader.open(hadoopConf); - try (TableLoader loader = tableLoader) { - Table loadedTable = loader.loadTable(); - icebergSchema = loadedTable.schema(); - io = loadedTable.io(); - encryption = loadedTable.encryption(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } else { - icebergSchema = table.schema(); - io = table.io(); - encryption = table.encryption(); - } - - if (projectedSchema != null && selectedFields != null) { - throw new IllegalArgumentException( - "Cannot using both requestedSchema and projectedFields to project."); - } - - TableSchema flinkProjectedSchema = projectedSchema; - - if (selectedFields != null) { - TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)); - TableSchema.Builder builder = TableSchema.builder(); - for (String field : selectedFields) { - TableColumn column = tableSchema.getTableColumn(field).orElseThrow(() -> new IllegalArgumentException( - String.format("The field(%s) can not be found in the table schema: %s", field, tableSchema))); - builder.field(column.getName(), column.getType()); - } - flinkProjectedSchema = builder.build(); - } - - Schema icebergProjectedSchema = icebergSchema; - if (flinkProjectedSchema != null) { - icebergProjectedSchema = FlinkSchemaUtil.convert(icebergSchema, flinkProjectedSchema); - } - - return new FlinkInputFormat(tableLoader, icebergProjectedSchema, io, encryption, filterExpressions, options, - new SerializableConfiguration(hadoopConf)); - } - } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java new file mode 100644 index 000000000000..ae70c6d16cf2 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.flink.FlinkCatalogFactory; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class FlinkSource { + private FlinkSource() { + } + + /** + * Initialize a {@link Builder} to read the data from iceberg table in bounded mode. Reading a snapshot of the table. + * + * @return {@link Builder} to connect the iceberg table. + */ + public static Builder forBounded() { + return new BoundedBuilder(); + } + + /** + * Source builder to build {@link DataStream}. + */ + public abstract static class Builder { + StreamExecutionEnvironment env; + private Table table; + private TableLoader tableLoader; + private List selectedFields; + private TableSchema projectedSchema; + private ScanOptions options = ScanOptions.builder().build(); + private List filterExpressions; + private org.apache.hadoop.conf.Configuration hadoopConf; + + private Schema expectedSchema; + RowDataTypeInfo outputTypeInfo; + + // -------------------------- Required options ------------------------------- + + public Builder tableLoader(TableLoader newLoader) { + this.tableLoader = newLoader; + return this; + } + + // -------------------------- Optional options ------------------------------- + + public Builder table(Table newTable) { + this.table = newTable; + return this; + } + + public Builder filters(List newFilters) { + this.filterExpressions = newFilters; + return this; + } + + public Builder project(TableSchema schema) { + this.projectedSchema = schema; + return this; + } + + public Builder select(String... fields) { + this.selectedFields = Lists.newArrayList(fields); + return this; + } + + public Builder select(List fields) { + this.selectedFields = fields; + return this; + } + + public Builder options(ScanOptions newOptions) { + this.options = newOptions; + return this; + } + + public Builder hadoopConf(org.apache.hadoop.conf.Configuration newConf) { + this.hadoopConf = newConf; + return this; + } + + public Builder env(StreamExecutionEnvironment newEnv) { + this.env = newEnv; + return this; + } + + public FlinkInputFormat buildFormat() { + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null."); + + hadoopConf = hadoopConf == null ? FlinkCatalogFactory.clusterHadoopConf() : hadoopConf; + + Schema icebergSchema; + FileIO io; + EncryptionManager encryption; + if (table == null) { + // load required fields by table loader. + tableLoader.open(hadoopConf); + try (TableLoader loader = tableLoader) { + table = loader.loadTable(); + icebergSchema = table.schema(); + io = table.io(); + encryption = table.encryption(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else { + icebergSchema = table.schema(); + io = table.io(); + encryption = table.encryption(); + } + + if (projectedSchema != null && selectedFields != null) { + throw new IllegalArgumentException( + "Cannot using both requestedSchema and projectedFields to project."); + } + + TableSchema projectedTableSchema = projectedSchema; + TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)); + if (selectedFields != null) { + TableSchema.Builder builder = TableSchema.builder(); + for (String field : selectedFields) { + TableColumn column = tableSchema.getTableColumn(field).orElseThrow( + () -> new IllegalArgumentException(String.format("The field(%s) can not be found in the table schema: %s", + field, tableSchema))); + builder.field(column.getName(), column.getType()); + } + projectedTableSchema = builder.build(); + } + + outputTypeInfo = RowDataTypeInfo.of((RowType) (projectedTableSchema == null ? tableSchema : projectedTableSchema) + .toRowDataType().getLogicalType()); + + expectedSchema = icebergSchema; + if (projectedTableSchema != null) { + expectedSchema = FlinkSchemaUtil.convert(icebergSchema, projectedTableSchema); + } + + return new FlinkInputFormat(tableLoader, expectedSchema, io, encryption, filterExpressions, options, + new SerializableConfiguration(hadoopConf)); + } + + public abstract DataStream build(); + } + + private static final class BoundedBuilder extends Builder { + @Override + public DataStream build() { + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + FlinkInputFormat format = buildFormat(); + return env.createInput(format, outputTypeInfo); + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java index 5824825308dc..aa8dd233076f 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -44,7 +44,7 @@ */ public class TestFlinkInputFormat extends TestFlinkScan { - private FlinkInputFormat.Builder builder; + private FlinkSource.Builder builder; public TestFlinkInputFormat(String fileFormat) { super(fileFormat); @@ -53,7 +53,7 @@ public TestFlinkInputFormat(String fileFormat) { @Override public void before() throws IOException { super.before(); - builder = FlinkInputFormat.builder().tableLoader(TableLoader.fromHadoopTable(warehouse + "/default/t")); + builder = FlinkSource.forBounded().tableLoader(TableLoader.fromHadoopTable(warehouse + "/default/t")); } @Override @@ -66,7 +66,7 @@ protected List executeWithOptions( builder.tableLoader(TableLoader.fromCatalog(loader, TableIdentifier.of("default", "t"))); } - return run(builder.select(projectFields).filters(filters).options(options).build()); + return run(builder.select(projectFields).filters(filters).options(options).buildFormat()); } @Override @@ -81,7 +81,7 @@ protected void assertNestedProjection(Table table, List records) throws TableSchema projectedSchema = TableSchema.builder() .field("nested", DataTypes.ROW(DataTypes.FIELD("f2", DataTypes.STRING()))) .field("data", DataTypes.STRING()).build(); - List result = run(builder.project(projectedSchema).build()); + List result = run(builder.project(projectedSchema).buildFormat()); List expected = Lists.newArrayList(); for (Record record : records) { From 3c727faafb1c097b5ac7248f7f575dabe2f38c95 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Wed, 9 Sep 2020 18:04:39 +0800 Subject: [PATCH 5/8] Checkstyle --- .../iceberg/flink/source/FlinkSource.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index ae70c6d16cf2..caa9c86758c7 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -58,7 +58,7 @@ public static Builder forBounded() { * Source builder to build {@link DataStream}. */ public abstract static class Builder { - StreamExecutionEnvironment env; + private StreamExecutionEnvironment env; private Table table; private TableLoader tableLoader; private List selectedFields; @@ -67,8 +67,7 @@ public abstract static class Builder { private List filterExpressions; private org.apache.hadoop.conf.Configuration hadoopConf; - private Schema expectedSchema; - RowDataTypeInfo outputTypeInfo; + private RowDataTypeInfo rowTypeInfo; // -------------------------- Required options ------------------------------- @@ -119,8 +118,16 @@ public Builder env(StreamExecutionEnvironment newEnv) { return this; } + StreamExecutionEnvironment getEnv() { + return env; + } + + RowDataTypeInfo getRowTypeInfo() { + return rowTypeInfo; + } + public FlinkInputFormat buildFormat() { - Preconditions.checkNotNull(tableLoader, "TableLoader should not be null."); + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); hadoopConf = hadoopConf == null ? FlinkCatalogFactory.clusterHadoopConf() : hadoopConf; @@ -146,7 +153,7 @@ public FlinkInputFormat buildFormat() { if (projectedSchema != null && selectedFields != null) { throw new IllegalArgumentException( - "Cannot using both requestedSchema and projectedFields to project."); + "Cannot using both requestedSchema and projectedFields to project"); } TableSchema projectedTableSchema = projectedSchema; @@ -162,10 +169,10 @@ public FlinkInputFormat buildFormat() { projectedTableSchema = builder.build(); } - outputTypeInfo = RowDataTypeInfo.of((RowType) (projectedTableSchema == null ? tableSchema : projectedTableSchema) + rowTypeInfo = RowDataTypeInfo.of((RowType) (projectedTableSchema == null ? tableSchema : projectedTableSchema) .toRowDataType().getLogicalType()); - expectedSchema = icebergSchema; + Schema expectedSchema = icebergSchema; if (projectedTableSchema != null) { expectedSchema = FlinkSchemaUtil.convert(icebergSchema, projectedTableSchema); } @@ -180,9 +187,9 @@ public FlinkInputFormat buildFormat() { private static final class BoundedBuilder extends Builder { @Override public DataStream build() { - Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(getEnv(), "StreamExecutionEnvironment should not be null"); FlinkInputFormat format = buildFormat(); - return env.createInput(format, outputTypeInfo); + return getEnv().createInput(format, getRowTypeInfo()); } } } From 6c5f830244961f39f355d26529ce7d923908c121 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 15 Sep 2020 12:02:16 +0800 Subject: [PATCH 6/8] tmp --- .../org/apache/iceberg/flink/source/DataIterator.java | 8 ++++---- .../org/apache/iceberg/flink/source/FlinkInputFormat.java | 4 ++-- .../java/org/apache/iceberg/flink/source/FlinkSource.java | 5 +++-- .../org/apache/iceberg/flink/source/RowDataIterator.java | 4 ++-- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java index 4dfc94fab4c4..f2cecf1e6f98 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java @@ -50,14 +50,14 @@ abstract class DataIterator implements CloseableIterator { private final Iterator tasks; - private final FileIO fileIo; + private final FileIO io; private final EncryptionManager encryption; private CloseableIterator currentIterator; - DataIterator(CombinedScanTask task, FileIO fileIo, EncryptionManager encryption) { + DataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption) { this.tasks = task.files().iterator(); - this.fileIo = fileIo; + this.io = io; this.encryption = encryption; this.currentIterator = CloseableIterator.empty(); } @@ -65,7 +65,7 @@ abstract class DataIterator implements CloseableIterator { InputFile getInputFile(FileScanTask task) { Preconditions.checkArgument(!task.isDataTask(), "Invalid task type"); return encryption.decrypt(EncryptedFiles.encryptedInput( - fileIo.newInputFile(task.file().path().toString()), + io.newInputFile(task.file().path().toString()), task.file().keyMetadata())); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index 164db94020a9..26e26b9f736a 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -59,10 +59,10 @@ public class FlinkInputFormat extends RichInputFormat List filterExpressions, ScanOptions options, SerializableConfiguration serializableConf) { this.tableLoader = tableLoader; this.projectedSchema = projectedSchema; - this.options = options; - this.filterExpressions = filterExpressions; this.io = io; this.encryption = encryption; + this.filterExpressions = filterExpressions; + this.options = options; this.serializableConf = serializableConf; } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index caa9c86758c7..06f6d6737e38 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -29,6 +29,7 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; import org.apache.flink.table.types.logical.RowType; +import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.encryption.EncryptionManager; @@ -65,7 +66,7 @@ public abstract static class Builder { private TableSchema projectedSchema; private ScanOptions options = ScanOptions.builder().build(); private List filterExpressions; - private org.apache.hadoop.conf.Configuration hadoopConf; + private Configuration hadoopConf; private RowDataTypeInfo rowTypeInfo; @@ -108,7 +109,7 @@ public Builder options(ScanOptions newOptions) { return this; } - public Builder hadoopConf(org.apache.hadoop.conf.Configuration newConf) { + public Builder hadoopConf(Configuration newConf) { this.hadoopConf = newConf; return this; } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java index 26c6b32502c5..dc7b38dd00a6 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java @@ -47,9 +47,9 @@ class RowDataIterator extends DataIterator { private final String nameMapping; private final boolean caseSensitive; - RowDataIterator(CombinedScanTask task, FileIO fileIo, EncryptionManager encryption, Schema projectedSchema, + RowDataIterator(CombinedScanTask task, FileIO io, EncryptionManager encryption, Schema projectedSchema, String nameMapping, boolean caseSensitive) { - super(task, fileIo, encryption); + super(task, io, encryption); this.projectedSchema = projectedSchema; this.nameMapping = nameMapping; this.caseSensitive = caseSensitive; From e27e42149acabd483967dad6aae00c097d7ea44e Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 15 Sep 2020 14:43:32 +0800 Subject: [PATCH 7/8] Address comments --- .../flink/source/FlinkInputFormat.java | 2 +- .../iceberg/flink/source/FlinkInputSplit.java | 17 ---- .../iceberg/flink/source/FlinkSource.java | 86 ++++++------------- .../flink/source/FlinkSplitGenerator.java | 30 +++---- .../iceberg/flink/source/ScanOptions.java | 30 ++++--- .../flink/source/TestFlinkInputFormat.java | 27 +++--- .../iceberg/flink/source/TestFlinkScan.java | 54 ++++-------- 7 files changed, 90 insertions(+), 156 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index 26e26b9f736a..f4a56fd54e57 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -100,7 +100,7 @@ public void configure(Configuration parameters) { @Override public void open(FlinkInputSplit split) { this.iterator = new RowDataIterator( - split.getTask(), io, encryption, projectedSchema, options.getNameMapping(), options.isCaseSensitive()); + split.getTask(), io, encryption, projectedSchema, options.nameMapping(), options.caseSensitive()); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java index fa61e987fbe8..21a2f71ac86d 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java @@ -44,21 +44,4 @@ public int getSplitNumber() { CombinedScanTask getTask() { return task; } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - FlinkInputSplit that = (FlinkInputSplit) o; - return splitNumber == that.splitNumber; - } - - @Override - public int hashCode() { - return splitNumber; - } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java index 06f6d6737e38..6bcc96f254c8 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSource.java @@ -24,7 +24,6 @@ import java.util.List; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; @@ -32,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; import org.apache.iceberg.encryption.EncryptionManager; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkCatalogFactory; @@ -40,29 +40,38 @@ import org.apache.iceberg.hadoop.SerializableConfiguration; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; public class FlinkSource { private FlinkSource() { } /** - * Initialize a {@link Builder} to read the data from iceberg table in bounded mode. Reading a snapshot of the table. + * Initialize a {@link Builder} to read the data from iceberg table. Equivalent to {@link TableScan}. + * See more options in {@link ScanOptions}. + *

+ * The Source can be read static data in bounded mode. It can also continuously check the arrival of new data and + * read records incrementally. + * The Bounded and Unbounded depends on the {@link Builder#options(ScanOptions)}: + *

+ *

* * @return {@link Builder} to connect the iceberg table. */ - public static Builder forBounded() { - return new BoundedBuilder(); + public static Builder forRowData() { + return new Builder(); } /** * Source builder to build {@link DataStream}. */ - public abstract static class Builder { + public static class Builder { private StreamExecutionEnvironment env; private Table table; private TableLoader tableLoader; - private List selectedFields; private TableSchema projectedSchema; private ScanOptions options = ScanOptions.builder().build(); private List filterExpressions; @@ -70,15 +79,11 @@ public abstract static class Builder { private RowDataTypeInfo rowTypeInfo; - // -------------------------- Required options ------------------------------- - public Builder tableLoader(TableLoader newLoader) { this.tableLoader = newLoader; return this; } - // -------------------------- Optional options ------------------------------- - public Builder table(Table newTable) { this.table = newTable; return this; @@ -94,16 +99,6 @@ public Builder project(TableSchema schema) { return this; } - public Builder select(String... fields) { - this.selectedFields = Lists.newArrayList(fields); - return this; - } - - public Builder select(List fields) { - this.selectedFields = fields; - return this; - } - public Builder options(ScanOptions newOptions) { this.options = newOptions; return this; @@ -119,14 +114,6 @@ public Builder env(StreamExecutionEnvironment newEnv) { return this; } - StreamExecutionEnvironment getEnv() { - return env; - } - - RowDataTypeInfo getRowTypeInfo() { - return rowTypeInfo; - } - public FlinkInputFormat buildFormat() { Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); @@ -152,45 +139,28 @@ public FlinkInputFormat buildFormat() { encryption = table.encryption(); } - if (projectedSchema != null && selectedFields != null) { - throw new IllegalArgumentException( - "Cannot using both requestedSchema and projectedFields to project"); - } - - TableSchema projectedTableSchema = projectedSchema; - TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)); - if (selectedFields != null) { - TableSchema.Builder builder = TableSchema.builder(); - for (String field : selectedFields) { - TableColumn column = tableSchema.getTableColumn(field).orElseThrow( - () -> new IllegalArgumentException(String.format("The field(%s) can not be found in the table schema: %s", - field, tableSchema))); - builder.field(column.getName(), column.getType()); - } - projectedTableSchema = builder.build(); - } - - rowTypeInfo = RowDataTypeInfo.of((RowType) (projectedTableSchema == null ? tableSchema : projectedTableSchema) - .toRowDataType().getLogicalType()); + rowTypeInfo = RowDataTypeInfo.of((RowType) ( + projectedSchema == null ? + FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)) : + projectedSchema).toRowDataType().getLogicalType()); Schema expectedSchema = icebergSchema; - if (projectedTableSchema != null) { - expectedSchema = FlinkSchemaUtil.convert(icebergSchema, projectedTableSchema); + if (projectedSchema != null) { + expectedSchema = FlinkSchemaUtil.convert(icebergSchema, projectedSchema); } return new FlinkInputFormat(tableLoader, expectedSchema, io, encryption, filterExpressions, options, new SerializableConfiguration(hadoopConf)); } - public abstract DataStream build(); - } - - private static final class BoundedBuilder extends Builder { - @Override public DataStream build() { - Preconditions.checkNotNull(getEnv(), "StreamExecutionEnvironment should not be null"); + Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); FlinkInputFormat format = buildFormat(); - return getEnv().createInput(format, getRowTypeInfo()); + if (options.startSnapshotId() != null && options.endSnapshotId() == null) { + throw new UnsupportedOperationException("The Unbounded mode is not supported yet"); + } else { + return env.createInput(format, rowTypeInfo); + } } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java index 3502055d39fc..af22edce10d7 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java @@ -57,35 +57,35 @@ FlinkInputSplit[] createInputSplits() { private List tasks() { TableScan scan = table .newScan() - .caseSensitive(options.isCaseSensitive()) + .caseSensitive(options.caseSensitive()) .project(projectedSchema); - if (options.getSnapshotId() != null) { - scan = scan.useSnapshot(options.getSnapshotId()); + if (options.snapshotId() != null) { + scan = scan.useSnapshot(options.snapshotId()); } - if (options.getAsOfTimestamp() != null) { - scan = scan.asOfTime(options.getAsOfTimestamp()); + if (options.asOfTimestamp() != null) { + scan = scan.asOfTime(options.asOfTimestamp()); } - if (options.getStartSnapshotId() != null) { - if (options.getEndSnapshotId() != null) { - scan = scan.appendsBetween(options.getStartSnapshotId(), options.getEndSnapshotId()); + if (options.startSnapshotId() != null) { + if (options.endSnapshotId() != null) { + scan = scan.appendsBetween(options.startSnapshotId(), options.endSnapshotId()); } else { - scan = scan.appendsAfter(options.getStartSnapshotId()); + scan = scan.appendsAfter(options.startSnapshotId()); } } - if (options.getSplitSize() != null) { - scan = scan.option(TableProperties.SPLIT_SIZE, options.getSplitSize().toString()); + if (options.splitSize() != null) { + scan = scan.option(TableProperties.SPLIT_SIZE, options.splitSize().toString()); } - if (options.getSplitLookback() != null) { - scan = scan.option(TableProperties.SPLIT_LOOKBACK, options.getSplitLookback().toString()); + if (options.splitLookback() != null) { + scan = scan.option(TableProperties.SPLIT_LOOKBACK, options.splitLookback().toString()); } - if (options.getSplitOpenFileCost() != null) { - scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, options.getSplitOpenFileCost().toString()); + if (options.splitOpenFileCost() != null) { + scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, options.splitOpenFileCost().toString()); } if (filterExpressions != null) { diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java index 6da2368292b2..309b4e58aa08 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java @@ -31,6 +31,8 @@ public class ScanOptions implements Serializable { private static final long serialVersionUID = 1L; + public static final long UNBOUNDED_PRECEDING = -1L; + public static final ConfigOption SNAPSHOT_ID = ConfigOptions.key("snapshot-id").longType().defaultValue(null); @@ -79,39 +81,39 @@ public ScanOptions(boolean caseSensitive, Long snapshotId, Long startSnapshotId, this.nameMapping = nameMapping; } - public boolean isCaseSensitive() { + public boolean caseSensitive() { return caseSensitive; } - public Long getSnapshotId() { + public Long snapshotId() { return snapshotId; } - public Long getStartSnapshotId() { + public Long startSnapshotId() { return startSnapshotId; } - public Long getEndSnapshotId() { + public Long endSnapshotId() { return endSnapshotId; } - public Long getAsOfTimestamp() { + public Long asOfTimestamp() { return asOfTimestamp; } - public Long getSplitSize() { + public Long splitSize() { return splitSize; } - public Integer getSplitLookback() { + public Integer splitLookback() { return splitLookback; } - public Long getSplitOpenFileCost() { + public Long splitOpenFileCost() { return splitOpenFileCost; } - public String getNameMapping() { + public String nameMapping() { return nameMapping; } @@ -119,8 +121,8 @@ public static Builder builder() { return new Builder(); } - public static ScanOptions of(Map options) { - return builder().options(options).build(); + public static ScanOptions fromProperties(Map properties) { + return builder().options(properties).build(); } public static final class Builder { @@ -137,9 +139,9 @@ public static final class Builder { private Builder() { } - public Builder options(Map options) { + public Builder options(Map properties) { Configuration config = new Configuration(); - options.forEach(config::setString); + properties.forEach(config::setString); this.caseSensitive = config.get(CASE_SENSITIVE); this.snapshotId = config.get(SNAPSHOT_ID); this.asOfTimestamp = config.get(AS_OF_TIMESTAMP); @@ -148,7 +150,7 @@ public Builder options(Map options) { this.splitSize = config.get(SPLIT_SIZE); this.splitLookback = config.get(SPLIT_LOOKBACK); this.splitOpenFileCost = config.get(SPLIT_FILE_OPEN_COST); - this.nameMapping = options.get(DEFAULT_NAME_MAPPING); + this.nameMapping = properties.get(DEFAULT_NAME_MAPPING); return this; } diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java index aa8dd233076f..d359aab4a8a9 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; +import java.util.stream.Collectors; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; @@ -31,10 +32,8 @@ import org.apache.flink.types.Row; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expression; -import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -53,20 +52,24 @@ public TestFlinkInputFormat(String fileFormat) { @Override public void before() throws IOException { super.before(); - builder = FlinkSource.forBounded().tableLoader(TableLoader.fromHadoopTable(warehouse + "/default/t")); + builder = FlinkSource.forRowData().tableLoader(TableLoader.fromHadoopTable(warehouse + "/default/t")); } @Override - protected List executeWithOptions( - Table table, List projectFields, CatalogLoader loader, Long snapshotId, Long startSnapshotId, - Long endSnapshotId, Long asOfTimestamp, List filters, String sqlFilter) throws IOException { - ScanOptions options = ScanOptions.builder().snapshotId(snapshotId).startSnapshotId(startSnapshotId) - .endSnapshotId(endSnapshotId).asOfTimestamp(asOfTimestamp).build(); - if (loader != null) { - builder.tableLoader(TableLoader.fromCatalog(loader, TableIdentifier.of("default", "t"))); - } + protected List execute(Table table, List projectFields) throws IOException { + Schema projected = new Schema(projectFields.stream().map(f -> + table.schema().asStruct().field(f)).collect(Collectors.toList())); + return run(builder.project(FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(projected))).buildFormat()); + } + + @Override + protected List execute(Table table, ScanOptions options) throws IOException { + return run(builder.options(options).buildFormat()); + } - return run(builder.select(projectFields).filters(filters).options(options).buildFormat()); + @Override + protected List execute(Table table, List filters, String sqlFilter) throws IOException { + return run(builder.filters(filters).buildFormat()); } @Override diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java index 0812e8b4472f..9b1dbf9dcb29 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -49,7 +49,6 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.RowDataConverter; import org.apache.iceberg.hadoop.HadoopCatalog; @@ -78,10 +77,8 @@ public abstract class TestFlinkScan extends AbstractTestBase { .bucket("id", 1) .build(); - // before variables - private Configuration conf; - String warehouse; private HadoopCatalog catalog; + protected String warehouse; // parametrized variables private final FileFormat fileFormat; @@ -99,23 +96,21 @@ public static Object[] parameters() { public void before() throws IOException { File warehouseFile = TEMPORARY_FOLDER.newFolder(); Assert.assertTrue(warehouseFile.delete()); - conf = new Configuration(); + // before variables + Configuration conf = new Configuration(); warehouse = "file:" + warehouseFile; catalog = new HadoopCatalog(conf, warehouse); } private List execute(Table table) throws IOException { - return executeWithOptions(table, null, null, null, null, null, null, null, null); + return execute(table, ScanOptions.builder().build()); } - private List execute(Table table, List projectFields) throws IOException { - return executeWithOptions(table, projectFields, null, null, null, null, null, null, null); - } + protected abstract List execute(Table table, List projectFields) throws IOException; + + protected abstract List execute(Table table, ScanOptions options) throws IOException; - protected abstract List executeWithOptions( - Table table, List projectFields, CatalogLoader loader, Long snapshotId, - Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp, List filters, String sqlFilter) - throws IOException; + protected abstract List execute(Table table, List filters, String sqlFilter) throws IOException; /** * The Flink SQL has no residuals, because there will be operator to filter all the data that should be filtered. @@ -239,9 +234,9 @@ public void testSnapshotReads() throws Exception { helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L)); assertRecords( - executeWithOptions(table, null, null, snapshotId, null, null, null, null, null), expectedRecords, SCHEMA); + execute(table, ScanOptions.builder().snapshotId(snapshotId).build()), expectedRecords, SCHEMA); assertRecords( - executeWithOptions(table, null, null, null, null, null, timestampMillis, null, null), expectedRecords, SCHEMA); + execute(table, ScanOptions.builder().asOfTimestamp(timestampMillis).build()), expectedRecords, SCHEMA); } @Test @@ -270,28 +265,13 @@ public void testIncrementalRead() throws Exception { expected1.addAll(records2); expected1.addAll(records3); expected1.addAll(records4); - assertRecords(executeWithOptions(table, null, null, null, snapshotId1, null, null, null, null), expected1, SCHEMA); + assertRecords(execute(table, ScanOptions.builder().startSnapshotId(snapshotId1).build()), expected1, SCHEMA); List expected2 = Lists.newArrayList(); expected2.addAll(records2); expected2.addAll(records3); - assertRecords( - executeWithOptions(table, null, null, null, snapshotId1, snapshotId3, null, null, null), expected2, SCHEMA); - } - - @Test - public void testCustomCatalog() throws IOException { - String newWarehouse = TEMPORARY_FOLDER.newFolder().toURI().toString(); - Table table = new HadoopCatalog(conf, newWarehouse).createTable(TableIdentifier.of("default", "t"), SCHEMA); - - List expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L); - expectedRecords.get(0).set(2, "2020-03-20"); - new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( - org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); - - List rows = executeWithOptions( - table, null, CatalogLoader.hadoop("new_catalog", newWarehouse), null, null, null, null, null, null); - assertRecords(rows, expectedRecords, SCHEMA); + assertRecords(execute(table, ScanOptions.builder().startSnapshotId(snapshotId1).endSnapshotId(snapshotId3).build()), + expected2, SCHEMA); } @Test @@ -308,9 +288,7 @@ public void testFilterExp() throws Exception { RandomGenericData.generate(SCHEMA, 2, 0L)); helper.appendToTable(dataFile1, dataFile2); List filters = Collections.singletonList(Expressions.equal("dt", "2020-03-20")); - assertRecords( - executeWithOptions(table, null, null, null, null, null, null, filters, "dt='2020-03-20'"), - expectedRecords, SCHEMA); + assertRecords(execute(table, filters, "dt='2020-03-20'"), expectedRecords, SCHEMA); } @Test @@ -334,9 +312,7 @@ public void testResiduals() throws Exception { helper.appendToTable(dataFile1, dataFile2); List filters = Arrays.asList(Expressions.equal("dt", "2020-03-20"), Expressions.equal("id", 123)); - assertResiduals( - SCHEMA, executeWithOptions(table, null, null, null, null, null, null, filters, "dt='2020-03-20' and id=123"), - writeRecords, expectedRecords); + assertResiduals(SCHEMA, execute(table, filters, "dt='2020-03-20' and id=123"), writeRecords, expectedRecords); } @Test From 45af8c7b3377c310ea367958cbe63c2aadf50bdf Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 24 Sep 2020 10:17:37 +0800 Subject: [PATCH 8/8] Set tasks to null in DataIterator --- .../org/apache/iceberg/flink/source/DataIterator.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java index f2cecf1e6f98..88a324fda14f 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java @@ -49,7 +49,7 @@ */ abstract class DataIterator implements CloseableIterator { - private final Iterator tasks; + private Iterator tasks; private final FileIO io; private final EncryptionManager encryption; @@ -101,12 +101,8 @@ private void updateCurrentIterator() { @Override public void close() throws IOException { // close the current iterator - this.currentIterator.close(); - - // exhaust the task iterator - while (tasks.hasNext()) { - tasks.next(); - } + currentIterator.close(); + tasks = null; } static Object convertConstant(Type type, Object value) {