Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First commit on supporting parquet #650

Open
wants to merge 27 commits into
base: main
Choose a base branch
from

Conversation

unical1988
Copy link

Important Read

  • Please ensure the GitHub issue is mentioned at the beginning of the PR

What is the purpose of the pull request

This PR is a first attempt (building off of previous attempt) to include Parquet file for Syncing to XTable.

Brief change log

  • Added Schema Extractor for Parquet (almost similar to Avro's)
  • Added Table Extractor using metadata
  • Added Conversion Source Script

Copy link
Contributor

@vinishjail97 vinishjail97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your first contribution @unical1988 , can you run mvn spotless:apply and push the PR ?

import org.apache.xtable.spi.extractor.ConversionSource;

@Builder
public class ParquetConversionSource implements ConversionSource<Long> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few clarification questions to ensure we are on the same page.

  1. Will this source assume partitioning based on directory structure or user can choose partition columns from the parquet file schema ?
  2. If a parquet file is removed from the source root path, will it be handled or ignored ? Using file notifications makes this easier but we can find a way to do this through listing as well.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I guess getPartitionFromDirectoryStructure() does get the partitions from the directory, but optionally I can add retrieving partitioning from columns (as set by the user).
  2. I can add catching (FileNotFoundException) to handle the case when the source path is not found.

Copy link
Contributor

@ashvina ashvina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @unical1988,
Thank you for your contribution! To ensure we fully understand the scope and assumptions of the Parquet support feature, could you please submit a RFC? A high-level description will make it easier for us to review the PR. For instance, please include details about schema consistency and validation across files, and different assumptions and error conditions.
Thanks again!

@@ -0,0 +1,249 @@
package org.apache.xtable.parquet;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

APL header is missing. Please run spotless plugin on the code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did, but on the other hand, not sure what is RFC, is there docs that explain what is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example: #634

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@unical1988 Here's the template, I can help if you have more clarifications, we can discuss in the slack.
https://github.com/apache/incubator-xtable/blob/main/rfc/template.md

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 118 to 122
case BYTES:
case JSON:
case BSON:
case FIXED:
logicalType = schema.getLogicalType();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why combining all of them in a single classification ? FIXED can be separate IMO. BYTES, JSON BSON are byte array kind of types.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added BYTE_ARRAY type (are there any metadata to add?): according to this: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md.

Comment on lines 48 to 61
@Builder.Default
private static final ParquetSchemaConverter schemaExtractor =
ParquetSchemaConverter.getInstance();

@Builder.Default
private static final ParquetMetadataExtractor parquetMetadataExtractor =
ParquetMetadataExtractor.getInstance();

@Builder.Default
private static final ParquetPartitionHelper parquetPartitionHelper =
ParquetPartitionHelper.getInstance();

private Map<String, List<String>> initPartitionInfo() {
return getPartitionFromDirectoryStructure(hadoopConf, basePath, Collections.emptyMap());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a ParquetStatsExtractor which computes col stats from parquet footer. These are populated in InternalDataFile

ParquetSchemaConverter.getInstance();

@Builder.Default
private static final ParquetMetadataExtractor parquetMetadataExtractor =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is for stats ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinishjail97 no, but I just added a first method for that class named ParquetStatsExtractor

Copy link
Contributor

@vinishjail97 vinishjail97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on the PR @unical1988, added comments.

There seems to be some confusion about extracting partition values, let me know what you think of this.

basePath/ 
                p1/.. (Can be recursive partitions for parquet files)
                p2/ ..
                p3/.. 
                .hoodie/  (Hudi Metadata)
                metadata/ (Iceberg metadata) 
                _delta_log/ (Delta metadata) 

To extract the partition fields (emphasis on fields here not the actual values) we can it in two ways:

  1. Assume table is not partitioned, this would just sync the parquet files in the target formats using the physical paths you have extracted in one of the classes. When you read those tables, partition pruning won't work.
  2. Ask user input (from YAML configuration) for the partition fields from the parquet file schema. Many of these analytical datasets are partitioned by date either through an actual date column in the parquet file or a timestamp field through which the date is actually extracted.

@@ -34,12 +34,16 @@
class ExternalTable {
/** The name of the table. */
protected final @NonNull String name;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes coming from mvn spotless:apply ? Wondering how latest main branch doesn't reflect these.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding how to get partitionValues, I think it is best to discuss this in our meeting of today, but anyways, can it be asked from the user as YAML configuration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import org.apache.xtable.model.storage.TableFormat;

/**
* Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for Delta may be confusing with the table format, this should be enough I guess ?
Extracts {@link InternalTable} canonical representation of a table at a point in time

@Builder.Default
private static final ParquetMetadataExtractor parquetMetadataExtractor =
ParquetMetadataExtractor.getInstance();
private Map<String, List<String>> initPartitionInfo() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//nit new line between 45 and 46 lines.

Comment on lines 55 to 58
Set<String> partitionKeys = initPartitionInfo().keySet();
List<InternalPartitionField> partitionFields =
partitionExtractor
.getInternalPartitionField(partitionKeys,schema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to be some confusion here.

initPartitionInfo().keySet() will return all the unique partitionPaths for all parquet files combined.

InternalPartitionField is the partition field name and not the values for the table.

Copy link
Author

@unical1988 unical1988 Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean that

  1. getPartitionFromDirectoryStructure() (in ParquetConversionSource.java) should return the partition accounting for the table name (only one file)?
  2. that same method should maybe consider returning partition values for the table and not field names (e.g., in line 254 currentPartitionMap.computeIfAbsent(partitionKeyValue[1]/*instead of the actual partitionKeyValue[0]*/, k ->new ArrayList<>()).add(partitionKeyValue[1]);)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inferring the partition fields from directory structure without knowing how user generated is difficult, so we can support both these options. getPartitionsFromUserConfiguration(..) is one name I could think of right now, feel free to change it.

#650 (review)

Comment on lines 22 to 23
public List<InternalPartitionField> getInternalPartitionField(
Set<String> partitionList, InternalSchema schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this would work because partitionList is a list of partition field values and you won't find this in the schema.

Comment on lines 119 to 134
logicalType = schema.getLogicalType();
// TODO: any metadata to add ?
if (logicalType== LogicalTypes.JSON) {
newDataType = InternalType.JSON;
}
else if (logicalType instanceof LogicalTypes.BSON) {
newDataType = InternalType.BSON;
}
else if (logicalType instanceof LogicalTypes.VARIANT) {
newDataType = InternalType.VARIANT;
}
else if (logicalType instanceof LogicalTypes.GEOMETRY) {
newDataType = InternalType.GEOMETRY;
}
else if (logicalType instanceof LogicalTypes.GEOGRAPHY) {
newDataType = InternalType.GEOGRAPHY;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it to be simple to map it to BYTES ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know your thoughts as well, I was asking because the targets (iceberg, delta and hudi) don't seem to support these types and just map it to byte array or binary.

* parquet data types and canonical data types.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ParquetSchemaConverter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class looks okay and thanks for putting this up, can you add unit tests as well for this ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add unit tests, are there similar test examples that I can start off from?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 39 to 41
List<PartitionValue> partitionValues =
partitionExtractor
.getPartitionValue(parentPath,file.getPath().toString(),schema,partitionInfo);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The partitionValues are not the actual partition values but it's a list of partitioning fields in the schema with their range.
https://github.com/apache/incubator-xtable/blob/main/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java#L44

@unical1988
Copy link
Author

Thanks for working on the PR @unical1988, added comments.

There seems to be some confusion about extracting partition values, let me know what you think of this.

basePath/ 
                p1/.. (Can be recursive partitions for parquet files)
                p2/ ..
                p3/.. 
                .hoodie/  (Hudi Metadata)
                metadata/ (Iceberg metadata) 
                _delta_log/ (Delta metadata) 

To extract the partition fields (emphasis on fields here not the actual values) we can it in two ways:

  1. Assume table is not partitioned, this would just sync the parquet files in the target formats using the physical paths you have extracted in one of the classes. When you read those tables, partition pruning won't work.
  2. Ask user input (from YAML configuration) for the partition fields from the parquet file schema. Many of these analytical datasets are partitioned by date either through an actual date column in the parquet file or a timestamp field through which the date is actually extracted.

We would want to read the configuration (or the partition fields) into a Java object (if I am not wrong). p1/ then could be date - year - month -day and p2/could be location and p3/ could be ID, so given these fields we could extract the partitionValues located at the related subdirectories for a specific parquet file, is that correct?if yes, how could the Java object be defined?


Stats valueCountStats = new Stats();
Stats allStats = new Stats();
Stats uncStats = new Stats();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does unc mean here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncompressed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's use the full word, it will be more clear

switch (schema.getName()) {
//PrimitiveTypes
case "INT64":
logicalType = schema.getLogicalTypeAnnotation();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should newDataType be set to LONG when there is no logical type?

Similarly it seems whenever there is a possibility of a logical type the newDataType is not currently set.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If 'no logical type' refers to the case where the logical type is null, then case UNKNOWN is my answer.

What is meant by "there is a possibility of a logical type", do you mean all handled cases except the default one? How is it not set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You do not set the newDataType unless there is a logical type. This will mean that if there is a plain long then newDataType is simply null which is unexpected

Copy link
Author

@unical1988 unical1988 Mar 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the case of "no logical type" here? is it the default case ?
I would say that if plain long is detected then "UNKNOWN" case is concerned. How do you suggest handling such case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there is no logical type on the INT64 then you can set newDataType to InternalType.LONG.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INT64 should be InternalType.INT or InternalType.LONG?

new org.apache.parquet.parquet.ParquetSchemaConverter()
.convert(parquetMetadataExtractor.getSchema(parquetMetadata));

Set<String> partitionKeys = initPartitionInfo().keySet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be user input.

@vinishjail97
Copy link
Contributor

Thanks for working on the PR @unical1988, added comments.

There seems to be some confusion about extracting partition values, let me know what you think of this.

basePath/ 
                p1/.. (Can be recursive partitions for parquet files)
                p2/ ..
                p3/.. 
                .hoodie/  (Hudi Metadata)
                metadata/ (Iceberg metadata) 
                _delta_log/ (Delta metadata) 

To extract the partition fields (emphasis on fields here not the actual values) we can it in two ways:

  1. Assume table is not partitioned, this would just sync the parquet files in the target formats using the physical paths you have extracted in one of the classes. When you read those tables, partition pruning won't work.
  2. Ask user input (from YAML configuration) for the partition fields from the parquet file schema. Many of these analytical datasets are partitioned by date either through an actual date column in the parquet file or a timestamp field through which the date is actually extracted.
public class InputPartitionColumn {
   String fieldName; 
   PartitionTransformType transformType;
}

InputPartitionKeyConfig should be part of Table object in DatasetConfig.  

1. No transform -> The values for partition keys in the parquet file are concatenated and partitionPath is generated.  Configuring this in InternalTable object. 
2. Transformation ->  timestamp -> transform(timestamp) -> year/date/month/xyz.parquet

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants