-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat(schema): phase 17 - Remove AvroSchemaUtils usage (part 1) #17535
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(schema): phase 17 - Remove AvroSchemaUtils usage (part 1) #17535
Conversation
228d0dc to
ac69a39
Compare
1531814 to
72aa418
Compare
f9ba546 to
89cd44a
Compare
00b71d6 to
a1695a2
Compare
| HoodieSchemaField tableField = tableSchema.getField(columnName).get(); | ||
|
|
||
| if (tableField == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's update this to first return an Option and then we can check if it is present instead of checking for null here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| if (writerField != null && !tableField.schema().equals(writerField.schema())) { | ||
| // Check if this is just making the field nullable/non-nullable, which is safe from SI perspective | ||
| if (getNonNullTypeFromUnion(tableField.schema()).equals(getNonNullTypeFromUnion(writerField.schema()))) { | ||
| HoodieSchema nonNullTableField = HoodieSchemaUtils.getNonNullTypeFromUnion(tableField.schema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the getNonNullType method on the HoodieSchema when possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| @Override | ||
| public ClosableIterator<HoodieRecord<InternalRow>> getRecordIterator(HoodieSchema schema) throws IOException { | ||
| //TODO boundary to revisit in later pr to use HoodieSchema directly |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this TODO now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| * @return the writer field, if any does correspond, or None. | ||
| */ | ||
| public static HoodieSchemaField lookupWriterField(final HoodieSchema writerSchema, final HoodieSchemaField readerField) { | ||
| assert (writerSchema.getType() == HoodieSchemaType.RECORD); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the ValidationUtils here so we can return a more customized error message if the type is not RECORD
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| */ | ||
| public static HoodieSchemaField lookupWriterField(final HoodieSchema writerSchema, final HoodieSchemaField readerField) { | ||
| assert (writerSchema.getType() == HoodieSchemaType.RECORD); | ||
| final List<HoodieSchemaField> writerFields = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like the list is expected to have 0 or 1 elements so an Option may fit this usecase better, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap make sense!
| Option<HoodieSchemaField> fieldOpt = schema.getField(partitionField); | ||
| // if the field is not present in the schema, we assume it is a string | ||
| Schema fieldSchema = field == null ? Schema.create(Schema.Type.STRING) : getNonNullTypeFromUnion(field.schema()); | ||
| LogicalType logicalType = fieldSchema.getLogicalType(); | ||
| HoodieSchema fieldSchema = fieldOpt.isEmpty() ? HoodieSchema.create(HoodieSchemaType.STRING) : HoodieSchemaUtils.getNonNullTypeFromUnion(fieldOpt.get().schema()); | ||
| HoodieSchemaType logicalType = fieldSchema.getType(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Option<HoodieSchemaField> fieldOpt = schema.getField(partitionField); | |
| // if the field is not present in the schema, we assume it is a string | |
| Schema fieldSchema = field == null ? Schema.create(Schema.Type.STRING) : getNonNullTypeFromUnion(field.schema()); | |
| LogicalType logicalType = fieldSchema.getLogicalType(); | |
| HoodieSchema fieldSchema = fieldOpt.isEmpty() ? HoodieSchema.create(HoodieSchemaType.STRING) : HoodieSchemaUtils.getNonNullTypeFromUnion(fieldOpt.get().schema()); | |
| HoodieSchemaType logicalType = fieldSchema.getType(); | |
| Option<HoodieSchemaField> field = schema.getField(partitionField); | |
| // if the field is not present in the schema, we assume it is a string | |
| HoodieSchema fieldSchema = field.map(f -> f.schema().getNonNullType()).orElseGet(() -> HoodieSchema.create(HoodieSchemaType.STRING)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| || logicalType instanceof LogicalTypes.TimeMicros | ||
| || logicalType instanceof LogicalTypes.LocalTimestampMicros | ||
| || logicalType instanceof LogicalTypes.LocalTimestampMillis; | ||
| private static boolean isTimeBasedLogicalType(HoodieSchemaType logicalType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private static boolean isTimeBasedLogicalType(HoodieSchemaType logicalType) { | |
| private static boolean isTimeBasedType(HoodieSchemaType type) { |
The logical type is an Avro concept. We have proper types for date, time, timestamp, etc now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will modify, once your PR is merged, I'll prioritise your changes over mine for this.
| assertEquals(f.schema(), HoodieSchema.createNullable(HoodieSchemaType.STRING)); | ||
|
|
||
| // case5: user_partition is in originSchema, but partition_path is in originSchema | ||
| String[] pts4 = {"user_partition", "partition_path"}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think this is an error in the original test? It seems like this should be used below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using pts4 for the last test will cause the test to fail. I checked the git blame. This variable is there since the creation of this file. I hazard that this is WIP var that the original author forgot to remove this.
| } | ||
|
|
||
| @Override | ||
| public HoodieSchema processSchema(HoodieSchema schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was not overridden on purpose in case a user is extending the deprecated method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, will remove then, will keep the AvroSchemaUtils usage in the old deprecated, but will fully qualify it.
8e94f63 to
0354017
Compare
| .or(() -> Option.of(schema)) | ||
| .get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: while we're updating this, let's change this to use orElseGet instead of or and get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| private Option<HoodieSchema> getTableCreateSchemaWithoutMetaField() { | ||
| return metaClient.getTableConfig().getTableCreateSchema() | ||
| .map(HoodieSchema::fromAvroSchema) | ||
| .or(Option.empty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't need the or in this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, shouldValidate, allowProjection, getDropPartitionColNames()); | ||
|
|
||
| HoodieSchema writerSchema = HoodieSchemaUtils.createHoodieWriteSchema(config.getSchema(), false); | ||
| HoodieSchema tableSchema = HoodieSchema.createHoodieWriteSchema(existingTableSchema.get().toString(), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of converting existingTableSchema to a string, let's create a method that takes in a HoodieSchema similar to the AvroSchemaUtils
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It invokes org.apache.hudi.avro.HoodieAvroUtils#addMetadataFields(org.apache.avro.Schema, boolean), I don't think we need a new method, let's just call org.apache.hudi.common.schema.HoodieSchemaUtils#addMetadataFields(org.apache.hudi.common.schema.HoodieSchema).
| private final boolean writeLegacyListFormat; | ||
| private final ValueWriter[] rootFieldWriters; | ||
| private final Schema avroSchema; | ||
| private final HoodieSchema hoodieSchema; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: let's update hoodieSchema variable names to simply schema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| return (row, ordinal) -> recordConsumer.addLong((long) timestampRebaseFunction.apply(row.getLong(ordinal))); | ||
| } else if (logicalType.getName().equals(LogicalTypes.timestampMillis().getName())) { | ||
| return (row, ordinal) -> recordConsumer.addLong(DateTimeUtils.microsToMillis((long) timestampRebaseFunction.apply(row.getLong(ordinal)))); | ||
| if (resolvedSchema instanceof HoodieSchema.Timestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use the HoodieSchemaType instead of instanceof checks to determine the type of the field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| return (row, ordinal) -> recordConsumer.addLong(row.getLong(ordinal)); | ||
| } else if (logicalType.getName().equals(LogicalTypes.localTimestampMillis().getName())) { | ||
| return (row, ordinal) -> recordConsumer.addLong(DateTimeUtils.microsToMillis(row.getLong(ordinal))); | ||
| if (resolvedSchema instanceof HoodieSchema.Timestamp) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here let's use the type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| ValidationUtils.checkArgument(writerSchema.getType() == HoodieSchemaType.RECORD, writerSchema + " is not a record"); | ||
| Option<HoodieSchemaField> result = Option.empty(); | ||
| final Option<HoodieSchemaField> directOpt = writerSchema.getField(readerField.name()); | ||
| if (directOpt.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a separate variable for the result? Can we just use directOpt directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, simplified this. Was too fixated on keeping directOpt final, my bad,
| Object javaInput = ObjectInspectorConverters.getConverter(writableOIOld, oldObjectInspector).convert(oldWritable); | ||
| if (isDecimalSchema(oldSchema)) { | ||
| javaInput = HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(getDecimalValue(javaInput, oldSchema), oldSchema, oldSchema.getLogicalType()); | ||
| if (oldSchema instanceof HoodieSchema.Decimal) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the schema type in this class as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
e88014e to
219eb35
Compare
|
Had to squash as i had too many commits and resolving conflicts one by one was too painful. |
219eb35 to
264c04c
Compare
| import static org.apache.hudi.avro.HoodieAvroUtils.toJavaDate; | ||
| import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; | ||
|
|
||
| public class HoodieArrayWritableAvroUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update this class name to just be HoodieArrayWritableSchemaUtils
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, cool with me.
| HoodieSchemaField oldField = oldFieldOpt.get(); | ||
| values[i] = rewriteRecordWithNewSchema(arrayWritable.get()[oldField.pos()], oldField.schema(), newField.schema(), renameCols, fieldNames); | ||
| } else if (newField.defaultVal() instanceof JsonProperties.Null) { | ||
| } else if (newField.defaultVal().isPresent() && newField.defaultVal().get() instanceof JsonProperties.Null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JsonProperties needs to be HoodieJsonProperties now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changing this to HoodieSchema.NULL_VALUE they are the same underlying value.
| * @return true if reader schema can read data written with writer schema | ||
| * @throws IllegalArgumentException if schemas are null | ||
| */ | ||
| public static boolean isSchemaCompatible(HoodieSchema readerSchema, HoodieSchema writerSchema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there existing test cases we can migrate to TestHoodieSchemaCompatibility as part of this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, I'll add some.
fb8531b to
4567f05
Compare
- Fix checkstyle
- Add tests
- Address comments
- Fix failing azure IT
- Fix compilation errors after rebase
- Fix failing TestHoodieFileGroupReaderOnFlink tests
- Fix failing TestHoodieFileGroupReaderOnHive
- Fix failing TestPartitionPathParser tests
- Address comments
- Fix Java17 compilation error
- Fix checkstyle
- Fix direct #get() usages in HoodieRowParquetWRiteSupport
- Fix removal
- Remove AvroSchemaUtils usage from TestParquetUtils
- Remove AvroSchemaUtils usage from HoodieSparkParquetReader
- Remove AvroSchemaUtils usage from HoodieTable
- Remove AvroSchemaUtils usage from HoodieTestDataGenerator
- Fix tests from missing namespace and doc
- Remove Avro.Schema and AvroSchemaUtils usage from HoodieArrayWritableAvroUtils
- Adapt KafkaOffsetPostProcessor
- Ignore AvroSchemaUtils#getAvroRecordQualifiedName usages
- Remove AvroSchemaUtils from TestHoodieCommitMetadata
- Remove AvroSchemaUtils from HoodieRowParquetWriteSupport
- Remove AvroSchemaUtils from HoodieRealtimeRecordReaderUtils
- Remove AvroSchemaUtils from TestMergeHandle
- Ignore ParquetRowIndexBasedSchemaHandler
- Add ignore flag in FileGroupReaderSchemaHandler
- Add ignore flag in OrderingValueEngineTypeConverter
- Remove AvroSchemaUtils usage from ConcurrentSchemaEvolutionTableSchemaGetter and TableSchemaResolver
- Remove AvroSchemaUtils usages from PartitionPathParser and TestPartitionPathParser
- Remove AvroSchemaUtils usages from TestHoodieSchemaCompatibility
- Remove AvroSchemaUtils usages from TestSparkSortAndSizeClustering
- Remove AvroSchemaUtils usages from TestHoodieAvroReaderContext
- Ignore AvroSchemaRepair and TestAvroSchemaRepair
- Remove AvroSchemaUtils usage from FlinkRowDataReaderContext
- Ignore SchemaBackwardsCompatibilityException
- Ignore HiveAvroSerializer and HiveTypeUtils + fully qualify AvroSchemaUtils usages
- Ignore HoodieSchemaUtils
- Ignore MissingSchemaFieldException
- Remove AvroSchemaUtils from HoodieMergeHelper
- Ignore classes:
- HoodieAvroReaderContext
- AvroRecordContext
- HoodieSchemaCompatibility
- Fully qualify AvroSchemaUtils call in HoodieAvroUtils
- Remove flags from *AvroSchemaUtils
- Add more references to AvroSchemaUtils missed out on first scan
- Remove AvroSchemaUtils and Avro.Schema usage in HoodieTable and HoodieWriteHandle
- Remove AvroSchemaUtils and Avro.Schema usage in TestTableSchemaEvolution
- Find all AvroSchemaUtils usages and prefix them with comment /*~~>*/
4567f05 to
2654cac
Compare
| } else { | ||
| throw new UnsupportedOperationException("Unsupported Avro logical type for TimestampType: " + logicalType); | ||
| // Default to micros precision when no timestamp schema is available | ||
| return (row, ordinal) -> recordConsumer.addLong((long) timestampRebaseFunction.apply(row.getLong(ordinal))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously it looks like this threw an UnsupportedOperationException, should we keep that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it's unlikely that we'll step into this clause, so, Exception might be safer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it's I believe it's unlikely that we'll step into this clause, so, Exception might be safer.
I was trying to capture the case of logicalType == null. I am not sure if HoodieSchema will resolve an Avro schema to a case where logicalType == null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment addressed.
bcd1764 to
3be646d
Compare

Describe the issue this Pull Request addresses
Reference issue: #14282
This PR focuses on migrating usages of
AvroSchemaUtilsto the internalHoodieSchemaabstraction andHoodieSchemaUtils.Key Changes:
HoodieTable,HoodieWriteHandle,HoodieRowParquetWriteSupport) to useHoodieSchemaandHoodieSchemaCompatibility.AvroSchemaUtilsstatic functions have been fully qualified. This explicitly marks technical debt and makes these usages easily searchable for future refactoring.HoodieSchemaUtilsandHoodieSchemaCompatibilityto supportHoodieSchemaobjects directly.Classes NOT included in migration (Fully Qualified)
The following classes retain
AvroSchemaUtilsusage but are now fully qualified:AvroRecordContextTestAvroSchemaUtilsAvroSchemaUtils(Self)AvroSchemaComparatorForRecordProjectionHoodieAvroUtilsHoodieSchemaCompatibilityMissingSchemaFieldExceptionHoodieSchemaUtilsHiveAvroSerializerHiveTypeUtilsSchemaBackwardsCompatibilityExceptionAvroSchemaRepairTestAvroSchemaRepairTestHoodieSchemaCompatibility(For consistency testing)Classes Addressed in dependent PR #17536
FileGroupReaderSchemaHandlerOrderingValueEngineTypeConverterParquetRowIndexBasedSchemaHandlerHoodieAvroReaderContextSpecific Ignored Usages (getAvroRecordQualifiedName)
The following classes ignore
AvroSchemaUtils.getAvroRecordQualifiedName:BaseHoodieWriteClientHoodieCatalogHoodieHiveCatalogHoodieTableFactoryStreamSyncTestHoodieTableFactorySummary and Changelog
This PR is a refactoring effort to improve schema abstraction within Hudi. By moving away from raw Avro utils, we pave the way for better type safety and cleaner internal APIs.
Impact
HoodieSchemaUtilsandHoodieSchemaCompatibilitynow play a larger role in schema validation and evolution logic.Risk Level
Low
Documentation Update
none
Contributor's checklist