Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.client.clustering.run.strategy;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
Expand Down Expand Up @@ -306,7 +306,7 @@ private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc,
// broadcast reader context.
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
ReaderContextFactory<InternalRow> readerContextFactory = getEngineContext().getReaderContextFactory(metaClient);
StructType sparkSchemaWithMetaFields = AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields.toAvroSchema());
StructType sparkSchemaWithMetaFields = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(tableSchemaWithMetaFields);

RDD<InternalRow> internalRowRDD = jsc.parallelize(clusteringOps, clusteringOps.size()).flatMap(new FlatMapFunction<ClusteringOperation, InternalRow>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.client.common;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
Expand Down Expand Up @@ -146,7 +146,7 @@ private static SparkColumnarFileReader getOrcFileReader(TableSchemaResolver reso
Configuration configs,
SparkAdapter sparkAdapter) {
try {
StructType dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema());
StructType dataSchema = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(resolver.getTableSchema());
return sparkAdapter.createOrcFileReader(false, sqlConf, options, configs, dataSchema);
} catch (Exception e) {
throw new HoodieException("Failed to broadcast ORC file reader", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.hudi.client.utils;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkRowSerDe;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
Expand Down Expand Up @@ -295,7 +295,7 @@ public static ExpressionIndexComputationMetadata getExprIndexRecords(
getExpressionIndexRecordsIterator(readerContextFactory.getContext(), metaClient, tableSchema, readerSchema, dataWriteConfig, entry));

// Generate dataset with expression index metadata
StructType structType = AvroConversionUtils.convertAvroSchemaToStructType(readerSchema.toAvroSchema())
StructType structType = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(readerSchema)
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION, DataTypes.StringType, false, Metadata.empty()))
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH, DataTypes.StringType, false, Metadata.empty()))
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE, DataTypes.LongType, false, Metadata.empty()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.client.utils;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
Expand All @@ -41,7 +41,7 @@ public static Object[] getPartitionFieldVals(Option<String[]> partitionFields,
partitionFields.get(),
partitionPath,
new StoragePath(basePath),
AvroConversionUtils.convertAvroSchemaToStructType(writerSchema.toAvroSchema()),
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(writerSchema),
hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()),
hadoopConf.getBoolean("spark.sql.sources.validatePartitionColumns", true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hudi.client.utils;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
Expand Down Expand Up @@ -137,8 +137,8 @@ public static Dataset<Row> getRecordsFromCommittedFiles(SQLContext sqlContext,
try {
return sqlContext.createDataFrame(
sqlContext.emptyDataFrame().rdd(),
AvroConversionUtils.convertAvroSchemaToStructType(
new TableSchemaResolver(table.getMetaClient()).getTableAvroSchema()));
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(
new TableSchemaResolver(table.getMetaClient()).getTableSchema()));
} catch (Exception e) {
LOG.warn("Cannot get table schema from before state.", e);
LOG.warn("Using the schema from after state (current transaction) to create the empty Spark dataframe: {}", newStructTypeSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.common.model;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRecordContext;
import org.apache.hudi.client.model.HoodieInternalRow;
Expand Down Expand Up @@ -323,7 +324,7 @@ public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, Prop
if (data == null) {
return Option.empty();
}
StructType structType = schema == null ? AvroConversionUtils.convertAvroSchemaToStructType(recordSchema) : schema;
StructType structType = schema == null ? HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(HoodieSchema.fromAvroSchema(recordSchema)) : schema;
GenericRecord convertedRecord = AvroConversionUtils.createInternalRowToAvroConverter(structType, recordSchema, false).apply(data);
return Option.of(new HoodieAvroIndexedRecord(key, convertedRecord));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.hudi.merge;

import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSparkRecord;
Expand Down Expand Up @@ -218,8 +218,8 @@ public static Pair<Map<Integer, StructField>, Pair<StructType, HoodieSchema>> ge
}
}
StructType mergedStructType = new StructType(mergedFieldList.toArray(new StructField[0]));
HoodieSchema mergedSchema = HoodieSchemaCache.intern(HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(
mergedStructType, readerSchema.getName(), readerSchema.getNamespace().orElse(null))));
HoodieSchema mergedSchema = HoodieSchemaCache.intern(HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
mergedStructType, readerSchema.getName(), readerSchema.getNamespace().orElse(null)));
return Pair.of(mergedMapping, Pair.of(mergedStructType, mergedSchema));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,22 @@ package org.apache.hudi

import org.apache.hudi.HoodieSparkUtils.{getCatalystRowSerDe, sparkAdapter}
import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.internal.schema.HoodieSchemaException

import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
import org.apache.avro.Schema.Type
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.types.StructType

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

object AvroConversionUtils {
private val ROW_TO_AVRO_CONVERTER_CACHE =
new ConcurrentHashMap[Tuple3[StructType, Schema, Boolean], Function1[InternalRow, GenericRecord]]
private val AVRO_SCHEMA_CACHE = new ConcurrentHashMap[Schema, StructType]

/**
* Creates converter to transform Avro payload into Spark's Catalyst one
Expand Down Expand Up @@ -102,10 +96,10 @@ object AvroConversionUtils {
structName: String,
recordNamespace: String): Row => GenericRecord = {
val serde = getCatalystRowSerDe(sourceSqlType)
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName, recordNamespace)
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(avroSchema) != avroSchema
val schema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(sourceSqlType, structName, recordNamespace)
val nullable = schema.isNullable

val converter = AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, avroSchema, nullable)
val converter = AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, schema.toAvroSchema, nullable)

row => converter.apply(serde.serializeRow(row))
}
Expand All @@ -119,77 +113,12 @@ object AvroConversionUtils {
ss.createDataFrame(rdd.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
val schema = new Schema.Parser().parse(schemaStr)
val dataType = convertAvroSchemaToStructType(schema)
val converter = createConverterToRow(schema, dataType)
val schema = HoodieSchema.parse(schemaStr)
val dataType = HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
val converter = createConverterToRow(schema.toAvroSchema, dataType)
records.map { r => converter(r) }
}
}, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr)))
}

/**
* Converts [[StructType]] into Avro's [[Schema]]
*
* @param structType Catalyst's [[StructType]]
* @param qualifiedName Avro's schema qualified name
* @return Avro schema corresponding to given struct type.
*/
def convertStructTypeToAvroSchema(structType: DataType,
qualifiedName: String): Schema = {
val (namespace, name) = {
val parts = qualifiedName.split('.')
(parts.init.mkString("."), parts.last)
}
convertStructTypeToAvroSchema(structType, name, namespace)
}


/**
* Converts [[StructType]] into Avro's [[Schema]]
*
* @param structType Catalyst's [[StructType]]
* @param structName Avro record name
* @param recordNamespace Avro record namespace
* @return Avro schema corresponding to given struct type.
*/
def convertStructTypeToAvroSchema(structType: DataType,
structName: String,
recordNamespace: String): Schema = {
try {
HoodieSparkAvroSchemaConverters.toAvroType(structType, nullable = false, structName, recordNamespace)
} catch {
case a: AvroRuntimeException => throw new HoodieSchemaException(a.getMessage, a)
case e: Exception => throw new HoodieSchemaException("Failed to convert struct type to avro schema: " + structType, e)
}
}

/**
* Converts Avro's [[Schema]] to Catalyst's [[StructType]]
*/
def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
val loader: java.util.function.Function[Schema, StructType] = key => {
try {
HoodieSparkAvroSchemaConverters.toSqlType(key) match {
case (dataType, _) => dataType.asInstanceOf[StructType]
}
} catch {
case e: Exception => throw new HoodieSchemaException("Failed to convert avro schema to struct type: " + avroSchema, e)
}
}
AVRO_SCHEMA_CACHE.computeIfAbsent(avroSchema, loader)
}

/**
* Converts Avro's [[Schema]] to Catalyst's [[DataType]]
*/
def convertAvroSchemaToDataType(avroSchema: Schema): DataType = {
try {
HoodieSparkAvroSchemaConverters.toSqlType(avroSchema) match {
case (dataType, _) => dataType
}
} catch {
case e: Exception => throw new HoodieSchemaException("Failed to convert avro schema to DataType: " + avroSchema, e)
}
}, HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(HoodieSchema.parse(schemaStr)))
}

/**
Expand All @@ -201,106 +130,4 @@ object AvroConversionUtils {
val nameParts = qualifiedName.split('.')
(nameParts.last, nameParts.init.mkString("."))
}

/**
* Recursively aligns the nullable property of hoodie table schema, supporting nested structures
*/
def alignFieldsNullability(sourceSchema: StructType, avroSchema: Schema): StructType = {
// Converts Avro fields to a Map for efficient lookup
val avroFieldsMap = avroSchema.getFields.asScala.map(f => (f.name, f)).toMap

// Recursively process fields
val alignedFields = sourceSchema.fields.map { field =>
avroFieldsMap.get(field.name) match {
case Some(avroField) =>
// Process the nullable property of the current field
val alignedField = field.copy(nullable = avroField.schema.isNullable)

// Recursively handle nested structures
field.dataType match {
case structType: StructType =>
// For struct type, recursively process its internal fields
val nestedAvroSchema = unwrapNullableSchema(avroField.schema)
if (nestedAvroSchema.getType == Schema.Type.RECORD) {
alignedField.copy(dataType = alignFieldsNullability(structType, nestedAvroSchema))
} else {
alignedField
}

case ArrayType(elementType, containsNull) =>
// For array type, process element type
val arraySchema = unwrapNullableSchema(avroField.schema)
if (arraySchema.getType == Schema.Type.ARRAY) {
val elemSchema = arraySchema.getElementType
val newElementType = updateElementType(elementType, elemSchema)
alignedField.copy(dataType = ArrayType(newElementType, elemSchema.isNullable))
} else {
alignedField
}

case MapType(keyType, valueType, valueContainsNull) =>
// For Map type, process value type
val mapSchema = unwrapNullableSchema(avroField.schema)
if (mapSchema.getType == Schema.Type.MAP) {
val valueSchema = mapSchema.getValueType
val newValueType = updateElementType(valueType, valueSchema)
alignedField.copy(dataType = MapType(keyType, newValueType, valueSchema.isNullable))
} else {
alignedField
}

case _ => alignedField // Basic types are returned directly
}

case None => field.copy() // Field not found in Avro schema remains unchanged
}
}

StructType(alignedFields)
}

/**
* Returns the non-null schema if the schema is a UNION type containing NULL
*/
private def unwrapNullableSchema(schema: Schema): Schema = {
if (schema.getType == Schema.Type.UNION) {
val types = schema.getTypes.asScala
val nonNullTypes = types.filter(_.getType != Schema.Type.NULL)
if (nonNullTypes.size == 1) nonNullTypes.head else schema
} else {
schema
}
}

/**
* Updates the element type, handling nested structures
*/
private def updateElementType(dataType: DataType, avroSchema: Schema): DataType = {
dataType match {
case structType: StructType =>
if (avroSchema.getType == Schema.Type.RECORD) {
alignFieldsNullability(structType, avroSchema)
} else {
structType
}

case ArrayType(elemType, containsNull) =>
if (avroSchema.getType == Schema.Type.ARRAY) {
val elemSchema = avroSchema.getElementType
ArrayType(updateElementType(elemType, elemSchema), elemSchema.isNullable)
} else {
dataType
}

case MapType(keyType, valueType, valueContainsNull) =>
if (avroSchema.getType == Schema.Type.MAP) {
val valueSchema = avroSchema.getValueType
MapType(keyType, updateElementType(valueType, valueSchema), valueSchema.isNullable)
} else {
dataType
}

case _ => dataType // Basic types are returned directly
}
}
}
Loading
Loading