diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java index 9655fdca5b13d..8487a1f01ee25 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java @@ -97,7 +97,7 @@ public Function, HoodieInsertValueGenResult> g } public static Function, HoodieInsertValueGenResult> getTransformerInternal(HoodieSchema schema, - HoodieWriteConfig writeConfig) { + HoodieWriteConfig writeConfig) { // NOTE: Whether record have to be cloned here is determined based on the executor type used // for writing: executors relying on an inner queue, will be keeping references to the records // and therefore in the environments where underlying buffer holding the record could be diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java index 5d7917fc25a98..7f4fdd0e5caf4 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java @@ -34,14 +34,13 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.table.format.FlinkRecordContext; -import org.apache.hudi.util.RowDataAvroQueryContexts; -import org.apache.hudi.util.RowDataAvroQueryContexts.RowDataQueryContext; +import org.apache.hudi.util.RowDataQueryContexts; +import org.apache.hudi.util.RowDataQueryContexts.RowDataQueryContext; import org.apache.hudi.util.RowProjection; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.GenericRowData; @@ -103,7 +102,7 @@ protected Comparable doGetOrderingValue(HoodieSchema recordSchema, Properties if (recordSchema.getField(field).isEmpty()) { return OrderingValues.getDefault(); } - return (Comparable) getColumnValue(recordSchema.toAvroSchema(), field, props); + return (Comparable) getColumnValue(recordSchema, field, props); }); } } @@ -141,7 +140,7 @@ public String getRecordKey(HoodieSchema recordSchema, Option k @Override public String getRecordKey(HoodieSchema recordSchema, String keyFieldName) { if (key == null) { - String recordKey = Objects.toString(RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema()).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data)); + String recordKey = Objects.toString(RowDataQueryContexts.fromSchema(recordSchema).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data)); key = new HoodieKey(recordKey, null); } return getRecordKey(); @@ -164,9 +163,9 @@ public Object convertColumnValueForLogicalType(HoodieSchema fieldSchema, if (fieldValue == null) { return null; } - + HoodieSchemaType schemaType = fieldSchema.getType(); - + if (schemaType == HoodieSchemaType.DATE) { return LocalDate.ofEpochDay(((Integer) fieldValue).longValue()); } else if (schemaType == HoodieSchemaType.TIMESTAMP && keepConsistentLogicalTimestamp) { @@ -190,20 +189,20 @@ public Object[] getColumnValues(HoodieSchema recordSchema, String[] columns, boo @Override public Object getColumnValueAsJava(HoodieSchema recordSchema, String column, Properties props) { - return getColumnValueAsJava(recordSchema.toAvroSchema(), column, props, true); + return getColumnValueAsJava(recordSchema, column, props, true); } - private Object getColumnValueAsJava(Schema recordSchema, String column, Properties props, boolean allowsNull) { + private Object getColumnValueAsJava(HoodieSchema recordSchema, String column, Properties props, boolean allowsNull) { boolean utcTimezone = Boolean.parseBoolean(props.getProperty( HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString())); - RowDataQueryContext rowDataQueryContext = RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone); + RowDataQueryContext rowDataQueryContext = RowDataQueryContexts.fromSchema(recordSchema, utcTimezone); return rowDataQueryContext.getFieldQueryContext(column).getValAsJava(data, allowsNull); } - private Object getColumnValue(Schema recordSchema, String column, Properties props) { + private Object getColumnValue(HoodieSchema recordSchema, String column, Properties props) { boolean utcTimezone = Boolean.parseBoolean(props.getProperty( HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString())); - RowDataQueryContext rowDataQueryContext = RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone); + RowDataQueryContext rowDataQueryContext = RowDataQueryContexts.fromSchema(recordSchema, utcTimezone); return rowDataQueryContext.getFieldQueryContext(column).getFieldGetter().getFieldOrNull(data); } @@ -236,7 +235,7 @@ public HoodieRecord updateMetaField(HoodieSchema recordSchema, int ordinal, Stri @Override public HoodieRecord rewriteRecordWithNewSchema(HoodieSchema recordSchema, Properties props, HoodieSchema newSchema, Map renameCols) { - RowProjection rowProjection = RowDataAvroQueryContexts.getRowProjection(recordSchema.toAvroSchema(), newSchema.toAvroSchema(), renameCols); + RowProjection rowProjection = RowDataQueryContexts.getRowProjection(recordSchema, newSchema, renameCols); RowData newRow = rowProjection.project(getData()); return new HoodieFlinkRecord(getKey(), getOperation(), newRow); } @@ -285,8 +284,8 @@ public HoodieRecord truncateRecordKey(HoodieSchema recordSchema, Properties prop public Option toIndexedRecord(HoodieSchema recordSchema, Properties props) { boolean utcTimezone = Boolean.parseBoolean(props.getProperty( HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString())); - RowDataQueryContext rowDataQueryContext = RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema(), utcTimezone); - IndexedRecord indexedRecord = (IndexedRecord) rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema.toAvroSchema(), getData()); + RowDataQueryContext rowDataQueryContext = RowDataQueryContexts.fromSchema(recordSchema, utcTimezone); + IndexedRecord indexedRecord = (IndexedRecord) rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema, getData()); return Option.of(new HoodieAvroIndexedRecord(getKey(), indexedRecord, getOperation(), getMetadata(), orderingValue, isDelete)); } @@ -294,8 +293,8 @@ public Option toIndexedRecord(HoodieSchema recordSchema public ByteArrayOutputStream getAvroBytes(HoodieSchema recordSchema, Properties props) { boolean utcTimezone = Boolean.parseBoolean(props.getProperty( HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString())); - RowDataQueryContext rowDataQueryContext = RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema(), utcTimezone); - IndexedRecord indexedRecord = (IndexedRecord) rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema.toAvroSchema(), getData()); + RowDataQueryContext rowDataQueryContext = RowDataQueryContexts.fromSchema(recordSchema, utcTimezone); + IndexedRecord indexedRecord = (IndexedRecord) rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema, getData()); return HoodieAvroUtils.avroToBytesStream(indexedRecord); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java index a08c7f39a5498..6bae6c2e8eac8 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java @@ -24,7 +24,7 @@ import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.read.BufferedRecord; import org.apache.hudi.common.table.read.BufferedRecords; -import org.apache.hudi.util.RowDataAvroQueryContexts; +import org.apache.hudi.util.RowDataQueryContexts; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; @@ -129,7 +129,7 @@ private BufferedRecord mergeRecord( // later in the file writer. int mergedArity = newSchema.getFields().size(); boolean utcTimezone = Boolean.parseBoolean(props.getProperty("read.utc-timezone", "true")); - RowData.FieldGetter[] fieldGetters = RowDataAvroQueryContexts.fromAvroSchema(newSchema.toAvroSchema(), utcTimezone).fieldGetters(); + RowData.FieldGetter[] fieldGetters = RowDataQueryContexts.fromSchema(newSchema, utcTimezone).fieldGetters(); int lowOrderIdx = 0; int highOrderIdx = 0; @@ -138,10 +138,10 @@ private BufferedRecord mergeRecord( // shift start index for merging if there is schema discrepancy if (lowOrderArity != mergedArity) { lowOrderIdx += lowOrderArity - mergedArity; - lowOrderFieldGetters = RowDataAvroQueryContexts.fromAvroSchema(lowOrderSchema.toAvroSchema(), utcTimezone).fieldGetters(); + lowOrderFieldGetters = RowDataQueryContexts.fromSchema(lowOrderSchema, utcTimezone).fieldGetters(); } else if (highOrderArity != mergedArity) { highOrderIdx += highOrderArity - mergedArity; - highOrderFieldGetters = RowDataAvroQueryContexts.fromAvroSchema(highOrderSchema.toAvroSchema(), utcTimezone).fieldGetters(); + highOrderFieldGetters = RowDataQueryContexts.fromSchema(highOrderSchema, utcTimezone).fieldGetters(); } RowData lowOrderRow = (RowData) lowOrderRecord.getRecord(); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java index dd98d8886e666..dbeb24c550ec3 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java @@ -18,11 +18,11 @@ package org.apache.hudi.execution; -import org.apache.hudi.common.schema.HoodieSchema; -import org.apache.hudi.common.schema.HoodieSchemaCache; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaCache; import org.apache.hudi.common.util.queue.HoodieExecutor; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java index 39ec2d08eb183..3dbc65c3290e1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java @@ -32,7 +32,7 @@ import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; -import org.apache.hudi.util.RowDataAvroQueryContexts; +import org.apache.hudi.util.RowDataQueryContexts; import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.conf.Configuration; @@ -67,7 +67,7 @@ protected HoodieFileWriter newParquetFileWriter( HoodieConfig config, HoodieSchema schema) throws IOException { //TODO boundary to revisit in follow up to use HoodieSchema directly - final RowType rowType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(schema.getAvroSchema()).getRowType().getLogicalType(); + final RowType rowType = (RowType) RowDataQueryContexts.fromSchema(schema).getRowType().getLogicalType(); HoodieRowDataParquetWriteSupport writeSupport = new HoodieRowDataParquetWriteSupport( storage.getConf().unwrapAs(Configuration.class), rowType, null); @@ -94,7 +94,7 @@ public HoodieFileWriter newParquetFileWriter( HoodieSchema schema, TaskContextSupplier taskContextSupplier) throws IOException { //TODO boundary to revisit in follow up to use HoodieSchema directly - final RowType rowType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(schema.getAvroSchema()).getRowType().getLogicalType(); + final RowType rowType = (RowType) RowDataQueryContexts.fromSchema(schema).getRowType().getLogicalType(); return newParquetFileWriter(instantTime, storagePath, config, rowType, taskContextSupplier); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java index 927ff5093b860..d242dcbfb9d60 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java @@ -34,7 +34,7 @@ import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.OrderingValueEngineTypeConverter; import org.apache.hudi.util.RecordKeyToRowDataConverter; -import org.apache.hudi.util.RowDataAvroQueryContexts; +import org.apache.hudi.util.RowDataQueryContexts; import org.apache.hudi.util.RowDataUtils; import org.apache.hudi.util.RowProjection; import org.apache.hudi.util.SchemaEvolvingRowDataProjection; @@ -82,8 +82,8 @@ public static FlinkRecordContext getDeleteCheckingInstance() { @Override public Object getValue(RowData record, HoodieSchema schema, String fieldName) { - RowDataAvroQueryContexts.FieldQueryContext fieldQueryContext = - RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema(), utcTimezone).getFieldQueryContext(fieldName); + RowDataQueryContexts.FieldQueryContext fieldQueryContext = + RowDataQueryContexts.fromSchema(schema, utcTimezone).getFieldQueryContext(fieldName); if (fieldQueryContext == null) { return null; } else { @@ -108,7 +108,7 @@ public Comparable convertOrderingValueToEngineType(Comparable value) { @Override public GenericRecord convertToAvroRecord(RowData record, HoodieSchema schema) { - return (GenericRecord) RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema()).getRowDataToAvroConverter().convert(schema.toAvroSchema(), record); + return (GenericRecord) RowDataQueryContexts.fromSchema(schema).getRowDataToAvroConverter().convert(schema, record); } @Override @@ -125,7 +125,7 @@ public RowData getDeleteRow(String recordKey) { @Override public RowData convertAvroRecord(IndexedRecord avroRecord) { Schema recordSchema = avroRecord.getSchema(); - AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone).getAvroToRowDataConverter(); + AvroToRowDataConverters.AvroToRowDataConverter converter = RowDataQueryContexts.fromSchema(HoodieSchema.fromAvroSchema(recordSchema), utcTimezone).getAvroToRowDataConverter(); RowData rowData = (RowData) converter.convert(avroRecord); Schema.Field operationField = recordSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD); if (operationField != null) { @@ -180,7 +180,7 @@ public RowData toBinaryRow(HoodieSchema schema, RowData record) { if (record instanceof BinaryRowData) { return record; } - RowDataSerializer rowDataSerializer = RowDataAvroQueryContexts.getRowDataSerializer(schema.toAvroSchema()); + RowDataSerializer rowDataSerializer = RowDataQueryContexts.getRowDataSerializer(schema); return rowDataSerializer.toBinaryRow(record); } @@ -197,8 +197,8 @@ public RowData toBinaryRow(HoodieSchema schema, RowData record) { */ @Override public UnaryOperator projectRecord(HoodieSchema from, HoodieSchema to, Map renamedColumns) { - RowType fromType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(from.toAvroSchema()).getRowType().getLogicalType(); - RowType toType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(to.toAvroSchema()).getRowType().getLogicalType(); + RowType fromType = (RowType) RowDataQueryContexts.fromSchema(from).getRowType().getLogicalType(); + RowType toType = (RowType) RowDataQueryContexts.fromSchema(to).getRowType().getLogicalType(); RowProjection rowProjection = SchemaEvolvingRowDataProjection.instance(fromType, toType, renamedColumns); return rowProjection::project; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java deleted file mode 100644 index 1729044a1464e..0000000000000 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.util; - -import org.apache.hudi.common.util.ReflectionUtils; - -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.SchemaBuilder; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.ArrayType; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.IntType; -import org.apache.flink.table.types.logical.LocalZonedTimestampType; -import org.apache.flink.table.types.logical.LogicalType; -import org.apache.flink.table.types.logical.LogicalTypeFamily; -import org.apache.flink.table.types.logical.MapType; -import org.apache.flink.table.types.logical.MultisetType; -import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.types.logical.TimeType; -import org.apache.flink.table.types.logical.TimestampType; - -import java.util.List; -import java.util.stream.Collectors; - -/** - * Converts an Avro schema into Flink's type information. It uses {@link org.apache.flink.api.java.typeutils.RowTypeInfo} for - * representing objects and converts Avro types into types that are compatible with Flink's Table & - * SQL API. - * - *

Note: Changes in this class need to be kept in sync with the corresponding runtime classes - * {@code org.apache.flink.formats.avro.AvroRowDeserializationSchema} and {@code org.apache.flink.formats.avro.AvroRowSerializationSchema}. - * - *

NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that. - */ -public class AvroSchemaConverter { - - /** - * Converts an Avro schema {@code schema} into a nested row structure with deterministic field order and - * data types that are compatible with Flink's Table & SQL API. - * - * @param schema Avro schema definition - * @return data type matching the schema - */ - public static DataType convertToDataType(Schema schema) { - switch (schema.getType()) { - case RECORD: - final List schemaFields = schema.getFields(); - - final DataTypes.Field[] fields = new DataTypes.Field[schemaFields.size()]; - for (int i = 0; i < schemaFields.size(); i++) { - final Schema.Field field = schemaFields.get(i); - fields[i] = DataTypes.FIELD(field.name(), convertToDataType(field.schema())); - } - return DataTypes.ROW(fields).notNull(); - case ENUM: - case STRING: - // convert Avro's Utf8/CharSequence to String - return DataTypes.STRING().notNull(); - case ARRAY: - return DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull(); - case MAP: - return DataTypes.MAP( - DataTypes.STRING().notNull(), - convertToDataType(schema.getValueType())) - .notNull(); - case UNION: - final Schema actualSchema; - final boolean nullable; - if (schema.getTypes().size() == 2 - && schema.getTypes().get(0).getType() == Schema.Type.NULL) { - actualSchema = schema.getTypes().get(1); - nullable = true; - } else if (schema.getTypes().size() == 2 - && schema.getTypes().get(1).getType() == Schema.Type.NULL) { - actualSchema = schema.getTypes().get(0); - nullable = true; - } else if (schema.getTypes().size() == 1) { - actualSchema = schema.getTypes().get(0); - nullable = false; - } else { - List nonNullTypes = schema.getTypes().stream() - .filter(s -> s.getType() != Schema.Type.NULL) - .collect(Collectors.toList()); - nullable = schema.getTypes().size() > nonNullTypes.size(); - - // use Kryo for serialization - DataType rawDataType = (DataType) ReflectionUtils.invokeStaticMethod( - "org.apache.hudi.utils.DataTypeUtils", - "createAtomicRawType", - new Object[] {false, Types.GENERIC(Object.class)}, - Boolean.class, - TypeInformation.class); - - if (recordTypesOfSameNumFields(nonNullTypes)) { - DataType converted = DataTypes.ROW( - DataTypes.FIELD("wrapper", rawDataType)) - .notNull(); - return nullable ? converted.nullable() : converted; - } - // use Kryo for serialization - return nullable ? rawDataType.nullable() : rawDataType; - } - DataType converted = convertToDataType(actualSchema); - return nullable ? converted.nullable() : converted; - case FIXED: - // logical decimal type - if (schema.getLogicalType() instanceof LogicalTypes.Decimal) { - final LogicalTypes.Decimal decimalType = - (LogicalTypes.Decimal) schema.getLogicalType(); - return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()) - .notNull(); - } - // convert fixed size binary data to primitive byte arrays - return DataTypes.VARBINARY(schema.getFixedSize()).notNull(); - case BYTES: - // logical decimal type - if (schema.getLogicalType() instanceof LogicalTypes.Decimal) { - final LogicalTypes.Decimal decimalType = - (LogicalTypes.Decimal) schema.getLogicalType(); - return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()) - .notNull(); - } - return DataTypes.BYTES().notNull(); - case INT: - // logical date and time type - final org.apache.avro.LogicalType logicalType = schema.getLogicalType(); - if (logicalType == LogicalTypes.date()) { - return DataTypes.DATE().notNull(); - } else if (logicalType == LogicalTypes.timeMillis()) { - return DataTypes.TIME(3).notNull(); - } - return DataTypes.INT().notNull(); - case LONG: - // logical timestamp type - if (schema.getLogicalType() == LogicalTypes.timestampMillis()) { - return DataTypes.TIMESTAMP(3).notNull(); - } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) { - return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(); - } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { - return DataTypes.TIMESTAMP(6).notNull(); - } else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) { - return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull(); - } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) { - return DataTypes.TIME(3).notNull(); - } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { - return DataTypes.TIME(6).notNull(); - } - return DataTypes.BIGINT().notNull(); - case FLOAT: - return DataTypes.FLOAT().notNull(); - case DOUBLE: - return DataTypes.DOUBLE().notNull(); - case BOOLEAN: - return DataTypes.BOOLEAN().notNull(); - case NULL: - return DataTypes.NULL(); - default: - throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'."); - } - } - - /** - * Returns true if all the types are RECORD type with same number of fields. - */ - private static boolean recordTypesOfSameNumFields(List types) { - if (types == null || types.size() == 0) { - return false; - } - if (types.stream().anyMatch(s -> s.getType() != Schema.Type.RECORD)) { - return false; - } - int numFields = types.get(0).getFields().size(); - return types.stream().allMatch(s -> s.getFields().size() == numFields); - } - - /** - * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. - * - *

Use "record" as the type name. - * - * @param schema the schema type, usually it should be the top level record type, e.g. not a - * nested type - * @return Avro's {@link Schema} matching this logical type. - */ - public static Schema convertToSchema(LogicalType schema) { - return convertToSchema(schema, "record"); - } - - /** - * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. - * - *

The "{rowName}." is used as the nested row type name prefix in order to generate the right - * schema. Nested record type that only differs with type name is still compatible. - * - * @param logicalType logical type - * @param rowName the record name - * @return Avro's {@link Schema} matching this logical type. - */ - public static Schema convertToSchema(LogicalType logicalType, String rowName) { - int precision; - boolean nullable = logicalType.isNullable(); - switch (logicalType.getTypeRoot()) { - case NULL: - return SchemaBuilder.builder().nullType(); - case BOOLEAN: - Schema bool = SchemaBuilder.builder().booleanType(); - return nullable ? nullableSchema(bool) : bool; - case TINYINT: - case SMALLINT: - case INTEGER: - Schema integer = SchemaBuilder.builder().intType(); - return nullable ? nullableSchema(integer) : integer; - case BIGINT: - Schema bigint = SchemaBuilder.builder().longType(); - return nullable ? nullableSchema(bigint) : bigint; - case FLOAT: - Schema f = SchemaBuilder.builder().floatType(); - return nullable ? nullableSchema(f) : f; - case DOUBLE: - Schema d = SchemaBuilder.builder().doubleType(); - return nullable ? nullableSchema(d) : d; - case CHAR: - case VARCHAR: - Schema str = SchemaBuilder.builder().stringType(); - return nullable ? nullableSchema(str) : str; - case BINARY: - case VARBINARY: - Schema binary = SchemaBuilder.builder().bytesType(); - return nullable ? nullableSchema(binary) : binary; - case TIMESTAMP_WITHOUT_TIME_ZONE: - // use long to represents Timestamp - final TimestampType timestampType = (TimestampType) logicalType; - precision = timestampType.getPrecision(); - org.apache.avro.LogicalType timestampLogicalType; - if (precision <= 3) { - timestampLogicalType = LogicalTypes.timestampMillis(); - } else if (precision <= 6) { - timestampLogicalType = LogicalTypes.timestampMicros(); - } else { - throw new IllegalArgumentException( - "Avro does not support TIMESTAMP type with precision: " - + precision - + ", it only support precisions <= 6."); - } - Schema timestamp = timestampLogicalType.addToSchema(SchemaBuilder.builder().longType()); - return nullable ? nullableSchema(timestamp) : timestamp; - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - // use long to represents LocalZonedTimestampType - final LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) logicalType; - precision = localZonedTimestampType.getPrecision(); - org.apache.avro.LogicalType localZonedTimestampLogicalType; - if (precision <= 3) { - localZonedTimestampLogicalType = LogicalTypes.localTimestampMillis(); - } else if (precision <= 6) { - localZonedTimestampLogicalType = LogicalTypes.localTimestampMicros(); - } else { - throw new IllegalArgumentException( - "Avro does not support LOCAL TIMESTAMP type with precision: " - + precision - + ", it only support precisions <= 6."); - } - Schema localZonedTimestamp = localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType()); - return nullable ? nullableSchema(localZonedTimestamp) : localZonedTimestamp; - case DATE: - // use int to represents Date - Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); - return nullable ? nullableSchema(date) : date; - case TIME_WITHOUT_TIME_ZONE: - precision = ((TimeType) logicalType).getPrecision(); - if (precision > 3) { - throw new IllegalArgumentException( - "Avro does not support TIME type with precision: " - + precision - + ", it only supports precision less than 3."); - } - // use int to represents Time, we only support millisecond when deserialization - Schema time = - LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType()); - return nullable ? nullableSchema(time) : time; - case DECIMAL: - DecimalType decimalType = (DecimalType) logicalType; - // store BigDecimal as Fixed - // for spark compatibility. - Schema decimal = - LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale()) - .addToSchema(SchemaBuilder - .fixed(String.format("%s.fixed", rowName)) - .size(computeMinBytesForDecimalPrecision(decimalType.getPrecision()))); - return nullable ? nullableSchema(decimal) : decimal; - case ROW: - RowType rowType = (RowType) logicalType; - List fieldNames = rowType.getFieldNames(); - // we have to make sure the record name is different in a Schema - SchemaBuilder.FieldAssembler builder = - SchemaBuilder.builder().record(rowName).fields(); - for (int i = 0; i < rowType.getFieldCount(); i++) { - String fieldName = fieldNames.get(i); - LogicalType fieldType = rowType.getTypeAt(i); - SchemaBuilder.GenericDefault fieldBuilder = - builder.name(fieldName) - .type(convertToSchema(fieldType, rowName + "." + fieldName)); - - if (fieldType.isNullable()) { - builder = fieldBuilder.withDefault(null); - } else { - builder = fieldBuilder.noDefault(); - } - } - Schema record = builder.endRecord(); - return nullable ? nullableSchema(record) : record; - case MULTISET: - case MAP: - Schema map = - SchemaBuilder.builder() - .map() - .values( - convertToSchema( - extractValueTypeToAvroMap(logicalType), rowName)); - return nullable ? nullableSchema(map) : map; - case ARRAY: - ArrayType arrayType = (ArrayType) logicalType; - Schema array = - SchemaBuilder.builder() - .array() - .items(convertToSchema(arrayType.getElementType(), rowName)); - return nullable ? nullableSchema(array) : array; - case RAW: - default: - throw new UnsupportedOperationException( - "Unsupported to derive Schema for type: " + logicalType); - } - } - - public static LogicalType extractValueTypeToAvroMap(LogicalType type) { - LogicalType keyType; - LogicalType valueType; - if (type instanceof MapType) { - MapType mapType = (MapType) type; - keyType = mapType.getKeyType(); - valueType = mapType.getValueType(); - } else { - MultisetType multisetType = (MultisetType) type; - keyType = multisetType.getElementType(); - valueType = new IntType(); - } - if (!isFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) { - throw new UnsupportedOperationException( - "Avro format doesn't support non-string as key type of map. " - + "The key type is: " - + keyType.asSummaryString()); - } - return valueType; - } - - /** - * Returns whether the given logical type belongs to the family. - */ - public static boolean isFamily(LogicalType logicalType, LogicalTypeFamily family) { - return logicalType.getTypeRoot().getFamilies().contains(family); - } - - /** - * Returns schema with nullable true. - */ - private static Schema nullableSchema(Schema schema) { - return schema.isNullable() - ? schema - : Schema.createUnion(SchemaBuilder.builder().nullType(), schema); - } - - private static int computeMinBytesForDecimalPrecision(int precision) { - int numBytes = 1; - while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) { - numBytes += 1; - } - return numBytes; - } -} - diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java index f9107e6df53e9..fee24e520382d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java @@ -198,7 +198,7 @@ private static AvroToRowDataConverter createMapConverter(LogicalType type, boole final AvroToRowDataConverter keyConverter = createConverter(DataTypes.STRING().getLogicalType(), utcTimezone); final AvroToRowDataConverter valueConverter = - createNullableConverter(AvroSchemaConverter.extractValueTypeToAvroMap(type), utcTimezone); + createNullableConverter(HoodieSchemaConverter.extractValueTypeToMap(type), utcTimezone); return avroObject -> { final Map map = (Map) avroObject; diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java index 14c59d5789a85..5e211c4851a0d 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java @@ -420,7 +420,7 @@ private static DataType convertUnion(HoodieSchema schema) { return convertToDataType(unionTypes.get(0)); } - // Complex multi-type unions - use RAW type (matches AvroSchemaConverter logic) + // Complex multi-type unions - use RAW type List nonNullTypes = unionTypes.stream() .filter(t -> t.getType() != HoodieSchemaType.NULL) .collect(Collectors.toList()); @@ -458,4 +458,25 @@ private static boolean recordTypesOfSameNumFields(List types) { int numFields = types.get(0).getFields().size(); return types.stream().allMatch(s -> s.getFields().size() == numFields); } -} \ No newline at end of file + + public static LogicalType extractValueTypeToMap(LogicalType type) { + LogicalType keyType; + LogicalType valueType; + if (type instanceof MapType) { + MapType mapType = (MapType) type; + keyType = mapType.getKeyType(); + valueType = mapType.getValueType(); + } else { + MultisetType multisetType = (MultisetType) type; + keyType = multisetType.getElementType(); + valueType = new IntType(); + } + if (!isFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) { + throw new UnsupportedOperationException( + "Avro format doesn't support non-string as key type of map. " + + "The key type is: " + + keyType.asSummaryString()); + } + return valueType; + } +} diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataQueryContexts.java similarity index 76% rename from hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java rename to hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataQueryContexts.java index ad020b41912d3..ace597859a3a0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataQueryContexts.java @@ -18,6 +18,7 @@ package org.apache.hudi.util; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Triple; import org.apache.hudi.exception.HoodieException; @@ -26,7 +27,6 @@ import lombok.AllArgsConstructor; import lombok.Getter; -import org.apache.avro.Schema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.DataType; @@ -41,21 +41,21 @@ /** * Maintains auxiliary utilities for row data fields handling. */ -public class RowDataAvroQueryContexts { - private static final Map, RowDataQueryContext> QUERY_CONTEXT_MAP = new ConcurrentHashMap<>(); +public class RowDataQueryContexts { + private static final Map, RowDataQueryContext> QUERY_CONTEXT_MAP = new ConcurrentHashMap<>(); // BinaryRowWriter in RowDataSerializer are reused, and it's not thread-safe. - private static final ThreadLocal> ROWDATA_SERIALIZER_CACHE = ThreadLocal.withInitial(HashMap::new); + private static final ThreadLocal> ROWDATA_SERIALIZER_CACHE = ThreadLocal.withInitial(HashMap::new); - private static final Map>, RowProjection> ROW_PROJECTION_CACHE = new ConcurrentHashMap<>(); + private static final Map>, RowProjection> ROW_PROJECTION_CACHE = new ConcurrentHashMap<>(); - public static RowDataQueryContext fromAvroSchema(Schema avroSchema) { - return fromAvroSchema(avroSchema, true); + public static RowDataQueryContext fromSchema(HoodieSchema schema) { + return fromSchema(schema, true); } - public static RowDataQueryContext fromAvroSchema(Schema avroSchema, boolean utcTimezone) { - return QUERY_CONTEXT_MAP.computeIfAbsent(Pair.of(avroSchema, utcTimezone), k -> { - DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema); + public static RowDataQueryContext fromSchema(HoodieSchema schema, boolean utcTimezone) { + return QUERY_CONTEXT_MAP.computeIfAbsent(Pair.of(schema, utcTimezone), k -> { + DataType dataType = HoodieSchemaConverter.convertToDataType(schema); RowType rowType = (RowType) dataType.getLogicalType(); RowType.RowField[] rowFields = rowType.getFields().toArray(new RowType.RowField[0]); RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[rowFields.length]; @@ -72,18 +72,18 @@ public static RowDataQueryContext fromAvroSchema(Schema avroSchema, boolean utcT }); } - public static RowDataSerializer getRowDataSerializer(Schema avroSchema) { - return ROWDATA_SERIALIZER_CACHE.get().computeIfAbsent(avroSchema, schema -> { - RowType rowType = (RowType) fromAvroSchema(schema).getRowType().getLogicalType(); + public static RowDataSerializer getRowDataSerializer(HoodieSchema schema) { + return ROWDATA_SERIALIZER_CACHE.get().computeIfAbsent(schema, providedSchema -> { + RowType rowType = (RowType) fromSchema(providedSchema).getRowType().getLogicalType(); return new RowDataSerializer(rowType); }); } - public static RowProjection getRowProjection(Schema from, Schema to, Map renameCols) { - Triple> cacheKey = Triple.of(from, to, renameCols); + public static RowProjection getRowProjection(HoodieSchema from, HoodieSchema to, Map renameCols) { + Triple> cacheKey = Triple.of(from, to, renameCols); return ROW_PROJECTION_CACHE.computeIfAbsent(cacheKey, key -> { - RowType fromType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(from).getRowType().getLogicalType(); - RowType toType = (RowType) RowDataAvroQueryContexts.fromAvroSchema(to).getRowType().getLogicalType(); + RowType fromType = (RowType) RowDataQueryContexts.fromSchema(from).getRowType().getLogicalType(); + RowType toType = (RowType) RowDataQueryContexts.fromSchema(to).getRowType().getLogicalType(); return SchemaEvolvingRowDataProjection.instance(fromType, toType, renameCols); }); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java index bc86e28248f28..9eb2979e19aa0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java @@ -18,8 +18,11 @@ package org.apache.hudi.util; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; + import org.apache.avro.Conversions; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; @@ -63,7 +66,7 @@ public class RowDataToAvroConverters { */ @FunctionalInterface public interface RowDataToAvroConverter extends Serializable { - Object convert(Schema schema, Object object); + Object convert(HoodieSchema schema, Object object); } // -------------------------------------------------------------------------------- @@ -90,7 +93,7 @@ public static RowDataToAvroConverter createConverter(LogicalType type, boolean u private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { return null; } }; @@ -101,7 +104,7 @@ public Object convert(Schema schema, Object object) { private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { return ((Byte) object).intValue(); } }; @@ -112,7 +115,7 @@ public Object convert(Schema schema, Object object) { private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { return ((Short) object).intValue(); } }; @@ -131,7 +134,7 @@ public Object convert(Schema schema, Object object) { private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { return object; } }; @@ -143,7 +146,7 @@ public Object convert(Schema schema, Object object) { private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { return new Utf8(((BinaryStringData) object).toBytes()); } }; @@ -155,7 +158,7 @@ public Object convert(Schema schema, Object object) { private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { return ByteBuffer.wrap((byte[]) object); } }; @@ -167,7 +170,7 @@ public Object convert(Schema schema, Object object) { private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { return ((TimestampData) object).toInstant().toEpochMilli(); } }; @@ -176,7 +179,7 @@ public Object convert(Schema schema, Object object) { private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { Instant instant = ((TimestampData) object).toInstant(); return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), instant.getNano() / 1000); } @@ -193,7 +196,7 @@ public Object convert(Schema schema, Object object) { private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { return utcTimezone ? ((TimestampData) object).toInstant().toEpochMilli() : ((TimestampData) object).toTimestamp().getTime(); } }; @@ -203,7 +206,7 @@ public Object convert(Schema schema, Object object) { private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { Instant instant = utcTimezone ? ((TimestampData) object).toInstant() : ((TimestampData) object).toTimestamp().toInstant(); return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), instant.getNano() / 1000); } @@ -218,9 +221,9 @@ public Object convert(Schema schema, Object object) { private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { BigDecimal javaDecimal = ((DecimalData) object).toBigDecimal(); - return DECIMAL_CONVERSION.toFixed(javaDecimal, schema, schema.getLogicalType()); + return DECIMAL_CONVERSION.toFixed(javaDecimal, schema.toAvroSchema(), schema.toAvroSchema().getLogicalType()); } }; break; @@ -244,19 +247,19 @@ public Object convert(Schema schema, Object object) { private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { if (object == null) { return null; } // get actual schema if it is a nullable schema - Schema actualSchema; - if (schema.getType() == Schema.Type.UNION) { - List types = schema.getTypes(); + HoodieSchema actualSchema; + if (schema.getType() == HoodieSchemaType.UNION) { + List types = schema.getTypes(); int size = types.size(); - if (size == 2 && types.get(1).getType() == Schema.Type.NULL) { + if (size == 2 && types.get(1).getType() == HoodieSchemaType.NULL) { actualSchema = types.get(0); - } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) { + } else if (size == 2 && types.get(0).getType() == HoodieSchemaType.NULL) { actualSchema = types.get(1); } else { throw new IllegalArgumentException( @@ -289,12 +292,12 @@ private static RowDataToAvroConverter createRowConverter(RowType rowType, boolea private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { + public Object convert(HoodieSchema schema, Object object) { final RowData row = (RowData) object; - final List fields = schema.getFields(); - final GenericRecord record = new GenericData.Record(schema); + final List fields = schema.getFields(); + final GenericRecord record = new GenericData.Record(schema.toAvroSchema()); for (int i = 0; i < length; ++i) { - final Schema.Field schemaField = fields.get(i); + final HoodieSchemaField schemaField = fields.get(i); Object avroObject = fieldConverters[i].convert( schemaField.schema(), fieldGetters[i].getFieldOrNull(row)); @@ -314,8 +317,8 @@ private static RowDataToAvroConverter createArrayConverter(ArrayType arrayType, private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { - final Schema elementSchema = schema.getElementType(); + public Object convert(HoodieSchema schema, Object object) { + final HoodieSchema elementSchema = schema.getElementType(); ArrayData arrayData = (ArrayData) object; List list = new ArrayList<>(); for (int i = 0; i < arrayData.size(); ++i) { @@ -329,7 +332,7 @@ public Object convert(Schema schema, Object object) { } private static RowDataToAvroConverter createMapConverter(LogicalType type, boolean utcTimezone) { - LogicalType valueType = AvroSchemaConverter.extractValueTypeToAvroMap(type); + LogicalType valueType = HoodieSchemaConverter.extractValueTypeToMap(type); final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType); final RowDataToAvroConverter valueConverter = createConverter(valueType, utcTimezone); @@ -337,8 +340,8 @@ private static RowDataToAvroConverter createMapConverter(LogicalType type, boole private static final long serialVersionUID = 1L; @Override - public Object convert(Schema schema, Object object) { - final Schema valueSchema = schema.getValueType(); + public Object convert(HoodieSchema schema, Object object) { + final HoodieSchema valueSchema = schema.getValueType(); final MapData mapData = (MapData) object; final ArrayData keyArray = mapData.keyArray(); final ArrayData valueArray = mapData.valueArray(); diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java index a6c3f5031affb..e932a077da618 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.read.BufferedRecord; import org.apache.hudi.common.table.read.BufferedRecords; -import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.data.GenericRowData; @@ -68,7 +68,7 @@ public class TestHoodieFlinkRecordMerger { @Test void testMergingWithNewRecordAsDelete() throws IOException { - HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE)); + HoodieSchema schema = HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE); HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION); RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1", 1L); BufferedRecord oldRecord = BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 1L, RECORD_KEY, false); @@ -82,7 +82,7 @@ void testMergingWithNewRecordAsDelete() throws IOException { @Test void testMergingWithOldRecordAsDelete() throws IOException { - HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE)); + HoodieSchema schema = HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE); HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION); RowData newRow = createRow(key, "001", "001_01", "file1", 1, "str_val1", 1L); BufferedRecord newRecord = BufferedRecords.fromEngineRecord(newRow, schema, recordContext, 1L, RECORD_KEY, false); @@ -96,7 +96,7 @@ void testMergingWithOldRecordAsDelete() throws IOException { @Test void testMergingWithOldRecordAccepted() throws IOException { - HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE)); + HoodieSchema schema = HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE); HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION); RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1", 3L); BufferedRecord oldRecord = BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 3L, RECORD_KEY, false); @@ -111,7 +111,7 @@ void testMergingWithOldRecordAccepted() throws IOException { @Test void testMergingWithNewRecordAccepted() throws IOException { - HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE)); + HoodieSchema schema = HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE); HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION); RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1", 1L); BufferedRecord oldRecord = BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 1L, RECORD_KEY, false); @@ -126,7 +126,7 @@ void testMergingWithNewRecordAccepted() throws IOException { @Test void testMergingWithCommitTimeRecordMerger() throws IOException { - HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE)); + HoodieSchema schema = HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE); HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION); RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1", 2L); BufferedRecord oldRecord = BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 2L, RECORD_KEY, false); diff --git a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java index c4892c48e6968..a6fc063857ebc 100644 --- a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java +++ b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java @@ -19,9 +19,11 @@ package org.apache.hudi.util; +import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaField; import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.metadata.HoodieMetadataPayload; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; @@ -248,43 +250,6 @@ public void testNestedRecordType() { assertEquals(2, addressSchema.getFields().size()); } - @Test - public void testCompareWithAvroConversion() { - // Test that HoodieSchemaConverter produces the same result as - // AvroSchemaConverter + HoodieSchema.fromAvroSchema() - - RowType flinkRowType = (RowType) DataTypes.ROW( - DataTypes.FIELD("id", DataTypes.BIGINT().notNull()), - DataTypes.FIELD("name", DataTypes.STRING().nullable()), - DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP(3).notNull()), - DataTypes.FIELD("decimal_val", DataTypes.DECIMAL(10, 2).notNull()) - ).notNull().getLogicalType(); - - // Method 1: Direct HoodieSchema conversion - HoodieSchema directSchema = HoodieSchemaConverter.convertToSchema(flinkRowType, "TestRecord"); - - // Method 2: Via Avro conversion - HoodieSchema viaAvroSchema = HoodieSchema.fromAvroSchema( - AvroSchemaConverter.convertToSchema(flinkRowType, "TestRecord")); - - // Both should produce equivalent schemas - assertNotNull(directSchema); - assertNotNull(viaAvroSchema); - assertEquals(HoodieSchemaType.RECORD, directSchema.getType()); - assertEquals(HoodieSchemaType.RECORD, viaAvroSchema.getType()); - assertEquals(4, directSchema.getFields().size()); - assertEquals(4, viaAvroSchema.getFields().size()); - - // Verify field types match - for (int i = 0; i < 4; i++) { - assertEquals( - viaAvroSchema.getFields().get(i).schema().getType(), - directSchema.getFields().get(i).schema().getType(), - "Field " + i + " type mismatch" - ); - } - } - @Test public void testComplexNestedStructure() { LogicalType complexType = DataTypes.ROW( @@ -317,26 +282,6 @@ public void testComplexNestedStructure() { assertEquals(2, nestedRecord.getFields().size()); } - @Test - public void testNativeConversionMatchesAvroPath() { - // Verify native conversion produces same result as Avro path - RowType originalRowType = (RowType) DataTypes.ROW( - DataTypes.FIELD("id", DataTypes.BIGINT().notNull()), - DataTypes.FIELD("name", DataTypes.STRING().nullable()), - DataTypes.FIELD("age", DataTypes.INT().notNull()) - ).notNull().getLogicalType(); - - HoodieSchema hoodieSchema = HoodieSchemaConverter.convertToSchema(originalRowType, "TestRecord"); - - // Native conversion - DataType nativeResult = HoodieSchemaConverter.convertToDataType(hoodieSchema); - - // Avro path (for comparison) - DataType avroResult = AvroSchemaConverter.convertToDataType(hoodieSchema.getAvroSchema()); - - assertEquals(avroResult.getLogicalType(), nativeResult.getLogicalType()); - } - @Test public void testRoundTripConversion() { RowType originalRowType = (RowType) DataTypes.ROW( @@ -528,4 +473,61 @@ public void testFixedConversion() { DataType dataType = HoodieSchemaConverter.convertToDataType(fixedSchema); assertTrue(dataType.getLogicalType() instanceof VarBinaryType); } -} \ No newline at end of file + + @Test + void testUnionSchemaWithMultipleRecordTypes() { + HoodieSchema schema = HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$); + DataType dataType = HoodieSchemaConverter.convertToDataType(schema); + int pos = HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos(); + final String expected = "ROW<" + + "`fileName` STRING, " + + "`columnName` STRING, " + + "`minValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, " + + "`maxValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, " + + "`valueCount` BIGINT, " + + "`nullCount` BIGINT, " + + "`totalSize` BIGINT, " + + "`totalUncompressedSize` BIGINT, " + + "`isDeleted` BOOLEAN NOT NULL, " + + "`isTightBound` BOOLEAN NOT NULL, " + + "`valueType` ROW<`typeOrdinal` INT NOT NULL, `additionalInfo` STRING>>"; + assertEquals(expected, dataType.getChildren().get(pos).toString()); + } + + @Test + void testLocalTimestampType() { + DataType dataType = DataTypes.ROW( + DataTypes.FIELD("f_localtimestamp_millis", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), + DataTypes.FIELD("f_localtimestamp_micros", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) + ); + // convert to avro schema + HoodieSchema schema = HoodieSchemaConverter.convertToSchema(dataType.getLogicalType()); + final String expectedSchema = "" + + "[ \"null\", {\n" + + " \"type\" : \"record\",\n" + + " \"name\" : \"record\",\n" + + " \"fields\" : [ {\n" + + " \"name\" : \"f_localtimestamp_millis\",\n" + + " \"type\" : [ \"null\", {\n" + + " \"type\" : \"long\",\n" + + " \"logicalType\" : \"local-timestamp-millis\"\n" + + " } ],\n" + + " \"default\" : null\n" + + " }, {\n" + + " \"name\" : \"f_localtimestamp_micros\",\n" + + " \"type\" : [ \"null\", {\n" + + " \"type\" : \"long\",\n" + + " \"logicalType\" : \"local-timestamp-micros\"\n" + + " } ],\n" + + " \"default\" : null\n" + + " } ]\n" + + "} ]"; + assertEquals(expectedSchema, schema.toString(true)); + // convert it back + DataType convertedDataType = HoodieSchemaConverter.convertToDataType(schema); + final String expectedDataType = "ROW<" + + "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), " + + "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>"; + assertEquals(expectedDataType, convertedDataType.toString()); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java index 1188c241990cf..f4fdf1dbc6e14 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java @@ -19,8 +19,9 @@ package org.apache.hudi.source.stats; import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.metadata.HoodieMetadataPayload; -import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; @@ -53,7 +54,7 @@ public class ColumnStatsSchemas { public static final int ORD_COL_NAME = 5; private static DataType getMetadataDataType() { - return AvroSchemaConverter.convertToDataType(HoodieMetadataRecord.SCHEMA$); + return HoodieSchemaConverter.convertToDataType(HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$)); } private static DataType getColStatsDataType() { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index 3ca968dc3dbf7..87006a03d6301 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -18,8 +18,8 @@ package org.apache.hudi.table; -import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieIndexConfig; @@ -31,8 +31,8 @@ import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; import org.apache.hudi.storage.StoragePath; -import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.DataTypeUtils; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.hudi.util.SerializableSchema; import org.apache.hudi.util.StreamerUtil; @@ -434,9 +434,9 @@ private void setupSortOptions(Configuration conf, ReadableConfig contextConfig) * @param rowType The specified table row type */ private static void inferAvroSchema(Configuration conf, LogicalType rowType) { - if (!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent() - && !conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) { - String inferredSchema = AvroSchemaConverter.convertToSchema(rowType, AvroSchemaUtils.getAvroRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME))).toString(); + if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isEmpty() + && conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isEmpty()) { + String inferredSchema = HoodieSchemaConverter.convertToSchema(rowType, HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME))).toString(); conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index b9b4c2fff34d2..b1bba78f0b076 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -544,7 +544,7 @@ private MergeOnReadInputFormat cdcInputFormat( return CdcInputFormat.builder() .config(this.conf) .tableState(hoodieTableState) - // use the explicit fields' data type because the AvroSchemaConverter + // use the explicit fields' data type because the HoodieSchemaConverter // is not very stable. .fieldTypes(rowDataType.getChildren()) .predicates(this.predicates) @@ -569,7 +569,7 @@ private MergeOnReadInputFormat mergeOnReadInputFormat( return MergeOnReadInputFormat.builder() .config(this.conf) .tableState(hoodieTableState) - // use the explicit fields' data type because the AvroSchemaConverter + // use the explicit fields' data type because the HoodieSchemaConverter // is not very stable. .fieldTypes(rowDataType.getChildren()) .predicates(this.predicates) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java index 10bdaa7bdf794..b7fbe94fb76d9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java @@ -20,6 +20,7 @@ import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.util.CollectionUtils; @@ -29,13 +30,12 @@ import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; -import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.DataTypeUtils; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.CatalogUtils; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; @@ -259,12 +259,12 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep final String path = inferTablePath(catalogPathStr, tablePath); Map options = TableOptionProperties.loadFromProperties(path, hadoopConf); - final Schema latestSchema = getLatestTableSchema(path); + final HoodieSchema latestSchema = getLatestTableSchema(path); if (latestSchema != null) { List pkColumns = TableOptionProperties.getPkColumns(options); // if the table is initialized from spark, the write schema is nullable for pk columns. DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable( - AvroSchemaConverter.convertToDataType(latestSchema), pkColumns); + HoodieSchemaConverter.convertToDataType(latestSchema), pkColumns); org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder() .fromRowDataType(tableDataType); final String pkConstraintName = TableOptionProperties.getPkConstraintName(options); @@ -316,7 +316,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable catalogTable, boo if (!resolvedSchema.getPrimaryKey().isPresent() && !conf.containsKey(RECORD_KEY_FIELD.key())) { throw new CatalogException("Primary key definition is missing"); } - final String avroSchema = AvroSchemaConverter.convertToSchema( + final String avroSchema = HoodieSchemaConverter.convertToSchema( resolvedSchema.toPhysicalRowDataType().getLogicalType(), AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString(); conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema); @@ -592,11 +592,11 @@ public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitio throw new UnsupportedOperationException("alterPartitionColumnStatistics is not implemented."); } - private @Nullable Schema getLatestTableSchema(String path) { + private @Nullable HoodieSchema getLatestTableSchema(String path) { if (path != null && StreamerUtil.tableExists(path, hadoopConf)) { try { HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path, hadoopConf); - return new TableSchemaResolver(metaClient).getTableAvroSchema(false); // change log mode is not supported now + return new TableSchemaResolver(metaClient).getTableSchema(false); // change log mode is not supported now } catch (Throwable throwable) { log.warn("Failed to resolve the latest table schema.", throwable); // ignored @@ -616,7 +616,7 @@ private Map applyOptionsHook(String tablePath, Map options = newCatalogTable.getOptions(); ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable) newCatalogTable; - final String avroSchema = AvroSchemaConverter.convertToSchema( + final String avroSchema = HoodieSchemaConverter.convertToSchema( resolvedTable.getResolvedSchema().toPhysicalRowDataType().getLogicalType(), AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString(); options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), avroSchema); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java index 3b8aaf48d12b5..ab54f3b393e06 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java @@ -22,7 +22,6 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; import org.apache.hudi.common.model.WriteOperationType; -import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.configuration.FlinkOptions; @@ -31,8 +30,8 @@ import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.Type; import org.apache.hudi.internal.schema.convert.InternalSchemaConverter; -import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.FlinkWriteClients; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.hudi.util.StreamerUtil; import lombok.extern.slf4j.Slf4j; @@ -232,7 +231,7 @@ protected static void alterTable( HoodieFlinkWriteClient writeClient = createWriteClient(tablePath, oldTable, hadoopConf, inferTablePathFunc); Pair pair = writeClient.getInternalSchemaAndMetaClient(); InternalSchema oldSchema = pair.getLeft(); - Function convertFunc = (LogicalType logicalType) -> InternalSchemaConverter.convertToField(HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(logicalType))); + Function convertFunc = (LogicalType logicalType) -> InternalSchemaConverter.convertToField(HoodieSchemaConverter.convertToSchema(logicalType)); InternalSchema newSchema = Utils.applyTableChange(oldSchema, tableChanges, convertFunc); if (!oldSchema.equals(newSchema)) { writeClient.setOperationType(WriteOperationType.ALTER_SCHEMA); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java index 0c1674247a27d..b2f5d428cdaf6 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java @@ -27,8 +27,8 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils; -import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.DataTypeUtils; +import org.apache.hudi.util.HoodieSchemaConverter; import lombok.extern.slf4j.Slf4j; import org.apache.flink.table.catalog.CatalogTable; @@ -203,7 +203,7 @@ public static Map translateFlinkTableProperties2Spark( List partitionKeys, boolean withOperationField) { RowType rowType = supplementMetaFields(DataTypeUtils.toRowType(catalogTable.getUnresolvedSchema()), withOperationField); - HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(rowType)); + HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType); String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION); Map sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties( partitionKeys, diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java index 8310dbe4f52a8..991f099d89814 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java @@ -47,7 +47,7 @@ import org.apache.hudi.storage.StoragePath; import org.apache.hudi.util.Lazy; import org.apache.hudi.util.RecordKeyToRowDataConverter; -import org.apache.hudi.util.RowDataAvroQueryContexts; +import org.apache.hudi.util.RowDataQueryContexts; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.utils.JoinedRowData; @@ -106,7 +106,7 @@ public ClosableIterator getFileRecordIterator( (HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage) .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK) .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, Option.empty()); - DataType rowType = RowDataAvroQueryContexts.fromAvroSchema(dataSchema.toAvroSchema()).getRowType(); + DataType rowType = RowDataQueryContexts.fromSchema(dataSchema).getRowType(); return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, requiredSchema, getSafePredicates(requiredSchema)); } @@ -203,7 +203,7 @@ public void setSchemaHandler(FileGroupReaderSchemaHandler schemaHandler // For e.g, if the pk fields are [a, b] but user only select a, then the pk // semantics is lost. RecordKeyToRowDataConverter recordKeyRowConverter = new RecordKeyToRowDataConverter( - pkFieldsPos, (RowType) RowDataAvroQueryContexts.fromAvroSchema(requiredSchema.toAvroSchema()).getRowType().getLogicalType()); + pkFieldsPos, (RowType) RowDataQueryContexts.fromSchema(requiredSchema).getRowType().getLogicalType()); ((FlinkRecordContext) recordContext).setRecordKeyRowConverter(recordKeyRowConverter); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java index 76d45a390723f..93b79750a21e8 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java @@ -37,7 +37,7 @@ import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; import org.apache.hudi.storage.HoodieStorageUtils; import org.apache.hudi.storage.StorageConfiguration; -import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.HoodieSchemaConverter; import lombok.Getter; import org.apache.flink.table.types.DataType; @@ -157,8 +157,8 @@ CastMap getCastMap(InternalSchema mergeSchema, String[] queryFieldNames, DataTyp } List selectedFieldList = IntStream.of(selectedFields).boxed().collect(Collectors.toList()); // mergeSchema is built with useColumnTypeFromFileSchema = true - List mergeSchemaAsDataTypes = AvroSchemaConverter.convertToDataType( - InternalSchemaConverter.convert(mergeSchema, "tableName").toAvroSchema()).getChildren(); + List mergeSchemaAsDataTypes = HoodieSchemaConverter.convertToDataType( + InternalSchemaConverter.convert(mergeSchema, "tableName")).getChildren(); DataType[] fileFieldTypes = new DataType[queryFieldTypes.length]; for (int i = 0; i < queryFieldTypes.length; i++) { // position of ChangedType in querySchema diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java index cc9d8dbfa70cb..745af94485b17 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java @@ -141,7 +141,7 @@ private ClosableIterator getFileSliceIterator(MergeOnReadInputSplit spl try { // get full schema iterator. final HoodieSchema schema = HoodieSchemaCache.intern( - HoodieSchema.parse(tableState.getAvroSchema())); + HoodieSchema.parse(tableState.getTableSchema())); // before/after images have assumption of snapshot scan, so `emitDelete` is set as false return getSplitRowIterator(split, schema, schema, FlinkOptions.REALTIME_PAYLOAD_COMBINE, false); } catch (IOException e) { @@ -181,7 +181,7 @@ private ClosableIterator getRecordIterator( MergeOnReadInputSplit inputSplit = fileSlice2Split(tablePath, fileSlice, maxCompactionMemoryInBytes); return new RemoveBaseFileIterator(tableState, getFileSliceIterator(inputSplit)); case AS_IS: - HoodieSchema dataSchema = HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableState.getAvroSchema())); + HoodieSchema dataSchema = HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableState.getTableSchema())); HoodieSchema cdcSchema = HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema); switch (mode) { case DATA_BEFORE_AFTER: @@ -216,7 +216,7 @@ private ClosableIterator getRecordIterator( */ private ClosableIterator> getSplitRecordIterator(MergeOnReadInputSplit split) throws IOException { final HoodieSchema schema = HoodieSchemaCache.intern( - HoodieSchema.parse(tableState.getAvroSchema())); + HoodieSchema.parse(tableState.getTableSchema())); HoodieFileGroupReader fileGroupReader = createFileGroupReader(split, schema, schema, FlinkOptions.REALTIME_PAYLOAD_COMBINE, true); return fileGroupReader.getClosableHoodieRecordIterator(); @@ -363,7 +363,7 @@ static class DataLogFileIterator implements ClosableIterator { MergeOnReadTableState tableState, ClosableIterator> logRecordIterator, HoodieTableMetaClient metaClient) throws IOException { - this.tableSchema = HoodieSchema.parse(tableState.getAvroSchema()); + this.tableSchema = HoodieSchema.parse(tableState.getTableSchema()); this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; this.imageManager = imageManager; this.projection = tableState.getRequiredRowType().equals(tableState.getRowType()) @@ -495,8 +495,8 @@ abstract static class BaseImageIterator implements ClosableIterator { MergeOnReadTableState tableState, HoodieSchema cdcSchema, HoodieCDCFileSplit fileSplit) { - this.requiredSchema = HoodieSchema.parse(tableState.getRequiredAvroSchema()); - this.requiredPos = getRequiredPos(tableState.getAvroSchema(), this.requiredSchema); + this.requiredSchema = HoodieSchema.parse(tableState.getRequiredSchema()); + this.requiredPos = getRequiredPos(tableState.getTableSchema(), this.requiredSchema); this.recordBuilder = new GenericRecordBuilder(requiredSchema.getAvroSchema()); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType()); StoragePath hadoopTablePath = new StoragePath(tablePath); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 165cc6a7d57ac..1fd6b3491128a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -190,9 +190,9 @@ protected ClosableIterator initIterator(MergeOnReadInputSplit split) th + "flink partition Index: " + split.getSplitNumber() + "merge type: " + split.getMergeType()); final HoodieSchema tableSchema = HoodieSchemaCache.intern( - HoodieSchema.parse(tableState.getAvroSchema())); + HoodieSchema.parse(tableState.getTableSchema())); final HoodieSchema requiredSchema = HoodieSchemaCache.intern( - HoodieSchema.parse(tableState.getRequiredAvroSchema())); + HoodieSchema.parse(tableState.getRequiredSchema())); return getSplitRowIterator(split, tableSchema, requiredSchema, mergeType, emitDelete); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java index e5e7e99dc24b7..868a7f814abaf 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java @@ -36,21 +36,21 @@ public class MergeOnReadTableState implements Serializable { private final RowType rowType; private final RowType requiredRowType; - private final String avroSchema; - private final String requiredAvroSchema; + private final String tableSchema; + private final String requiredSchema; private final List inputSplits; private final int operationPos; public MergeOnReadTableState( RowType rowType, RowType requiredRowType, - String avroSchema, - String requiredAvroSchema, + String tableSchema, + String requiredSchema, List inputSplits) { this.rowType = rowType; this.requiredRowType = requiredRowType; - this.avroSchema = avroSchema; - this.requiredAvroSchema = requiredAvroSchema; + this.tableSchema = tableSchema; + this.requiredSchema = requiredSchema; this.inputSplits = inputSplits; this.operationPos = rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java index 56082f107bb4f..e540cd55b0069 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java @@ -26,7 +26,7 @@ import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; import org.apache.hudi.table.format.mor.MergeOnReadTableState; -import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -253,13 +253,13 @@ private OneInputStreamOperatorTestHarness create } catch (Exception e) { throw new HoodieException("Get table avro schema error", e); } - final DataType rowDataType = AvroSchemaConverter.convertToDataType(tableSchema.getAvroSchema()); + final DataType rowDataType = HoodieSchemaConverter.convertToDataType(tableSchema); final RowType rowType = (RowType) rowDataType.getLogicalType(); final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState( rowType, TestConfigurations.ROW_TYPE, tableSchema.toString(), - AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(), + HoodieSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(), Collections.emptyList()); MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder() .config(conf) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java index cebc24651ec24..1598b4d7ed89e 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java @@ -40,8 +40,8 @@ import org.apache.hudi.sink.compact.CompactionCommitEvent; import org.apache.hudi.sink.compact.CompactionCommitSink; import org.apache.hudi.sink.compact.CompactionPlanSourceFunction; -import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.FlinkWriteClients; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.hudi.utils.FlinkMiniCluster; import lombok.extern.slf4j.Slf4j; @@ -313,7 +313,7 @@ private void changeTableSchema(TableOptions tableOptions, boolean shouldCompactB private void writeTableWithSchema2(TableOptions tableOptions) throws ExecutionException, InterruptedException { tableOptions.withOption( FlinkOptions.SOURCE_AVRO_SCHEMA.key(), - AvroSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_AFTER, "hoodie.t1.t1_record")); + HoodieSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_AFTER, "hoodie.t1.t1_record")); //language=SQL tEnv.executeSql("drop table t1"); @@ -384,7 +384,7 @@ private TableOptions defaultTableOptions(String tablePath) { KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition", KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true, FlinkOptions.WRITE_BATCH_SIZE.key(), 0.000001, // each record triggers flush - FlinkOptions.SOURCE_AVRO_SCHEMA.key(), AvroSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_BEFORE), + FlinkOptions.SOURCE_AVRO_SCHEMA.key(), HoodieSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_BEFORE), FlinkOptions.READ_TASKS.key(), 1, FlinkOptions.WRITE_TASKS.key(), 1, FlinkOptions.INDEX_BOOTSTRAP_TASKS.key(), 1, diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java index f7ff64d3dfcbd..45a9b8ecab0be 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java @@ -49,7 +49,7 @@ import org.apache.hudi.table.format.InternalSchemaManager; import org.apache.hudi.util.AvroToRowDataConverters; import org.apache.hudi.util.HoodieSchemaConverter; -import org.apache.hudi.util.RowDataAvroQueryContexts; +import org.apache.hudi.util.RowDataQueryContexts; import org.apache.hudi.utils.TestData; import org.apache.flink.configuration.Configuration; @@ -131,7 +131,7 @@ protected void readWithFileGroupReader( HoodieSchema recordSchema, HoodieReaderContext readerContext, boolean sortOutput) throws IOException { - RowDataSerializer rowDataSerializer = RowDataAvroQueryContexts.getRowDataSerializer(recordSchema.toAvroSchema()); + RowDataSerializer rowDataSerializer = RowDataQueryContexts.getRowDataSerializer(recordSchema); try (ClosableIterator iterator = fileGroupReader.getClosableIterator()) { while (iterator.hasNext()) { RowData rowData = rowDataSerializer.copy(iterator.next()); @@ -148,7 +148,7 @@ public void commitToTable(List recordList, String operation, boole HoodieSchema localSchema = getRecordSchema(schemaStr); conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, localSchema.toString()); AvroToRowDataConverters.AvroToRowDataConverter avroConverter = - RowDataAvroQueryContexts.fromAvroSchema(localSchema.getAvroSchema()).getAvroToRowDataConverter(); + RowDataQueryContexts.fromSchema(localSchema).getAvroToRowDataConverter(); List rowDataList = recordList.stream().map(record -> { try { return (RowData) avroConverter.convert(record.toIndexedRecord(localSchema, CollectionUtils.emptyProps()).get().getData()); @@ -169,7 +169,7 @@ public void assertRecordsEqual(HoodieSchema schema, RowData expected, RowData ac TestData.assertRowDataEquals( Collections.singletonList(actual), Collections.singletonList(expected), - RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema()).getRowType()); + RowDataQueryContexts.fromSchema(schema).getRowType()); } @Override @@ -324,7 +324,7 @@ public void testFilterFileWithInstantRange(WriteOperationType firstCommitOperati } private static HoodieSchema getRecordSchema(String schemaStr) { - HoodieSchema recordSchema = new HoodieSchema.Parser().parse(schemaStr); - return HoodieSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema.getAvroSchema()).getRowType().getLogicalType()); + HoodieSchema recordSchema = HoodieSchema.parse(schemaStr); + return HoodieSchemaConverter.convertToSchema(RowDataQueryContexts.fromSchema(recordSchema).getRowType().getLogicalType()); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index a4a69706e4fe2..adce67c00f306 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -28,7 +28,7 @@ import org.apache.hudi.keygen.ComplexAvroKeyGenerator; import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; -import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.CatalogUtils; import org.apache.hudi.utils.SchemaBuilder; @@ -364,7 +364,7 @@ void testInferAvroSchemaForSource() { final HoodieTableSink tableSink3 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, schema3, "")); final Configuration conf3 = tableSink3.getConf(); - final String expected = AvroSchemaConverter.convertToSchema(schema3.toSourceRowDataType().getLogicalType(), AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString(); + final String expected = HoodieSchemaConverter.convertToSchema(schema3.toSourceRowDataType().getLogicalType(), AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString(); assertThat(conf3.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(expected)); } @@ -565,7 +565,7 @@ void testInferAvroSchemaForSink() { final HoodieTableSink tableSink3 = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, schema3, "")); final Configuration conf3 = tableSink3.getConf(); - final String expected = AvroSchemaConverter.convertToSchema(schema3.toSinkRowDataType().getLogicalType(), AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString(); + final String expected = HoodieSchemaConverter.convertToSchema(schema3.toSinkRowDataType().getLogicalType(), AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString(); assertThat(conf3.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(expected)); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index fc4694bd3a6fa..8431a8e0490bd 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -50,8 +50,8 @@ import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; -import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.FlinkWriteClients; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.hudi.util.SerializableSchema; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; @@ -1207,7 +1207,7 @@ void testReadArchivedCommitsIncrementally() throws Exception { void testReadWithWiderSchema(HoodieTableType tableType) throws Exception { Map options = new HashMap<>(); options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), - AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_WIDER).toString()); + HoodieSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_WIDER).toString()); beforeEach(tableType, options); TestData.writeData(TestData.DATA_SET_INSERT, conf); @@ -1223,7 +1223,7 @@ void testOrderingValueWithDecimalType() throws Exception { conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable"); conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name()); conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition"); - conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_DECIMAL_ORDERING).toString()); + conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), HoodieSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_DECIMAL_ORDERING).toString()); conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // by default close the async compaction StreamerUtil.initTableIfNotExists(conf); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java deleted file mode 100644 index afe75e0615661..0000000000000 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.utils; - -import org.apache.hudi.avro.model.HoodieMetadataRecord; -import org.apache.hudi.common.schema.HoodieSchema; -import org.apache.hudi.metadata.HoodieMetadataPayload; -import org.apache.hudi.util.AvroSchemaConverter; - -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.types.DataType; -import org.junit.jupiter.api.Test; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -/** - * Test cases for {@link org.apache.hudi.util.AvroSchemaConverter}. - */ -public class TestAvroSchemaConverter { - - @Test - void testUnionSchemaWithMultipleRecordTypes() { - HoodieSchema schema = HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$); - DataType dataType = AvroSchemaConverter.convertToDataType(schema.getAvroSchema()); - int pos = HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos(); - final String expected = "ROW<" - + "`fileName` STRING, " - + "`columnName` STRING, " - + "`minValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, " - + "`maxValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, " - + "`valueCount` BIGINT, " - + "`nullCount` BIGINT, " - + "`totalSize` BIGINT, " - + "`totalUncompressedSize` BIGINT, " - + "`isDeleted` BOOLEAN NOT NULL, " - + "`isTightBound` BOOLEAN NOT NULL, " - + "`valueType` ROW<`typeOrdinal` INT NOT NULL, `additionalInfo` STRING>>"; - assertThat(dataType.getChildren().get(pos).toString(), is(expected)); - } - - @Test - void testLocalTimestampType() { - DataType dataType = DataTypes.ROW( - DataTypes.FIELD("f_localtimestamp_millis", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)), - DataTypes.FIELD("f_localtimestamp_micros", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6)) - ); - // convert to avro schema - HoodieSchema schema = HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(dataType.getLogicalType())); - final String expectedSchema = "" - + "[ \"null\", {\n" - + " \"type\" : \"record\",\n" - + " \"name\" : \"record\",\n" - + " \"fields\" : [ {\n" - + " \"name\" : \"f_localtimestamp_millis\",\n" - + " \"type\" : [ \"null\", {\n" - + " \"type\" : \"long\",\n" - + " \"logicalType\" : \"local-timestamp-millis\"\n" - + " } ],\n" - + " \"default\" : null\n" - + " }, {\n" - + " \"name\" : \"f_localtimestamp_micros\",\n" - + " \"type\" : [ \"null\", {\n" - + " \"type\" : \"long\",\n" - + " \"logicalType\" : \"local-timestamp-micros\"\n" - + " } ],\n" - + " \"default\" : null\n" - + " } ]\n" - + "} ]"; - assertThat(schema.toString(true), is(expectedSchema)); - // convert it back - DataType convertedDataType = AvroSchemaConverter.convertToDataType(schema.getAvroSchema()); - final String expectedDataType = "ROW<" - + "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), " - + "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>"; - assertThat(convertedDataType.toString(), is(expectedDataType)); - } -} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index 2430a14045294..46c90afa7a10f 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -21,8 +21,8 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.streamer.FlinkStreamerConfig; -import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.DataTypeUtils; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.hudi.utils.factory.CollectSinkTableFactory; import org.apache.hudi.utils.factory.ContinuousFileSourceFactory; @@ -374,7 +374,7 @@ public static Configuration getDefaultConf(String tablePath) { public static Configuration getDefaultConf(String tablePath, DataType dataType) { Configuration conf = new Configuration(); conf.set(FlinkOptions.PATH, tablePath); - conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, AvroSchemaConverter.convertToSchema(dataType.getLogicalType()).toString()); + conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, HoodieSchemaConverter.convertToSchema(dataType.getLogicalType()).toString()); conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable"); conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition"); return conf; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 9020073f2a376..8dfbd1d724a9e 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -37,18 +37,17 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; import org.apache.hudi.sink.utils.BucketStreamWriteFunctionWrapper; -import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.sink.utils.BulkInsertFunctionWrapper; import org.apache.hudi.sink.utils.ConsistentBucketStreamWriteFunctionWrapper; import org.apache.hudi.sink.utils.InsertFunctionWrapper; +import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper; import org.apache.hudi.sink.utils.TestFunctionWrapper; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.table.format.InternalSchemaManager; import org.apache.hudi.util.AvroToRowDataConverters; -import org.apache.hudi.util.RowDataAvroQueryContexts; +import org.apache.hudi.util.RowDataQueryContexts; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -977,8 +976,7 @@ public static void checkWrittenDataMOR( HoodieTableMetaClient metaClient = createMetaClient(basePath); HoodieFlinkTable table = HoodieFlinkTable.create(config, HoodieFlinkEngineContext.DEFAULT, metaClient); - Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema(); - HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema); + HoodieSchema schema = new TableSchemaResolver(metaClient).getTableSchema(); String latestInstant = metaClient.getActiveTimeline().filterCompletedInstants() .lastInstant().map(HoodieInstant::requestedTime).orElse(null); @@ -993,7 +991,7 @@ public static void checkWrittenDataMOR( List readBuffer = new ArrayList<>(); List fileSlices = table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionDir.getName(), latestInstant).collect(Collectors.toList()); for (FileSlice fileSlice : fileSlices) { - try (ClosableIterator rowIterator = getRecordIterator(fileSlice, hoodieSchema, metaClient, config)) { + try (ClosableIterator rowIterator = getRecordIterator(fileSlice, schema, metaClient, config)) { while (rowIterator.hasNext()) { RowData rowData = rowIterator.next(); readBuffer.add(filterOutVariables(schema, rowData)); @@ -1048,8 +1046,8 @@ private static String filterOutVariables(GenericRecord genericRecord) { return String.join(",", fields); } - private static String filterOutVariables(Schema schema, RowData record) { - RowDataAvroQueryContexts.RowDataQueryContext queryContext = RowDataAvroQueryContexts.fromAvroSchema(schema); + private static String filterOutVariables(HoodieSchema schema, RowData record) { + RowDataQueryContexts.RowDataQueryContext queryContext = RowDataQueryContexts.fromSchema(schema); List fields = new ArrayList<>(); fields.add(getFieldValue(queryContext, record, "_hoodie_record_key")); fields.add(getFieldValue(queryContext, record, "_hoodie_partition_path")); @@ -1061,7 +1059,7 @@ private static String filterOutVariables(Schema schema, RowData record) { return String.join(",", fields); } - private static String getFieldValue(RowDataAvroQueryContexts.RowDataQueryContext queryContext, RowData rowData, String fieldName) { + private static String getFieldValue(RowDataQueryContexts.RowDataQueryContext queryContext, RowData rowData, String fieldName) { return String.valueOf(queryContext.getFieldQueryContext(fieldName).getValAsJava(rowData, true)); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java index 9fa2bd3aa3b88..25967f2246ef9 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java @@ -19,9 +19,9 @@ package org.apache.hudi.utils; import org.apache.hudi.keygen.KeyGenUtils; -import org.apache.hudi.util.AvroSchemaConverter; -import org.apache.hudi.util.RowDataToAvroConverters; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.hudi.util.RecordKeyToRowDataConverter; +import org.apache.hudi.util.RowDataToAvroConverters; import org.apache.avro.generic.GenericRecord; import org.apache.flink.table.api.DataTypes; @@ -103,7 +103,7 @@ void testRowDataToAvroStringToRowData() { RowDataToAvroConverters.RowDataToAvroConverter converter = RowDataToAvroConverters.createConverter(rowType); GenericRecord avroRecord = - (GenericRecord) converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData); + (GenericRecord) converter.convert(HoodieSchemaConverter.convertToSchema(rowType), rowData); RecordKeyToRowDataConverter keyToRowDataConverter = new RecordKeyToRowDataConverter(new int[]{0, 1, 2, 3, 4, 5, 6}, rowType); final String recordKey = KeyGenUtils.getRecordKey(avroRecord, rowType.getFieldNames(), false); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java index 471c0686b60f6..185c5830616c3 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java @@ -18,7 +18,7 @@ package org.apache.hudi.utils; -import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.HoodieSchemaConverter; import org.apache.hudi.util.RowDataToAvroConverters; import org.apache.avro.generic.GenericRecord; @@ -58,7 +58,7 @@ void testRowDataToAvroStringToRowDataWithLocalTimezone() throws JsonProcessingEx RowDataToAvroConverters.RowDataToAvroConverter converter = RowDataToAvroConverters.createConverter(rowType, false); GenericRecord avroRecord = - (GenericRecord) converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData); + (GenericRecord) converter.convert(HoodieSchemaConverter.convertToSchema(rowType), rowData); Assertions.assertEquals(timestampFromLocal, formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) avroRecord.get(0)), ZoneId.systemDefault()))); } @@ -76,7 +76,7 @@ void testRowDataToAvroStringToRowDataWithUtcTimezone() throws JsonProcessingExce RowDataToAvroConverters.RowDataToAvroConverter converter = RowDataToAvroConverters.createConverter(rowType); GenericRecord avroRecord = - (GenericRecord) converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData); + (GenericRecord) converter.convert(HoodieSchemaConverter.convertToSchema(rowType), rowData); Assertions.assertEquals(timestampFromUtc0, formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) avroRecord.get(0)), ZoneId.of("UTC")))); Assertions.assertEquals("2021-03-30 08:44:29", formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) avroRecord.get(0)), ZoneId.of("UTC+1")))); Assertions.assertEquals("2021-03-30 15:44:29", formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) avroRecord.get(0)), ZoneId.of("Asia/Shanghai"))));