diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index f378ef46a5a34..be6d516a303f0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -18,7 +18,7 @@ package org.apache.hudi.client.clustering.run.strategy; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.avro.model.HoodieClusteringGroup; import org.apache.hudi.avro.model.HoodieClusteringPlan; @@ -306,7 +306,7 @@ private Dataset readRecordsForGroupAsRow(JavaSparkContext jsc, // broadcast reader context. HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient(); ReaderContextFactory readerContextFactory = getEngineContext().getReaderContextFactory(metaClient); - StructType sparkSchemaWithMetaFields = AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields.toAvroSchema()); + StructType sparkSchemaWithMetaFields = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(tableSchemaWithMetaFields); RDD internalRowRDD = jsc.parallelize(clusteringOps, clusteringOps.size()).flatMap(new FlatMapFunction() { @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java index a4e88b06915f9..4a2f52707121a 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java @@ -18,7 +18,7 @@ package org.apache.hudi.client.common; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.SparkFileFormatInternalRowReaderContext; @@ -146,7 +146,7 @@ private static SparkColumnarFileReader getOrcFileReader(TableSchemaResolver reso Configuration configs, SparkAdapter sparkAdapter) { try { - StructType dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema()); + StructType dataSchema = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(resolver.getTableSchema()); return sparkAdapter.createOrcFileReader(false, sqlConf, options, configs, dataSchema); } catch (Exception e) { throw new HoodieException("Failed to broadcast ORC file reader", e); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java index b86fa9612d1e3..11da57ce3f02c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java @@ -19,7 +19,7 @@ package org.apache.hudi.client.utils; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.SparkRowSerDe; import org.apache.hudi.avro.model.HoodieMetadataRecord; @@ -295,7 +295,7 @@ public static ExpressionIndexComputationMetadata getExprIndexRecords( getExpressionIndexRecordsIterator(readerContextFactory.getContext(), metaClient, tableSchema, readerSchema, dataWriteConfig, entry)); // Generate dataset with expression index metadata - StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(readerSchema.toAvroSchema()) + StructType structType = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(readerSchema) .add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION, DataTypes.StringType, false, Metadata.empty())) .add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH, DataTypes.StringType, false, Metadata.empty())) .add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE, DataTypes.LongType, false, Metadata.empty())); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java index 8cf96101bf1a5..d3f7f99083244 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java @@ -18,7 +18,7 @@ package org.apache.hudi.client.utils; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.Option; @@ -41,7 +41,7 @@ public static Object[] getPartitionFieldVals(Option partitionFields, partitionFields.get(), partitionPath, new StoragePath(basePath), - AvroConversionUtils.convertAvroSchemaToStructType(writerSchema.toAvroSchema()), + HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(writerSchema), hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()), hadoopConf.getBoolean("spark.sql.sources.validatePartitionColumns", true)); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java index f3de90d587be5..76fd622ec7fc3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java @@ -18,7 +18,7 @@ package org.apache.hudi.client.utils; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.validator.SparkPreCommitValidator; @@ -137,8 +137,8 @@ public static Dataset getRecordsFromCommittedFiles(SQLContext sqlContext, try { return sqlContext.createDataFrame( sqlContext.emptyDataFrame().rdd(), - AvroConversionUtils.convertAvroSchemaToStructType( - new TableSchemaResolver(table.getMetaClient()).getTableAvroSchema())); + HoodieSchemaConversionUtils.convertHoodieSchemaToStructType( + new TableSchemaResolver(table.getMetaClient()).getTableSchema())); } catch (Exception e) { LOG.warn("Cannot get table schema from before state.", e); LOG.warn("Using the schema from after state (current transaction) to create the empty Spark dataframe: {}", newStructTypeSchema); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java index 6747e1512a38f..64d48f2ca8a0f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.model; import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.SparkFileFormatInternalRecordContext; import org.apache.hudi.client.model.HoodieInternalRow; @@ -323,7 +324,7 @@ public Option toIndexedRecord(Schema recordSchema, Prop if (data == null) { return Option.empty(); } - StructType structType = schema == null ? AvroConversionUtils.convertAvroSchemaToStructType(recordSchema) : schema; + StructType structType = schema == null ? HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(HoodieSchema.fromAvroSchema(recordSchema)) : schema; GenericRecord convertedRecord = AvroConversionUtils.createInternalRowToAvroConverter(structType, recordSchema, false).apply(data); return Option.of(new HoodieAvroIndexedRecord(key, convertedRecord)); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java index a838e1378d63f..29408c131dfae 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java @@ -19,7 +19,7 @@ package org.apache.hudi.merge; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.common.engine.RecordContext; import org.apache.hudi.common.model.HoodieRecordMerger; import org.apache.hudi.common.model.HoodieSparkRecord; @@ -218,8 +218,8 @@ public static Pair, Pair> ge } } StructType mergedStructType = new StructType(mergedFieldList.toArray(new StructField[0])); - HoodieSchema mergedSchema = HoodieSchemaCache.intern(HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema( - mergedStructType, readerSchema.getName(), readerSchema.getNamespace().orElse(null)))); + HoodieSchema mergedSchema = HoodieSchemaCache.intern(HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema( + mergedStructType, readerSchema.getName(), readerSchema.getNamespace().orElse(null))); return Pair.of(mergedMapping, Pair.of(mergedStructType, mergedSchema)); }); } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 94b06f551a744..26b1dbf584dd2 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -20,28 +20,22 @@ package org.apache.hudi import org.apache.hudi.HoodieSparkUtils.{getCatalystRowSerDe, sparkAdapter} import org.apache.hudi.avro.AvroSchemaUtils -import org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.exception.SchemaCompatibilityException import org.apache.hudi.internal.schema.HoodieSchemaException -import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema} -import org.apache.avro.Schema.Type +import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Dataset, Row, SparkSession} -import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} +import org.apache.spark.sql.types.StructType import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConverters._ - object AvroConversionUtils { private val ROW_TO_AVRO_CONVERTER_CACHE = new ConcurrentHashMap[Tuple3[StructType, Schema, Boolean], Function1[InternalRow, GenericRecord]] - private val AVRO_SCHEMA_CACHE = new ConcurrentHashMap[Schema, StructType] /** * Creates converter to transform Avro payload into Spark's Catalyst one @@ -102,10 +96,10 @@ object AvroConversionUtils { structName: String, recordNamespace: String): Row => GenericRecord = { val serde = getCatalystRowSerDe(sourceSqlType) - val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName, recordNamespace) - val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(avroSchema) != avroSchema + val schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(sourceSqlType, structName, recordNamespace) + val nullable = schema.isNullable - val converter = AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, avroSchema, nullable) + val converter = AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, schema.toAvroSchema, nullable) row => converter.apply(serde.serializeRow(row)) } @@ -119,77 +113,12 @@ object AvroConversionUtils { ss.createDataFrame(rdd.mapPartitions { records => if (records.isEmpty) Iterator.empty else { - val schema = new Schema.Parser().parse(schemaStr) - val dataType = convertAvroSchemaToStructType(schema) - val converter = createConverterToRow(schema, dataType) + val schema = HoodieSchema.parse(schemaStr) + val dataType = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema) + val converter = createConverterToRow(schema.toAvroSchema, dataType) records.map { r => converter(r) } } - }, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr))) - } - - /** - * Converts [[StructType]] into Avro's [[Schema]] - * - * @param structType Catalyst's [[StructType]] - * @param qualifiedName Avro's schema qualified name - * @return Avro schema corresponding to given struct type. - */ - def convertStructTypeToAvroSchema(structType: DataType, - qualifiedName: String): Schema = { - val (namespace, name) = { - val parts = qualifiedName.split('.') - (parts.init.mkString("."), parts.last) - } - convertStructTypeToAvroSchema(structType, name, namespace) - } - - - /** - * Converts [[StructType]] into Avro's [[Schema]] - * - * @param structType Catalyst's [[StructType]] - * @param structName Avro record name - * @param recordNamespace Avro record namespace - * @return Avro schema corresponding to given struct type. - */ - def convertStructTypeToAvroSchema(structType: DataType, - structName: String, - recordNamespace: String): Schema = { - try { - HoodieSparkAvroSchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace) - } catch { - case a: AvroRuntimeException => throw new HoodieSchemaException(a.getMessage, a) - case e: Exception => throw new HoodieSchemaException("Failed to convert struct type to avro schema: " + structType, e) - } - } - - /** - * Converts Avro's [[Schema]] to Catalyst's [[StructType]] - */ - def convertAvroSchemaToStructType(avroSchema: Schema): StructType = { - val loader: java.util.function.Function[Schema, StructType] = key => { - try { - HoodieSparkAvroSchemaConverters.toSqlType(key) match { - case (dataType, _) => dataType.asInstanceOf[StructType] - } - } catch { - case e: Exception => throw new HoodieSchemaException("Failed to convert avro schema to struct type: " + avroSchema, e) - } - } - AVRO_SCHEMA_CACHE.computeIfAbsent(avroSchema, loader) - } - - /** - * Converts Avro's [[Schema]] to Catalyst's [[DataType]] - */ - def convertAvroSchemaToDataType(avroSchema: Schema): DataType = { - try { - HoodieSparkAvroSchemaConverters.toSqlType(avroSchema) match { - case (dataType, _) => dataType - } - } catch { - case e: Exception => throw new HoodieSchemaException("Failed to convert avro schema to DataType: " + avroSchema, e) - } + }, HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(HoodieSchema.parse(schemaStr))) } /** @@ -201,106 +130,4 @@ object AvroConversionUtils { val nameParts = qualifiedName.split('.') (nameParts.last, nameParts.init.mkString(".")) } - - /** - * Recursively aligns the nullable property of hoodie table schema, supporting nested structures - */ - def alignFieldsNullability(sourceSchema: StructType, avroSchema: Schema): StructType = { - // Converts Avro fields to a Map for efficient lookup - val avroFieldsMap = avroSchema.getFields.asScala.map(f => (f.name, f)).toMap - - // Recursively process fields - val alignedFields = sourceSchema.fields.map { field => - avroFieldsMap.get(field.name) match { - case Some(avroField) => - // Process the nullable property of the current field - val alignedField = field.copy(nullable = avroField.schema.isNullable) - - // Recursively handle nested structures - field.dataType match { - case structType: StructType => - // For struct type, recursively process its internal fields - val nestedAvroSchema = unwrapNullableSchema(avroField.schema) - if (nestedAvroSchema.getType == Schema.Type.RECORD) { - alignedField.copy(dataType = alignFieldsNullability(structType, nestedAvroSchema)) - } else { - alignedField - } - - case ArrayType(elementType, containsNull) => - // For array type, process element type - val arraySchema = unwrapNullableSchema(avroField.schema) - if (arraySchema.getType == Schema.Type.ARRAY) { - val elemSchema = arraySchema.getElementType - val newElementType = updateElementType(elementType, elemSchema) - alignedField.copy(dataType = ArrayType(newElementType, elemSchema.isNullable)) - } else { - alignedField - } - - case MapType(keyType, valueType, valueContainsNull) => - // For Map type, process value type - val mapSchema = unwrapNullableSchema(avroField.schema) - if (mapSchema.getType == Schema.Type.MAP) { - val valueSchema = mapSchema.getValueType - val newValueType = updateElementType(valueType, valueSchema) - alignedField.copy(dataType = MapType(keyType, newValueType, valueSchema.isNullable)) - } else { - alignedField - } - - case _ => alignedField // Basic types are returned directly - } - - case None => field.copy() // Field not found in Avro schema remains unchanged - } - } - - StructType(alignedFields) - } - - /** - * Returns the non-null schema if the schema is a UNION type containing NULL - */ - private def unwrapNullableSchema(schema: Schema): Schema = { - if (schema.getType == Schema.Type.UNION) { - val types = schema.getTypes.asScala - val nonNullTypes = types.filter(_.getType != Schema.Type.NULL) - if (nonNullTypes.size == 1) nonNullTypes.head else schema - } else { - schema - } - } - - /** - * Updates the element type, handling nested structures - */ - private def updateElementType(dataType: DataType, avroSchema: Schema): DataType = { - dataType match { - case structType: StructType => - if (avroSchema.getType == Schema.Type.RECORD) { - alignFieldsNullability(structType, avroSchema) - } else { - structType - } - - case ArrayType(elemType, containsNull) => - if (avroSchema.getType == Schema.Type.ARRAY) { - val elemSchema = avroSchema.getElementType - ArrayType(updateElementType(elemType, elemSchema), elemSchema.isNullable) - } else { - dataType - } - - case MapType(keyType, valueType, valueContainsNull) => - if (avroSchema.getType == Schema.Type.MAP) { - val valueSchema = avroSchema.getValueType - MapType(keyType, updateElementType(valueType, valueSchema), valueSchema.isNullable) - } else { - dataType - } - - case _ => dataType // Basic types are returned directly - } - } } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala index 1af72b9c7b917..86061f1cc4145 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala @@ -22,12 +22,16 @@ import org.apache.avro.generic.GenericRecord import org.apache.hudi.HoodieSparkUtils.sparkAdapter import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType, HoodieSchemaUtils} import org.apache.hudi.internal.schema.HoodieSchemaException + +import org.apache.avro.{AvroRuntimeException, Schema} import org.apache.spark.rdd.RDD import org.apache.spark.sql.avro.HoodieSparkSchemaConverters import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} import org.apache.spark.sql.{Dataset, Row, SparkSession} +import java.util.concurrent.ConcurrentHashMap + import scala.collection.JavaConverters._ /** @@ -37,6 +41,8 @@ import scala.collection.JavaConverters._ * handling defaults and nullability alignment. */ object HoodieSchemaConversionUtils { + private val SCHEMA_CACHE = new ConcurrentHashMap[HoodieSchema, StructType] + /** * Converts HoodieSchema to Catalyst's StructType. @@ -46,14 +52,20 @@ object HoodieSchemaConversionUtils { * @throws HoodieSchemaException if conversion fails */ def convertHoodieSchemaToStructType(hoodieSchema: HoodieSchema): StructType = { - try { - HoodieSparkSchemaConverters.toSqlType(hoodieSchema) match { - case (dataType, _) => dataType.asInstanceOf[StructType] + val loader: java.util.function.Function[HoodieSchema, StructType] = + new java.util.function.Function[HoodieSchema, StructType]() { + override def apply(schema: HoodieSchema): StructType = { + try { + HoodieSparkSchemaConverters.toSqlType(schema) match { + case (dataType, _) => dataType.asInstanceOf[StructType] + } + } catch { + case e: Exception => throw new HoodieSchemaException( + s"Failed to convert HoodieSchema to StructType: $schema", e) + } + } } - } catch { - case e: Exception => throw new HoodieSchemaException( - s"Failed to convert HoodieSchema to StructType: $hoodieSchema", e) - } + SCHEMA_CACHE.computeIfAbsent(hoodieSchema, loader) } /** @@ -126,6 +138,7 @@ object HoodieSchemaConversionUtils { try { HoodieSparkSchemaConverters.toHoodieType(structType, nullable, structName, recordNamespace) } catch { + case a: AvroRuntimeException => throw new HoodieSchemaException(a.getMessage, a) case e: Exception => throw new HoodieSchemaException( s"Failed to convert struct type to HoodieSchema: $structType", e) } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java index aebf8740d46b9..271f5a1fa4cee 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java @@ -18,7 +18,7 @@ package org.apache.hudi.util; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; @@ -68,7 +68,7 @@ private static List> createConverters(HoodieSch if (fieldSchemaOpt.isEmpty()) { return Function.identity(); } else { - DataType fieldType = AvroConversionUtils.convertAvroSchemaToDataType(fieldSchemaOpt.get().toAvroSchema()); + DataType fieldType = HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(fieldSchemaOpt.get()); return createConverter(fieldType, fieldSchemaOpt.get()); } }).collect(Collectors.toList()); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala deleted file mode 100644 index a853c1fdecde4..0000000000000 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.avro - -import org.apache.avro.Schema -import org.apache.spark.sql.avro.SchemaConverters.SchemaType -import org.apache.spark.sql.types.DataType - -/** - * This interface is simply a facade abstracting away Spark's [[SchemaConverters]] implementation, allowing - * the rest of the code-base to not depend on it directly - */ -object HoodieSparkAvroSchemaConverters extends HoodieAvroSchemaConverters { - - override def toSqlType(avroSchema: Schema): (DataType, Boolean) = - SchemaConverters.toSqlType(avroSchema) match { - case SchemaType(dataType, nullable) => (dataType, nullable) - } - - override def toAvroType(catalystType: DataType, nullable: Boolean, recordName: String, nameSpace: String): Schema = - SchemaConverters.toAvroType(catalystType, nullable, recordName, nameSpace) - -} diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index af45ebc534441..1afcc78df058e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -23,8 +23,7 @@ import org.apache.hudi.client.model.HoodieInternalRow import org.apache.hudi.common.model.FileSlice import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit -import org.apache.hudi.storage.{StorageConfiguration, StoragePath} -import org.apache.avro.Schema +import org.apache.hudi.storage.StorageConfiguration import org.apache.hadoop.conf.Configuration import org.apache.hudi.common.schema.HoodieSchema import org.apache.parquet.schema.MessageType @@ -166,7 +165,7 @@ trait SparkAdapter extends Serializable { */ def createRelation(sqlContext: SQLContext, metaClient: HoodieTableMetaClient, - schema: Schema, + schema: HoodieSchema, parameters: java.util.Map[String, String]): BaseRelation /** diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala index e111c9306f407..d4586cac82d5f 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala @@ -19,14 +19,15 @@ package org.apache.hudi.integ.testsuite.utils -import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils} +import org.apache.hudi.HoodieSchemaConversionUtils +import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.model.HoodieRecord +import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.util.Option import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config import org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator import org.apache.hudi.utilities.schema.RowBasedSchemaProvider -import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.spark.api.java.JavaRDD import org.apache.spark.sql.SparkSession @@ -137,8 +138,8 @@ object SparkSqlUtils { * @return an array of field names and types */ def getFieldNamesAndTypes(avroSchemaString: String): Array[(String, String)] = { - val schema = new Schema.Parser().parse(avroSchemaString) - val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema) + val schema = HoodieSchema.parse(avroSchemaString) + val structType = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema) structType.fields.map(field => (field.name, field.dataType.simpleString)) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java index d035f453fe4f7..3fc95008cd589 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java @@ -74,8 +74,7 @@ void testMergerWithNewRecordAccepted() throws IOException { HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION); Row oldValue = getSpecificValue(key, "001", 1L, "file1", 1, "1"); Row newValue = getSpecificValue(key, "002", 2L, "file2", 2, "2"); - HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema( - SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE)); + HoodieSchema schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE); BufferedRecord oldRecord = BufferedRecords.fromEngineRecord(InternalRow.apply(oldValue.toSeq()), ANY_KEY, schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null); BufferedRecord newRecord = BufferedRecords.fromEngineRecord(InternalRow.apply(newValue.toSeq()), ANY_KEY, schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null); @@ -100,8 +99,7 @@ void testMergerWithOldRecordAccepted() throws IOException { HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION); Row oldValue = getSpecificValue(key, "001", 1L, "file1", 3, "1"); Row newValue = getSpecificValue(key, "002", 2L, "file2", 2, "2"); - HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema( - SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE)); + HoodieSchema schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE); BufferedRecord oldRecord = BufferedRecords.fromEngineRecord(InternalRow.apply(oldValue.toSeq()), ANY_KEY, schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null); BufferedRecord newRecord = BufferedRecords.fromEngineRecord(InternalRow.apply(newValue.toSeq()), ANY_KEY, schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null); @@ -123,8 +121,7 @@ void testMergerWithOldRecordAccepted() throws IOException { void testMergerWithNewRecordAsDelete() throws IOException { HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION); Row oldValue = getSpecificValue(key, "001", 1L, "file1", 1, "1"); - HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema( - SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE)); + HoodieSchema schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE); BufferedRecord oldRecord = BufferedRecords.fromEngineRecord(InternalRow.apply(oldValue.toSeq()), ANY_KEY, schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null); BufferedRecord newRecord = BufferedRecords.createDelete(key.getRecordKey(), OrderingValues.getDefault()); @@ -143,8 +140,7 @@ void testMergerWithNewRecordAsDelete() throws IOException { void testMergerWithOldRecordAsDelete() throws IOException { HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION); Row newValue = getSpecificValue(key, "001", 1L, "file1", 1, "1"); - HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema( - SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE)); + HoodieSchema schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE); BufferedRecord oldRecord = BufferedRecords.createDelete(key.getRecordKey(), OrderingValues.getDefault()); BufferedRecord newRecord = BufferedRecords.fromEngineRecord(InternalRow.apply(newValue.toSeq()), ANY_KEY, schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala index a717bca8110bd..b8a26c04fa104 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala @@ -176,12 +176,12 @@ class TestAvroConversionUtils extends FunSuite with Matchers { .add("nullableMap", mapType, true).add("map", mapType, false) .add("nullableArray", arrayType, true).add("array", arrayType, false) - val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct, "SchemaName", "SchemaNS") + val schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(struct, "SchemaName", "SchemaNS") val expectedSchemaStr = complexSchemaStr - val expectedAvroSchema = HoodieSchema.parse(expectedSchemaStr).toAvroSchema + val expectedSchema = HoodieSchema.parse(expectedSchemaStr) - assert(avroSchema.equals(expectedAvroSchema)) + assert(schema.equals(expectedSchema)) } test("test convertStructTypeToAvroSchema with Nested StructField comment") { @@ -194,7 +194,7 @@ class TestAvroConversionUtils extends FunSuite with Matchers { .add("nullableMap", mapType, true).add("map",mapType,false) .add("nullableArray", arrayType, true).add("array",arrayType,false) - val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct, "SchemaName", "SchemaNS") + val schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(struct, "SchemaName", "SchemaNS") val expectedSchemaStr = s""" { @@ -395,9 +395,9 @@ class TestAvroConversionUtils extends FunSuite with Matchers { } """ - val expectedAvroSchema = HoodieSchema.parse(expectedSchemaStr).toAvroSchema + val expectedSchema = HoodieSchema.parse(expectedSchemaStr) - assert(avroSchema.equals(expectedAvroSchema)) + assert(schema.equals(expectedSchema)) } test("test converter with binary") { @@ -456,105 +456,7 @@ class TestAvroConversionUtils extends FunSuite with Matchers { .add("name", DataTypes.StringType, true) .add("name", DataTypes.StringType, true) the[HoodieSchemaException] thrownBy { - AvroConversionUtils.convertStructTypeToAvroSchema(struct, "SchemaName", "SchemaNS") + HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(struct, "SchemaName", "SchemaNS") } should have message "Duplicate field name in record SchemaNS.SchemaName: name type:UNION pos:2 and name type:UNION pos:1." } - - test("test alignFieldsNullability function") { - val sourceTableSchema: StructType = - StructType( - Seq( - StructField("intType", IntegerType, nullable = false), - StructField("longType", LongType), - StructField("stringType", StringType, nullable = false), - StructField("doubleType", DoubleType), - StructField("floatType", FloatType, nullable = true), - StructField("structType", new StructType( - Array(StructField("structType_1", StringType), StructField("structType_2", StringType)))), - StructField("dateType", DateType), - StructField("listType", new ArrayType(StringType, true)), - StructField("decimalType", new DecimalType(7, 3), nullable = false), - StructField("timeStampType", TimestampType), - StructField("mapType", new MapType(StringType, IntegerType, true)) - ) - ) - - val writeStructSchema: StructType = - StructType( - Seq( - StructField("intType", IntegerType, nullable = false), - StructField("longType", LongType), - StructField("stringType", StringType, nullable = true), - StructField("doubleType", DoubleType), - StructField("floatType", FloatType, nullable = false), - StructField("structType", new StructType( - Array(StructField("structType_1", StringType, nullable = false), StructField("structType_2", StringType)))), - StructField("dateType", DateType, nullable = false), - StructField("listType", new ArrayType(StringType, true)), - StructField("decimalType", new DecimalType(7, 3)), - StructField("timeStampType", TimestampType), - StructField("mapType", new MapType(StringType, IntegerType, true)), - StructField("notInTableSchemaTimeStampType_1", TimestampType), - StructField("notInTableSchemaIntType", IntegerType, nullable = false), - StructField("notInTableSchemaMapType", new MapType(StringType, IntegerType, true)) - ) - ) - val tableAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(sourceTableSchema, "data") - - val alignedSchema = AvroConversionUtils.alignFieldsNullability(writeStructSchema, tableAvroSchema) - - val nameToNullableSourceSchema = sourceTableSchema.fields.map(item => (item.name, item.nullable)).toMap - - val nameToNullableWriteSchema = writeStructSchema.fields.map(item => (item.name, item.nullable)).toMap - - // Validate alignment rules: - // 1. For fields existing in both schemas: use source table's nullability - // 2. For fields only in write schema: retain original nullability - for (field <- alignedSchema.fields) { - if (nameToNullableSourceSchema.contains(field.name) && nameToNullableWriteSchema.contains(field.name)) { - assertTrue(field.nullable == nameToNullableSourceSchema(field.name)) - } - if (!nameToNullableSourceSchema.contains(field.name) && nameToNullableWriteSchema.contains(field.name)) { - assertTrue(field.nullable == nameToNullableWriteSchema(field.name)) - } - } - - for (field <- alignedSchema.fields) { - if (field.name.equals("intType")) { - // Common field: both schemas specify nullable=false → aligned nullable=false - assertFalse(field.nullable) - } - if (field.name.equals("longType")) { - // Common field: both schemas default to nullable=true → aligned nullable=true - assertTrue(field.nullable) - } - if (field.name.equals("stringType")) { - // Conflicting case: - // Write schema (nullable=true) overridden by table schema (nullable=false) → aligned nullable=false - assertFalse(field.nullable) - } - - if (field.name.equals("structType")) { - val fields = field.dataType.asInstanceOf[StructType].fields - assertTrue(fields.apply(0).nullable) - assertTrue(fields.apply(1).nullable) - } - - if (field.name.equals("dateType")) { - // Conflicting case: - // Write schema specifies nullable=false but table schema defaults to true → aligned nullable=true - assertTrue(field.nullable) - } - - if (field.name.equals("notInTableSchemaIntType")) { - // Write-exclusive field: retains original nullability=false - assertFalse(field.nullable) - } - - if (field.name.equals("notInTableSchemaMapType")) { - // Write-exclusive field: retains original nullability=true - assertTrue(field.nullable) - } - } - } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index dd3ea6b45df59..561e829dbe35b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -507,7 +507,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = { // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema val structType = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema) - val modifiedSchema = AvroConversionUtils.convertStructTypeToAvroSchema(structType, "trip", "example.schema") + val modifiedSchema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, "trip", "example.schema") val records = DataSourceTestUtils.generateRandomRows(100) val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala index f81209820e5c4..2cf0dc20ad08e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala @@ -378,7 +378,7 @@ class TestHoodieSparkSqlWriterWithTestFormat extends HoodieSparkWriterTestBase { // generate the inserts val schema = DataSourceTestUtils.getStructTypeExampleSchema val structType = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema) - val modifiedSchema = AvroConversionUtils.convertStructTypeToAvroSchema(structType, "trip", "example.schema") + val modifiedSchema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, "trip", "example.schema") val records = DataSourceTestUtils.generateRandomRows(100) val recordsSeq = convertRowListToSeq(records) val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala index 44f6b03f27481..562f484956f6e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala @@ -18,8 +18,8 @@ package org.apache.hudi.common.model -import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport, SparkRowSerDe} -import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, createInternalRowToAvroConverter} +import org.apache.hudi.{HoodieSchemaConversionUtils, HoodieSparkUtils, SparkAdapterSupport, SparkRowSerDe} +import org.apache.hudi.AvroConversionUtils.createInternalRowToAvroConverter import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType import org.apache.hudi.common.model.TestHoodieRecordSerialization.{cloneUsingKryo, convertToAvroRecord, toUnsafeRow, OverwriteWithLatestAvroPayloadWithEquality} import org.apache.hudi.common.schema.HoodieSchema @@ -183,9 +183,9 @@ object TestHoodieRecordSerialization { } private def convertToAvroRecord(row: Row): GenericRecord = { - val schema = convertStructTypeToAvroSchema(row.schema, "testRecord", "testNamespace") + val schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(row.schema, "testRecord", "testNamespace") - createInternalRowToAvroConverter(row.schema, schema, nullable = false) + createInternalRowToAvroConverter(row.schema, schema.toAvroSchema, nullable = false) .apply(toUnsafeRow(row, row.schema)) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala index 74c547d991b6b..9a527b693c6ec 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala @@ -19,7 +19,7 @@ package org.apache.hudi.common.table.read -import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} +import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSchemaConversionUtils, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD, TABLE_TYPE} import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode, TypedProperties} import org.apache.hudi.common.engine.HoodieReaderContext @@ -41,7 +41,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf} import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils, Row, SaveMode, SparkSession} -import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters +import org.apache.spark.sql.avro.HoodieSparkSchemaConverters import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.execution.datasources.SparkColumnarFileReader @@ -108,7 +108,7 @@ class TestHoodieFileGroupReaderOnSpark extends TestHoodieFileGroupReaderBase[Int override def getHoodieReaderContext(tablePath: String, schema: HoodieSchema, storageConf: StorageConfiguration[_], metaClient: HoodieTableMetaClient): HoodieReaderContext[InternalRow] = { val parquetReader = sparkAdapter.createParquetFileReader(vectorized = false, spark.sessionState.conf, Map.empty, storageConf.unwrapAs(classOf[Configuration])) - val dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(schema.toAvroSchema) + val dataSchema = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema) val orcReader = sparkAdapter.createOrcFileReader(vectorized = false, spark.sessionState.conf, Map.empty, storageConf.unwrapAs(classOf[Configuration]), dataSchema) val multiFormatReader = new MultipleColumnarFileFormatReader(parquetReader, orcReader) new SparkFileFormatInternalRowReaderContext(multiFormatReader, Seq.empty, Seq.empty, getStorageConf, metaClient.getTableConfig) @@ -138,7 +138,7 @@ class TestHoodieFileGroupReaderOnSpark extends TestHoodieFileGroupReaderBase[Int override def assertRecordsEqual(schema: HoodieSchema, expected: InternalRow, actual: InternalRow): Unit = { assertEquals(expected.numFields, actual.numFields) - val expectedStruct = HoodieSparkAvroSchemaConverters.toSqlType(schema.toAvroSchema)._1.asInstanceOf[StructType] + val expectedStruct = HoodieSparkSchemaConverters.toSqlType(schema)._1.asInstanceOf[StructType] expected.toSeq(expectedStruct).zip(actual.toSeq(expectedStruct)).zipWithIndex.foreach { case ((v1, v2), i) => diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala index 2f18a202639c1..50686448b4080 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala @@ -374,7 +374,7 @@ class ColumnStatIndexTestBase extends HoodieSparkClientTestBase { assertEquals(asJson(sort(pExpectedColStatsIndexTableDf.drop(colsToDrop: _*), pValidationSortColumns)), asJson(sort(pTransposedColStatsDF.drop(colsToDrop: _*), pValidationSortColumns))) - val convertedSchema = AvroConversionUtils.convertAvroSchemaToStructType(AvroConversionUtils.convertStructTypeToAvroSchema(pExpectedColStatsSchema, "col_stats_schema")) + val convertedSchema = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(pExpectedColStatsSchema, "col_stats_schema")) if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) { val manualColStatsTableDF = diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala index 1ce99067f9c45..81fd89ec6d41a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala @@ -19,9 +19,8 @@ package org.apache.hudi.functional -import org.apache.hudi.AvroConversionUtils +import org.apache.hudi.{AvroConversionUtils, HoodieSchemaConversionUtils, PartitionStatsIndexSupport} import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.PartitionStatsIndexSupport import org.apache.hudi.TestHoodieSparkUtils.dropMetaFields import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.schema.HoodieSchema @@ -114,7 +113,7 @@ class PartitionStatsIndexTestBase extends HoodieStatsIndexTestBase { val partitionStatsIndex = new PartitionStatsIndexSupport( spark, inputDf.schema, - HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(inputDf.schema, "record", "")), + HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(inputDf.schema, "record", ""), HoodieMetadataConfig.newBuilder().enable(true).build(), metaClient) val partitionStats = partitionStatsIndex.loadColumnStatsIndexRecords(List("partition", "trip_type"), shouldReadInMemory = true).collectAsList() diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala index 26fa1c722344e..34963c7b4e225 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala @@ -17,9 +17,9 @@ package org.apache.hudi.functional -import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, ScalaAssertionSupport, SparkAdapterSupport} +import org.apache.hudi.{DataSourceWriteOptions, HoodieSchemaConversionUtils, ScalaAssertionSupport, SparkAdapterSupport} import org.apache.hudi.HoodieConversionUtils.toJavaOption -import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode} +import org.apache.hudi.common.config.RecordMergeMode import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver} import org.apache.hudi.common.util.Option @@ -125,7 +125,7 @@ class TestBasicSchemaEvolution extends HoodieSparkClientTestBase with ScalaAsser tableMetaClient.reloadActiveTimeline() val resolver = new TableSchemaResolver(tableMetaClient) - val latestTableSchema = AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema(false)) + val latestTableSchema = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(resolver.getTableSchema(false)) val df = spark.read.format("org.apache.hudi") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index a49591f429b9f..6473bb3de3431 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -17,7 +17,7 @@ package org.apache.hudi.functional -import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils, QuickstartUtils, ScalaAssertionSupport} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSchemaConversionUtils, HoodieSparkUtils, QuickstartUtils, ScalaAssertionSupport} import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, KEYGENERATOR_CLASS_NAME} import org.apache.hudi.HoodieConversionUtils.toJavaOption import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs} @@ -696,12 +696,12 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup val tableMetaClient = createMetaClient(spark, basePath) assertFalse(tableMetaClient.getArchivedTimeline.empty()) - val actualSchema = new TableSchemaResolver(tableMetaClient).getTableAvroSchema(false) + val actualSchema = new TableSchemaResolver(tableMetaClient).getTableSchema(false) val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(CommonOptionUtils.commonOpts(HoodieWriteConfig.TBL_NAME.key)) spark.sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) - val schema = AvroConversionUtils.convertStructTypeToAvroSchema(structType, structName, nameSpace) + val schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, structName, nameSpace) assertTrue(actualSchema != null) assertEquals(schema, actualSchema) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala index e5ff78e4488dc..12712b4db6171 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala @@ -18,7 +18,7 @@ package org.apache.hudi.functional -import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex} +import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex, HoodieSchemaConversionUtils} import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, RECORDKEY_FIELD} import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.client.common.HoodieSparkEngineContext @@ -485,7 +485,7 @@ class TestColumnStatsIndexWithSQL extends ColumnStatIndexTestBase { var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + ("path" -> basePath), includeLogFiles = true) val metadataConfig = HoodieMetadataConfig.newBuilder.withMetadataIndexColumnStats(true).enable(true).build - val hoodieSchema = HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(fileIndex.schema, "record", "")) + val hoodieSchema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(fileIndex.schema, "record", "") val cis = new ColumnStatsIndexSupport(spark, fileIndex.schema, hoodieSchema, metadataConfig, metaClient) // unpartitioned table - get all file slices val fileSlices = fileIndex.prunePartitionsAndGetFileSlices(Seq.empty, Seq()) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala index ec8ba1d8e54db..85f20ad3050fb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala @@ -19,7 +19,7 @@ package org.apache.hudi.functional -import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex, PartitionStatsIndexSupport} +import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex, HoodieSchemaConversionUtils, PartitionStatsIndexSupport} import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL, MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, UPSERT_OPERATION_OPT_VAL} import org.apache.hudi.avro.model.HoodieCleanMetadata import org.apache.hudi.client.SparkRDDWriteClient @@ -28,7 +28,6 @@ import org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictReso import org.apache.hudi.client.transaction.lock.InProcessLockProvider import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode, WriteOperationType} -import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.HoodieInstant import org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadataLegacy @@ -465,7 +464,7 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { val partitionStatsIndex = new PartitionStatsIndexSupport( spark, latestDf.schema, - HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(latestDf.schema, "record", "")), + HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(latestDf.schema, "record", ""), HoodieMetadataConfig.newBuilder() .enable(true) .build(), @@ -476,7 +475,7 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { .collectAsList() assertTrue(partitionStats.size() > 0) // Assert column stats after restore. - val hoodieSchema = HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(latestDf.schema, "record", "")) + val hoodieSchema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(latestDf.schema, "record", "") val columnStatsIndex = new ColumnStatsIndexSupport( spark, latestDf.schema, hoodieSchema, diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala index 9dbe116dc27b7..5c8ea92be0082 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala @@ -25,7 +25,7 @@ import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.internal.schema.convert.TestInternalSchemaConverter._ import org.apache.hudi.testutils.HoodieSparkClientTestHarness -import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters +import org.apache.spark.sql.avro.HoodieSparkSchemaConverters import org.apache.spark.sql.types._ import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} import org.junit.jupiter.api.Test @@ -33,7 +33,7 @@ import org.junit.jupiter.api.Test class TestSparkInternalSchemaConverter extends HoodieSparkClientTestHarness with SparkAdapterSupport { private def getStructType(schema: HoodieSchema): DataType = { - HoodieSparkAvroSchemaConverters.toSqlType(schema.toAvroSchema)._1 + HoodieSparkSchemaConverters.toSqlType(schema)._1 } @Test diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala index dfd97349a8ba3..4a488ca1a0eb0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala @@ -80,8 +80,8 @@ object AvroSerDerBenchmark extends HoodieBenchmarkBase { spark.sparkContext.getConf.registerAvroSchemas(schema.toAvroSchema) benchmark.addCase("deserialize avro Record to internalRow") { _ => testRdd.mapPartitions { iter => - val schema = AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, "record", "my") - val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(schema, sparkSchema) + val schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(sparkSchema, "record", "my") + val avroToRowConverter = AvroConversionUtils.createAvroToInternalRowConverter(schema.toAvroSchema, sparkSchema) iter.map(record => avroToRowConverter.apply(record).get) }.foreach(f => f) } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index dbce843fa936c..4f2bbb4b5b75c 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -17,22 +17,20 @@ package org.apache.spark.sql.adapter -import org.apache.hudi.{AvroConversionUtils, DefaultSource, HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, Spark3HoodiePartitionCDCFileGroupMapping, Spark3HoodiePartitionFileSliceMapping} +import org.apache.hudi.{DefaultSource, HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, HoodieSchemaConversionUtils, Spark3HoodiePartitionCDCFileGroupMapping, Spark3HoodiePartitionFileSliceMapping} import org.apache.hudi.client.model.{HoodieInternalRow, Spark3HoodieInternalRow} import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit import org.apache.hudi.common.util.JsonUtils import org.apache.hudi.spark.internal.ReflectUtil -import org.apache.hudi.storage.StoragePath -import org.apache.avro.Schema import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Column, DataFrame, DataFrameUtil, Dataset, HoodieUnsafeUtils, HoodieUTF8StringFactory, Spark3DataFrameUtil, Spark3HoodieUnsafeUtils, Spark3HoodieUTF8StringFactory, SparkSession, SQLContext} import org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan -import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters, HoodieSparkSchemaConverters} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog.CatalogTable @@ -101,9 +99,9 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { override def createRelation(sqlContext: SQLContext, metaClient: HoodieTableMetaClient, - schema: Schema, + schema: HoodieSchema, parameters: java.util.Map[String, String]): BaseRelation = { - val dataSchema = Option(schema).map(AvroConversionUtils.convertAvroSchemaToStructType).orNull + val dataSchema = Option(schema).map(HoodieSchemaConversionUtils.convertHoodieSchemaToStructType).orNull DefaultSource.createRelation(sqlContext, metaClient, dataSchema, parameters.asScala.toMap) } diff --git a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala index 1d0391aa219c2..58ed3eb5b88c9 100644 --- a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala @@ -17,24 +17,22 @@ package org.apache.spark.sql.adapter -import org.apache.hudi.{AvroConversionUtils, DefaultSource, HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, Spark4HoodiePartitionCDCFileGroupMapping, Spark4HoodiePartitionFileSliceMapping} +import org.apache.hudi.{DefaultSource, HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, HoodieSchemaConversionUtils, Spark4HoodiePartitionCDCFileGroupMapping, Spark4HoodiePartitionFileSliceMapping} import org.apache.hudi.client.model.{HoodieInternalRow, Spark4HoodieInternalRow} import org.apache.hudi.common.model.FileSlice +import org.apache.hudi.common.schema.HoodieSchema import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit import org.apache.hudi.common.util.JsonUtils import org.apache.hudi.spark.internal.ReflectUtil import org.apache.hudi.storage.StorageConfiguration -import org.apache.hudi.storage.StoragePath -import org.apache.avro.Schema import org.apache.parquet.schema.MessageType import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Column, DataFrame, DataFrameUtil, ExpressionColumnNodeWrapper, HoodieUnsafeUtils, HoodieUTF8StringFactory, Spark4DataFrameUtil, Spark4HoodieUnsafeUtils, Spark4HoodieUTF8StringFactory, SparkSession, SQLContext} import org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan -import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters, HoodieSparkSchemaConverters} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog.CatalogTable @@ -106,9 +104,9 @@ abstract class BaseSpark4Adapter extends SparkAdapter with Logging { override def createRelation(sqlContext: SQLContext, metaClient: HoodieTableMetaClient, - schema: Schema, + schema: HoodieSchema, parameters: java.util.Map[String, String]): BaseRelation = { - val dataSchema = Option(schema).map(AvroConversionUtils.convertAvroSchemaToStructType).orNull + val dataSchema = Option(schema).map(HoodieSchemaConversionUtils.convertHoodieSchemaToStructType).orNull DefaultSource.createRelation(sqlContext, metaClient, dataSchema, parameters.asScala.toMap) } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index b17116374bff5..02fc43dd37fc7 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -18,8 +18,8 @@ package org.apache.hudi.utilities; -import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.DataSourceReadOptions; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.PartitionStatsIndexSupport; import org.apache.hudi.async.HoodieAsyncService; import org.apache.hudi.avro.model.HoodieCleanerPlan; @@ -67,7 +67,6 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.FileFormatUtils; -import org.apache.hudi.io.util.FileIOUtils; import org.apache.hudi.common.util.HoodieDataUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -83,6 +82,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.io.util.FileIOUtils; import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieIndexVersion; import org.apache.hudi.metadata.HoodieMetadataPayload; @@ -1032,7 +1032,7 @@ private void validatePartitionStats(HoodieMetadataValidationContext metadataTabl PartitionStatsIndexSupport partitionStatsIndexSupport = new PartitionStatsIndexSupport(engineContext.getSqlContext().sparkSession(), - AvroConversionUtils.convertAvroSchemaToStructType(metadataTableBasedContext.getSchema().toAvroSchema()), + HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(metadataTableBasedContext.getSchema()), metadataTableBasedContext.getSchema(), metadataTableBasedContext.getMetadataConfig(), metaClientOpt.get(), false); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java index 6c7b5c1b810aa..6c58fb2739dc4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java @@ -19,8 +19,9 @@ package org.apache.hudi.utilities.schema; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.utilities.config.HiveSchemaProviderConfig; import org.apache.hudi.utilities.exception.HoodieSchemaFetchException; @@ -43,8 +44,8 @@ */ public class HiveSchemaProvider extends SchemaProvider { - private final Schema sourceSchema; - private Schema targetSchema; + private final HoodieSchema sourceSchema; + private HoodieSchema targetSchema; public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc); @@ -57,7 +58,7 @@ public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) { try { TableIdentifier sourceSchemaTable = new TableIdentifier(sourceSchemaTableName, scala.Option.apply(sourceSchemaDatabaseName)); StructType sourceSchema = spark.sessionState().catalog().getTableMetadata(sourceSchemaTable).schema(); - this.sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema( + this.sourceSchema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema( sourceSchema, sourceSchemaTableName, "hoodie." + sourceSchemaDatabaseName); @@ -72,7 +73,7 @@ public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) { try { TableIdentifier targetSchemaTable = new TableIdentifier(targetSchemaTableName, scala.Option.apply(targetSchemaDatabaseName)); StructType targetSchema = spark.sessionState().catalog().getTableMetadata(targetSchemaTable).schema(); - this.targetSchema = AvroConversionUtils.convertStructTypeToAvroSchema( + this.targetSchema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema( targetSchema, targetSchemaTableName, "hoodie." + targetSchemaDatabaseName); @@ -84,13 +85,13 @@ public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) { @Override public Schema getSourceSchema() { - return sourceSchema; + return sourceSchema.toAvroSchema(); } @Override public Schema getTargetSchema() { if (targetSchema != null) { - return targetSchema; + return targetSchema.toAvroSchema(); } else { return super.getTargetSchema(); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java index 29504c01c0061..b783ba32c4d9a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java @@ -18,7 +18,7 @@ package org.apache.hudi.utilities.schema; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.avro.Schema; @@ -44,6 +44,6 @@ public RowBasedSchemaProvider(StructType rowStruct) { @Override public Schema getSourceSchema() { - return AvroConversionUtils.convertStructTypeToAvroSchema(rowStruct, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE); + return HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(rowStruct, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toAvroSchema(); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java index 2704dc132a2ea..d25f3082d0822 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java @@ -18,13 +18,13 @@ package org.apache.hudi.utilities.sources.helpers; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.Option; import org.apache.hudi.testutils.HoodieSparkClientTestHarness; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; -import org.apache.avro.Schema; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -34,7 +34,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.util.Arrays; import java.util.Collections; @@ -194,8 +194,8 @@ void loadDatasetWithNestedSchemaAndCoalesceAliases() throws IOException { List expected = Arrays.asList(person1, person2, person3); List actual = result.get().collectAsList(); Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual)); - Schema schema = new Schema.Parser().parse(new File(schemaFilePath)); - StructType expectedSchema = AvroConversionUtils.convertAvroSchemaToStructType(schema); + HoodieSchema schema = HoodieSchema.parse(new FileInputStream(schemaFilePath)); + StructType expectedSchema = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema); // assert final output schema matches with the source schema Assertions.assertEquals(expectedSchema, result.get().schema(), "output dataset schema should match source schema"); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java index 36e53bfafc465..eb195de5a8c9d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java @@ -18,7 +18,7 @@ package org.apache.hudi.utilities.sources.helpers; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.HoodieSparkUtils; import org.apache.hudi.avro.MercifulJsonConverterTestBase; import org.apache.hudi.common.schema.HoodieSchema; @@ -686,7 +686,7 @@ void testEncodedDecimalAvroSparkPostProcessorCase(int size, int scale, int preci } private void validateSchemaCompatibility(List rows, HoodieSchema schema) { - StructType rowSchema = AvroConversionUtils.convertAvroSchemaToStructType(schema.toAvroSchema()); + StructType rowSchema = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema); Dataset dataset = spark.createDataFrame(rows, rowSchema); assertDoesNotThrow(dataset::collect, "Schema validation and dataset creation should not throw any exceptions."); }