Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First commit on supporting parquet #650

Open
wants to merge 27 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
74cbc83
First commit on supporting parquet
Feb 15, 2025
79bd222
catch file not found exception
Feb 17, 2025
2143c99
executed mvn spotless:apply
Feb 17, 2025
4f1ea77
added byte_array data type
Feb 18, 2025
f71610b
added ParquetStatsExtractor
Feb 20, 2025
c57a42f
added InternalDataFile population from parquet metadata
Feb 22, 2025
1557ea3
added col stats for parquet
Feb 22, 2025
24c474a
set todos
Feb 22, 2025
e1a3f35
integrated ParquetPartitionExtractor.java
Feb 23, 2025
fbbd1eb
added partitionValues to StatsExtractor builder
Feb 23, 2025
40c5e67
added the parquet conversion source provider
Feb 23, 2025
ec222de
run mvn spotless:apply
Feb 24, 2025
e0fbca8
edited ParquetSchemaExtractor to include some other LogicalTypes and …
Mar 5, 2025
6e2fc66
ParquetSchemaExtractor few fixes
Mar 5, 2025
b4c49b7
ParquetSchemaExtractor NULL type added
Mar 5, 2025
cac552a
ParquetSchemaExtractor Numeric and time types OK, TODO : Arrays and Maps
Mar 5, 2025
004d763
ParquetSchemaExtractor added groupTypes Map and List: TODO: tests
Mar 5, 2025
4b4593b
added -write parquet- to test Parquet types
Mar 6, 2025
9d56c21
added first test for primitive types
Mar 8, 2025
18ef037
cleanups
Mar 9, 2025
bd11c67
added timestamp metadata (millis, micros, nanos)
Mar 11, 2025
0dbedb0
added else type for each switch case
Mar 11, 2025
0233d54
added string type
Mar 11, 2025
8fc6a95
added Time type
Mar 11, 2025
c88fb25
added metadata for ENUM and FIXED
Mar 11, 2025
6c04cc7
adjusted primitive type detection
Mar 11, 2025
9bdd972
adjusted primitive types for fromInternalSchema sync, TODO: ENUM, LIS…
Mar 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
edited ParquetSchemaExtractor to include some other LogicalTypes and …
…added YAML reader for partition fields user configuration
Selim Soufargi committed Mar 5, 2025
commit e0fbca892c3d262a54b3c3d2379508925440beff
Original file line number Diff line number Diff line change
@@ -52,11 +52,7 @@ public enum InternalType {
DECIMAL,
TIMESTAMP,
TIMESTAMP_NTZ,
JSON,
BSON,
VARIANT,
GEOMETRY,
GEOGRAPHY;
JSON;
private final String name;

InternalType() {
Original file line number Diff line number Diff line change
@@ -31,12 +31,35 @@
import org.apache.xtable.model.stat.Range;
import org.apache.xtable.schema.SchemaFieldFinder;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;

public class ParquetPartitionExtractor {
private static final ParquetPartitionExtractor INSTANCE = new ParquetPartitionExtractor();

public static ParquetPartitionExtractor getInstance() {
return INSTANCE;
}
public static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory());


public PartitionConfiguration getPartitionsFromUserConfiguration(String configPath) throws IOException {
PartitionConfiguration partitionConfiguration = new PartitionConfiguration();
try (InputStream inputStream = Files.newInputStream(Paths.get(configPath))) {
ObjectReader objectReader = YAML_MAPPER.readerForUpdating(partitionConfiguration);
objectReader.readValue(inputStream);
return partitionConfiguration;
}
}


public List<InternalPartitionField> getInternalPartitionField(
Set<String> partitionList, InternalSchema schema) {
Original file line number Diff line number Diff line change
@@ -34,6 +34,12 @@
import org.apache.parquet.LogicalType;
import org.apache.parquet.LogicalTypes;
import org.apache.parquet.Schema;
import org.apache.parquet.Schema.Type;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.format.NullType;
import org.apache.parquet.schema.LogicalTypeAnnotation;



import org.apache.xtable.collectors.CustomCollectors;
import org.apache.xtable.exception.SchemaExtractorException;
@@ -96,38 +102,49 @@ public InternalSchema toInternalSchema(Schema schema) {
* @return a converted schema
*/
private InternalSchema toInternalSchema(
Schema schema, String parentPath, Map<String, IdMapping> fieldNameToIdMapping) {
Type schema, String parentPath, Map<String, IdMapping> fieldNameToIdMapping) {
// TODO - Does not handle recursion in parquet schema
InternalType newDataType;
Map<InternalSchema.MetadataKey, Object> metadata = new HashMap<>();
switch (schema.getType()) {
case INT:
LogicalType logicalType = schema.getLogicalType();
if (logicalType instanceof LogicalTypes.Date) {
switch (schema.getName()) {
case INT64:
LogicalType logicalType = schema.getLogicalTypeAnnotation();
if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
newDataType = InternalType.TIMESTAMP;
}
break;
case INT32:
LogicalType logicalType = schema.getLogicalTypeAnnotation();
if (logicalType instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
newDataType = InternalType.DATE;
} else {
newDataType = InternalType.INT;
}
// is also a TIME
break;
case STRING:
newDataType = InternalType.STRING;
case FLOAT:
LogicalType logicalType = schema.getLogicalTypeAnnotation();
if (logicalType instanceof LogicalTypeAnnotation.Float16LogicalTypeAnnotation) {
newDataType = InternalType.FLOAT;
}
break;
case BOOLEAN:
newDataType = InternalType.BOOLEAN;
case BYTES:
logicalType = schema.getLogicalType();
break;
case FIXED_LEN_BYTE_ARRAY:
logicalType=schema.getLogicalTypeAnnotation()
if (logicalType instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) {
newDataType = InternalType.UUID;
}
//TODO how to add INTERVAL ?
case BYTE_ARRAY:
logicalType = schema.getLogicalType();
// TODO: any metadata to add ?
// includes string json,BSON, Variant,GEOMETRY, geography,
logicalType=schema.getLogicalTypeAnnotation()
if (logicalType instanceof LogicalTypeAnnotation.EnumLogicalTypeAnnotation) {
newDataType = InternalType.ENUM;
}
if (logicalType == LogicalTypes.JSON) {
newDataType = InternalType.JSON;
} else if (logicalType instanceof LogicalTypes.BSON) {
newDataType = InternalType.BSON;
} else if (logicalType instanceof LogicalTypes.VARIANT) {
newDataType = InternalType.VARIANT;
} else if (logicalType instanceof LogicalTypes.GEOMETRY) {
newDataType = InternalType.GEOMETRY;
} else if (logicalType instanceof LogicalTypes.GEOGRAPHY) {
newDataType = InternalType.GEOGRAPHY;
newDataType = InternalType.BYTES;
} else if (logicalType instanceof LogicalTypeAnnotation.BsonLogicalTypeAnnotation) {
newDataType = InternalType.BYTES;
}
break;
case FIXED:
@@ -157,12 +174,6 @@ private InternalSchema toInternalSchema(
newDataType = InternalType.BYTES;
}
break;
case DOUBLE:
newDataType = InternalType.DOUBLE;
break;
case FLOAT:
newDataType = InternalType.FLOAT;
break;
case LONG:
logicalType = schema.getLogicalType();
if (logicalType instanceof LogicalTypes.TimestampMillis) {
@@ -185,10 +196,6 @@ private InternalSchema toInternalSchema(
newDataType = InternalType.LONG;
}
break;
case ENUM:
metadata.put(InternalSchema.MetadataKey.ENUM_VALUES, schema.getEnumSymbols());
newDataType = InternalType.ENUM;
break;
case NULL:
newDataType = InternalType.NULL;
break;
@@ -473,8 +480,9 @@ private String buildCurrentPath(InternalField field, String parentPath) {
.orElse(field.getName());
}

private static Schema finalizeSchema(Schema targetSchema, InternalSchema inputSchema) {
private static Type finalizeSchema(Type targetSchema, InternalSchema inputSchema) {
if (inputSchema.isNullable()) {
return targetSchema.union(Type("NULL","optional"))
return Schema.createUnion(Schema.create(Schema.Type.NULL), targetSchema);
}
return targetSchema;
Original file line number Diff line number Diff line change
@@ -54,6 +54,7 @@ public InternalTable table(String tableName, Long version) {
MessageType schema = parquetMetadataExtractor.getSchema(footer);
InternalSchema schema = schemaExtractor.toInternalSchema(schema);
Set<String> partitionKeys = initPartitionInfo().keySet();
// TODO getInternalPartitionField() to be replaced with the new YAML based partition fields
List<InternalPartitionField> partitionFields =
partitionExtractor.getInternalPartitionField(partitionKeys, schema);
DataLayoutStrategy dataLayoutStrategy =
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class PartitionConfiguration {
public String partition;


public PartitionConfiguration(String partition) {
this.partition = partition;
}

public PartitionConfiguration() {

}

public String getPartition() {
return this.partition;
}

}