From 8190ce7e6b66656ccb283859cd14ecca97063230 Mon Sep 17 00:00:00 2001 From: Walaa Eldin Moustafa Date: Fri, 4 Oct 2024 14:56:33 -0700 Subject: [PATCH] API, Core: Add default value APIs and Avro implementation (#9502) Co-authored-by: Walaa Eldin Moustafa Co-authored-by: Ryan Blue --- .palantir/revapi.yml | 5 + .../iceberg/expressions/Expressions.java | 12 ++ .../java/org/apache/iceberg/types/Types.java | 124 +++++++++++-- .../iceberg/avro/GenericAvroReader.java | 14 +- .../iceberg/avro/TestReadDefaultValues.java | 166 ++++++++++++++++++ 5 files changed, 304 insertions(+), 17 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/avro/TestReadDefaultValues.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 05ec59226fdd..fade79326a49 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -873,6 +873,11 @@ acceptedBreaks: new: "method void org.apache.iceberg.encryption.Ciphers::()" justification: "Static utility class - should not have public constructor" "1.4.0": + org.apache.iceberg:iceberg-api: + - code: "java.class.defaultSerializationChanged" + old: "class org.apache.iceberg.types.Types.NestedField" + new: "class org.apache.iceberg.types.Types.NestedField" + justification: "Add default value APIs." org.apache.iceberg:iceberg-core: - code: "java.class.defaultSerializationChanged" old: "class org.apache.iceberg.PartitionData" diff --git a/api/src/main/java/org/apache/iceberg/expressions/Expressions.java b/api/src/main/java/org/apache/iceberg/expressions/Expressions.java index f21a7705968b..deeba664ec07 100644 --- a/api/src/main/java/org/apache/iceberg/expressions/Expressions.java +++ b/api/src/main/java/org/apache/iceberg/expressions/Expressions.java @@ -309,6 +309,18 @@ public static UnboundTerm transform(String name, Transform transfor return new UnboundTransform<>(ref(name), transform); } + /** + * Create a {@link Literal} from an Object. + * + * @param value a value + * @param Java type of value + * @return a Literal for the given value + * @throws IllegalArgumentException if the value has no literal implementation + */ + public static Literal lit(T value) { + return Literals.from(value); + } + public static UnboundAggregate count(String name) { return new UnboundAggregate<>(Operation.COUNT, ref(name)); } diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java index 8c636221bd14..4bb1674f3be5 100644 --- a/api/src/main/java/org/apache/iceberg/types/Types.java +++ b/api/src/main/java/org/apache/iceberg/types/Types.java @@ -27,6 +27,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -469,27 +470,94 @@ public int hashCode() { public static class NestedField implements Serializable { public static NestedField optional(int id, String name, Type type) { - return new NestedField(true, id, name, type, null); + return new NestedField(true, id, name, type, null, null, null); } public static NestedField optional(int id, String name, Type type, String doc) { - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, doc, null, null); } public static NestedField required(int id, String name, Type type) { - return new NestedField(false, id, name, type, null); + return new NestedField(false, id, name, type, null, null, null); } public static NestedField required(int id, String name, Type type, String doc) { - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, doc, null, null); } public static NestedField of(int id, boolean isOptional, String name, Type type) { - return new NestedField(isOptional, id, name, type, null); + return new NestedField(isOptional, id, name, type, null, null, null); } public static NestedField of(int id, boolean isOptional, String name, Type type, String doc) { - return new NestedField(isOptional, id, name, type, doc); + return new NestedField(isOptional, id, name, type, doc, null, null); + } + + public static Builder from(NestedField field) { + return new Builder(field); + } + + public static Builder required(String name) { + return new Builder(false, name); + } + + public static Builder optional(String name) { + return new Builder(true, name); + } + + public static class Builder { + private final boolean isOptional; + private final String name; + private Integer id = null; + private Type type = null; + private String doc = null; + private Object initialDefault = null; + private Object writeDefault = null; + + private Builder(boolean isFieldOptional, String fieldName) { + isOptional = isFieldOptional; + name = fieldName; + } + + private Builder(NestedField toCopy) { + this.isOptional = toCopy.isOptional; + this.name = toCopy.name; + this.id = toCopy.id; + this.type = toCopy.type; + this.doc = toCopy.doc; + this.initialDefault = toCopy.initialDefault; + this.writeDefault = toCopy.writeDefault; + } + + public Builder withId(int fieldId) { + id = fieldId; + return this; + } + + public Builder ofType(Type fieldType) { + type = fieldType; + return this; + } + + public Builder withDoc(String fieldDoc) { + doc = fieldDoc; + return this; + } + + public Builder withInitialDefault(Object fieldInitialDefault) { + initialDefault = fieldInitialDefault; + return this; + } + + public Builder withWriteDefault(Object fieldWriteDefault) { + writeDefault = fieldWriteDefault; + return this; + } + + public NestedField build() { + // the constructor validates the fields + return new NestedField(isOptional, id, name, type, doc, initialDefault, writeDefault); + } } private final boolean isOptional; @@ -497,8 +565,17 @@ public static NestedField of(int id, boolean isOptional, String name, Type type, private final String name; private final Type type; private final String doc; - - private NestedField(boolean isOptional, int id, String name, Type type, String doc) { + private final Object initialDefault; + private final Object writeDefault; + + private NestedField( + boolean isOptional, + int id, + String name, + Type type, + String doc, + Object initialDefault, + Object writeDefault) { Preconditions.checkNotNull(name, "Name cannot be null"); Preconditions.checkNotNull(type, "Type cannot be null"); this.isOptional = isOptional; @@ -506,6 +583,19 @@ private NestedField(boolean isOptional, int id, String name, Type type, String d this.name = name; this.type = type; this.doc = doc; + this.initialDefault = castDefault(initialDefault, type); + this.writeDefault = castDefault(writeDefault, type); + } + + private static Object castDefault(Object defaultValue, Type type) { + if (type.isNestedType() && defaultValue != null) { + throw new IllegalArgumentException( + String.format("Invalid default value for %s: %s (must be null)", type, defaultValue)); + } else if (defaultValue != null) { + return Expressions.lit(defaultValue).to(type).value(); + } + + return null; } public boolean isOptional() { @@ -516,7 +606,7 @@ public NestedField asOptional() { if (isOptional) { return this; } - return new NestedField(true, id, name, type, doc); + return new NestedField(true, id, name, type, doc, initialDefault, writeDefault); } public boolean isRequired() { @@ -527,11 +617,15 @@ public NestedField asRequired() { if (!isOptional) { return this; } - return new NestedField(false, id, name, type, doc); + return new NestedField(false, id, name, type, doc, initialDefault, writeDefault); } + /** + * @deprecated will be removed in 2.0.0; use {@link Builder#withId(int)} instead + */ + @Deprecated public NestedField withFieldId(int newId) { - return new NestedField(isOptional, newId, name, type, doc); + return new NestedField(isOptional, newId, name, type, doc, initialDefault, writeDefault); } public int fieldId() { @@ -550,6 +644,14 @@ public String doc() { return doc; } + public Object initialDefault() { + return initialDefault; + } + + public Object writeDefault() { + return writeDefault; + } + @Override public String toString() { return String.format( diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java index 93bfa2398466..f630129dc50f 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroReader.java @@ -47,16 +47,16 @@ public class GenericAvroReader private Schema fileSchema = null; private ValueReader reader = null; - public static GenericAvroReader create(org.apache.iceberg.Schema schema) { - return new GenericAvroReader<>(schema); + public static GenericAvroReader create(org.apache.iceberg.Schema expectedSchema) { + return new GenericAvroReader<>(expectedSchema); } - public static GenericAvroReader create(Schema schema) { - return new GenericAvroReader<>(schema); + public static GenericAvroReader create(Schema readSchema) { + return new GenericAvroReader<>(readSchema); } - GenericAvroReader(org.apache.iceberg.Schema readSchema) { - this.expectedType = readSchema.asStruct(); + GenericAvroReader(org.apache.iceberg.Schema expectedSchema) { + this.expectedType = expectedSchema.asStruct(); } GenericAvroReader(Schema readSchema) { @@ -140,6 +140,8 @@ public ValueReader record(Type partner, Schema record, List> f Types.NestedField field = expected.field(fieldId); if (constant != null) { readPlan.add(Pair.of(pos, ValueReaders.constant(constant))); + } else if (field.initialDefault() != null) { + readPlan.add(Pair.of(pos, ValueReaders.constant(field.initialDefault()))); } else if (fieldId == MetadataColumns.IS_DELETED.fieldId()) { readPlan.add(Pair.of(pos, ValueReaders.constant(false))); } else if (fieldId == MetadataColumns.ROW_POSITION.fieldId()) { diff --git a/core/src/test/java/org/apache/iceberg/avro/TestReadDefaultValues.java b/core/src/test/java/org/apache/iceberg/avro/TestReadDefaultValues.java new file mode 100644 index 000000000000..1cfe88206b1b --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/avro/TestReadDefaultValues.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.avro; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import org.apache.avro.generic.GenericData.Record; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SingleValueParser; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TestReadDefaultValues { + + @TempDir public Path temp; + + private static final Object[][] TYPES_WITH_DEFAULTS = + new Object[][] { + {Types.BooleanType.get(), "true"}, + {Types.IntegerType.get(), "1"}, + {Types.LongType.get(), "9999999"}, + {Types.FloatType.get(), "1.23"}, + {Types.DoubleType.get(), "123.456"}, + {Types.DateType.get(), "\"2007-12-03\""}, + {Types.TimeType.get(), "\"10:15:30\""}, + {Types.TimestampType.withoutZone(), "\"2007-12-03T10:15:30\""}, + {Types.TimestampType.withZone(), "\"2007-12-03T10:15:30+00:00\""}, + {Types.StringType.get(), "\"foo\""}, + {Types.UUIDType.get(), "\"eb26bdb1-a1d8-4aa6-990e-da940875492c\""}, + {Types.FixedType.ofLength(2), "\"111f\""}, + {Types.BinaryType.get(), "\"0000ff\""}, + {Types.DecimalType.of(9, 4), "\"123.4500\""}, + {Types.DecimalType.of(9, 0), "\"2\""}, + // Avro doesn't support negative scale + // {Types.DecimalType.of(9, -20), "\"2E+20\""}, + // Nested type defaults are not currently allowed + }; + + @Test + public void testDefaultAppliedWhenMissingColumn() throws IOException { + for (Object[] typeAndDefault : TYPES_WITH_DEFAULTS) { + Type type = (Type) typeAndDefault[0]; + String defaultValueJson = (String) typeAndDefault[1]; + Object defaultValue = SingleValueParser.fromJson(type, defaultValueJson); + + // note that this schema does not have column "defaulted" + Schema writerSchema = new Schema(required(999, "written", Types.IntegerType.get())); + + File testFile = temp.resolve("test.avro").toFile(); + testFile.delete(); + + try (FileAppender writer = + Avro.write(Files.localOutput(testFile)) + .schema(writerSchema) + .createWriterFunc(GenericAvroWriter::create) + .named("test") + .build()) { + Record record = new Record(AvroSchemaUtil.convert(writerSchema.asStruct())); + record.put(0, 1); + writer.add(record); + } + + Schema readerSchema = + new Schema( + Types.NestedField.required("written") + .withId(999) + .ofType(Types.IntegerType.get()) + .build(), + Types.NestedField.optional("defaulted") + .withId(1000) + .ofType(type) + .withInitialDefault(defaultValue) + .build()); + + Record expectedRecord = new Record(AvroSchemaUtil.convert(readerSchema.asStruct())); + expectedRecord.put(0, 1); + expectedRecord.put(1, defaultValue); + + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)) + .project(readerSchema) + .createResolvingReader(schema -> GenericAvroReader.create(schema)) + .build()) { + rows = Lists.newArrayList(reader); + } + + AvroTestHelpers.assertEquals(readerSchema.asStruct(), expectedRecord, rows.get(0)); + } + } + + @Test + public void testDefaultDoesNotOverrideExplicitValue() throws IOException { + for (Object[] typeAndDefault : TYPES_WITH_DEFAULTS) { + Type type = (Type) typeAndDefault[0]; + String defaultValueJson = (String) typeAndDefault[1]; + Object defaultValue = SingleValueParser.fromJson(type, defaultValueJson); + + Schema readerSchema = + new Schema( + Types.NestedField.required("written_1") + .withId(999) + .ofType(Types.IntegerType.get()) + .build(), + Types.NestedField.optional("written_2") + .withId(1000) + .ofType(type) + .withInitialDefault(defaultValue) + .build()); + + // Create a record with null value for the column with default value + Record expectedRecord = new Record(AvroSchemaUtil.convert(readerSchema.asStruct())); + expectedRecord.put(0, 1); + expectedRecord.put(1, null); + + File testFile = temp.resolve("test.avro").toFile(); + testFile.delete(); + + try (FileAppender writer = + Avro.write(Files.localOutput(testFile)) + .schema(readerSchema) + .createWriterFunc(GenericAvroWriter::create) + .named("test") + .build()) { + writer.add(expectedRecord); + } + + List rows; + try (AvroIterable reader = + Avro.read(Files.localInput(testFile)) + .project(readerSchema) + .createReaderFunc(GenericAvroReader::create) + .build()) { + rows = Lists.newArrayList(reader); + } + + // Existence of default value should not affect the read result + AvroTestHelpers.assertEquals(readerSchema.asStruct(), expectedRecord, rows.get(0)); + } + } +}