diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java index cce53001ea621..9fc1495e62e2a 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java @@ -174,7 +174,8 @@ public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType spa return new FixedLenByteArrayAsIntUpdater(arrayLen); } else if (canReadAsLongDecimal(descriptor, sparkType)) { return new FixedLenByteArrayAsLongUpdater(arrayLen); - } else if (canReadAsBinaryDecimal(descriptor, sparkType)) { + } else if (canReadAsBinaryDecimal(descriptor, sparkType) || + sparkType == DataTypes.BinaryType) { return new FixedLenByteArrayUpdater(arrayLen); } break; diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index 34a4eb8c002d6..b3318985a32b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -274,6 +274,7 @@ class ParquetToSparkSchemaConverter( case _: DecimalLogicalTypeAnnotation => makeDecimalType(Decimal.maxPrecisionForBytes(parquetType.getTypeLength)) case _: IntervalLogicalTypeAnnotation => typeNotImplemented() + case null => BinaryType case _ => illegalType() } diff --git a/sql/core/src/test/avro/parquet-compat.avdl b/sql/core/src/test/avro/parquet-compat.avdl index c5eb5b5164cf4..02e0fc889aebf 100644 --- a/sql/core/src/test/avro/parquet-compat.avdl +++ b/sql/core/src/test/avro/parquet-compat.avdl @@ -34,6 +34,8 @@ protocol CompatibilityTest { string nested_string_column; } + fixed FixedType(8); + record AvroPrimitives { boolean bool_column; int int_column; @@ -42,6 +44,7 @@ protocol CompatibilityTest { double double_column; bytes binary_column; string string_column; + FixedType fixed_column; } record AvroOptionalPrimitives { @@ -52,6 +55,7 @@ protocol CompatibilityTest { union { null, double } maybe_double_column; union { null, bytes } maybe_binary_column; union { null, string } maybe_string_column; + union { null, FixedType} maybe_fixed_column; } record AvroNonNullableArrays { diff --git a/sql/core/src/test/avro/parquet-compat.avpr b/sql/core/src/test/avro/parquet-compat.avpr index 9ad315b74fb41..3c90a5893020d 100644 --- a/sql/core/src/test/avro/parquet-compat.avpr +++ b/sql/core/src/test/avro/parquet-compat.avpr @@ -25,6 +25,10 @@ "name" : "nested_string_column", "type" : "string" } ] + }, { + "type" : "fixed", + "name" : "FixedType", + "size" : 8 }, { "type" : "record", "name" : "AvroPrimitives", @@ -49,6 +53,9 @@ }, { "name" : "string_column", "type" : "string" + }, { + "name" : "fixed_column", + "type" : "FixedType" } ] }, { "type" : "record", @@ -74,6 +81,9 @@ }, { "name" : "maybe_string_column", "type" : [ "null", "string" ] + }, { + "name" : "maybe_fixed_column", + "type" : [ "null", "FixedType" ] } ] }, { "type" : "record", diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/AvroOptionalPrimitives.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/AvroOptionalPrimitives.java index e4d1ead8dd15f..1fe87fcff6d2e 100644 --- a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/AvroOptionalPrimitives.java +++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/AvroOptionalPrimitives.java @@ -7,7 +7,7 @@ @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class AvroOptionalPrimitives extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AvroOptionalPrimitives\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"fields\":[{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]}]}"); + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AvroOptionalPrimitives\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"fields\":[{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"maybe_fixed_column\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"FixedType\",\"size\":8}]}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } @Deprecated public java.lang.Boolean maybe_bool_column; @Deprecated public java.lang.Integer maybe_int_column; @@ -17,6 +17,8 @@ public class AvroOptionalPrimitives extends org.apache.avro.specific.SpecificRec @Deprecated public java.nio.ByteBuffer maybe_binary_column; @Deprecated public java.lang.String maybe_string_column; + private org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType maybe_fixed_column; + /** * Default constructor. Note that this does not initialize fields * to their default values from the schema. If that is desired then @@ -27,7 +29,7 @@ public AvroOptionalPrimitives() {} /** * All-args constructor. */ - public AvroOptionalPrimitives(java.lang.Boolean maybe_bool_column, java.lang.Integer maybe_int_column, java.lang.Long maybe_long_column, java.lang.Float maybe_float_column, java.lang.Double maybe_double_column, java.nio.ByteBuffer maybe_binary_column, java.lang.String maybe_string_column) { + public AvroOptionalPrimitives(java.lang.Boolean maybe_bool_column, java.lang.Integer maybe_int_column, java.lang.Long maybe_long_column, java.lang.Float maybe_float_column, java.lang.Double maybe_double_column, java.nio.ByteBuffer maybe_binary_column, java.lang.String maybe_string_column, org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType maybe_fixed_column) { this.maybe_bool_column = maybe_bool_column; this.maybe_int_column = maybe_int_column; this.maybe_long_column = maybe_long_column; @@ -35,6 +37,7 @@ public AvroOptionalPrimitives(java.lang.Boolean maybe_bool_column, java.lang.Int this.maybe_double_column = maybe_double_column; this.maybe_binary_column = maybe_binary_column; this.maybe_string_column = maybe_string_column; + this.maybe_fixed_column = maybe_fixed_column; } public org.apache.avro.Schema getSchema() { return SCHEMA$; } @@ -48,6 +51,7 @@ public java.lang.Object get(int field$) { case 4: return maybe_double_column; case 5: return maybe_binary_column; case 6: return maybe_string_column; + case 7: return maybe_fixed_column; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } @@ -62,6 +66,7 @@ public void put(int field$, java.lang.Object value$) { case 4: maybe_double_column = (java.lang.Double)value$; break; case 5: maybe_binary_column = (java.nio.ByteBuffer)value$; break; case 6: maybe_string_column = (java.lang.String)value$; break; + case 7: maybe_fixed_column = (org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType)value$; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } @@ -171,6 +176,22 @@ public void setMaybeStringColumn(java.lang.String value) { this.maybe_string_column = value; } + /** + * Gets the value of the 'maybe_fixed_column' field. + * @return The value of the 'maybe_fixed_column' field. + */ + public org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType getMaybeFixedColumn() { + return maybe_fixed_column; + } + + /** + * Sets the value of the 'maybe_fixed_column' field. + * @param value the value to set. + */ + public void setMaybeFixedColumn(org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType value) { + this.maybe_fixed_column = value; + } + /** Creates a new AvroOptionalPrimitives RecordBuilder */ public static org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroOptionalPrimitives.Builder newBuilder() { return new org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroOptionalPrimitives.Builder(); @@ -199,6 +220,7 @@ public static class Builder extends org.apache.avro.specific.SpecificRecordBuild private java.lang.Double maybe_double_column; private java.nio.ByteBuffer maybe_binary_column; private java.lang.String maybe_string_column; + private org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType maybe_fixed_column; /** Creates a new Builder */ private Builder() { @@ -236,6 +258,10 @@ private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Avr this.maybe_string_column = data().deepCopy(fields()[6].schema(), other.maybe_string_column); fieldSetFlags()[6] = true; } + if (isValidValue(fields()[7], other.maybe_fixed_column)) { + this.maybe_fixed_column = data().deepCopy(fields()[7].schema(), other.maybe_fixed_column); + fieldSetFlags()[7] = true; + } } /** Creates a Builder by copying an existing AvroOptionalPrimitives instance */ @@ -269,6 +295,10 @@ private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Avr this.maybe_string_column = data().deepCopy(fields()[6].schema(), other.maybe_string_column); fieldSetFlags()[6] = true; } + if (isValidValue(fields()[7], other.maybe_fixed_column)) { + this.maybe_fixed_column = data().deepCopy(fields()[7].schema(), other.maybe_fixed_column); + fieldSetFlags()[7] = true; + } } /** Gets the value of the 'maybe_bool_column' field */ @@ -446,6 +476,45 @@ public org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroOptional return this; } + /** + * Gets the value of the 'maybe_fixed_column' field. + * @return The value. + */ + public org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType getMaybeFixedColumn() { + return maybe_fixed_column; + } + + /** + * Sets the value of the 'maybe_fixed_column' field. + * @param value The value of 'maybe_fixed_column'. + * @return This builder. + */ + public org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroOptionalPrimitives.Builder setMaybeFixedColumn(org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType value) { + validate(fields()[7], value); + this.maybe_fixed_column = value; + fieldSetFlags()[7] = true; + return this; + } + + /** + * Checks whether the 'maybe_fixed_column' field has been set. + * @return True if the 'maybe_fixed_column' field has been set, false otherwise. + */ + public boolean hasMaybeFixedColumn() { + return fieldSetFlags()[7]; + } + + + /** + * Clears the value of the 'maybe_fixed_column' field. + * @return This builder. + */ + public org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroOptionalPrimitives.Builder clearMaybeFixedColumn() { + maybe_fixed_column = null; + fieldSetFlags()[7] = false; + return this; + } + @Override public AvroOptionalPrimitives build() { try { @@ -457,6 +526,7 @@ public AvroOptionalPrimitives build() { record.maybe_double_column = fieldSetFlags()[4] ? this.maybe_double_column : (java.lang.Double) defaultValue(fields()[4]); record.maybe_binary_column = fieldSetFlags()[5] ? this.maybe_binary_column : (java.nio.ByteBuffer) defaultValue(fields()[5]); record.maybe_string_column = fieldSetFlags()[6] ? this.maybe_string_column : (java.lang.String) defaultValue(fields()[6]); + record.maybe_fixed_column = fieldSetFlags()[7] ? this.maybe_fixed_column : (org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType) defaultValue(fields()[7]); return record; } catch (Exception e) { throw new org.apache.avro.AvroRuntimeException(e); diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/AvroPrimitives.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/AvroPrimitives.java index 1c2afed16781e..fd21cab1c2669 100644 --- a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/AvroPrimitives.java +++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/AvroPrimitives.java @@ -7,7 +7,7 @@ @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class AvroPrimitives extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { - public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AvroPrimitives\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"); + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"AvroPrimitives\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"fixed_column\",\"type\":{\"type\":\"fixed\",\"name\":\"FixedType\",\"size\":8}}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } @Deprecated public boolean bool_column; @Deprecated public int int_column; @@ -17,6 +17,8 @@ public class AvroPrimitives extends org.apache.avro.specific.SpecificRecordBase @Deprecated public java.nio.ByteBuffer binary_column; @Deprecated public java.lang.String string_column; + private org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType fixed_column; + /** * Default constructor. Note that this does not initialize fields * to their default values from the schema. If that is desired then @@ -27,7 +29,7 @@ public AvroPrimitives() {} /** * All-args constructor. */ - public AvroPrimitives(java.lang.Boolean bool_column, java.lang.Integer int_column, java.lang.Long long_column, java.lang.Float float_column, java.lang.Double double_column, java.nio.ByteBuffer binary_column, java.lang.String string_column) { + public AvroPrimitives(java.lang.Boolean bool_column, java.lang.Integer int_column, java.lang.Long long_column, java.lang.Float float_column, java.lang.Double double_column, java.nio.ByteBuffer binary_column, java.lang.String string_column, org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType fixed_column) { this.bool_column = bool_column; this.int_column = int_column; this.long_column = long_column; @@ -35,6 +37,7 @@ public AvroPrimitives(java.lang.Boolean bool_column, java.lang.Integer int_colum this.double_column = double_column; this.binary_column = binary_column; this.string_column = string_column; + this.fixed_column = fixed_column; } public org.apache.avro.Schema getSchema() { return SCHEMA$; } @@ -48,6 +51,7 @@ public java.lang.Object get(int field$) { case 4: return double_column; case 5: return binary_column; case 6: return string_column; + case 7: return fixed_column; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } @@ -62,6 +66,7 @@ public void put(int field$, java.lang.Object value$) { case 4: double_column = (java.lang.Double)value$; break; case 5: binary_column = (java.nio.ByteBuffer)value$; break; case 6: string_column = (java.lang.String)value$; break; + case 7: fixed_column = (org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType)value$; break; default: throw new org.apache.avro.AvroRuntimeException("Bad index"); } } @@ -171,6 +176,21 @@ public void setStringColumn(java.lang.String value) { this.string_column = value; } + /** + * Gets the value of the 'fixed_column' field. + */ + public org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType getFixedColumn() { + return fixed_column; + } + + /** + * Sets the value of the 'fixed_column' field. + * @param value the value to set. + */ + public void setFixedColumn(org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType value) { + this.fixed_column = value; + } + /** Creates a new AvroPrimitives RecordBuilder */ public static org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroPrimitives.Builder newBuilder() { return new org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroPrimitives.Builder(); @@ -199,6 +219,7 @@ public static class Builder extends org.apache.avro.specific.SpecificRecordBuild private double double_column; private java.nio.ByteBuffer binary_column; private java.lang.String string_column; + private org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType fixed_column; /** Creates a new Builder */ private Builder() { @@ -236,6 +257,10 @@ private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Avr this.string_column = data().deepCopy(fields()[6].schema(), other.string_column); fieldSetFlags()[6] = true; } + if (isValidValue(fields()[7], other.fixed_column)) { + this.fixed_column = data().deepCopy(fields()[7].schema(), other.fixed_column); + fieldSetFlags()[7] = true; + } } /** Creates a Builder by copying an existing AvroPrimitives instance */ @@ -269,6 +294,10 @@ private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Avr this.string_column = data().deepCopy(fields()[6].schema(), other.string_column); fieldSetFlags()[6] = true; } + if (isValidValue(fields()[7], other.fixed_column)) { + this.fixed_column = data().deepCopy(fields()[7].schema(), other.fixed_column); + fieldSetFlags()[7] = true; + } } /** Gets the value of the 'bool_column' field */ @@ -441,6 +470,44 @@ public org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroPrimitiv return this; } + /** + * Gets the value of the 'fixed_column' field. + * @return The value. + */ + public org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType getFixedColumn() { + return fixed_column; + } + + /** + * Sets the value of the 'fixed_column' field. + * @param value The value of 'fixed_column'. + * @return This builder. + */ + public org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroPrimitives.Builder setFixedColumn(org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType value) { + validate(fields()[7], value); + this.fixed_column = value; + fieldSetFlags()[7] = true; + return this; + } + + /** + * Checks whether the 'fixed_column' field has been set. + * @return True if the 'fixed_column' field has been set, false otherwise. + */ + public boolean hasFixedColumn() { + return fieldSetFlags()[7]; + } + + /** + * Clears the value of the 'fixed_column' field. + * @return This builder. + */ + public org.apache.spark.sql.execution.datasources.parquet.test.avro.AvroPrimitives.Builder clearFixedColumn() { + fixed_column = null; + fieldSetFlags()[7] = false; + return this; + } + @Override public AvroPrimitives build() { try { @@ -452,6 +519,7 @@ public AvroPrimitives build() { record.double_column = fieldSetFlags()[4] ? this.double_column : (java.lang.Double) defaultValue(fields()[4]); record.binary_column = fieldSetFlags()[5] ? this.binary_column : (java.nio.ByteBuffer) defaultValue(fields()[5]); record.string_column = fieldSetFlags()[6] ? this.string_column : (java.lang.String) defaultValue(fields()[6]); + record.fixed_column = fieldSetFlags()[7] ? this.fixed_column : (org.apache.spark.sql.execution.datasources.parquet.test.avro.FixedType) defaultValue(fields()[7]); return record; } catch (Exception e) { throw new org.apache.avro.AvroRuntimeException(e); diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java index 28fdc1dfb911c..40c18e72e6d12 100644 --- a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java +++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/CompatibilityTest.java @@ -8,7 +8,7 @@ @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public interface CompatibilityTest { - public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"CompatibilityTest\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"types\":[{\"type\":\"enum\",\"name\":\"Suit\",\"symbols\":[\"SPADES\",\"HEARTS\",\"DIAMONDS\",\"CLUBS\"]},{\"type\":\"record\",\"name\":\"ParquetEnum\",\"fields\":[{\"name\":\"suit\",\"type\":\"Suit\"}]},{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"AvroPrimitives\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"AvroOptionalPrimitives\",\"fields\":[{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]}]},{\"type\":\"record\",\"name\":\"AvroNonNullableArrays\",\"fields\":[{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"maybe_ints_column\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"int\"}]}]},{\"type\":\"record\",\"name\":\"AvroArrayOfArray\",\"fields\":[{\"name\":\"int_arrays_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"array\",\"items\":\"int\"}}}]},{\"type\":\"record\",\"name\":\"AvroMapOfArray\",\"fields\":[{\"name\":\"string_to_ints_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":\"int\"},\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"fields\":[{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":\"Nested\"},\"avro.java.string\":\"String\"}}]}],\"messages\":{}}"); + public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol.parse("{\"protocol\":\"CompatibilityTest\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"types\":[{\"type\":\"enum\",\"name\":\"Suit\",\"symbols\":[\"SPADES\",\"HEARTS\",\"DIAMONDS\",\"CLUBS\"]},{\"type\":\"record\",\"name\":\"ParquetEnum\",\"fields\":[{\"name\":\"suit\",\"type\":\"Suit\"}]},{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},{\"type\":\"fixed\",\"name\":\"FixedType\",\"size\":8},{\"type\":\"record\",\"name\":\"AvroPrimitives\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"fixed_column\",\"type\":\"FixedType\"}]},{\"type\":\"record\",\"name\":\"AvroOptionalPrimitives\",\"fields\":[{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"maybe_fixed_column\",\"type\":[\"null\",\"FixedType\"]}]},{\"type\":\"record\",\"name\":\"AvroNonNullableArrays\",\"fields\":[{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"maybe_ints_column\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"int\"}]}]},{\"type\":\"record\",\"name\":\"AvroArrayOfArray\",\"fields\":[{\"name\":\"int_arrays_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"array\",\"items\":\"int\"}}}]},{\"type\":\"record\",\"name\":\"AvroMapOfArray\",\"fields\":[{\"name\":\"string_to_ints_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":\"int\"},\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"fields\":[{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":\"Nested\"},\"avro.java.string\":\"String\"}}]}],\"messages\":{}}"); @SuppressWarnings("all") public interface Callback extends CompatibilityTest { diff --git a/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/FixedType.java b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/FixedType.java new file mode 100644 index 0000000000000..035538b5d7c8d --- /dev/null +++ b/sql/core/src/test/gen-java/org/apache/spark/sql/execution/datasources/parquet/test/avro/FixedType.java @@ -0,0 +1,44 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package org.apache.spark.sql.execution.datasources.parquet.test.avro; +@org.apache.avro.specific.FixedSize(8) +@org.apache.avro.specific.AvroGenerated +public class FixedType extends org.apache.avro.specific.SpecificFixed { + private static final long serialVersionUID = -7223470326441566598L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"fixed\",\"name\":\"FixedType\",\"namespace\":\"org.apache.spark.sql.execution.datasources.parquet.test.avro\",\"size\":8}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + + /** Creates a new FixedType */ + public FixedType() { + super(); + } + + /** + * Creates a new FixedType with the given bytes. + * @param bytes The bytes to create the new FixedType. + */ + public FixedType(byte[] bytes) { + super(bytes); + } + + private static final org.apache.avro.io.DatumWriter + WRITER$ = new org.apache.avro.specific.SpecificDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, org.apache.avro.specific.SpecificData.getEncoder(out)); + } + + private static final org.apache.avro.io.DatumReader + READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, org.apache.avro.specific.SpecificData.getDecoder(in)); + } + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala index d7727d93ddf98..fd06372826403 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala @@ -47,7 +47,15 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared try f(writer) finally writer.close() } - test("required primitives") { + private def generateFixedLengthByteArray(i : Int): Array[Byte] = { + val fixedLengthByteArray = Array[Byte](0, 0, 0, 0, 0, 0, 0, 0) + val fixedLengthByteArrayComponent = "val_$i".getBytes(StandardCharsets.UTF_8) + Array.copy(fixedLengthByteArrayComponent, 0, fixedLengthByteArray, 0, + Math.min(fixedLengthByteArrayComponent.length, fixedLengthByteArray.length)) + fixedLengthByteArray + } + + private def testRequiredPrimitives(enableVectorizedReader: String): Unit = { withTempPath { dir => val path = dir.getCanonicalPath @@ -62,12 +70,14 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared .setDoubleColumn(i.toDouble + 0.2d) .setBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes(StandardCharsets.UTF_8))) .setStringColumn(s"val_$i") + .setFixedColumn(new FixedType(generateFixedLengthByteArray(i))) .build()) } } logParquetSchema(path) + spark.conf.set("spark.sql.parquet.enableVectorizedReader", enableVectorizedReader) checkAnswer(spark.read.parquet(path), (0 until 10).map { i => Row( i % 2 == 0, @@ -76,13 +86,22 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared i.toFloat + 0.1f, i.toDouble + 0.2d, s"val_$i".getBytes(StandardCharsets.UTF_8), - s"val_$i") + s"val_$i", + generateFixedLengthByteArray(i)) }) } } - test("optional primitives") { - withTempPath { dir => + test("required primitives enableVectorizedReader true") { + testRequiredPrimitives("true") + } + + test("required primitives enableVectorizedReader false") { + testRequiredPrimitives("false") + } + + private def testOptionalPrimitives(enableVectorizedReader: String): Unit = { + withTempPath { dir => val path = dir.getCanonicalPath withWriter[AvroOptionalPrimitives](path, AvroOptionalPrimitives.getClassSchema) { writer => @@ -96,6 +115,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared .setMaybeDoubleColumn(null) .setMaybeBinaryColumn(null) .setMaybeStringColumn(null) + .setMaybeFixedColumn(null) .build() } else { AvroOptionalPrimitives.newBuilder() @@ -106,6 +126,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared .setMaybeDoubleColumn(i.toDouble + 0.2d) .setMaybeBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes(StandardCharsets.UTF_8))) .setMaybeStringColumn(s"val_$i") + .setMaybeFixedColumn(new FixedType(generateFixedLengthByteArray(i))) .build() } @@ -117,7 +138,7 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared checkAnswer(spark.read.parquet(path), (0 until 10).map { i => if (i % 3 == 0) { - Row.apply(Seq.fill(7)(null): _*) + Row.apply(Seq.fill(8)(null): _*) } else { Row( i % 2 == 0, @@ -126,12 +147,21 @@ class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest with Shared i.toFloat + 0.1f, i.toDouble + 0.2d, s"val_$i".getBytes(StandardCharsets.UTF_8), - s"val_$i") + s"val_$i", + generateFixedLengthByteArray(i)) } }) } } + test("optional primitives with enableVectorizedReader true") { + testOptionalPrimitives("true") + } + + test("optional primitives with enableVectorizedReader false") { + testOptionalPrimitives("false") + } + test("non-nullable arrays") { withTempPath { dir => val path = dir.getCanonicalPath