-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat(schema): Phase 18 - HoodieAvroUtils removal (Part 2) #17763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat(schema): Phase 18 - HoodieAvroUtils removal (Part 2) #17763
Conversation
1c8ac53 to
9650fda
Compare
| * @return HoodieSchema containing record key and partition path fields | ||
| */ | ||
| public static HoodieSchema getRecordKeyPartitionPathSchema() { | ||
| List<HoodieSchemaField> toBeAddedFields = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super nitpick: initialize this with size 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I keep forgetting about this, will try to keep this in mind in the future!
Addressed.
| * @return the Avro schema of the data file. | ||
| */ | ||
| public abstract Schema readAvroSchema(HoodieStorage storage, StoragePath filePath); | ||
| public abstract HoodieSchema readHoodieSchema(HoodieStorage storage, StoragePath filePath); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets name this readSchema?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tried doing this initially, but it conflicts with ParquetUtils#readSchema which returns a MessageType.
I will rename ParquetUtils#readSchema to ParquetUtils#readMessageType so that this abstract readHoodieSchema can be renamed to readSchema.
Addressed.
| * @param type ORC schema of the value Object. | ||
| * @param colVector The column vector to store the value Object. | ||
| * @param avroSchema Avro schema of the value Object. | ||
| * @param schema Avro schema of the value Object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Update the comment to fix alignment and remove Avro
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
| throw new IllegalStateException( | ||
| String.format("Unrecognized TimePrecision for: %s for Time type: %s", timeSchema.getPrecision(), timeSchema)); | ||
| } | ||
| case TIMESTAMP: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we previously did not support translating local timestamps, should that be preserved here? They likely just get treated as longs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, we should preserve as this is a refactoring. Let's try not to change any behaviour first.
I will add a comment to this explicit handling too.
Will add a test for this too.
| .map(fieldName -> avroSchema.getField(fieldName).schema()) | ||
| .toArray(Schema[]::new); | ||
| this.fieldSchemas = fieldNames.stream() | ||
| .map(fieldName -> this.schema.getField(fieldName).get().schema()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's use orElseThrow if the field is missing instead of directly using get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
| fileSchema = Option.ofNullable(parquetUtils.readHoodieSchema(storage, path)); | ||
| } | ||
| return HoodieSchema.fromAvroSchema(fileSchema.get()); | ||
| return fileSchema.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this could be simplified to fileSchema.orElseGet(...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fileSchema is assigned to if it is empty.
Will modify it to:
fileSchema = fileSchema.or(() -> Option.ofNullable(parquetUtils.readSchema(storage, path)));
return fileSchema.get();There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just use orElseGet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, updated it to use orElseGet
.
| HoodieSchemaType type = schema.getType(); | ||
| if (type == HoodieSchemaType.BOOLEAN) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's update this to a switch statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| List<HoodieSchemaField> toBeAddedFields = new ArrayList<>(); | ||
| for (String field: fields) { | ||
| Schema.Field schemaField = | ||
| new Schema.Field(field, METADATA_FIELD_SCHEMA, "", JsonProperties.NULL_VALUE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we continue to use a constant here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, addressed.
|
|
||
| Schema schema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); | ||
| HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(new AvroSchemaConverter().convert(schema), | ||
| HoodieSchema schema = HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be HoodieTestDataGenerator.HOODIE_SCHEMA
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
| reader = HoodieIOFactory.getIOFactory(HoodieStorageUtils.getStorage(path, storageConf)).getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) | ||
| .getFileReader(hoodieConfig, path, HoodieFileFormat.HFILE, Option.empty()); | ||
|
|
||
| //TODO boundary for now to revisit in later pr to use HoodieSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove todo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| Schema repairedFileSchema = AvroSchemaRepair.repairLogicalTypes(getSchema().toAvroSchema(), schema.toAvroSchema()); | ||
| Option<Schema> promotedSchema = Option.empty(); | ||
| if (!renamedColumns.isEmpty() || HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema, schema)) { | ||
| if (!renamedColumns.isEmpty() || HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(repairedFileSchema, schema.toAvroSchema())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will recordNeedsRewriteForExtendedAvroTypePromotion be handled in another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not handled in any of the current PRs now. I am still systematically trying to migrate what is easily migrated first.
Whatever that is left, will then evaluate if we should migrate them over. I haven't really analysed what this method entails.
| if (field.schema().getType().equals(Schema.Type.NULL)) { | ||
| for (HoodieSchemaField field : fields) { | ||
| if (field.schema().getType() == HoodieSchemaType.NULL) { | ||
| continue; // Avro nulls are not encoded, unless they are null unions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: update comment to remove Avro
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
|
||
| private List<Type> convertFields(List<Schema.Field> fields, String schemaPath) { | ||
| private List<Type> convertFields(List<HoodieSchemaField> fields, String schemaPath) { | ||
| List<Type> types = new ArrayList<Type>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: size this array to the incoming fields size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| private Schema convertFields(String name, List<Type> parquetFields, Map<String, Integer> names) { | ||
| List<Schema.Field> fields = new ArrayList<Schema.Field>(); | ||
| private HoodieSchema convertFields(String name, List<Type> parquetFields, Map<String, Integer> names) { | ||
| List<HoodieSchemaField> fields = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, size this list to the incoming list size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<HoodieSchema>() { | ||
| @Override | ||
| public java.util.Optional<Schema> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) { | ||
| public java.util.Optional<HoodieSchema> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While we're here, let's import Optional to shorten the line lengths in this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't change this as java.util.Optional is a banned import according to our checkstyle rules.
We cannot change this to use our hudi common Option too as the method we are overriding calls for a java.util.Optional return type in: org.apache.parquet.schema.LogicalTypeAnnotation#accept
/**
* Visits this logical type with the given visitor
*
* @param logicalTypeAnnotationVisitor the visitor to visit this type
*/
public abstract <T> Optional<T> accept(LogicalTypeAnnotationVisitor<T> logicalTypeAnnotationVisitor);| Schema schema = Schema.createRecord("record1", null, null, false); | ||
| Schema optionalIntArray = Schema.createArray(optional(Schema.create(INT))); | ||
| schema.setFields(Arrays.asList(new Schema.Field("myintarray", optionalIntArray, null, null))); | ||
| HoodieSchema optionalIntArray = HoodieSchema.createArray(HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.INT))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a tip, you can directly use HoodieSchema.createNullable(HoodieSchemaType.INT) to shorten the setup in the tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addresed.
| Arrays.asList(new Schema.Field("myarray", Schema.createArray(optional(innerRecord)), null, null))); | ||
| HoodieSchema optionalString = HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING)); | ||
| List<HoodieSchemaField> innerRecordFields = Arrays.asList( | ||
| HoodieSchemaField.of("s1", optionalString, null, JsonProperties.NULL_VALUE), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we update JsonProperties.NULL_VALUE to HoodieSchema.NULL_VALUE?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap. Removed the import and updated the other references to JsonProperties.NULL_VALUE.
| HoodieSchema listOfLists = HoodieSchema.createNullable(HoodieSchema.createArray(HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.INT)))); | ||
| HoodieSchema schema = HoodieSchema.createRecord("UnknownTwoLevelListInList", null, null, false, | ||
| Collections.singletonList(HoodieSchemaField.of("listOfLists", listOfLists, null, HoodieSchema.NULL_VALUE))); | ||
| System.err.println("Avro schema: " + schema.toString(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove this println and any others in the file while we're updating this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed all:
System.err.println("Avro schema: " + schema.toString(true));| assertEquals( | ||
| COMPATIBLE, checkReaderWriterCompatibility(fromAvro, toAvro).getType()); | ||
| COMPATIBLE, checkReaderWriterCompatibility(fromSchema.toAvroSchema(), toSchema.toAvroSchema()).getType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can update this to assertTrue(HoodieSchemaCompatibility.areSchemasCompatible(fromSchema, toSchema)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
594dcdf to
2bc369b
Compare
| * @param unionVector The vector to store value. | ||
| * @param unionChildTypes All possible types for the value Object. | ||
| * @param avroSchema Avro union schema for the value Object. | ||
| * @param schema Avro union schema for the value Object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove Avro from the comment here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
|
||
| @Override | ||
| public ClosableIterator<IndexedRecord> getIndexedRecordIterator(HoodieSchema readerSchema, HoodieSchema requestedSchema) throws IOException { | ||
| //TODO boundary for now to revisit in later pr to use HoodieSchema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove the TODO comments in this file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| fileSchema = Option.ofNullable(parquetUtils.readHoodieSchema(storage, path)); | ||
| } | ||
| return HoodieSchema.fromAvroSchema(fileSchema.get()); | ||
| return fileSchema.get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just use orElseGet?
bff2d71 to
8ddd0ef
Compare
Describe the issue this Pull Request addresses
Reference issue: #14283
Remove methods that were migrated to
HoodieSchemaUtils, consolidate remaining Avro-specific utilities, update documentation. The scope here only covers:hudi-hadoop-commonNOTE: Merge this after #17599
Key Changes:
HoodieAvroUtilsstatic functions have been fully qualified. This explicitly marks technical debt and makes these usages easily searchable for future refactoring.TODOs:
Will need some suggestions on how to rename:
These will also need to be renamed:
I'll leave this as an exercise before actually performing the renames.
Summary and Changelog
Swap out HoodieAvroUtils to HoodieSchema equivalent.
Impact
None
Risk Level
Low
Documentation Update
Contributor's checklist