From cd187c5718ba1eb0c8853d595eed82fa7230bc3d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 20 Dec 2024 16:31:48 -0800 Subject: [PATCH] Spark: Test reading default values in Spark (#11832) --- .../org/apache/iceberg/types/ReassignIds.java | 6 +- .../java/org/apache/iceberg/types/Types.java | 8 +- .../apache/iceberg/types/TestTypeUtil.java | 20 +- .../java/org/apache/iceberg/SchemaParser.java | 45 +- .../org/apache/iceberg/TestSchemaParser.java | 126 ++++++ .../iceberg/spark/source/BaseRowReader.java | 2 +- .../iceberg/spark/data/AvroDataTest.java | 53 +-- .../spark/data/ParameterizedAvroDataTest.java | 284 ------------ .../iceberg/spark/data/TestHelpers.java | 35 +- .../spark/source/DataFrameWriteTestBase.java | 140 ++++++ .../iceberg/spark/source/ScanTestBase.java | 126 ++++++ .../spark/source/TestAvroDataFrameWrite.java | 33 ++ .../iceberg/spark/source/TestAvroScan.java | 64 +-- .../spark/source/TestDataFrameWrites.java | 412 ------------------ .../spark/source/TestORCDataFrameWrite.java | 33 ++ .../source/TestParquetDataFrameWrite.java | 33 ++ .../iceberg/spark/source/TestParquetScan.java | 136 +----- .../source/TestParquetVectorizedScan.java | 26 ++ 18 files changed, 644 insertions(+), 938 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/TestSchemaParser.java delete mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/ParameterizedAvroDataTest.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/DataFrameWriteTestBase.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroDataFrameWrite.java delete mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java diff --git a/api/src/main/java/org/apache/iceberg/types/ReassignIds.java b/api/src/main/java/org/apache/iceberg/types/ReassignIds.java index d31fea98e53f..565ceee2a901 100644 --- a/api/src/main/java/org/apache/iceberg/types/ReassignIds.java +++ b/api/src/main/java/org/apache/iceberg/types/ReassignIds.java @@ -79,11 +79,7 @@ public Type struct(Types.StructType struct, Iterable fieldTypes) { for (int i = 0; i < length; i += 1) { Types.NestedField field = fields.get(i); int fieldId = id(sourceStruct, field.name()); - if (field.isRequired()) { - newFields.add(Types.NestedField.required(fieldId, field.name(), types.get(i), field.doc())); - } else { - newFields.add(Types.NestedField.optional(fieldId, field.name(), types.get(i), field.doc())); - } + newFields.add(Types.NestedField.from(field).withId(fieldId).ofType(types.get(i)).build()); } return Types.StructType.of(newFields); diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 927b3a5065ad..3c03a3defb42 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -711,8 +711,14 @@ public boolean equals(Object o) { return false; } else if (!Objects.equals(doc, that.doc)) { return false; + } else if (!type.equals(that.type)) { + return false; + } else if (!Objects.equals(initialDefault, that.initialDefault)) { + return false; + } else if (!Objects.equals(writeDefault, that.writeDefault)) { + return false; } - return type.equals(that.type); + return true; } @Override diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java index e8db0937eb73..36384d232af3 100644 --- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java +++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java @@ -595,7 +595,12 @@ public void testReassignOrRefreshIds() { new Schema( Lists.newArrayList( required(10, "a", Types.IntegerType.get()), - required(11, "c", Types.IntegerType.get()), + Types.NestedField.required("c") + .withId(11) + .ofType(Types.IntegerType.get()) + .withInitialDefault(23) + .withWriteDefault(34) + .build(), required(12, "B", Types.IntegerType.get())), Sets.newHashSet(10)); Schema sourceSchema = @@ -603,13 +608,20 @@ public void testReassignOrRefreshIds() { Lists.newArrayList( required(1, "a", Types.IntegerType.get()), required(15, "B", Types.IntegerType.get()))); - final Schema actualSchema = TypeUtil.reassignOrRefreshIds(schema, sourceSchema); - final Schema expectedSchema = + + Schema actualSchema = TypeUtil.reassignOrRefreshIds(schema, sourceSchema); + Schema expectedSchema = new Schema( Lists.newArrayList( required(1, "a", Types.IntegerType.get()), - required(16, "c", Types.IntegerType.get()), + Types.NestedField.required("c") + .withId(16) + .ofType(Types.IntegerType.get()) + .withInitialDefault(23) + .withWriteDefault(34) + .build(), required(15, "B", Types.IntegerType.get()))); + assertThat(actualSchema.asStruct()).isEqualTo(expectedSchema.asStruct()); } diff --git a/core/src/main/java/org/apache/iceberg/SchemaParser.java b/core/src/main/java/org/apache/iceberg/SchemaParser.java index a4333af1be19..27e6ed048712 100644 --- a/core/src/main/java/org/apache/iceberg/SchemaParser.java +++ b/core/src/main/java/org/apache/iceberg/SchemaParser.java @@ -49,6 +49,8 @@ private SchemaParser() {} private static final String DOC = "doc"; private static final String NAME = "name"; private static final String ID = "id"; + private static final String INITIAL_DEFAULT = "initial-default"; + private static final String WRITE_DEFAULT = "write-default"; private static final String ELEMENT_ID = "element-id"; private static final String KEY_ID = "key-id"; private static final String VALUE_ID = "value-id"; @@ -88,6 +90,17 @@ private static void toJson( if (field.doc() != null) { generator.writeStringField(DOC, field.doc()); } + + if (field.initialDefault() != null) { + generator.writeFieldName(INITIAL_DEFAULT); + SingleValueParser.toJson(field.type(), field.initialDefault(), generator); + } + + if (field.writeDefault() != null) { + generator.writeFieldName(WRITE_DEFAULT); + SingleValueParser.toJson(field.type(), field.writeDefault(), generator); + } + generator.writeEndObject(); } generator.writeEndArray(); @@ -184,6 +197,22 @@ private static Type typeFromJson(JsonNode json) { throw new IllegalArgumentException("Cannot parse type from json: " + json); } + private static Object defaultFromJson(String defaultField, Type type, JsonNode json) { + if (json.has(defaultField)) { + return SingleValueParser.fromJson(type, json.get(defaultField)); + } + + return null; + } + + private static Types.NestedField.Builder fieldBuilder(boolean isRequired, String name) { + if (isRequired) { + return Types.NestedField.required(name); + } else { + return Types.NestedField.optional(name); + } + } + private static Types.StructType structFromJson(JsonNode json) { JsonNode fieldArray = JsonUtil.get(FIELDS, json); Preconditions.checkArgument( @@ -200,13 +229,19 @@ private static Types.StructType structFromJson(JsonNode json) { String name = JsonUtil.getString(NAME, field); Type type = typeFromJson(JsonUtil.get(TYPE, field)); + Object initialDefault = defaultFromJson(INITIAL_DEFAULT, type, field); + Object writeDefault = defaultFromJson(WRITE_DEFAULT, type, field); + String doc = JsonUtil.getStringOrNull(DOC, field); boolean isRequired = JsonUtil.getBool(REQUIRED, field); - if (isRequired) { - fields.add(Types.NestedField.required(id, name, type, doc)); - } else { - fields.add(Types.NestedField.optional(id, name, type, doc)); - } + fields.add( + fieldBuilder(isRequired, name) + .withId(id) + .ofType(type) + .withDoc(doc) + .withInitialDefault(initialDefault) + .withWriteDefault(writeDefault) + .build()); } return Types.StructType.of(fields); diff --git a/core/src/test/java/org/apache/iceberg/TestSchemaParser.java b/core/src/test/java/org/apache/iceberg/TestSchemaParser.java new file mode 100644 index 000000000000..ebd197a68af0 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestSchemaParser.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.UUID; +import java.util.stream.Stream; +import org.apache.iceberg.avro.AvroDataTest; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class TestSchemaParser extends AvroDataTest { + @Override + protected void writeAndValidate(Schema schema) throws IOException { + Schema serialized = SchemaParser.fromJson(SchemaParser.toJson(schema)); + assertThat(serialized.asStruct()).isEqualTo(schema.asStruct()); + } + + @Test + public void testSchemaId() { + Schema schema = new Schema(34, required(1, "id", Types.LongType.get())); + + Schema serialized = SchemaParser.fromJson(SchemaParser.toJson(schema)); + assertThat(serialized.schemaId()).isEqualTo(schema.schemaId()); + } + + @Test + public void testIdentifierColumns() { + Schema schema = + new Schema( + Lists.newArrayList( + required(1, "id-1", Types.LongType.get()), + required(2, "id-2", Types.LongType.get()), + optional(3, "data", Types.StringType.get())), + Sets.newHashSet(1, 2)); + + Schema serialized = SchemaParser.fromJson(SchemaParser.toJson(schema)); + assertThat(serialized.identifierFieldIds()).isEqualTo(Sets.newHashSet(1, 2)); + } + + @Test + public void testDocStrings() { + Schema schema = + new Schema( + required(1, "id", Types.LongType.get(), "unique identifier"), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withDoc("payload") + .build()); + + Schema serialized = SchemaParser.fromJson(SchemaParser.toJson(schema)); + assertThat(serialized.findField("id").doc()).isEqualTo("unique identifier"); + assertThat(serialized.findField("data").doc()).isEqualTo("payload"); + } + + private static Stream primitiveTypesAndDefaults() { + return Stream.of( + Arguments.of(Types.BooleanType.get(), false), + Arguments.of(Types.IntegerType.get(), 34), + Arguments.of(Types.LongType.get(), 4900000000L), + Arguments.of(Types.FloatType.get(), 12.21F), + Arguments.of(Types.DoubleType.get(), -0.0D), + Arguments.of(Types.DateType.get(), DateTimeUtil.isoDateToDays("2024-12-17")), + // Arguments.of(Types.TimeType.get(), DateTimeUtil.isoTimeToMicros("23:59:59.999999")), + Arguments.of( + Types.TimestampType.withZone(), + DateTimeUtil.isoTimestamptzToMicros("2024-12-17T23:59:59.999999+00:00")), + Arguments.of( + Types.TimestampType.withoutZone(), + DateTimeUtil.isoTimestampToMicros("2024-12-17T23:59:59.999999")), + Arguments.of(Types.StringType.get(), "iceberg"), + Arguments.of(Types.UUIDType.get(), UUID.randomUUID()), + Arguments.of( + Types.FixedType.ofLength(4), ByteBuffer.wrap(new byte[] {0x0a, 0x0b, 0x0c, 0x0d})), + Arguments.of(Types.BinaryType.get(), ByteBuffer.wrap(new byte[] {0x0a, 0x0b})), + Arguments.of(Types.DecimalType.of(9, 2), new BigDecimal("12.34"))); + } + + @ParameterizedTest + @MethodSource("primitiveTypesAndDefaults") + public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Object defaultValue) { + Schema schema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.required("col_with_default") + .withId(2) + .ofType(type) + .withInitialDefault(defaultValue) + .withWriteDefault(defaultValue) + .build()); + + Schema serialized = SchemaParser.fromJson(SchemaParser.toJson(schema)); + assertThat(serialized.findField("col_with_default").initialDefault()).isEqualTo(defaultValue); + assertThat(serialized.findField("col_with_default").writeDefault()).isEqualTo(defaultValue); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index eb97185e21f1..2d51992dd96a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -77,7 +77,7 @@ private CloseableIterable newAvroIterable( .reuseContainers() .project(projection) .split(start, length) - .createReaderFunc(readSchema -> SparkPlannedAvroReader.create(projection, idToConstant)) + .createResolvingReader(schema -> SparkPlannedAvroReader.create(schema, idToConstant)) .withNameMapping(nameMapping()) .build(); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java index d6e8ae773b4b..ad969384c586 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java @@ -27,13 +27,11 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.file.Path; -import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.iceberg.Schema; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -42,8 +40,8 @@ import org.apache.iceberg.types.Types.MapType; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.DateTimeUtil; -import org.apache.spark.sql.internal.SQLConf; import org.assertj.core.api.Assumptions; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -285,8 +283,13 @@ public void testMissingRequiredWithoutDefault() { .build()); assertThatThrownBy(() -> writeAndValidate(writeSchema, expectedSchema)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Missing required field: missing_str"); + .has( + new Condition<>( + t -> + IllegalArgumentException.class.isInstance(t) + || IllegalArgumentException.class.isInstance(t.getCause()), + "Expecting a throwable or cause that is an instance of IllegalArgumentException")) + .hasMessageContaining("Missing required field: missing_str"); } @Test @@ -542,44 +545,4 @@ public void testPrimitiveTypeDefaultValues(Type.PrimitiveType type, Object defau writeAndValidate(writeSchema, readSchema); } - - protected void withSQLConf(Map conf, Action action) throws IOException { - SQLConf sqlConf = SQLConf.get(); - - Map currentConfValues = Maps.newHashMap(); - conf.keySet() - .forEach( - confKey -> { - if (sqlConf.contains(confKey)) { - String currentConfValue = sqlConf.getConfString(confKey); - currentConfValues.put(confKey, currentConfValue); - } - }); - - conf.forEach( - (confKey, confValue) -> { - if (SQLConf.isStaticConfigKey(confKey)) { - throw new RuntimeException("Cannot modify the value of a static config: " + confKey); - } - sqlConf.setConfString(confKey, confValue); - }); - - try { - action.invoke(); - } finally { - conf.forEach( - (confKey, confValue) -> { - if (currentConfValues.containsKey(confKey)) { - sqlConf.setConfString(confKey, currentConfValues.get(confKey)); - } else { - sqlConf.unsetConf(confKey); - } - }); - } - } - - @FunctionalInterface - protected interface Action { - void invoke() throws IOException; - } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/ParameterizedAvroDataTest.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/ParameterizedAvroDataTest.java deleted file mode 100644 index 85effe7d39a7..000000000000 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/ParameterizedAvroDataTest.java +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark.data; - -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; - -import java.io.IOException; -import java.nio.file.Path; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.iceberg.Schema; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.types.TypeUtil; -import org.apache.iceberg.types.Types; -import org.apache.iceberg.types.Types.ListType; -import org.apache.iceberg.types.Types.LongType; -import org.apache.iceberg.types.Types.MapType; -import org.apache.iceberg.types.Types.StructType; -import org.apache.spark.sql.internal.SQLConf; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.io.TempDir; - -/** - * Copy of {@link AvroDataTest} that marks tests with @{@link org.junit.jupiter.api.TestTemplate} - * instead of @{@link Test} to make the tests work in a parameterized environment. - */ -public abstract class ParameterizedAvroDataTest { - - protected abstract void writeAndValidate(Schema schema) throws IOException; - - protected static final StructType SUPPORTED_PRIMITIVES = - StructType.of( - required(100, "id", LongType.get()), - optional(101, "data", Types.StringType.get()), - required(102, "b", Types.BooleanType.get()), - optional(103, "i", Types.IntegerType.get()), - required(104, "l", LongType.get()), - optional(105, "f", Types.FloatType.get()), - required(106, "d", Types.DoubleType.get()), - optional(107, "date", Types.DateType.get()), - required(108, "ts", Types.TimestampType.withZone()), - required(110, "s", Types.StringType.get()), - required(111, "uuid", Types.UUIDType.get()), - required(112, "fixed", Types.FixedType.ofLength(7)), - optional(113, "bytes", Types.BinaryType.get()), - required(114, "dec_9_0", Types.DecimalType.of(9, 0)), // int encoded - required(115, "dec_11_2", Types.DecimalType.of(11, 2)), // long encoded - required(116, "dec_20_5", Types.DecimalType.of(20, 5)), // requires padding - required(117, "dec_38_10", Types.DecimalType.of(38, 10)) // Spark's maximum precision - ); - - @TempDir protected Path temp; - - @TestTemplate - public void testSimpleStruct() throws IOException { - writeAndValidate(TypeUtil.assignIncreasingFreshIds(new Schema(SUPPORTED_PRIMITIVES.fields()))); - } - - @TestTemplate - public void testStructWithRequiredFields() throws IOException { - writeAndValidate( - TypeUtil.assignIncreasingFreshIds( - new Schema( - Lists.transform(SUPPORTED_PRIMITIVES.fields(), Types.NestedField::asRequired)))); - } - - @TestTemplate - public void testStructWithOptionalFields() throws IOException { - writeAndValidate( - TypeUtil.assignIncreasingFreshIds( - new Schema( - Lists.transform(SUPPORTED_PRIMITIVES.fields(), Types.NestedField::asOptional)))); - } - - @TestTemplate - public void testNestedStruct() throws IOException { - writeAndValidate( - TypeUtil.assignIncreasingFreshIds(new Schema(required(1, "struct", SUPPORTED_PRIMITIVES)))); - } - - @TestTemplate - public void testArray() throws IOException { - Schema schema = - new Schema( - required(0, "id", LongType.get()), - optional(1, "data", ListType.ofOptional(2, Types.StringType.get()))); - - writeAndValidate(schema); - } - - @TestTemplate - public void testArrayOfStructs() throws IOException { - Schema schema = - TypeUtil.assignIncreasingFreshIds( - new Schema( - required(0, "id", LongType.get()), - optional(1, "data", ListType.ofOptional(2, SUPPORTED_PRIMITIVES)))); - - writeAndValidate(schema); - } - - @TestTemplate - public void testMap() throws IOException { - Schema schema = - new Schema( - required(0, "id", LongType.get()), - optional( - 1, - "data", - MapType.ofOptional(2, 3, Types.StringType.get(), Types.StringType.get()))); - - writeAndValidate(schema); - } - - @TestTemplate - public void testNumericMapKey() throws IOException { - Schema schema = - new Schema( - required(0, "id", LongType.get()), - optional(1, "data", MapType.ofOptional(2, 3, LongType.get(), Types.StringType.get()))); - - writeAndValidate(schema); - } - - @TestTemplate - public void testComplexMapKey() throws IOException { - Schema schema = - new Schema( - required(0, "id", LongType.get()), - optional( - 1, - "data", - MapType.ofOptional( - 2, - 3, - StructType.of( - required(4, "i", Types.IntegerType.get()), - optional(5, "s", Types.StringType.get())), - Types.StringType.get()))); - - writeAndValidate(schema); - } - - @TestTemplate - public void testMapOfStructs() throws IOException { - Schema schema = - TypeUtil.assignIncreasingFreshIds( - new Schema( - required(0, "id", LongType.get()), - optional( - 1, - "data", - MapType.ofOptional(2, 3, Types.StringType.get(), SUPPORTED_PRIMITIVES)))); - - writeAndValidate(schema); - } - - @TestTemplate - public void testMixedTypes() throws IOException { - StructType structType = - StructType.of( - required(0, "id", LongType.get()), - optional( - 1, - "list_of_maps", - ListType.ofOptional( - 2, MapType.ofOptional(3, 4, Types.StringType.get(), SUPPORTED_PRIMITIVES))), - optional( - 5, - "map_of_lists", - MapType.ofOptional( - 6, 7, Types.StringType.get(), ListType.ofOptional(8, SUPPORTED_PRIMITIVES))), - required( - 9, - "list_of_lists", - ListType.ofOptional(10, ListType.ofOptional(11, SUPPORTED_PRIMITIVES))), - required( - 12, - "map_of_maps", - MapType.ofOptional( - 13, - 14, - Types.StringType.get(), - MapType.ofOptional(15, 16, Types.StringType.get(), SUPPORTED_PRIMITIVES))), - required( - 17, - "list_of_struct_of_nested_types", - ListType.ofOptional( - 19, - StructType.of( - Types.NestedField.required( - 20, - "m1", - MapType.ofOptional( - 21, 22, Types.StringType.get(), SUPPORTED_PRIMITIVES)), - Types.NestedField.optional( - 23, "l1", ListType.ofRequired(24, SUPPORTED_PRIMITIVES)), - Types.NestedField.required( - 25, "l2", ListType.ofRequired(26, SUPPORTED_PRIMITIVES)), - Types.NestedField.optional( - 27, - "m2", - MapType.ofOptional( - 28, 29, Types.StringType.get(), SUPPORTED_PRIMITIVES)))))); - - Schema schema = - new Schema( - TypeUtil.assignFreshIds(structType, new AtomicInteger(0)::incrementAndGet) - .asStructType() - .fields()); - - writeAndValidate(schema); - } - - @TestTemplate - public void testTimestampWithoutZone() throws IOException { - Schema schema = - TypeUtil.assignIncreasingFreshIds( - new Schema( - required(0, "id", LongType.get()), - optional(1, "ts_without_zone", Types.TimestampType.withoutZone()))); - - writeAndValidate(schema); - } - - protected void withSQLConf(Map conf, Action action) throws IOException { - SQLConf sqlConf = SQLConf.get(); - - Map currentConfValues = Maps.newHashMap(); - conf.keySet() - .forEach( - confKey -> { - if (sqlConf.contains(confKey)) { - String currentConfValue = sqlConf.getConfString(confKey); - currentConfValues.put(confKey, currentConfValue); - } - }); - - conf.forEach( - (confKey, confValue) -> { - if (SQLConf.isStaticConfigKey(confKey)) { - throw new RuntimeException("Cannot modify the value of a static config: " + confKey); - } - sqlConf.setConfString(confKey, confValue); - }); - - try { - action.invoke(); - } finally { - conf.forEach( - (confKey, confValue) -> { - if (currentConfValues.containsKey(confKey)) { - sqlConf.setConfString(confKey, currentConfValues.get(confKey)); - } else { - sqlConf.unsetConf(confKey); - } - }); - } - } - - @FunctionalInterface - protected interface Action { - void invoke() throws IOException; - } -} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index 64d0b85625a9..a0e77e2acbae 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -96,11 +96,20 @@ public static void assertEqualsSafe(Types.StructType struct, List recs, public static void assertEqualsSafe(Types.StructType struct, Record rec, Row row) { List fields = struct.fields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i).type(); + for (int readPos = 0; readPos < fields.size(); readPos += 1) { + Types.NestedField field = fields.get(readPos); + Field writeField = rec.getSchema().getField(field.name()); - Object expectedValue = rec.get(i); - Object actualValue = row.get(i); + Type fieldType = field.type(); + Object actualValue = row.get(readPos); + + Object expectedValue; + if (writeField != null) { + int writePos = writeField.pos(); + expectedValue = rec.get(writePos); + } else { + expectedValue = field.initialDefault(); + } assertEqualsSafe(fieldType, expectedValue, actualValue); } @@ -237,11 +246,21 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual) .isEqualTo(String.valueOf(expected)); break; case FIXED: - assertThat(expected).as("Should expect a Fixed").isInstanceOf(GenericData.Fixed.class); + // generated data is written using Avro or Parquet/Avro so generated rows use + // GenericData.Fixed, but default values are converted from Iceberg's internal + // representation so the expected value may be either class. + byte[] expectedBytes; + if (expected instanceof ByteBuffer) { + expectedBytes = ByteBuffers.toByteArray((ByteBuffer) expected); + } else if (expected instanceof GenericData.Fixed) { + expectedBytes = ((GenericData.Fixed) expected).bytes(); + } else { + throw new IllegalStateException( + "Invalid expected value, not byte[] or Fixed: " + expected); + } + assertThat(actual).as("Should be a byte[]").isInstanceOf(byte[].class); - assertThat(actual) - .as("Bytes should match") - .isEqualTo(((GenericData.Fixed) expected).bytes()); + assertThat(actual).as("Bytes should match").isEqualTo(expectedBytes); break; case BINARY: assertThat(expected).as("Should expect a ByteBuffer").isInstanceOf(ByteBuffer.class); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/DataFrameWriteTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/DataFrameWriteTestBase.java new file mode 100644 index 000000000000..756370eec0da --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/DataFrameWriteTestBase.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.spark.SparkSchemaUtil.convert; +import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.Tables; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroIterable; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.SparkPlannedAvroReader; +import org.apache.iceberg.types.Types; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.DataFrameWriter; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.InternalRow; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public abstract class DataFrameWriteTestBase extends ScanTestBase { + @TempDir private Path temp; + + @Override + protected boolean supportsDefaultValues() { + // disable default value tests because this tests the write path + return false; + } + + @Override + protected void writeRecords(Table table, List records) throws IOException { + Schema tableSchema = table.schema(); // use the table schema because ids are reassigned + + Dataset df = createDataset(records, tableSchema); + DataFrameWriter writer = df.write().format("iceberg").mode("append"); + + writer.save(table.location()); + + // refresh the in-memory table state to pick up Spark's write + table.refresh(); + } + + private Dataset createDataset(List records, Schema schema) + throws IOException { + // this uses the SparkAvroReader to create a DataFrame from the list of records + // it assumes that SparkAvroReader is correct + File testFile = File.createTempFile("junit", null, temp.toFile()); + assertThat(testFile.delete()).as("Delete should succeed").isTrue(); + + try (FileAppender writer = + Avro.write(Files.localOutput(testFile)).schema(schema).named("test").build()) { + for (GenericData.Record rec : records) { + writer.add(rec); + } + } + + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)) + .createResolvingReader(SparkPlannedAvroReader::create) + .project(schema) + .build()) { + rows = Lists.newArrayList(reader); + } + + // verify that the dataframe matches + assertThat(rows.size()).isEqualTo(records.size()); + Iterator recordIter = records.iterator(); + for (InternalRow row : rows) { + assertEqualsUnsafe(schema.asStruct(), recordIter.next(), row); + } + + JavaRDD rdd = sc.parallelize(rows); + return spark.internalCreateDataFrame(JavaRDD.toRDD(rdd), convert(schema), false); + } + + @Test + public void testAlternateLocation() throws IOException { + Schema schema = new Schema(required(1, "id", Types.LongType.get())); + + File location = temp.resolve("table_location").toFile(); + File altLocation = temp.resolve("alt_location").toFile(); + + Tables tables = new HadoopTables(spark.sessionState().newHadoopConf()); + Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); + + // override the table's data location + table + .updateProperties() + .set(TableProperties.WRITE_DATA_LOCATION, altLocation.getAbsolutePath()) + .commit(); + + writeRecords(table, RandomData.generateList(table.schema(), 100, 87112L)); + + table + .currentSnapshot() + .addedDataFiles(table.io()) + .forEach( + dataFile -> + assertThat(dataFile.location()) + .as( + String.format( + "File should have the parent directory %s, but has: %s.", + altLocation, dataFile.location())) + .startsWith(altLocation + "/")); + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java new file mode 100644 index 000000000000..3a269740b709 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.spark.data.AvroDataTest; +import org.apache.iceberg.spark.data.RandomData; +import org.apache.iceberg.spark.data.TestHelpers; +import org.apache.iceberg.types.TypeUtil; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; + +/** An AvroDataScan test that validates data by reading through Spark */ +public abstract class ScanTestBase extends AvroDataTest { + private static final Configuration CONF = new Configuration(); + + protected static SparkSession spark = null; + protected static JavaSparkContext sc = null; + + @BeforeAll + public static void startSpark() { + ScanTestBase.spark = SparkSession.builder().master("local[2]").getOrCreate(); + ScanTestBase.sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + } + + @AfterAll + public static void stopSpark() { + SparkSession currentSpark = ScanTestBase.spark; + ScanTestBase.spark = null; + ScanTestBase.sc = null; + currentSpark.stop(); + } + + @TempDir private Path temp; + + protected void configureTable(Table table) {} + + protected abstract void writeRecords(Table table, List records) + throws IOException; + + @Override + protected void writeAndValidate(Schema schema) throws IOException { + writeAndValidate(schema, schema); + } + + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + File parent = temp.resolve("scan_test").toFile(); + File location = new File(parent, "test"); + + HadoopTables tables = new HadoopTables(CONF); + Table table = tables.create(writeSchema, PartitionSpec.unpartitioned(), location.toString()); + + // Important: use the table's schema for the rest of the test + // When tables are created, the column ids are reassigned. + List expected = RandomData.generateList(table.schema(), 100, 1L); + + writeRecords(table, expected); + + // update the table schema to the expected schema + if (!expectedSchema.sameSchema(table.schema())) { + Schema expectedSchemaWithTableIds = + TypeUtil.reassignOrRefreshIds(expectedSchema, table.schema()); + int highestFieldId = + Math.max(table.schema().highestFieldId(), expectedSchema.highestFieldId()); + + // don't use the table API because tests cover incompatible update cases + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata builder = + TableMetadata.buildFrom(ops.current()) + .upgradeFormatVersion(3) + .setCurrentSchema(expectedSchemaWithTableIds, highestFieldId) + .build(); + ops.commit(ops.current(), builder); + } + + Dataset df = spark.read().format("iceberg").load(table.location()); + + List rows = df.collectAsList(); + assertThat(rows).as("Should contain 100 rows").hasSize(100); + + for (int i = 0; i < expected.size(); i += 1) { + TestHelpers.assertEqualsSafe(table.schema().asStruct(), expected.get(i), rows.get(i)); + } + } + + @Override + protected boolean supportsDefaultValues() { + return true; + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroDataFrameWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroDataFrameWrite.java new file mode 100644 index 000000000000..110428d0a20c --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroDataFrameWrite.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; + +public class TestAvroDataFrameWrite extends DataFrameWriteTestBase { + @Override + protected void configureTable(Table table) { + table + .updateProperties() + .set(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.AVRO.toString()) + .commit(); + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java index 8345a4e0a697..96627c7f7334 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java @@ -19,93 +19,41 @@ package org.apache.iceberg.spark.source; import static org.apache.iceberg.Files.localOutput; -import static org.assertj.core.api.Assertions.assertThat; import java.io.File; import java.io.IOException; -import java.nio.file.Path; import java.util.List; import java.util.UUID; import org.apache.avro.generic.GenericData.Record; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.spark.data.AvroDataTest; -import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.TestHelpers; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.io.TempDir; - -public class TestAvroScan extends AvroDataTest { - private static final Configuration CONF = new Configuration(); - - @TempDir private Path temp; - - private static SparkSession spark = null; - - @BeforeAll - public static void startSpark() { - TestAvroScan.spark = SparkSession.builder().master("local[2]").getOrCreate(); - } - - @AfterAll - public static void stopSpark() { - SparkSession currentSpark = TestAvroScan.spark; - TestAvroScan.spark = null; - currentSpark.stop(); - } +public class TestAvroScan extends ScanTestBase { @Override - protected void writeAndValidate(Schema schema) throws IOException { - File parent = temp.resolve("avro").toFile(); - File location = new File(parent, "test"); - File dataFolder = new File(location, "data"); + protected void writeRecords(Table table, List records) throws IOException { + File dataFolder = new File(table.location(), "data"); dataFolder.mkdirs(); File avroFile = new File(dataFolder, FileFormat.AVRO.addExtension(UUID.randomUUID().toString())); - HadoopTables tables = new HadoopTables(CONF); - Table table = tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); - - // Important: use the table's schema for the rest of the test - // When tables are created, the column ids are reassigned. - Schema tableSchema = table.schema(); - - List expected = RandomData.generateList(tableSchema, 100, 1L); - try (FileAppender writer = - Avro.write(localOutput(avroFile)).schema(tableSchema).build()) { - writer.addAll(expected); + Avro.write(localOutput(avroFile)).schema(table.schema()).build()) { + writer.addAll(records); } DataFile file = DataFiles.builder(PartitionSpec.unpartitioned()) - .withRecordCount(100) .withFileSizeInBytes(avroFile.length()) + .withRecordCount(records.size()) .withPath(avroFile.toString()) .build(); table.newAppend().appendFile(file).commit(); - - Dataset df = spark.read().format("iceberg").load(location.toString()); - - List rows = df.collectAsList(); - assertThat(rows).as("Should contain 100 rows").hasSize(100); - - for (int i = 0; i < expected.size(); i += 1) { - TestHelpers.assertEqualsSafe(tableSchema.asStruct(), expected.get(i), rows.get(i)); - } } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java deleted file mode 100644 index 42552f385137..000000000000 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java +++ /dev/null @@ -1,412 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.iceberg.spark.source; - -import static org.apache.iceberg.spark.SparkSchemaUtil.convert; -import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsSafe; -import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Assumptions.assumeThat; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import org.apache.avro.generic.GenericData.Record; -import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.Files; -import org.apache.iceberg.Parameter; -import org.apache.iceberg.ParameterizedTestExtension; -import org.apache.iceberg.Parameters; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.Table; -import org.apache.iceberg.TableProperties; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.avro.AvroIterable; -import org.apache.iceberg.hadoop.HadoopTables; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.SparkSQLProperties; -import org.apache.iceberg.spark.SparkSchemaUtil; -import org.apache.iceberg.spark.SparkWriteOptions; -import org.apache.iceberg.spark.data.ParameterizedAvroDataTest; -import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.SparkPlannedAvroReader; -import org.apache.iceberg.types.Types; -import org.apache.spark.SparkException; -import org.apache.spark.TaskContext; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.MapPartitionsFunction; -import org.apache.spark.sql.DataFrameWriter; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoders; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.catalyst.InternalRow; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; - -@ExtendWith(ParameterizedTestExtension.class) -public class TestDataFrameWrites extends ParameterizedAvroDataTest { - private static final Configuration CONF = new Configuration(); - - @Parameters(name = "format = {0}") - public static Collection parameters() { - return Arrays.asList("parquet", "avro", "orc"); - } - - @Parameter private String format; - - @TempDir private File location; - - private static SparkSession spark = null; - private static JavaSparkContext sc = null; - - private Map tableProperties; - - private final org.apache.spark.sql.types.StructType sparkSchema = - new org.apache.spark.sql.types.StructType( - new org.apache.spark.sql.types.StructField[] { - new org.apache.spark.sql.types.StructField( - "optionalField", - org.apache.spark.sql.types.DataTypes.StringType, - true, - org.apache.spark.sql.types.Metadata.empty()), - new org.apache.spark.sql.types.StructField( - "requiredField", - org.apache.spark.sql.types.DataTypes.StringType, - false, - org.apache.spark.sql.types.Metadata.empty()) - }); - - private final Schema icebergSchema = - new Schema( - Types.NestedField.optional(1, "optionalField", Types.StringType.get()), - Types.NestedField.required(2, "requiredField", Types.StringType.get())); - - private final List data0 = - Arrays.asList( - "{\"optionalField\": \"a1\", \"requiredField\": \"bid_001\"}", - "{\"optionalField\": \"a2\", \"requiredField\": \"bid_002\"}"); - private final List data1 = - Arrays.asList( - "{\"optionalField\": \"d1\", \"requiredField\": \"bid_101\"}", - "{\"optionalField\": \"d2\", \"requiredField\": \"bid_102\"}", - "{\"optionalField\": \"d3\", \"requiredField\": \"bid_103\"}", - "{\"optionalField\": \"d4\", \"requiredField\": \"bid_104\"}"); - - @BeforeAll - public static void startSpark() { - TestDataFrameWrites.spark = SparkSession.builder().master("local[2]").getOrCreate(); - TestDataFrameWrites.sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); - } - - @AfterAll - public static void stopSpark() { - SparkSession currentSpark = TestDataFrameWrites.spark; - TestDataFrameWrites.spark = null; - TestDataFrameWrites.sc = null; - currentSpark.stop(); - } - - @Override - protected void writeAndValidate(Schema schema) throws IOException { - Table table = createTable(schema); - writeAndValidateWithLocations(table, new File(location, "data")); - } - - @TestTemplate - public void testWriteWithCustomDataLocation() throws IOException { - File tablePropertyDataLocation = temp.resolve("test-table-property-data-dir").toFile(); - Table table = createTable(new Schema(SUPPORTED_PRIMITIVES.fields())); - table - .updateProperties() - .set(TableProperties.WRITE_DATA_LOCATION, tablePropertyDataLocation.getAbsolutePath()) - .commit(); - writeAndValidateWithLocations(table, tablePropertyDataLocation); - } - - private Table createTable(Schema schema) { - HadoopTables tables = new HadoopTables(CONF); - return tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); - } - - private void writeAndValidateWithLocations(Table table, File expectedDataDir) throws IOException { - Schema tableSchema = table.schema(); // use the table schema because ids are reassigned - - table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, format).commit(); - - Iterable expected = RandomData.generate(tableSchema, 100, 0L); - writeData(expected, tableSchema); - - table.refresh(); - - List actual = readTable(); - - Iterator expectedIter = expected.iterator(); - Iterator actualIter = actual.iterator(); - while (expectedIter.hasNext() && actualIter.hasNext()) { - assertEqualsSafe(tableSchema.asStruct(), expectedIter.next(), actualIter.next()); - } - assertThat(actualIter.hasNext()) - .as("Both iterators should be exhausted") - .isEqualTo(expectedIter.hasNext()); - - table - .currentSnapshot() - .addedDataFiles(table.io()) - .forEach( - dataFile -> - assertThat(URI.create(dataFile.location()).getPath()) - .as( - String.format( - "File should have the parent directory %s, but has: %s.", - expectedDataDir.getAbsolutePath(), dataFile.location())) - .startsWith(expectedDataDir.getAbsolutePath())); - } - - private List readTable() { - Dataset result = spark.read().format("iceberg").load(location.toString()); - - return result.collectAsList(); - } - - private void writeData(Iterable records, Schema schema) throws IOException { - Dataset df = createDataset(records, schema); - DataFrameWriter writer = df.write().format("iceberg").mode("append"); - writer.save(location.toString()); - } - - private void writeDataWithFailOnPartition(Iterable records, Schema schema) - throws IOException, SparkException { - final int numPartitions = 10; - final int partitionToFail = new Random().nextInt(numPartitions); - MapPartitionsFunction failOnFirstPartitionFunc = - input -> { - int partitionId = TaskContext.getPartitionId(); - - if (partitionId == partitionToFail) { - throw new SparkException( - String.format("Intended exception in partition %d !", partitionId)); - } - return input; - }; - - Dataset df = - createDataset(records, schema) - .repartition(numPartitions) - .mapPartitions(failOnFirstPartitionFunc, Encoders.row(convert(schema))); - // This trick is needed because Spark 3 handles decimal overflow in RowEncoder which "changes" - // nullability of the column to "true" regardless of original nullability. - // Setting "check-nullability" option to "false" doesn't help as it fails at Spark analyzer. - Dataset convertedDf = df.sqlContext().createDataFrame(df.rdd(), convert(schema)); - DataFrameWriter writer = convertedDf.write().format("iceberg").mode("append"); - writer.save(location.toString()); - } - - private Dataset createDataset(Iterable records, Schema schema) throws IOException { - // this uses the SparkAvroReader to create a DataFrame from the list of records - // it assumes that SparkAvroReader is correct - File testFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(testFile.delete()).as("Delete should succeed").isTrue(); - - try (FileAppender writer = - Avro.write(Files.localOutput(testFile)).schema(schema).named("test").build()) { - for (Record rec : records) { - writer.add(rec); - } - } - - // make sure the dataframe matches the records before moving on - List rows = Lists.newArrayList(); - try (AvroIterable reader = - Avro.read(Files.localInput(testFile)) - .createResolvingReader(SparkPlannedAvroReader::create) - .project(schema) - .build()) { - - Iterator recordIter = records.iterator(); - Iterator readIter = reader.iterator(); - while (recordIter.hasNext() && readIter.hasNext()) { - InternalRow row = readIter.next(); - assertEqualsUnsafe(schema.asStruct(), recordIter.next(), row); - rows.add(row); - } - assertThat(readIter.hasNext()) - .as("Both iterators should be exhausted") - .isEqualTo(recordIter.hasNext()); - } - - JavaRDD rdd = sc.parallelize(rows); - return spark.internalCreateDataFrame(JavaRDD.toRDD(rdd), convert(schema), false); - } - - @TestTemplate - public void testNullableWithWriteOption() throws IOException { - assumeThat(spark.version()) - .as("Spark 3 rejects writing nulls to a required column") - .startsWith("2"); - - String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location); - String targetPath = String.format("%s/nullable_poc/targetFolder/", location); - - tableProperties = ImmutableMap.of(TableProperties.WRITE_DATA_LOCATION, targetPath); - - // read this and append to iceberg dataset - spark - .read() - .schema(sparkSchema) - .json(JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data1)) - .write() - .parquet(sourcePath); - - // this is our iceberg dataset to which we will append data - new HadoopTables(spark.sessionState().newHadoopConf()) - .create( - icebergSchema, - PartitionSpec.builderFor(icebergSchema).identity("requiredField").build(), - tableProperties, - targetPath); - - // this is the initial data inside the iceberg dataset - spark - .read() - .schema(sparkSchema) - .json(JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data0)) - .write() - .format("iceberg") - .mode(SaveMode.Append) - .save(targetPath); - - // read from parquet and append to iceberg w/ nullability check disabled - spark - .read() - .schema(SparkSchemaUtil.convert(icebergSchema)) - .parquet(sourcePath) - .write() - .format("iceberg") - .option(SparkWriteOptions.CHECK_NULLABILITY, false) - .mode(SaveMode.Append) - .save(targetPath); - - // read all data - List rows = spark.read().format("iceberg").load(targetPath).collectAsList(); - assumeThat(rows).as("Should contain 6 rows").hasSize(6); - } - - @TestTemplate - public void testNullableWithSparkSqlOption() throws IOException { - assumeThat(spark.version()) - .as("Spark 3 rejects writing nulls to a required column") - .startsWith("2"); - - String sourcePath = String.format("%s/nullable_poc/sourceFolder/", location); - String targetPath = String.format("%s/nullable_poc/targetFolder/", location); - - tableProperties = ImmutableMap.of(TableProperties.WRITE_DATA_LOCATION, targetPath); - - // read this and append to iceberg dataset - spark - .read() - .schema(sparkSchema) - .json(JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data1)) - .write() - .parquet(sourcePath); - - SparkSession newSparkSession = - SparkSession.builder() - .master("local[2]") - .appName("NullableTest") - .config(SparkSQLProperties.CHECK_NULLABILITY, false) - .getOrCreate(); - - // this is our iceberg dataset to which we will append data - new HadoopTables(newSparkSession.sessionState().newHadoopConf()) - .create( - icebergSchema, - PartitionSpec.builderFor(icebergSchema).identity("requiredField").build(), - tableProperties, - targetPath); - - // this is the initial data inside the iceberg dataset - newSparkSession - .read() - .schema(sparkSchema) - .json(JavaSparkContext.fromSparkContext(spark.sparkContext()).parallelize(data0)) - .write() - .format("iceberg") - .mode(SaveMode.Append) - .save(targetPath); - - // read from parquet and append to iceberg - newSparkSession - .read() - .schema(SparkSchemaUtil.convert(icebergSchema)) - .parquet(sourcePath) - .write() - .format("iceberg") - .mode(SaveMode.Append) - .save(targetPath); - - // read all data - List rows = newSparkSession.read().format("iceberg").load(targetPath).collectAsList(); - assumeThat(rows).as("Should contain 6 rows").hasSize(6); - } - - @TestTemplate - public void testFaultToleranceOnWrite() throws IOException { - Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields()); - Table table = createTable(schema); - - Iterable records = RandomData.generate(schema, 100, 0L); - writeData(records, schema); - - table.refresh(); - - Snapshot snapshotBeforeFailingWrite = table.currentSnapshot(); - List resultBeforeFailingWrite = readTable(); - - Iterable records2 = RandomData.generate(schema, 100, 0L); - - assertThatThrownBy(() -> writeDataWithFailOnPartition(records2, schema)) - .isInstanceOf(SparkException.class); - - table.refresh(); - - Snapshot snapshotAfterFailingWrite = table.currentSnapshot(); - List resultAfterFailingWrite = readTable(); - - assertThat(snapshotBeforeFailingWrite).isEqualTo(snapshotAfterFailingWrite); - assertThat(resultBeforeFailingWrite).isEqualTo(resultAfterFailingWrite); - } -} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java new file mode 100644 index 000000000000..35be6423ee23 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestORCDataFrameWrite.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; + +public class TestORCDataFrameWrite extends DataFrameWriteTestBase { + @Override + protected void configureTable(Table table) { + table + .updateProperties() + .set(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.toString()) + .commit(); + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java new file mode 100644 index 000000000000..90a9ac48a486 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetDataFrameWrite.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; + +public class TestParquetDataFrameWrite extends DataFrameWriteTestBase { + @Override + protected void configureTable(Table table) { + table + .updateProperties() + .set(TableProperties.DEFAULT_FILE_FORMAT, FileFormat.PARQUET.toString()) + .commit(); + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java index ebeed62acce4..c0dee43d6de1 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java @@ -19,140 +19,41 @@ package org.apache.iceberg.spark.source; import static org.apache.iceberg.Files.localOutput; -import static org.apache.iceberg.types.Types.NestedField.optional; -import static org.apache.iceberg.types.Types.NestedField.required; -import static org.apache.spark.sql.functions.monotonically_increasing_id; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assumptions.assumeThat; import java.io.File; import java.io.IOException; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.UUID; import org.apache.avro.generic.GenericData; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Parameter; -import org.apache.iceberg.ParameterizedTestExtension; -import org.apache.iceberg.Parameters; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; -import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; -import org.apache.iceberg.spark.data.ParameterizedAvroDataTest; -import org.apache.iceberg.spark.data.RandomData; -import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; -@ExtendWith(ParameterizedTestExtension.class) -public class TestParquetScan extends ParameterizedAvroDataTest { - private static final Configuration CONF = new Configuration(); - - private static SparkSession spark = null; - - @BeforeAll - public static void startSpark() { - TestParquetScan.spark = SparkSession.builder().master("local[2]").getOrCreate(); - } - - @AfterAll - public static void stopSpark() { - SparkSession currentSpark = TestParquetScan.spark; - TestParquetScan.spark = null; - currentSpark.stop(); - } - - @TempDir private Path temp; - - @Parameter private boolean vectorized; - - @Parameters(name = "vectorized = {0}") - public static Collection parameters() { - return Arrays.asList(false, true); +public class TestParquetScan extends ScanTestBase { + protected boolean vectorized() { + return false; } @Override - protected void writeAndValidate(Schema schema) throws IOException { - assumeThat( - TypeUtil.find( - schema, - type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get())) - .as("Cannot handle non-string map keys in parquet-avro") - .isNull(); - - assertThat(vectorized).as("should not be null").isNotNull(); - Table table = createTable(schema); - - // Important: use the table's schema for the rest of the test - // When tables are created, the column ids are reassigned. - List expected = RandomData.generateList(table.schema(), 100, 1L); - writeRecords(table, expected); - - configureVectorization(table); - - Dataset df = spark.read().format("iceberg").load(table.location()); - - List rows = df.collectAsList(); - assertThat(rows).as("Should contain 100 rows").hasSize(100); - - for (int i = 0; i < expected.size(); i += 1) { - TestHelpers.assertEqualsSafe(table.schema().asStruct(), expected.get(i), rows.get(i)); - } - } - - @TestTemplate - public void testEmptyTableProjection() throws IOException { - Types.StructType structType = - Types.StructType.of( - required(100, "id", Types.LongType.get()), - optional(101, "data", Types.StringType.get()), - required(102, "b", Types.BooleanType.get()), - optional(103, "i", Types.IntegerType.get())); - Table table = createTable(new Schema(structType.fields())); - - List expected = RandomData.generateList(table.schema(), 100, 1L); - writeRecords(table, expected); - - configureVectorization(table); - - List rows = - spark - .read() - .format("iceberg") - .load(table.location()) - .select(monotonically_increasing_id()) - .collectAsList(); - assertThat(rows).hasSize(100); - } - - private Table createTable(Schema schema) throws IOException { - File parent = temp.resolve("parquet").toFile(); - File location = new File(parent, "test"); - HadoopTables tables = new HadoopTables(CONF); - return tables.create(schema, PartitionSpec.unpartitioned(), location.toString()); + protected void configureTable(Table table) { + table + .updateProperties() + .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized())) + .commit(); } - private void writeRecords(Table table, List records) throws IOException { + @Override + protected void writeRecords(Table table, List records) throws IOException { File dataFolder = new File(table.location(), "data"); - dataFolder.mkdirs(); - File parquetFile = new File(dataFolder, FileFormat.PARQUET.addExtension(UUID.randomUUID().toString())); @@ -165,16 +66,21 @@ private void writeRecords(Table table, List records) throws DataFiles.builder(PartitionSpec.unpartitioned()) .withFileSizeInBytes(parquetFile.length()) .withPath(parquetFile.toString()) - .withRecordCount(100) + .withRecordCount(records.size()) .build(); table.newAppend().appendFile(file).commit(); } - private void configureVectorization(Table table) { - table - .updateProperties() - .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized)) - .commit(); + @Override + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + assumeThat( + TypeUtil.find( + writeSchema, + type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get())) + .as("Cannot handle non-string map keys in parquet-avro") + .isNull(); + + super.writeAndValidate(writeSchema, expectedSchema); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java new file mode 100644 index 000000000000..a6b5166b3a4e --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetVectorizedScan.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +public class TestParquetVectorizedScan extends TestParquetScan { + @Override + protected boolean vectorized() { + return true; + } +}