Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.filter2.predicate;

import java.util.HashMap;
import java.util.Map;

import static org.apache.parquet.Preconditions.checkArgument;

/**
* Converts a {@code Class<primitive>} to it's corresponding {@code Class<Boxed>}, eg
* {@code Class<int>} to {@code Class<Integer>}
*/
public class PrimitiveToBoxedClass {
private static final Map<Class<?>, Class<?>> primitiveToBoxed = new HashMap<Class<?>, Class<?>>();

static {
primitiveToBoxed.put(boolean.class, Boolean.class);
primitiveToBoxed.put(byte.class, Byte.class);
primitiveToBoxed.put(short.class, Short.class);
primitiveToBoxed.put(char.class, Character.class);
primitiveToBoxed.put(int.class, Integer.class);
primitiveToBoxed.put(long.class, Long.class);
primitiveToBoxed.put(float.class, Float.class);
primitiveToBoxed.put(double.class, Double.class);
}

public static Class<?> get(Class<?> c) {
checkArgument(c.isPrimitive(), "Class " + c + " is not primitive!");
return primitiveToBoxed.get(c);
}

private PrimitiveToBoxedClass() { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Map;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.Column;
import org.apache.parquet.filter2.predicate.Operators.ColumnFilterPredicate;
Expand All @@ -36,8 +35,8 @@
import org.apache.parquet.filter2.predicate.Operators.NotEq;
import org.apache.parquet.filter2.predicate.Operators.Or;
import org.apache.parquet.filter2.predicate.Operators.UserDefined;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;

import static org.apache.parquet.Preconditions.checkArgument;
import static org.apache.parquet.Preconditions.checkNotNull;
Expand Down Expand Up @@ -73,19 +72,11 @@ public static void validate(FilterPredicate predicate, MessageType schema) {
// we are validating that what the user provided agrees with these.
private final Map<ColumnPath, ColumnDescriptor> columnsAccordingToSchema = new HashMap<ColumnPath, ColumnDescriptor>();

// the original type of a column, keyed by path
private final Map<ColumnPath, OriginalType> originalTypes = new HashMap<ColumnPath, OriginalType>();

private SchemaCompatibilityValidator(MessageType schema) {

for (ColumnDescriptor cd : schema.getColumns()) {
ColumnPath columnPath = ColumnPath.get(cd.getPath());
columnsAccordingToSchema.put(columnPath, cd);

OriginalType ot = schema.getType(cd.getPath()).getOriginalType();
if (ot != null) {
originalTypes.put(columnPath, ot);
}
}
}

Expand Down Expand Up @@ -182,7 +173,7 @@ private <T extends Comparable<T>> void validateColumn(Column<T> column) {
+ "Column " + path.toDotString() + " is repeated.");
}

ValidTypeMap.assertTypeValid(column, descriptor.getType(), originalTypes.get(path));
ValidTypeMap.assertTypeValid(column, descriptor.getType());
}

private ColumnDescriptor getColumnDescriptor(ColumnPath columnPath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@
import java.util.Map;
import java.util.Set;

import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.filter2.predicate.Operators.Column;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;

Expand All @@ -43,40 +42,36 @@ public class ValidTypeMap {
private ValidTypeMap() { }

// classToParquetType and parquetTypeToClass are used as a bi-directional map
private static final Map<Class<?>, Set<FullTypeDescriptor>> classToParquetType = new HashMap<Class<?>, Set<FullTypeDescriptor>>();
private static final Map<FullTypeDescriptor, Set<Class<?>>> parquetTypeToClass = new HashMap<FullTypeDescriptor, Set<Class<?>>>();
private static final Map<Class<?>, Set<PrimitiveTypeName>> classToParquetType = new HashMap<Class<?>, Set<PrimitiveTypeName>>();
private static final Map<PrimitiveTypeName, Set<Class<?>>> parquetTypeToClass = new HashMap<PrimitiveTypeName, Set<Class<?>>>();

// set up the mapping in both directions
private static void add(Class<?> c, FullTypeDescriptor f) {
Set<FullTypeDescriptor> descriptors = classToParquetType.get(c);
private static void add(Class<?> c, PrimitiveTypeName p) {
Set<PrimitiveTypeName> descriptors = classToParquetType.get(c);
if (descriptors == null) {
descriptors = new HashSet<FullTypeDescriptor>();
descriptors = new HashSet<PrimitiveTypeName>();
classToParquetType.put(c, descriptors);
}
descriptors.add(f);
descriptors.add(p);

Set<Class<?>> classes = parquetTypeToClass.get(f);
Set<Class<?>> classes = parquetTypeToClass.get(p);
if (classes == null) {
classes = new HashSet<Class<?>>();
parquetTypeToClass.put(f, classes);
parquetTypeToClass.put(p, classes);
}
classes.add(c);
}

static {
// basic primitive columns
add(Integer.class, new FullTypeDescriptor(PrimitiveTypeName.INT32, null));
add(Long.class, new FullTypeDescriptor(PrimitiveTypeName.INT64, null));
add(Float.class, new FullTypeDescriptor(PrimitiveTypeName.FLOAT, null));
add(Double.class, new FullTypeDescriptor(PrimitiveTypeName.DOUBLE, null));
add(Boolean.class, new FullTypeDescriptor(PrimitiveTypeName.BOOLEAN, null));
for (PrimitiveTypeName t: PrimitiveTypeName.values()) {
Class<?> c = t.javaType;

// Both of these binary types are valid
add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, null));
add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, null));
if (c.isPrimitive()) {
c = PrimitiveToBoxedClass.get(c);
}

add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.BINARY, OriginalType.UTF8));
add(Binary.class, new FullTypeDescriptor(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, OriginalType.UTF8));
add(c, t);
}
}

/**
Expand All @@ -87,14 +82,12 @@ private static void add(Class<?> c, FullTypeDescriptor f) {
*
* @param foundColumn the column as declared by the user
* @param primitiveType the primitive type according to the schema
* @param originalType the original type according to the schema
*/
public static <T extends Comparable<T>> void assertTypeValid(Column<T> foundColumn, PrimitiveTypeName primitiveType, OriginalType originalType) {
public static <T extends Comparable<T>> void assertTypeValid(Column<T> foundColumn, PrimitiveTypeName primitiveType) {
Class<T> foundColumnType = foundColumn.getColumnType();
ColumnPath columnPath = foundColumn.getColumnPath();

Set<FullTypeDescriptor> validTypeDescriptors = classToParquetType.get(foundColumnType);
FullTypeDescriptor typeInFileMetaData = new FullTypeDescriptor(primitiveType, originalType);
Set<PrimitiveTypeName> validTypeDescriptors = classToParquetType.get(foundColumnType);

if (validTypeDescriptors == null) {
StringBuilder message = new StringBuilder();
Expand All @@ -105,18 +98,18 @@ public static <T extends Comparable<T>> void assertTypeValid(Column<T> foundColu
.append(foundColumnType.getName())
.append(" which is not supported in FilterPredicates.");

Set<Class<?>> supportedTypes = parquetTypeToClass.get(typeInFileMetaData);
Set<Class<?>> supportedTypes = parquetTypeToClass.get(primitiveType);
if (supportedTypes != null) {
message
.append(" Supported types for this column are: ")
.append(supportedTypes);
} else {
message.append(" There are no supported types for columns of " + typeInFileMetaData);
message.append(" There are no supported types for columns of " + primitiveType);
}
throw new IllegalArgumentException(message.toString());
}

if (!validTypeDescriptors.contains(typeInFileMetaData)) {
if (!validTypeDescriptors.contains(primitiveType)) {
StringBuilder message = new StringBuilder();
message
.append("FilterPredicate column: ")
Expand All @@ -126,53 +119,10 @@ public static <T extends Comparable<T>> void assertTypeValid(Column<T> foundColu
.append(") does not match the schema found in file metadata. Column ")
.append(columnPath.toDotString())
.append(" is of type: ")
.append(typeInFileMetaData)
.append(primitiveType)
.append("\nValid types for this column are: ")
.append(parquetTypeToClass.get(typeInFileMetaData));
.append(parquetTypeToClass.get(primitiveType));
throw new IllegalArgumentException(message.toString());
}
}

private static final class FullTypeDescriptor {
private final PrimitiveTypeName primitiveType;
private final OriginalType originalType;

private FullTypeDescriptor(PrimitiveTypeName primitiveType, OriginalType originalType) {
this.primitiveType = primitiveType;
this.originalType = originalType;
}

public PrimitiveTypeName getPrimitiveType() {
return primitiveType;
}

public OriginalType getOriginalType() {
return originalType;
}

@Override
public String toString() {
return "FullTypeDescriptor(" + "PrimitiveType: " + primitiveType + ", OriginalType: " + originalType + ')';
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

FullTypeDescriptor that = (FullTypeDescriptor) o;

if (originalType != that.originalType) return false;
if (primitiveType != that.primitiveType) return false;

return true;
}

@Override
public int hashCode() {
int result = primitiveType != null ? primitiveType.hashCode() : 0;
result = 31 * result + (originalType != null ? originalType.hashCode() : 0);
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void testFindsInvalidTypes() {
fail("this should throw");
} catch (IllegalArgumentException e) {
assertEquals("FilterPredicate column: x.bar's declared type (java.lang.Long) does not match the schema found in file metadata. "
+ "Column x.bar is of type: FullTypeDescriptor(PrimitiveType: INT32, OriginalType: null)\n"
+ "Column x.bar is of type: INT32\n"
+ "Valid types for this column are: [class java.lang.Integer]", e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,51 +61,39 @@ public int compareTo(InvalidColumnType o) {

@Test
public void testValidTypes() {
assertTypeValid(intColumn, PrimitiveTypeName.INT32, null);
assertTypeValid(longColumn, PrimitiveTypeName.INT64, null);
assertTypeValid(floatColumn, PrimitiveTypeName.FLOAT, null);
assertTypeValid(doubleColumn, PrimitiveTypeName.DOUBLE, null);
assertTypeValid(booleanColumn, PrimitiveTypeName.BOOLEAN, null);
assertTypeValid(binaryColumn, PrimitiveTypeName.BINARY, null);
assertTypeValid(binaryColumn, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, null);
assertTypeValid(binaryColumn, PrimitiveTypeName.BINARY, OriginalType.UTF8);
assertTypeValid(binaryColumn, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, OriginalType.UTF8);
assertTypeValid(intColumn, PrimitiveTypeName.INT32);
assertTypeValid(longColumn, PrimitiveTypeName.INT64);
assertTypeValid(floatColumn, PrimitiveTypeName.FLOAT);
assertTypeValid(doubleColumn, PrimitiveTypeName.DOUBLE);
assertTypeValid(booleanColumn, PrimitiveTypeName.BOOLEAN);
assertTypeValid(binaryColumn, PrimitiveTypeName.BINARY);
assertTypeValid(binaryColumn, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY);
assertTypeValid(binaryColumn, PrimitiveTypeName.INT96);
}

@Test
public void testMismatchedTypes() {
try {
assertTypeValid(intColumn, PrimitiveTypeName.DOUBLE, null);
assertTypeValid(intColumn, PrimitiveTypeName.DOUBLE);
fail("This should throw!");
} catch (IllegalArgumentException e) {
assertEquals("FilterPredicate column: int.column's declared type (java.lang.Integer) does not match the "
+ "schema found in file metadata. Column int.column is of type: "
+ "FullTypeDescriptor(PrimitiveType: DOUBLE, OriginalType: null)\n"
+ "DOUBLE\n"
+ "Valid types for this column are: [class java.lang.Double]", e.getMessage());
}
}

@Test
public void testUnsupportedType() {
try {
assertTypeValid(invalidColumn, PrimitiveTypeName.INT32, null);
assertTypeValid(invalidColumn, PrimitiveTypeName.INT32);
fail("This should throw!");
} catch (IllegalArgumentException e) {
assertEquals("Column invalid.column was declared as type: "
+ "org.apache.parquet.filter2.predicate.TestValidTypeMap$InvalidColumnType which is not supported "
+ "in FilterPredicates. Supported types for this column are: [class java.lang.Integer]", e.getMessage());
}

try {
assertTypeValid(invalidColumn, PrimitiveTypeName.INT32, OriginalType.UTF8);
fail("This should throw!");
} catch (IllegalArgumentException e) {
assertEquals("Column invalid.column was declared as type: "
+ "org.apache.parquet.filter2.predicate.TestValidTypeMap$InvalidColumnType which is not supported "
+ "in FilterPredicates. There are no supported types for columns of FullTypeDescriptor(PrimitiveType: INT32, OriginalType: UTF8)",
e.getMessage());
}

}

}