From ad42994b7afadc5540319cf52bf2aeaadf2acdc6 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Mon, 17 Aug 2020 13:51:07 +0800 Subject: [PATCH 1/6] Flink: Introduce Flink InputFormat --- .../apache/iceberg/flink/FlinkFixupTypes.java | 51 +++ .../apache/iceberg/flink/FlinkSchemaUtil.java | 23 + .../iceberg/flink/data/FlinkOrcReader.java | 8 +- .../iceberg/flink/data/FlinkOrcReaders.java | 12 +- .../flink/data/FlinkParquetReaders.java | 2 +- .../iceberg/flink/source/DataIterator.java | 144 +++++++ .../flink/source/FlinkInputFormat.java | 243 +++++++++++ .../iceberg/flink/source/FlinkInputSplit.java | 64 +++ .../flink/source/FlinkSplitGenerator.java | 103 +++++ .../iceberg/flink/source/RowDataIterator.java | 141 ++++++ .../iceberg/flink/source/ScanOptions.java | 221 ++++++++++ .../flink/data/TestFlinkOrcReaderWriter.java | 2 +- .../flink/source/TestFlinkInputFormat.java | 115 +++++ .../iceberg/flink/source/TestFlinkScan.java | 408 ++++++++++++++++++ 14 files changed, 1528 insertions(+), 9 deletions(-) create mode 100644 flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java create mode 100644 flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java create mode 100644 flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java new file mode 100644 index 000000000000..6501c0226e44 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkFixupTypes.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.FixupTypes; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; + +/** + * The uuid and fixed are converted to the same Flink type. Conversion back can produce only one, + * which may not be correct. + */ +class FlinkFixupTypes extends FixupTypes { + + private FlinkFixupTypes(Schema referenceSchema) { + super(referenceSchema); + } + + static Schema fixup(Schema schema, Schema referenceSchema) { + return new Schema(TypeUtil.visit(schema, + new FlinkFixupTypes(referenceSchema)).asStructType().fields()); + } + + @Override + protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) { + if (type instanceof Types.FixedType) { + int length = ((Types.FixedType) type).length(); + return source.typeId() == Type.TypeID.UUID && length == 16; + } + return false; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java index 90534d714630..fa871e505129 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java @@ -27,6 +27,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; /** * Converter between Flink types and Iceberg type. @@ -63,6 +64,28 @@ public static Schema convert(TableSchema schema) { return new Schema(converted.asStructType().fields()); } + /** + * Convert a Flink {@link TableSchema} to a {@link Schema} based on the given schema. + *

+ * This conversion does not assign new ids; it uses ids from the base schema. + *

+ * Data types, field order, and nullability will match the Flink type. This conversion may return + * a schema that is not compatible with base schema. + * + * @param baseSchema a Schema on which conversion is based + * @param flinkSchema a Flink TableSchema + * @return the equivalent Schema + * @throws IllegalArgumentException if the type cannot be converted or there are missing ids + */ + public static Schema convert(Schema baseSchema, TableSchema flinkSchema) { + // convert to a type with fresh ids + Types.StructType struct = convert(flinkSchema).asStruct(); + // reassign ids to match the base schema + Schema schema = TypeUtil.reassignIds(new Schema(struct.fields()), baseSchema); + // fix types that can't be represented in Flink (UUID) + return FlinkFixupTypes.fixup(schema, baseSchema); + } + /** * Convert a {@link Schema} to a {@link RowType Flink type}. * diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java index 2f5db1967ef2..4c4e2050263b 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java @@ -39,18 +39,14 @@ public class FlinkOrcReader implements OrcRowReader { private final OrcValueReader reader; - private FlinkOrcReader(Schema iSchema, TypeDescription readSchema) { + public FlinkOrcReader(Schema iSchema, TypeDescription readSchema) { this(iSchema, readSchema, ImmutableMap.of()); } - private FlinkOrcReader(Schema iSchema, TypeDescription readSchema, Map idToConstant) { + public FlinkOrcReader(Schema iSchema, TypeDescription readSchema, Map idToConstant) { this.reader = OrcSchemaWithTypeVisitor.visit(iSchema, readSchema, new ReadBuilder(idToConstant)); } - public static OrcRowReader buildReader(Schema schema, TypeDescription readSchema) { - return new FlinkOrcReader(schema, readSchema); - } - @Override public RowData read(VectorizedRowBatch batch, int row) { return (RowData) reader.read(new StructColumnVector(batch.size, batch.cols), row); diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java index a434bddfe265..744a05eb2d21 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReaders.java @@ -36,6 +36,7 @@ import org.apache.flink.table.data.TimestampData; import org.apache.iceberg.orc.OrcValueReader; import org.apache.iceberg.orc.OrcValueReaders; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Types; @@ -127,6 +128,11 @@ private static class Decimal18Reader implements OrcValueReader { @Override public DecimalData nonNullRead(ColumnVector vector, int row) { HiveDecimalWritable value = ((DecimalColumnVector) vector).vector[row]; + + // The hive ORC writer may will adjust the scale of decimal data. + Preconditions.checkArgument(value.precision() <= precision, + "Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value); + return DecimalData.fromUnscaledLong(value.serialize64(scale), precision, scale); } } @@ -143,6 +149,10 @@ private static class Decimal38Reader implements OrcValueReader { @Override public DecimalData nonNullRead(ColumnVector vector, int row) { BigDecimal value = ((DecimalColumnVector) vector).vector[row].getHiveDecimal().bigDecimalValue(); + + Preconditions.checkArgument(value.precision() <= precision, + "Cannot read value as decimal(%s,%s), too large: %s", precision, scale, value); + return DecimalData.fromBigDecimal(value, precision, scale); } } @@ -246,7 +256,7 @@ private static class StructReader extends OrcValueReaders.StructReader StructReader(List> readers, Types.StructType struct, Map idToConstant) { super(readers, struct, idToConstant); - this.numFields = readers.size(); + this.numFields = struct.fields().size(); } @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java index 3012544cba83..720a842e32a8 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java @@ -52,7 +52,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; -class FlinkParquetReaders { +public class FlinkParquetReaders { private FlinkParquetReaders() { } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java new file mode 100644 index 000000000000..4dfc94fab4c4 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.Iterator; +import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.DateTimeUtil; + +/** + * Base class of Flink iterators. + * + * @param is the Java class returned by this iterator whose objects contain one or more rows. + */ +abstract class DataIterator implements CloseableIterator { + + private final Iterator tasks; + private final FileIO fileIo; + private final EncryptionManager encryption; + + private CloseableIterator currentIterator; + + DataIterator(CombinedScanTask task, FileIO fileIo, EncryptionManager encryption) { + this.tasks = task.files().iterator(); + this.fileIo = fileIo; + this.encryption = encryption; + this.currentIterator = CloseableIterator.empty(); + } + + InputFile getInputFile(FileScanTask task) { + Preconditions.checkArgument(!task.isDataTask(), "Invalid task type"); + return encryption.decrypt(EncryptedFiles.encryptedInput( + fileIo.newInputFile(task.file().path().toString()), + task.file().keyMetadata())); + } + + @Override + public boolean hasNext() { + updateCurrentIterator(); + return currentIterator.hasNext(); + } + + @Override + public T next() { + updateCurrentIterator(); + return currentIterator.next(); + } + + /** + * Updates the current iterator field to ensure that the current Iterator + * is not exhausted. + */ + private void updateCurrentIterator() { + try { + while (!currentIterator.hasNext() && tasks.hasNext()) { + currentIterator.close(); + currentIterator = openTaskIterator(tasks.next()); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + abstract CloseableIterator openTaskIterator(FileScanTask scanTask) throws IOException; + + @Override + public void close() throws IOException { + // close the current iterator + this.currentIterator.close(); + + // exhaust the task iterator + while (tasks.hasNext()) { + tasks.next(); + } + } + + static Object convertConstant(Type type, Object value) { + if (value == null) { + return null; + } + + switch (type.typeId()) { + case DECIMAL: // DecimalData + Types.DecimalType decimal = (Types.DecimalType) type; + return DecimalData.fromBigDecimal((BigDecimal) value, decimal.precision(), decimal.scale()); + case STRING: // StringData + if (value instanceof Utf8) { + Utf8 utf8 = (Utf8) value; + return StringData.fromBytes(utf8.getBytes(), 0, utf8.getByteLength()); + } + return StringData.fromString(value.toString()); + case FIXED: // byte[] + if (value instanceof byte[]) { + return value; + } else if (value instanceof GenericData.Fixed) { + return ((GenericData.Fixed) value).bytes(); + } + return ByteBuffers.toByteArray((ByteBuffer) value); + case BINARY: // byte[] + return ByteBuffers.toByteArray((ByteBuffer) value); + case TIME: // int mills instead of long + return (int) ((Long) value / 1000); + case TIMESTAMP: // TimestampData + return TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromMicros((Long) value)); + default: + } + return value; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java new file mode 100644 index 000000000000..ebd7da4e72c4 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.table.api.TableColumn; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.flink.FlinkCatalogFactory; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.hadoop.SerializableConfiguration; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * Flink {@link InputFormat} for Iceberg. + */ +public class FlinkInputFormat extends RichInputFormat { + + private static final long serialVersionUID = 1L; + + private final TableLoader tableLoader; + private final Schema projectedSchema; + private final ScanOptions options; + private final List filterExpressions; + private final FileIO io; + private final EncryptionManager encryption; + private final SerializableConfiguration serializableConf; + + private transient RowDataIterator iterator; + + private FlinkInputFormat( + TableLoader tableLoader, Schema projectedSchema, FileIO io, EncryptionManager encryption, + List filterExpressions, ScanOptions options, SerializableConfiguration serializableConf) { + this.tableLoader = tableLoader; + this.projectedSchema = projectedSchema; + this.options = options; + this.filterExpressions = filterExpressions; + this.io = io; + this.encryption = encryption; + this.serializableConf = serializableConf; + } + + @VisibleForTesting + Schema projectedSchema() { + return projectedSchema; + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + // Legacy method, not be used. + return null; + } + + @Override + public FlinkInputSplit[] createInputSplits(int minNumSplits) throws IOException { + // Called in Job manager, so it is OK to load table from catalog. + tableLoader.open(serializableConf.get()); + try (TableLoader loader = tableLoader) { + Table table = loader.loadTable(); + FlinkSplitGenerator generator = new FlinkSplitGenerator(table, projectedSchema, options, filterExpressions); + return generator.createInputSplits(); + } + } + + @Override + public InputSplitAssigner getInputSplitAssigner(FlinkInputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + @Override + public void configure(Configuration parameters) { + } + + @Override + public void open(FlinkInputSplit split) { + this.iterator = new RowDataIterator(split.getTask(), io, encryption, projectedSchema, + options.getNameMapping(), options.isCaseSensitive()); + } + + @Override + public boolean reachedEnd() { + return !iterator.hasNext(); + } + + @Override + public RowData nextRecord(RowData reuse) { + return iterator.next(); + } + + @Override + public void close() throws IOException { + if (iterator != null) { + iterator.close(); + } + } + + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private Table table; + private TableLoader tableLoader; + private List selectedFields; + private TableSchema projectedSchema; + private ScanOptions options = ScanOptions.builder().build(); + private List filterExpressions; + private org.apache.hadoop.conf.Configuration hadoopConf; + + private Builder() { + } + + // -------------------------- Required options ------------------------------- + + public Builder tableLoader(TableLoader newLoader) { + this.tableLoader = newLoader; + return this; + } + + // -------------------------- Optional options ------------------------------- + + public Builder table(Table newTable) { + this.table = newTable; + return this; + } + + public Builder filters(List newFilters) { + this.filterExpressions = newFilters; + return this; + } + + public Builder project(TableSchema schema) { + this.projectedSchema = schema; + return this; + } + + public Builder select(String... fields) { + this.selectedFields = Lists.newArrayList(fields); + return this; + } + + public Builder select(List fields) { + this.selectedFields = fields; + return this; + } + + public Builder options(ScanOptions newOptions) { + this.options = newOptions; + return this; + } + + public Builder hadoopConf(org.apache.hadoop.conf.Configuration newConf) { + this.hadoopConf = newConf; + return this; + } + + public FlinkInputFormat build() { + Preconditions.checkNotNull(tableLoader, "TableLoader should not be null."); + + hadoopConf = hadoopConf == null ? FlinkCatalogFactory.clusterHadoopConf() : hadoopConf; + + Schema icebergSchema; + FileIO io; + EncryptionManager encryption; + if (table == null) { + // load required fields by table loader. + tableLoader.open(hadoopConf); + try (TableLoader loader = tableLoader) { + Table loadedTable = loader.loadTable(); + icebergSchema = loadedTable.schema(); + io = loadedTable.io(); + encryption = loadedTable.encryption(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } else { + icebergSchema = table.schema(); + io = table.io(); + encryption = table.encryption(); + } + + if (projectedSchema != null && selectedFields != null) { + throw new IllegalArgumentException( + "Cannot using both requestedSchema and projectedFields to project."); + } + + TableSchema flinkProjectedSchema = projectedSchema; + + if (selectedFields != null) { + TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)); + TableSchema.Builder builder = TableSchema.builder(); + for (String field : selectedFields) { + TableColumn column = tableSchema.getTableColumn(field).orElseThrow( + () -> new IllegalArgumentException(String.format("The field(%s) can not be found in the table schema: %s", + field, tableSchema))); + builder.field(column.getName(), column.getType()); + } + flinkProjectedSchema = builder.build(); + } + + Schema icebergProjectedSchema = icebergSchema; + if (flinkProjectedSchema != null) { + icebergProjectedSchema = FlinkSchemaUtil.convert(icebergSchema, flinkProjectedSchema); + } + + return new FlinkInputFormat(tableLoader, icebergProjectedSchema, io, encryption, filterExpressions, options, + new SerializableConfiguration(hadoopConf)); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java new file mode 100644 index 000000000000..fa61e987fbe8 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputSplit.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.LocatableInputSplit; +import org.apache.iceberg.CombinedScanTask; + +/** + * TODO Implement {@link LocatableInputSplit}. + */ +public class FlinkInputSplit implements InputSplit { + + private final int splitNumber; + private final CombinedScanTask task; + + FlinkInputSplit(int splitNumber, CombinedScanTask task) { + this.splitNumber = splitNumber; + this.task = task; + } + + @Override + public int getSplitNumber() { + return splitNumber; + } + + CombinedScanTask getTask() { + return task; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FlinkInputSplit that = (FlinkInputSplit) o; + return splitNumber == that.splitNumber; + } + + @Override + public int hashCode() { + return splitNumber; + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java new file mode 100644 index 000000000000..3502055d39fc --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitGenerator.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +class FlinkSplitGenerator { + + private final Table table; + private final Schema projectedSchema; + private final ScanOptions options; + private final List filterExpressions; + + FlinkSplitGenerator(Table table, Schema projectedSchema, ScanOptions options, List filterExpressions) { + this.table = table; + this.projectedSchema = projectedSchema; + this.options = options; + this.filterExpressions = filterExpressions; + } + + FlinkInputSplit[] createInputSplits() { + List tasks = tasks(); + FlinkInputSplit[] splits = new FlinkInputSplit[tasks.size()]; + for (int i = 0; i < tasks.size(); i++) { + splits[i] = new FlinkInputSplit(i, tasks.get(i)); + } + return splits; + } + + private List tasks() { + TableScan scan = table + .newScan() + .caseSensitive(options.isCaseSensitive()) + .project(projectedSchema); + + if (options.getSnapshotId() != null) { + scan = scan.useSnapshot(options.getSnapshotId()); + } + + if (options.getAsOfTimestamp() != null) { + scan = scan.asOfTime(options.getAsOfTimestamp()); + } + + if (options.getStartSnapshotId() != null) { + if (options.getEndSnapshotId() != null) { + scan = scan.appendsBetween(options.getStartSnapshotId(), options.getEndSnapshotId()); + } else { + scan = scan.appendsAfter(options.getStartSnapshotId()); + } + } + + if (options.getSplitSize() != null) { + scan = scan.option(TableProperties.SPLIT_SIZE, options.getSplitSize().toString()); + } + + if (options.getSplitLookback() != null) { + scan = scan.option(TableProperties.SPLIT_LOOKBACK, options.getSplitLookback().toString()); + } + + if (options.getSplitOpenFileCost() != null) { + scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, options.getSplitOpenFileCost().toString()); + } + + if (filterExpressions != null) { + for (Expression filter : filterExpressions) { + scan = scan.filter(filter); + } + } + + try (CloseableIterable tasksIterable = scan.planTasks()) { + return Lists.newArrayList(tasksIterable); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table scan: " + scan, e); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java new file mode 100644 index 000000000000..26c6b32502c5 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataIterator.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.util.Map; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.flink.data.FlinkAvroReader; +import org.apache.iceberg.flink.data.FlinkOrcReader; +import org.apache.iceberg.flink.data.FlinkParquetReaders; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.mapping.NameMappingParser; +import org.apache.iceberg.orc.ORC; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.PartitionUtil; + +class RowDataIterator extends DataIterator { + + private final Schema projectedSchema; + private final String nameMapping; + private final boolean caseSensitive; + + RowDataIterator(CombinedScanTask task, FileIO fileIo, EncryptionManager encryption, Schema projectedSchema, + String nameMapping, boolean caseSensitive) { + super(task, fileIo, encryption); + this.projectedSchema = projectedSchema; + this.nameMapping = nameMapping; + this.caseSensitive = caseSensitive; + } + + @Override + protected CloseableIterator openTaskIterator(FileScanTask task) { + Schema partitionSchema = TypeUtil.select(projectedSchema, task.spec().identitySourceIds()); + + Map idToConstant = partitionSchema.columns().isEmpty() ? ImmutableMap.of() : + PartitionUtil.constantsMap(task, RowDataIterator::convertConstant); + CloseableIterable iterable = newIterable(task, idToConstant); + return iterable.iterator(); + } + + private CloseableIterable newIterable(FileScanTask task, Map idToConstant) { + CloseableIterable iter; + if (task.isDataTask()) { + throw new UnsupportedOperationException("Cannot read data task."); + } else { + switch (task.file().format()) { + case PARQUET: + iter = newParquetIterable(task, idToConstant); + break; + + case AVRO: + iter = newAvroIterable(task, idToConstant); + break; + + case ORC: + iter = newOrcIterable(task, idToConstant); + break; + + default: + throw new UnsupportedOperationException( + "Cannot read unknown format: " + task.file().format()); + } + } + + return iter; + } + + private CloseableIterable newAvroIterable(FileScanTask task, Map idToConstant) { + Avro.ReadBuilder builder = Avro.read(getInputFile(task)) + .reuseContainers() + .project(projectedSchema) + .split(task.start(), task.length()) + .createReaderFunc(readSchema -> new FlinkAvroReader(projectedSchema, readSchema, idToConstant)); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } + + private CloseableIterable newParquetIterable(FileScanTask task, Map idToConstant) { + Parquet.ReadBuilder builder = Parquet.read(getInputFile(task)) + .split(task.start(), task.length()) + .project(projectedSchema) + .createReaderFunc(fileSchema -> FlinkParquetReaders.buildReader(projectedSchema, fileSchema, idToConstant)) + .filter(task.residual()) + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } + + private CloseableIterable newOrcIterable(FileScanTask task, Map idToConstant) { + Schema readSchemaWithoutConstantAndMetadataFields = TypeUtil.selectNot(projectedSchema, + Sets.union(idToConstant.keySet(), MetadataColumns.metadataFieldIds())); + + ORC.ReadBuilder builder = ORC.read(getInputFile(task)) + .project(readSchemaWithoutConstantAndMetadataFields) + .split(task.start(), task.length()) + .createReaderFunc(readOrcSchema -> new FlinkOrcReader(projectedSchema, readOrcSchema, idToConstant)) + .filter(task.residual()) + .caseSensitive(caseSensitive); + + if (nameMapping != null) { + builder.withNameMapping(NameMappingParser.fromJson(nameMapping)); + } + + return builder.build(); + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java new file mode 100644 index 000000000000..c13150232356 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.Serializable; +import java.util.Map; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; + +import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING; + +public class ScanOptions implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final ConfigOption SNAPSHOT_ID = + ConfigOptions.key("snapshot-id").longType().defaultValue(null); + + public static final ConfigOption CASE_SENSITIVE = + ConfigOptions.key("case-sensitive").booleanType().defaultValue(false); + + public static final ConfigOption AS_OF_TIMESTAMP = + ConfigOptions.key("as-of-timestamp").longType().defaultValue(null); + + public static final ConfigOption START_SNAPSHOT_ID = + ConfigOptions.key("start-snapshot-id").longType().defaultValue(null); + + public static final ConfigOption END_SNAPSHOT_ID = + ConfigOptions.key("end-snapshot-id").longType().defaultValue(null); + + public static final ConfigOption SPLIT_SIZE = + ConfigOptions.key("split-size").longType().defaultValue(null); + + public static final ConfigOption SPLIT_LOOKBACK = + ConfigOptions.key("split-lookback").intType().defaultValue(null); + + public static final ConfigOption SPLIT_FILE_OPEN_COST = + ConfigOptions.key("split-file-open-cost").longType().defaultValue(null); + + private final boolean caseSensitive; + private final Long snapshotId; + private final Long startSnapshotId; + private final Long endSnapshotId; + private final Long asOfTimestamp; + private final Long splitSize; + private final Integer splitLookback; + private final Long splitOpenFileCost; + private final String nameMapping; + + public ScanOptions(boolean caseSensitive, Long snapshotId, Long startSnapshotId, Long endSnapshotId, + Long asOfTimestamp, Long splitSize, Integer splitLookback, Long splitOpenFileCost, + String nameMapping) { + this.caseSensitive = caseSensitive; + this.snapshotId = snapshotId; + this.startSnapshotId = startSnapshotId; + this.endSnapshotId = endSnapshotId; + this.asOfTimestamp = asOfTimestamp; + this.splitSize = splitSize; + this.splitLookback = splitLookback; + this.splitOpenFileCost = splitOpenFileCost; + this.nameMapping = nameMapping; + } + + public boolean isCaseSensitive() { + return caseSensitive; + } + + public Long getSnapshotId() { + return snapshotId; + } + + public Long getStartSnapshotId() { + return startSnapshotId; + } + + public Long getEndSnapshotId() { + return endSnapshotId; + } + + public Long getAsOfTimestamp() { + return asOfTimestamp; + } + + public Long getSplitSize() { + return splitSize; + } + + public Integer getSplitLookback() { + return splitLookback; + } + + public Long getSplitOpenFileCost() { + return splitOpenFileCost; + } + + public String getNameMapping() { + return nameMapping; + } + + public static Builder builder() { + return new Builder(); + } + + public static ScanOptions of(Map options) { + return builder().options(options).build(); + } + + public static final class Builder { + private boolean caseSensitive = CASE_SENSITIVE.defaultValue(); + private Long snapshotId = SNAPSHOT_ID.defaultValue(); + private Long startSnapshotId = START_SNAPSHOT_ID.defaultValue(); + private Long endSnapshotId = END_SNAPSHOT_ID.defaultValue(); + private Long asOfTimestamp = AS_OF_TIMESTAMP.defaultValue(); + private Long splitSize = SPLIT_SIZE.defaultValue(); + private Integer splitLookback = SPLIT_LOOKBACK.defaultValue(); + private Long splitOpenFileCost = SPLIT_FILE_OPEN_COST.defaultValue(); + private String nameMapping; + + private Builder() { + } + + public Builder options(Map options) { + Configuration config = new Configuration(); + options.forEach(config::setString); + this.caseSensitive = config.get(CASE_SENSITIVE); + this.snapshotId = config.get(SNAPSHOT_ID); + this.asOfTimestamp = config.get(AS_OF_TIMESTAMP); + this.startSnapshotId = config.get(START_SNAPSHOT_ID); + this.endSnapshotId = config.get(END_SNAPSHOT_ID); + this.splitSize = config.get(SPLIT_SIZE); + this.splitLookback = config.get(SPLIT_LOOKBACK); + this.splitOpenFileCost = config.get(SPLIT_FILE_OPEN_COST); + this.nameMapping = options.get(DEFAULT_NAME_MAPPING); + return this; + } + + public Builder caseSensitive(boolean newCaseSensitive) { + this.caseSensitive = newCaseSensitive; + return this; + } + + public Builder snapshotId(Long newSnapshotId) { + this.snapshotId = newSnapshotId; + return this; + } + + public Builder startSnapshotId(Long newStartSnapshotId) { + this.startSnapshotId = newStartSnapshotId; + return this; + } + + public Builder endSnapshotId(Long newEndSnapshotId) { + this.endSnapshotId = newEndSnapshotId; + return this; + } + + public Builder asOfTimestamp(Long newAsOfTimestamp) { + this.asOfTimestamp = newAsOfTimestamp; + return this; + } + + public Builder splitSize(Long newSplitSize) { + this.splitSize = newSplitSize; + return this; + } + + public Builder splitLookback(Integer newSplitLookback) { + this.splitLookback = newSplitLookback; + return this; + } + + public Builder splitOpenFileCost(Long newSplitOpenFileCost) { + this.splitOpenFileCost = newSplitOpenFileCost; + return this; + } + + public Builder nameMapping(String newNameMapping) { + this.nameMapping = newNameMapping; + return this; + } + + public ScanOptions build() { + if (snapshotId != null && asOfTimestamp != null) { + throw new IllegalArgumentException( + "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); + } + + if (snapshotId != null || asOfTimestamp != null) { + if (startSnapshotId != null || endSnapshotId != null) { + throw new IllegalArgumentException( + "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id" + + " or as-of-timestamp is specified"); + } + } else { + if (startSnapshotId == null && endSnapshotId != null) { + throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan"); + } + } + return new ScanOptions(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, + splitLookback, splitOpenFileCost, nameMapping); + } + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java index d86469a9115a..987e070f78b6 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java +++ b/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java @@ -61,7 +61,7 @@ protected void writeAndValidate(Schema schema) throws IOException { try (CloseableIterable reader = ORC.read(Files.localInput(recordsFile)) .project(schema) - .createReaderFunc(type -> FlinkOrcReader.buildReader(schema, type)) + .createReaderFunc(type -> new FlinkOrcReader(schema, type)) .build()) { Iterator expected = expectedRecords.iterator(); Iterator rows = reader.iterator(); diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java new file mode 100644 index 000000000000..a145c5033eaf --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.util.List; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.types.Row; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.TableLoader; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * Test {@link FlinkInputFormat}. + */ +public class TestFlinkInputFormat extends TestFlinkScan { + + private FlinkInputFormat.Builder builder; + + public TestFlinkInputFormat(String fileFormat) { + super(fileFormat); + } + + @Override + public void before() throws IOException { + super.before(); + builder = FlinkInputFormat.builder().tableLoader(TableLoader.fromHadoopTable(warehouse + "/default/t")); + } + + @Override + protected List executeWithOptions( + Table table, List projectFields, CatalogLoader loader, Long snapshotId, Long startSnapshotId, + Long endSnapshotId, Long asOfTimestamp, List filters, String sqlFilter) throws IOException { + ScanOptions options = ScanOptions.builder().snapshotId(snapshotId).startSnapshotId(startSnapshotId) + .endSnapshotId(endSnapshotId).asOfTimestamp(asOfTimestamp).build(); + if (loader != null) { + builder.tableLoader(TableLoader.fromCatalog(loader, TableIdentifier.of("default", "t"))); + } + + return run(builder.select(projectFields).filters(filters).options(options).build()); + } + + @Override + protected void assertResiduals( + Schema shcema, List results, List writeRecords, List filteredRecords) { + // can not filter the data. + assertRecords(results, writeRecords, shcema); + } + + @Override + protected void assertNestedProjection(Table table, List records) throws IOException { + TableSchema projectedSchema = TableSchema.builder() + .field("nested", DataTypes.ROW(DataTypes.FIELD("f2", DataTypes.STRING()))) + .field("data", DataTypes.STRING()) + .build(); + List result = run(builder.project(projectedSchema).build()); + + List expected = Lists.newArrayList(); + for (Record record : records) { + Row nested = Row.of(((Record) record.get(1)).get(1)); + expected.add(Row.of(nested, record.get(0))); + } + + assertRows(result, expected); + } + + private List run(FlinkInputFormat inputFormat) throws IOException { + FlinkInputSplit[] splits = inputFormat.createInputSplits(0); + List results = Lists.newArrayList(); + + RowType rowType = FlinkSchemaUtil.convert(inputFormat.projectedSchema()); + + DataStructureConverter converter = DataStructureConverters.getConverter( + TypeConversions.fromLogicalToDataType(rowType)); + + for (FlinkInputSplit s : splits) { + inputFormat.open(s); + while (!inputFormat.reachedEnd()) { + RowData row = inputFormat.nextRecord(null); + results.add((Row) converter.toExternal(row)); + } + } + inputFormat.close(); + return results; + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java new file mode 100644 index 000000000000..6032339713f4 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -0,0 +1,408 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.File; +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Locale; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.types.utils.TypeConversions; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TestHelpers; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.hadoop.HadoopCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.iceberg.types.Types.NestedField.required; + +@RunWith(Parameterized.class) +public abstract class TestFlinkScan extends AbstractTestBase { + + private static final Schema SCHEMA = new Schema( + required(1, "data", Types.StringType.get()), + required(2, "id", Types.LongType.get()), + required(3, "dt", Types.StringType.get())); + + private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA) + .identity("dt") + .bucket("id", 1) + .build(); + + // before variables + private Configuration conf; + String warehouse; + private HadoopCatalog catalog; + + // parametrized variables + private final FileFormat fileFormat; + + @Parameterized.Parameters(name = "format={0}") + public static Object[] parameters() { + return new Object[] {"avro", "parquet", "orc"}; + } + + TestFlinkScan(String fileFormat) { + this.fileFormat = FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH)); + } + + @Before + public void before() throws IOException { + File warehouseFile = TEMPORARY_FOLDER.newFolder(); + Assert.assertTrue(warehouseFile.delete()); + conf = new Configuration(); + warehouse = "file:" + warehouseFile; + catalog = new HadoopCatalog(conf, warehouse); + } + + private List execute(Table table) throws IOException { + return executeWithOptions(table, null, null, null, null, null, null, null, null); + } + + private List execute(Table table, List projectFields) throws IOException { + return executeWithOptions(table, projectFields, null, null, null, null, null, null, null); + } + + protected abstract List executeWithOptions( + Table table, List projectFields, CatalogLoader loader, Long snapshotId, + Long startSnapshotId, Long endSnapshotId, Long asOfTimestamp, List filters, String sqlFilter) + throws IOException; + + /** + * The Flink SQL has no residuals, because there will be operator to filter all the data that should be filtered. + * But the FlinkInputFormat can't. + */ + protected abstract void assertResiduals(Schema schema, List results, List writeRecords, + List filteredRecords) throws IOException; + + /** + * Schema: [data, nested[f1, f2, f3], id] + * Projection: [nested.f2, data] + * The Flink SQL output: [f2, data] + * The FlinkInputFormat output: [nested[f2], data]. + */ + protected abstract void assertNestedProjection(Table table, List records) throws IOException; + + @Test + public void testUnpartitionedTable() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); + List expectedRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(expectedRecords); + assertRecords(execute(table), expectedRecords, SCHEMA); + } + + @Test + public void testPartitionedTable() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + List expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( + org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); + assertRecords(execute(table), expectedRecords, SCHEMA); + } + + @Test + public void testProjection() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + List inputRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( + org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), inputRecords); + assertRows(execute(table, Collections.singletonList("data")), Row.of(inputRecords.get(0).get(0))); + } + + @Test + public void testIdentityPartitionProjections() throws Exception { + Schema logSchema = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "level", Types.StringType.get()), + Types.NestedField.optional(4, "message", Types.StringType.get()) + ); + PartitionSpec spec = + PartitionSpec.builderFor(logSchema).identity("dt").identity("level").build(); + + Table table = catalog.createTable(TableIdentifier.of("default", "t"), logSchema, spec); + List inputRecords = RandomGenericData.generate(logSchema, 10, 0L); + + int idx = 0; + AppendFiles append = table.newAppend(); + for (Record record : inputRecords) { + record.set(1, "2020-03-2" + idx); + record.set(2, Integer.toString(idx)); + append.appendFile(new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).writeFile( + org.apache.iceberg.TestHelpers.Row.of("2020-03-2" + idx, Integer.toString(idx)), ImmutableList.of(record))); + idx += 1; + } + append.commit(); + + // individual fields + validateIdentityPartitionProjections(table, Collections.singletonList("dt"), inputRecords); + validateIdentityPartitionProjections(table, Collections.singletonList("level"), inputRecords); + validateIdentityPartitionProjections(table, Collections.singletonList("message"), inputRecords); + validateIdentityPartitionProjections(table, Collections.singletonList("id"), inputRecords); + // field pairs + validateIdentityPartitionProjections(table, Arrays.asList("dt", "message"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("level", "message"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("dt", "level"), inputRecords); + // out-of-order pairs + validateIdentityPartitionProjections(table, Arrays.asList("message", "dt"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("message", "level"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("level", "dt"), inputRecords); + // out-of-order triplets + validateIdentityPartitionProjections(table, Arrays.asList("dt", "level", "message"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("level", "dt", "message"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("dt", "message", "level"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("level", "message", "dt"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("message", "dt", "level"), inputRecords); + validateIdentityPartitionProjections(table, Arrays.asList("message", "level", "dt"), inputRecords); + } + + private void validateIdentityPartitionProjections(Table table, List projectedFields, + List inputRecords) throws IOException { + List rows = execute(table, projectedFields); + + for (int pos = 0; pos < inputRecords.size(); pos++) { + Record inputRecord = inputRecords.get(pos); + Row actualRecord = rows.get(pos); + + for (int i = 0; i < projectedFields.size(); i++) { + String name = projectedFields.get(i); + Assert.assertEquals( + "Projected field " + name + " should match", inputRecord.getField(name), actualRecord.getField(i)); + } + } + } + + @Test + public void testSnapshotReads() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + helper.appendToTable(expectedRecords); + long snapshotId = table.currentSnapshot().snapshotId(); + + long timestampMillis = table.currentSnapshot().timestampMillis(); + + // produce another timestamp + Thread.sleep(10); + helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L)); + + assertRecords(executeWithOptions(table, null, null, snapshotId, null, null, null, null, null), expectedRecords, + SCHEMA); + assertRecords(executeWithOptions(table, null, null, null, null, null, timestampMillis, null, null), + expectedRecords, SCHEMA); + } + + @Test + public void testIncrementalRead() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List records1 = RandomGenericData.generate(SCHEMA, 1, 0L); + helper.appendToTable(records1); + long snapshotId1 = table.currentSnapshot().snapshotId(); + + // snapshot 2 + List records2 = RandomGenericData.generate(SCHEMA, 1, 0L); + helper.appendToTable(records2); + + List records3 = RandomGenericData.generate(SCHEMA, 1, 0L); + helper.appendToTable(records3); + long snapshotId3 = table.currentSnapshot().snapshotId(); + + // snapshot 4 + List records4 = RandomGenericData.generate(SCHEMA, 1, 0L); + helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L)); + + List expected1 = Lists.newArrayList(); + expected1.addAll(records2); + expected1.addAll(records3); + expected1.addAll(records4); + assertRecords(executeWithOptions(table, null, null, null, snapshotId1, null, null, null, null), expected1, SCHEMA); + + List expected2 = Lists.newArrayList(); + expected2.addAll(records2); + expected2.addAll(records3); + assertRecords(executeWithOptions(table, null, null, null, snapshotId1, snapshotId3, null, null, null), expected2, + SCHEMA); + } + + @Test + public void testCustomCatalog() throws IOException { + String newWarehouse = TEMPORARY_FOLDER.newFolder().toURI().toString(); + Table table = new HadoopCatalog(conf, newWarehouse).createTable(TableIdentifier.of("default", "t"), SCHEMA); + + List expectedRecords = RandomGenericData.generate(SCHEMA, 1, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( + org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); + + List rows = executeWithOptions(table, null, CatalogLoader.hadoop("new_catalog", newWarehouse), null, null, + null, null, null, null); + assertRecords(rows, expectedRecords, SCHEMA); + } + + @Test + public void testFilterExp() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + + List expectedRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + expectedRecords.get(0).set(2, "2020-03-20"); + expectedRecords.get(1).set(2, "2020-03-20"); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), expectedRecords); + DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0), + RandomGenericData.generate(SCHEMA, 2, 0L)); + helper.appendToTable(dataFile1, dataFile2); + List filters = Collections.singletonList(Expressions.equal("dt", "2020-03-20")); + assertRecords(executeWithOptions(table, null, null, null, null, null, null, filters, "dt='2020-03-20'"), + expectedRecords, SCHEMA); + } + + @Test + public void testResiduals() throws Exception { + Table table = catalog.createTable(TableIdentifier.of("default", "t"), SCHEMA, SPEC); + + List writeRecords = RandomGenericData.generate(SCHEMA, 2, 0L); + writeRecords.get(0).set(1, 123L); + writeRecords.get(0).set(2, "2020-03-20"); + writeRecords.get(1).set(1, 456L); + writeRecords.get(1).set(2, "2020-03-20"); + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.add(writeRecords.get(0)); + + DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), writeRecords); + DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0), + RandomGenericData.generate(SCHEMA, 2, 0L)); + helper.appendToTable(dataFile1, dataFile2); + + List filters = Arrays.asList(Expressions.equal("dt", "2020-03-20"), Expressions.equal("id", 123)); + assertResiduals( + SCHEMA, executeWithOptions(table, null, null, null, null, null, null, filters, "dt='2020-03-20' and id=123"), + writeRecords, expectedRecords); + } + + @Test + public void testPartitionTypes() throws Exception { + Schema typesSchema = new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "decimal", Types.DecimalType.of(38, 18)), + Types.NestedField.optional(3, "str", Types.StringType.get()), + Types.NestedField.optional(4, "binary", Types.BinaryType.get()), + Types.NestedField.optional(5, "date", Types.DateType.get()), + Types.NestedField.optional(6, "time", Types.TimeType.get()), + Types.NestedField.optional(7, "timestamp", Types.TimestampType.withoutZone()) + ); + PartitionSpec spec = PartitionSpec.builderFor(typesSchema).identity("decimal").identity("str").identity("binary") + .identity("date").identity("time").identity("timestamp").build(); + + Table table = catalog.createTable(TableIdentifier.of("default", "t"), typesSchema, spec); + List records = RandomGenericData.generate(typesSchema, 10, 0L); + GenericAppenderHelper appender = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + for (Record record : records) { + TestHelpers.Row partition = TestHelpers.Row.of( + record.get(1), + record.get(2), + record.get(3), + record.get(4) == null ? null : DateTimeUtil.daysFromDate((LocalDate) record.get(4)), + record.get(5) == null ? null : DateTimeUtil.microsFromTime((LocalTime) record.get(5)), + record.get(6) == null ? null : DateTimeUtil.microsFromTimestamp((LocalDateTime) record.get(6))); + appender.appendToTable(partition, Collections.singletonList(record)); + } + + assertRecords(execute(table), records, typesSchema); + } + + @Test + public void testNestedProjection() throws Exception { + Schema schema = new Schema( + required(1, "data", Types.StringType.get()), + required(2, "nested", Types.StructType.of( + Types.NestedField.required(3, "f1", Types.StringType.get()), + Types.NestedField.required(4, "f2", Types.StringType.get()), + Types.NestedField.required(5, "f3", Types.LongType.get()))), + required(6, "id", Types.LongType.get())); + + Table table = catalog.createTable(TableIdentifier.of("default", "t"), schema); + + List writeRecords = RandomGenericData.generate(schema, 2, 0L); + new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable(writeRecords); + + assertNestedProjection(table, writeRecords); + } + + static void assertRecords(List results, List expectedRecords, Schema schema) { + List expected = Lists.newArrayList(); + @SuppressWarnings("unchecked") + DataStructureConverter converter = (DataStructureConverter) DataStructureConverters.getConverter( + TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema))); + expectedRecords.forEach(r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r)))); + assertRows(results, expected); + } + + private static void assertRows(List results, Row... expected) { + assertRows(results, Arrays.asList(expected)); + } + + static void assertRows(List results, List expected) { + expected.sort(Comparator.comparing(Row::toString)); + results.sort(Comparator.comparing(Row::toString)); + Assert.assertEquals(expected, results); + } +} From dfb682500bdded7669982c310453aebd603d9da7 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Mon, 17 Aug 2020 13:51:07 +0800 Subject: [PATCH 2/6] Flink: Integrate Flink input format to FlinkCatalog --- .../apache/iceberg/flink/FlinkCatalog.java | 35 +++-- .../iceberg/flink/FlinkTableFactory.java | 75 ++++++++++ .../apache/iceberg/flink/TypeToFlinkType.java | 4 +- .../flink/source/FlinkTableSource.java | 134 ++++++++++++++++++ .../iceberg/flink/source/TestFlinkScan.java | 2 +- .../flink/source/TestFlinkScanSql.java | 123 ++++++++++++++++ 6 files changed, 362 insertions(+), 11 deletions(-) create mode 100644 flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java create mode 100644 flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java create mode 100644 flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 4182964a8679..348166b796a8 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.flink.table.api.TableSchema; @@ -46,6 +47,7 @@ import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.TableFactory; import org.apache.flink.util.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CachingCatalog; @@ -119,7 +121,7 @@ private Namespace toNamespace(String database) { return Namespace.of(namespace); } - private TableIdentifier toIdentifier(ObjectPath path) { + public TableIdentifier toIdentifier(ObjectPath path) { return TableIdentifier.of(toNamespace(path.getDatabaseName()), path.getObjectName()); } @@ -278,14 +280,18 @@ public List listTables(String databaseName) throws DatabaseNotExistExcep @Override public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { - try { - Table table = icebergCatalog.loadTable(toIdentifier(tablePath)); - TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema())); + Table table = getIcebergTable(tablePath); + TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(table.schema())); + + // NOTE: We can not create a IcebergCatalogTable, because Flink optimizer may use CatalogTableImpl to copy a new + // catalog table. + // Let's re-loading table from Iceberg catalog when creating source/sink operators. + return new CatalogTableImpl(tableSchema, table.properties(), null); + } - // NOTE: We can not create a IcebergCatalogTable, because Flink optimizer may use CatalogTableImpl to copy a new - // catalog table. - // Let's re-loading table from Iceberg catalog when creating source/sink operators. - return new CatalogTableImpl(tableSchema, table.properties(), null); + Table getIcebergTable(ObjectPath tablePath) throws TableNotExistException { + try { + return icebergCatalog.loadTable(toIdentifier(tablePath)); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { throw new TableNotExistException(getName(), tablePath, e); } @@ -335,6 +341,19 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean throw new UnsupportedOperationException("Not support alterTable now."); } + CatalogLoader getCatalogLoader() { + return catalogLoader; + } + + Configuration getHadoopConf() { + return hadoopConf; + } + + @Override + public Optional getTableFactory() { + return Optional.of(new FlinkTableFactory(this)); + } + // ------------------------------ Unsupported methods --------------------------------------------- @Override diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java new file mode 100644 index 000000000000..30d1c5848e2b --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink; + +import java.util.List; +import java.util.Map; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.TableSourceFactory; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.utils.TableSchemaUtils; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.source.FlinkTableSource; + +/** + * Flink Iceberg table factory to create table source and sink. + * Only works for catalog, can not be loaded from Java SPI(Service Provider Interface). + */ +class FlinkTableFactory implements TableSourceFactory { + + private final FlinkCatalog catalog; + + FlinkTableFactory(FlinkCatalog catalog) { + this.catalog = catalog; + } + + @Override + public Map requiredContext() { + throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI"); + } + + @Override + public List supportedProperties() { + throw new UnsupportedOperationException("Iceberg Table Factory can not be loaded from Java SPI"); + } + + @Override + public TableSource createTableSource(Context context) { + ObjectIdentifier identifier = context.getObjectIdentifier(); + ObjectPath objectPath = new ObjectPath(identifier.getDatabaseName(), identifier.getObjectName()); + TableIdentifier icebergIdentifier = catalog.toIdentifier(objectPath); + try { + Table table = catalog.getIcebergTable(objectPath); + // Excludes computed columns + TableSchema icebergSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); + return new FlinkTableSource( + icebergIdentifier, table, catalog.getCatalogLoader(), catalog.getHadoopConf(), icebergSchema, + context.getTable().getOptions()); + } catch (TableNotExistException e) { + throw new ValidationException(String.format("Iceberg Table(%s) not exist.", icebergIdentifier), e); + } + } +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java b/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java index dfd8ffb9668c..e481d60c8565 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java +++ b/flink/src/main/java/org/apache/iceberg/flink/TypeToFlinkType.java @@ -100,8 +100,8 @@ public LogicalType primitive(Type.PrimitiveType primitive) { case DATE: return new DateType(); case TIME: - // MICROS - return new TimeType(6); + // Flink only support TimeType with default precision now. + return new TimeType(); case TIMESTAMP: Types.TimestampType timestamp = (Types.TimestampType) primitive; if (timestamp.shouldAdjustToUTC()) { diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java new file mode 100644 index 000000000000..c9556f452e01 --- /dev/null +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.sources.FilterableTableSource; +import org.apache.flink.table.sources.LimitableTableSource; +import org.apache.flink.table.sources.ProjectableTableSource; +import org.apache.flink.table.sources.StreamTableSource; +import org.apache.flink.table.sources.TableSource; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.TableConnectorUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.TableLoader; + +/** + * Flink Iceberg table source. + * TODO: Implement {@link FilterableTableSource} and {@link LimitableTableSource}. + */ +public class FlinkTableSource implements StreamTableSource, ProjectableTableSource { + + private final TableIdentifier identifier; + private final Table table; + private final CatalogLoader catalogLoader; + private final Configuration hadoopConf; + private final TableSchema schema; + private final Map options; + private final int[] projectedFields; + + public FlinkTableSource( + TableIdentifier identifier, Table table, CatalogLoader catalogLoader, Configuration hadoopConf, + TableSchema schema, Map options) { + this(identifier, table, catalogLoader, hadoopConf, schema, options, null); + } + + private FlinkTableSource( + TableIdentifier identifier, Table table, CatalogLoader catalogLoader, Configuration hadoopConf, + TableSchema schema, Map options, int[] projectedFields) { + this.identifier = identifier; + this.table = table; + this.catalogLoader = catalogLoader; + this.hadoopConf = hadoopConf; + this.schema = schema; + this.options = options; + this.projectedFields = projectedFields; + } + + @Override + public boolean isBounded() { + return true; + } + + @Override + public TableSource projectFields(int[] fields) { + return new FlinkTableSource(identifier, table, catalogLoader, hadoopConf, schema, options, fields); + } + + @Override + public DataStream getDataStream(StreamExecutionEnvironment execEnv) { + Schema icebergSchema = table.schema(); + List projectNames = null; + if (projectedFields != null) { + projectNames = Arrays.stream(projectedFields) + .mapToObj(project -> icebergSchema.asStruct().fields().get(project).name()) + .collect(Collectors.toList()); + } + FlinkInputFormat inputFormat = FlinkInputFormat.builder().table(table) + .tableLoader(TableLoader.fromCatalog(catalogLoader, identifier)).hadoopConf(hadoopConf) + .select(projectNames).options(ScanOptions.of(options)).build(); + return execEnv.createInput(inputFormat, RowDataTypeInfo.of((RowType) getProducedDataType().getLogicalType())); + } + + @Override + public TableSchema getTableSchema() { + return schema; + } + + @Override + public DataType getProducedDataType() { + return getProjectedSchema().toRowDataType().bridgedTo(RowData.class); + } + + private TableSchema getProjectedSchema() { + TableSchema fullSchema = getTableSchema(); + if (projectedFields == null) { + return fullSchema; + } else { + String[] fullNames = fullSchema.getFieldNames(); + DataType[] fullTypes = fullSchema.getFieldDataTypes(); + return TableSchema.builder().fields( + Arrays.stream(projectedFields).mapToObj(i -> fullNames[i]).toArray(String[]::new), + Arrays.stream(projectedFields).mapToObj(i -> fullTypes[i]).toArray(DataType[]::new)).build(); + } + } + + @Override + public String explainSource() { + String explain = "Iceberg table: " + identifier; + if (projectedFields != null) { + explain += ", ProjectedFields: " + Arrays.toString(projectedFields); + } + return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) + explain; + } +} diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java index 6032339713f4..1cb710fc51f1 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -79,7 +79,7 @@ public abstract class TestFlinkScan extends AbstractTestBase { .build(); // before variables - private Configuration conf; + Configuration conf; String warehouse; private HadoopCatalog catalog; diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java new file mode 100644 index 000000000000..f16cf4043483 --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.types.Row; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Test Flink SELECT SQLs. + */ +public class TestFlinkScanSql extends TestFlinkScan { + + private TableEnvironment tEnv; + + public TestFlinkScanSql(String fileFormat) { + super(fileFormat); + } + + @Override + public void before() throws IOException { + super.before(); + tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); + tEnv.executeSql(String.format("create catalog iceberg_catalog with (" + + "'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", warehouse)); + tEnv.executeSql("use catalog iceberg_catalog"); + tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); + } + + @Override + protected List executeWithOptions( + Table table, List projectFields, CatalogLoader loader, Long snapshotId, Long startSnapshotId, + Long endSnapshotId, Long asOfTimestamp, List filters, String sqlFilter) { + String fields = projectFields == null ? "*" : String.join(",", projectFields); + + if (loader != null) { + tEnv.registerCatalog( + "new_catalog", new FlinkCatalog("new_catalog", "default", new String[0], loader, conf, true)); + tEnv.executeSql("use catalog new_catalog"); + } + + Map optionMap = Maps.newHashMap(); + if (snapshotId != null) { + optionMap.put(ScanOptions.SNAPSHOT_ID.key(), snapshotId.toString()); + } + if (startSnapshotId != null) { + optionMap.put(ScanOptions.START_SNAPSHOT_ID.key(), startSnapshotId.toString()); + } + if (endSnapshotId != null) { + optionMap.put(ScanOptions.END_SNAPSHOT_ID.key(), endSnapshotId.toString()); + } + if (asOfTimestamp != null) { + optionMap.put(ScanOptions.AS_OF_TIMESTAMP.key(), asOfTimestamp.toString()); + } + + StringBuilder builder = new StringBuilder(); + optionMap.forEach((key, value) -> + builder.append("'").append(key).append("'").append("=").append("'").append(value).append("'").append(",")); + String options = builder.toString(); + if (options.endsWith(",")) { + options = options.substring(0, options.length() - 1); + } + if (!options.isEmpty()) { + options = String.format("/*+ OPTIONS(%s)*/", options); + } + + String filter = ""; + if (sqlFilter != null) { + filter = " where " + sqlFilter; + } + + String sql = String.format("select %s from t %s %s", fields, options, filter); + return Lists.newArrayList(tEnv.executeSql(sql).collect()); + } + + @Override + protected void assertResiduals(Schema schema, List results, List writeRecords, + List filteredRecords) { + // should filter the data. + assertRecords(results, filteredRecords, schema); + } + + @Override + protected void assertNestedProjection(Table table, List records) { + List result = Lists.newArrayList(tEnv.executeSql("select nested.f2,data from t").collect()); + + List expected = Lists.newArrayList(); + for (Record record : records) { + expected.add(Row.of(((Record) record.get(1)).get(1), record.get(0))); + } + + assertRows(result, expected); + } +} From 1194d43e73ad82ce6a575d500312e0562a9ee310 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Mon, 31 Aug 2020 12:12:05 +0800 Subject: [PATCH 3/6] Fix TestFlinkSchemaUtil case --- .../test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java index 320d3bb30861..3272ed65d35d 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java +++ b/flink/src/test/java/org/apache/iceberg/flink/TestFlinkSchemaUtil.java @@ -252,7 +252,7 @@ public void testInconsistentTypes() { Types.BinaryType.get(), new VarBinaryType(VarBinaryType.MAX_LENGTH), new VarBinaryType(100), Types.BinaryType.get()); checkInconsistentType( - Types.TimeType.get(), new TimeType(6), + Types.TimeType.get(), new TimeType(), new TimeType(3), Types.TimeType.get()); checkInconsistentType( Types.TimestampType.withoutZone(), new TimestampType(6), From c826b03fdfd9274fd5dd292e6ff912202846fe55 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Mon, 31 Aug 2020 17:44:14 +0800 Subject: [PATCH 4/6] Address comments --- .../iceberg/flink/source/FlinkTableSource.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java index c9556f452e01..9029e69e39d8 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java @@ -54,24 +54,24 @@ public class FlinkTableSource implements StreamTableSource, Projectable private final CatalogLoader catalogLoader; private final Configuration hadoopConf; private final TableSchema schema; - private final Map options; + private final Map scanOptions; private final int[] projectedFields; public FlinkTableSource( TableIdentifier identifier, Table table, CatalogLoader catalogLoader, Configuration hadoopConf, - TableSchema schema, Map options) { - this(identifier, table, catalogLoader, hadoopConf, schema, options, null); + TableSchema schema, Map scanOptions) { + this(identifier, table, catalogLoader, hadoopConf, schema, scanOptions, null); } private FlinkTableSource( TableIdentifier identifier, Table table, CatalogLoader catalogLoader, Configuration hadoopConf, - TableSchema schema, Map options, int[] projectedFields) { + TableSchema schema, Map scanOptions, int[] projectedFields) { this.identifier = identifier; this.table = table; this.catalogLoader = catalogLoader; this.hadoopConf = hadoopConf; this.schema = schema; - this.options = options; + this.scanOptions = scanOptions; this.projectedFields = projectedFields; } @@ -82,7 +82,7 @@ public boolean isBounded() { @Override public TableSource projectFields(int[] fields) { - return new FlinkTableSource(identifier, table, catalogLoader, hadoopConf, schema, options, fields); + return new FlinkTableSource(identifier, table, catalogLoader, hadoopConf, schema, scanOptions, fields); } @Override @@ -95,8 +95,9 @@ public DataStream getDataStream(StreamExecutionEnvironment execEnv) { .collect(Collectors.toList()); } FlinkInputFormat inputFormat = FlinkInputFormat.builder().table(table) - .tableLoader(TableLoader.fromCatalog(catalogLoader, identifier)).hadoopConf(hadoopConf) - .select(projectNames).options(ScanOptions.of(options)).build(); + .tableLoader(TableLoader.fromCatalog(catalogLoader, identifier)) + .hadoopConf(hadoopConf).select(projectNames) + .options(ScanOptions.of(scanOptions)).build(); return execEnv.createInput(inputFormat, RowDataTypeInfo.of((RowType) getProducedDataType().getLogicalType())); } From 9bd4248212e4c557c8691d974ee2a2e485c93154 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 1 Sep 2020 15:08:51 +0800 Subject: [PATCH 5/6] Minor fix to source --- .../iceberg/flink/FlinkTableFactory.java | 22 +++++++--------- .../flink/source/FlinkTableSource.java | 26 +++++++------------ 2 files changed, 19 insertions(+), 29 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java index 30d1c5848e2b..42ad2b7d627c 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java @@ -23,15 +23,12 @@ import java.util.Map; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.TableSourceFactory; import org.apache.flink.table.sources.TableSource; import org.apache.flink.table.utils.TableSchemaUtils; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.source.FlinkTableSource; /** @@ -58,18 +55,17 @@ public List supportedProperties() { @Override public TableSource createTableSource(Context context) { - ObjectIdentifier identifier = context.getObjectIdentifier(); - ObjectPath objectPath = new ObjectPath(identifier.getDatabaseName(), identifier.getObjectName()); - TableIdentifier icebergIdentifier = catalog.toIdentifier(objectPath); + ObjectPath objectPath = context.getObjectIdentifier().toObjectPath(); + TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); try { - Table table = catalog.getIcebergTable(objectPath); - // Excludes computed columns - TableSchema icebergSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); - return new FlinkTableSource( - icebergIdentifier, table, catalog.getCatalogLoader(), catalog.getHadoopConf(), icebergSchema, - context.getTable().getOptions()); + return new FlinkTableSource(catalog.getIcebergTable(objectPath), createTableLoader(objectPath), + catalog.getHadoopConf(), tableSchema, context.getTable().getOptions()); } catch (TableNotExistException e) { - throw new ValidationException(String.format("Iceberg Table(%s) not exist.", icebergIdentifier), e); + throw new ValidationException(String.format("Iceberg Table(%s) not exist.", objectPath), e); } } + + private TableLoader createTableLoader(ObjectPath objectPath) { + return TableLoader.fromCatalog(catalog.getCatalogLoader(), catalog.toIdentifier(objectPath)); + } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java index 9029e69e39d8..5cf78dc558c9 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java @@ -39,8 +39,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.TableLoader; /** @@ -49,26 +47,22 @@ */ public class FlinkTableSource implements StreamTableSource, ProjectableTableSource { - private final TableIdentifier identifier; private final Table table; - private final CatalogLoader catalogLoader; + private final TableLoader loader; private final Configuration hadoopConf; private final TableSchema schema; private final Map scanOptions; private final int[] projectedFields; - public FlinkTableSource( - TableIdentifier identifier, Table table, CatalogLoader catalogLoader, Configuration hadoopConf, - TableSchema schema, Map scanOptions) { - this(identifier, table, catalogLoader, hadoopConf, schema, scanOptions, null); + public FlinkTableSource(Table table, TableLoader loader, Configuration hadoopConf, TableSchema schema, + Map scanOptions) { + this(table, loader, hadoopConf, schema, scanOptions, null); } - private FlinkTableSource( - TableIdentifier identifier, Table table, CatalogLoader catalogLoader, Configuration hadoopConf, - TableSchema schema, Map scanOptions, int[] projectedFields) { - this.identifier = identifier; + private FlinkTableSource(Table table, TableLoader loader, Configuration hadoopConf, TableSchema schema, + Map scanOptions, int[] projectedFields) { this.table = table; - this.catalogLoader = catalogLoader; + this.loader = loader; this.hadoopConf = hadoopConf; this.schema = schema; this.scanOptions = scanOptions; @@ -82,7 +76,7 @@ public boolean isBounded() { @Override public TableSource projectFields(int[] fields) { - return new FlinkTableSource(identifier, table, catalogLoader, hadoopConf, schema, scanOptions, fields); + return new FlinkTableSource(table, loader, hadoopConf, schema, scanOptions, fields); } @Override @@ -95,7 +89,7 @@ public DataStream getDataStream(StreamExecutionEnvironment execEnv) { .collect(Collectors.toList()); } FlinkInputFormat inputFormat = FlinkInputFormat.builder().table(table) - .tableLoader(TableLoader.fromCatalog(catalogLoader, identifier)) + .tableLoader(loader) .hadoopConf(hadoopConf).select(projectNames) .options(ScanOptions.of(scanOptions)).build(); return execEnv.createInput(inputFormat, RowDataTypeInfo.of((RowType) getProducedDataType().getLogicalType())); @@ -126,7 +120,7 @@ private TableSchema getProjectedSchema() { @Override public String explainSource() { - String explain = "Iceberg table: " + identifier; + String explain = "Iceberg table: " + table.toString(); if (projectedFields != null) { explain += ", ProjectedFields: " + Arrays.toString(projectedFields); } From 02bc0249d5e2030d2202321097f9290302bdce3c Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 1 Sep 2020 16:11:25 +0800 Subject: [PATCH 6/6] checkstyles --- .../iceberg/flink/FlinkTableFactory.java | 5 +++-- .../flink/source/FlinkInputFormat.java | 11 +++++----- .../flink/source/FlinkTableSource.java | 10 ++++----- .../iceberg/flink/source/ScanOptions.java | 2 +- .../flink/source/TestFlinkInputFormat.java | 5 ++--- .../iceberg/flink/source/TestFlinkScan.java | 21 ++++++++++--------- .../flink/source/TestFlinkScanSql.java | 5 +++-- 7 files changed, 29 insertions(+), 30 deletions(-) diff --git a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java index 42ad2b7d627c..9792c06be514 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/FlinkTableFactory.java @@ -58,8 +58,9 @@ public TableSource createTableSource(Context context) { ObjectPath objectPath = context.getObjectIdentifier().toObjectPath(); TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()); try { - return new FlinkTableSource(catalog.getIcebergTable(objectPath), createTableLoader(objectPath), - catalog.getHadoopConf(), tableSchema, context.getTable().getOptions()); + return new FlinkTableSource( + catalog.getIcebergTable(objectPath), createTableLoader(objectPath), catalog.getHadoopConf(), tableSchema, + context.getTable().getOptions()); } catch (TableNotExistException e) { throw new ValidationException(String.format("Iceberg Table(%s) not exist.", objectPath), e); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java index ebd7da4e72c4..72b2e054c49f 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkInputFormat.java @@ -106,8 +106,8 @@ public void configure(Configuration parameters) { @Override public void open(FlinkInputSplit split) { - this.iterator = new RowDataIterator(split.getTask(), io, encryption, projectedSchema, - options.getNameMapping(), options.isCaseSensitive()); + this.iterator = new RowDataIterator( + split.getTask(), io, encryption, projectedSchema, options.getNameMapping(), options.isCaseSensitive()); } @Override @@ -223,9 +223,8 @@ public FlinkInputFormat build() { TableSchema tableSchema = FlinkSchemaUtil.toSchema(FlinkSchemaUtil.convert(icebergSchema)); TableSchema.Builder builder = TableSchema.builder(); for (String field : selectedFields) { - TableColumn column = tableSchema.getTableColumn(field).orElseThrow( - () -> new IllegalArgumentException(String.format("The field(%s) can not be found in the table schema: %s", - field, tableSchema))); + TableColumn column = tableSchema.getTableColumn(field).orElseThrow(() -> new IllegalArgumentException( + String.format("The field(%s) can not be found in the table schema: %s", field, tableSchema))); builder.field(column.getName(), column.getType()); } flinkProjectedSchema = builder.build(); @@ -237,7 +236,7 @@ public FlinkInputFormat build() { } return new FlinkInputFormat(tableLoader, icebergProjectedSchema, io, encryption, filterExpressions, options, - new SerializableConfiguration(hadoopConf)); + new SerializableConfiguration(hadoopConf)); } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java index 5cf78dc558c9..d52edef061c8 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/FlinkTableSource.java @@ -85,13 +85,11 @@ public DataStream getDataStream(StreamExecutionEnvironment execEnv) { List projectNames = null; if (projectedFields != null) { projectNames = Arrays.stream(projectedFields) - .mapToObj(project -> icebergSchema.asStruct().fields().get(project).name()) - .collect(Collectors.toList()); + .mapToObj(project -> icebergSchema.asStruct().fields().get(project).name()) + .collect(Collectors.toList()); } - FlinkInputFormat inputFormat = FlinkInputFormat.builder().table(table) - .tableLoader(loader) - .hadoopConf(hadoopConf).select(projectNames) - .options(ScanOptions.of(scanOptions)).build(); + FlinkInputFormat inputFormat = FlinkInputFormat.builder().table(table).tableLoader(loader).hadoopConf(hadoopConf) + .select(projectNames).options(ScanOptions.of(scanOptions)).build(); return execEnv.createInput(inputFormat, RowDataTypeInfo.of((RowType) getProducedDataType().getLogicalType())); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java index c13150232356..6da2368292b2 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java +++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanOptions.java @@ -215,7 +215,7 @@ public ScanOptions build() { } } return new ScanOptions(caseSensitive, snapshotId, startSnapshotId, endSnapshotId, asOfTimestamp, splitSize, - splitLookback, splitOpenFileCost, nameMapping); + splitLookback, splitOpenFileCost, nameMapping); } } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java index a145c5033eaf..63629f5efe3a 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkInputFormat.java @@ -79,9 +79,8 @@ protected void assertResiduals( @Override protected void assertNestedProjection(Table table, List records) throws IOException { TableSchema projectedSchema = TableSchema.builder() - .field("nested", DataTypes.ROW(DataTypes.FIELD("f2", DataTypes.STRING()))) - .field("data", DataTypes.STRING()) - .build(); + .field("nested", DataTypes.ROW(DataTypes.FIELD("f2", DataTypes.STRING()))) + .field("data", DataTypes.STRING()).build(); List result = run(builder.project(projectedSchema).build()); List expected = Lists.newArrayList(); diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java index 1cb710fc51f1..f9f3c00310cc 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScan.java @@ -238,10 +238,10 @@ public void testSnapshotReads() throws Exception { Thread.sleep(10); helper.appendToTable(RandomGenericData.generate(SCHEMA, 1, 0L)); - assertRecords(executeWithOptions(table, null, null, snapshotId, null, null, null, null, null), expectedRecords, - SCHEMA); - assertRecords(executeWithOptions(table, null, null, null, null, null, timestampMillis, null, null), - expectedRecords, SCHEMA); + assertRecords( + executeWithOptions(table, null, null, snapshotId, null, null, null, null, null), expectedRecords, SCHEMA); + assertRecords( + executeWithOptions(table, null, null, null, null, null, timestampMillis, null, null), expectedRecords, SCHEMA); } @Test @@ -275,8 +275,8 @@ public void testIncrementalRead() throws Exception { List expected2 = Lists.newArrayList(); expected2.addAll(records2); expected2.addAll(records3); - assertRecords(executeWithOptions(table, null, null, null, snapshotId1, snapshotId3, null, null, null), expected2, - SCHEMA); + assertRecords( + executeWithOptions(table, null, null, null, snapshotId1, snapshotId3, null, null, null), expected2, SCHEMA); } @Test @@ -289,8 +289,8 @@ public void testCustomCatalog() throws IOException { new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER).appendToTable( org.apache.iceberg.TestHelpers.Row.of("2020-03-20", 0), expectedRecords); - List rows = executeWithOptions(table, null, CatalogLoader.hadoop("new_catalog", newWarehouse), null, null, - null, null, null, null); + List rows = executeWithOptions( + table, null, CatalogLoader.hadoop("new_catalog", newWarehouse), null, null, null, null, null, null); assertRecords(rows, expectedRecords, SCHEMA); } @@ -308,8 +308,9 @@ public void testFilterExp() throws Exception { RandomGenericData.generate(SCHEMA, 2, 0L)); helper.appendToTable(dataFile1, dataFile2); List filters = Collections.singletonList(Expressions.equal("dt", "2020-03-20")); - assertRecords(executeWithOptions(table, null, null, null, null, null, null, filters, "dt='2020-03-20'"), - expectedRecords, SCHEMA); + assertRecords( + executeWithOptions(table, null, null, null, null, null, null, filters, "dt='2020-03-20'"), + expectedRecords, SCHEMA); } @Test diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java index f16cf4043483..b5265c120fdc 100644 --- a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkScanSql.java @@ -50,8 +50,9 @@ public TestFlinkScanSql(String fileFormat) { public void before() throws IOException { super.before(); tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); - tEnv.executeSql(String.format("create catalog iceberg_catalog with (" + - "'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", warehouse)); + tEnv.executeSql(String.format( + "create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", + warehouse)); tEnv.executeSql("use catalog iceberg_catalog"); tEnv.getConfig().getConfiguration().set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); }