Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public interface ContentFile<F> {
*/
Long pos();

/**
* Return id of the schema when write this file.
*/
int schemaId();

/**
* Returns id of the partition spec used for partition metadata.
*/
Expand Down
6 changes: 4 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,12 @@ public interface DataFile extends ContentFile<DataFile> {
"Equality comparison field IDs");
Types.NestedField SORT_ORDER_ID = optional(140, "sort_order_id", IntegerType.get(), "Sort order ID");
Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID");
Types.NestedField SCHEMA_ID = optional(142, "schema_id", IntegerType.get(), "Schema ID");

int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
// NEXT ID TO ASSIGN: 142
// NEXT ID TO ASSIGN: 143

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
Expand All @@ -88,7 +89,8 @@ static StructType getType(StructType partitionType) {
KEY_METADATA,
SPLIT_OFFSETS,
EQUALITY_IDS,
SORT_ORDER_ID
SORT_ORDER_ID,
SCHEMA_ID
);
}

Expand Down
11 changes: 9 additions & 2 deletions api/src/main/java/org/apache/iceberg/ManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ public interface ManifestFile {
"Summary for each partition");
Types.NestedField KEY_METADATA = optional(519, "key_metadata", Types.BinaryType.get(),
"Encryption key metadata blob");
// next ID to assign: 520
Types.NestedField SCHEMA_ID = optional(520, "schema_id", Types.IntegerType.get(), "");
// next ID to assign: 521

Schema SCHEMA = new Schema(
PATH, LENGTH, SPEC_ID, MANIFEST_CONTENT,
SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, SNAPSHOT_ID,
ADDED_FILES_COUNT, EXISTING_FILES_COUNT, DELETED_FILES_COUNT,
ADDED_ROWS_COUNT, EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT,
PARTITION_SUMMARIES, KEY_METADATA);
PARTITION_SUMMARIES, KEY_METADATA, SCHEMA_ID);

static Schema schema() {
return SCHEMA;
Expand Down Expand Up @@ -188,6 +189,12 @@ default ByteBuffer keyMetadata() {
return null;
}

/**
* Returns ID of the {@link Schema} used to write the manifest file. This could be -1 when not all the
* entries have the same schema ID.
*/
int schemaId();

/**
* Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
* this method to make defensive copies.
Expand Down
165 changes: 165 additions & 0 deletions api/src/main/java/org/apache/iceberg/expressions/SchemaEvaluator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.expressions;

import java.io.Serializable;
import java.util.Set;
import org.apache.iceberg.Schema;
import org.apache.iceberg.types.Types;

import static org.apache.iceberg.expressions.Expressions.rewriteNot;

/**
* Evaluates an {@link Expression} on a {@link Schema} to test whether the file with the schema may match.
*/
public class SchemaEvaluator implements Serializable {
private final Expression expr;

public SchemaEvaluator(Types.StructType struct, Expression unbound) {
this(struct, unbound, true);
}

public SchemaEvaluator(Types.StructType struct, Expression unbound, boolean caseSensitive) {
this.expr = Binder.bind(struct, rewriteNot(unbound), caseSensitive);
}

public boolean eval(Schema schema) {
return new EvalVisitor().eval(schema);
}

private class EvalVisitor extends ExpressionVisitors.BoundVisitor<Boolean> {
private Schema schema;

private boolean eval(Schema evalSchema) {
this.schema = evalSchema;
return ExpressionVisitors.visitEvaluator(expr, this);
}

@Override
public Boolean alwaysTrue() {
return true;
}

@Override
public Boolean alwaysFalse() {
return false;
}

@Override
public Boolean not(Boolean result) {
return !result;
}

@Override
public Boolean and(Boolean leftResult, Boolean rightResult) {
return leftResult && rightResult;
}

@Override
public Boolean or(Boolean leftResult, Boolean rightResult) {
return leftResult || rightResult;
}

@Override
public <T> Boolean isNull(Bound<T> valueExpr) {
// column exists, it could be null
// column does not exist, it is null
return true;
}

@Override
public <T> Boolean notNull(Bound<T> valueExpr) {
return columnExists(valueExpr);
}

@Override
public <T> Boolean isNaN(Bound<T> valueExpr) {
return columnExists(valueExpr);
}

@Override
public <T> Boolean notNaN(Bound<T> valueExpr) {
// column exists, it could be NaN
// column does not exist, column value is null. Null could not be NaN.
return true;
}

@Override
public <T> Boolean lt(Bound<T> valueExpr, Literal<T> lit) {
return columnExists(valueExpr);
}

@Override
public <T> Boolean ltEq(Bound<T> valueExpr, Literal<T> lit) {
return columnExists(valueExpr);
}

@Override
public <T> Boolean gt(Bound<T> valueExpr, Literal<T> lit) {
return columnExists(valueExpr);
}

@Override
public <T> Boolean gtEq(Bound<T> valueExpr, Literal<T> lit) {
return columnExists(valueExpr);
}

@Override
public <T> Boolean eq(Bound<T> valueExpr, Literal<T> lit) {
// lit could not be null, so it should be return true when column exists
return columnExists(valueExpr);
}

@Override
public <T> Boolean notEq(Bound<T> valueExpr, Literal<T> lit) {
// column exists, it could be not equal to lit
// column does not exist, column value is null. Null not equal to lit because lit could not be null
return true;
}

@Override
public <T> Boolean in(Bound<T> valueExpr, Set<T> literalSet) {
return columnExists(valueExpr);
}

@Override
public <T> Boolean notIn(Bound<T> valueExpr, Set<T> literalSet) {
// column exists, it could be not in literalSet
// column does not exist, column values it null. Null could not be in literalSet
return true;
}

@Override
public <T> Boolean startsWith(Bound<T> valueExpr, Literal<T> lit) {
return columnExists(valueExpr);
}

@Override
public <T> Boolean notStartsWith(Bound<T> valueExpr, Literal<T> lit) {
// column exists, it could be not start with lit.
// column does not exist, it is not start with lit.
return true;
}

private <T> boolean columnExists(Bound<T> valueExpr) {
return schema.findField(valueExpr.ref().fieldId()) != null;
}
}
}
10 changes: 10 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ public ByteBuffer keyMetadata() {
return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata);
}

@Override
public int schemaId() {
return -1;
}

@Override
public ManifestFile copy() {
return this;
Expand Down Expand Up @@ -476,6 +481,11 @@ public ByteBuffer keyMetadata() {
return null;
}

@Override
public int schemaId() {
return -1;
}

@Override
public DataFile copy() {
return this;
Expand Down
Loading