Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public static int daysFromDate(LocalDate date) {
return (int) ChronoUnit.DAYS.between(EPOCH_DAY, date);
}

public static int daysFromInstant(Instant instant) {
return (int) ChronoUnit.DAYS.between(EPOCH, instant.atOffset(ZoneOffset.UTC));
}

public static LocalTime timeFromMicros(long microFromMidnight) {
return LocalTime.ofNanoOfDay(microFromMidnight * 1000);
}
Expand All @@ -54,6 +58,10 @@ public static LocalDateTime timestampFromMicros(long microsFromEpoch) {
return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch).toLocalDateTime();
}

public static long microsFromInstant(Instant instant) {
return ChronoUnit.MICROS.between(EPOCH, instant.atOffset(ZoneOffset.UTC));
}

public static long microsFromTimestamp(LocalDateTime dateTime) {
return ChronoUnit.MICROS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.util.DateTimeUtil;

import static org.apache.iceberg.expressions.Expressions.and;
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual;
import static org.apache.iceberg.expressions.Expressions.in;
import static org.apache.iceberg.expressions.Expressions.isNull;
import static org.apache.iceberg.expressions.Expressions.lessThan;
import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual;
import static org.apache.iceberg.expressions.Expressions.not;
import static org.apache.iceberg.expressions.Expressions.or;


public class HiveIcebergFilterFactory {

private HiveIcebergFilterFactory() {
}

public static Expression generateFilterExpression(SearchArgument sarg) {
return translate(sarg.getExpression(), sarg.getLeaves());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe logging would be nice here minimally on DEBUG level, but maybe on INFO level, like:

LOG.info("Translated sarg=[{}] to expression=[{}]", sarg, expression);

Not sure about the toString implementations, but the general idea would be to see what went in and what came out.
Also we can add this later, just noting here so we do not forget :D

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed filters are logged in the scan, so the translated expression is already logged. I assume that Hive also logs the filters that it is pushing, so I don't think this is necessary.

}

/**
* Recursive method to traverse down the ExpressionTree to evaluate each expression and its leaf nodes.
* @param tree Current ExpressionTree where the 'top' node is being evaluated.
* @param leaves List of all leaf nodes within the tree.
* @return Expression that is translated from the Hive SearchArgument.
*/
private static Expression translate(ExpressionTree tree, List<PredicateLeaf> leaves) {
List<ExpressionTree> childNodes = tree.getChildren();
switch (tree.getOperator()) {
case OR:
Expression orResult = Expressions.alwaysFalse();
for (ExpressionTree child : childNodes) {
orResult = or(orResult, translate(child, leaves));
}
return orResult;
case AND:
Expression result = Expressions.alwaysTrue();
for (ExpressionTree child : childNodes) {
result = and(result, translate(child, leaves));
}
return result;
case NOT:
return not(translate(childNodes.get(0), leaves));
case LEAF:
return translateLeaf(leaves.get(tree.getLeaf()));
case CONSTANT:
throw new UnsupportedOperationException("CONSTANT operator is not supported");
default:
throw new UnsupportedOperationException("Unknown operator: " + tree.getOperator());
}
}

/**
* Translate leaf nodes from Hive operator to Iceberg operator.
* @param leaf Leaf node
* @return Expression fully translated from Hive PredicateLeaf
*/
private static Expression translateLeaf(PredicateLeaf leaf) {
String column = leaf.getColumnName();
switch (leaf.getOperator()) {
case EQUALS:
return equal(column, leafToLiteral(leaf));
case LESS_THAN:
return lessThan(column, leafToLiteral(leaf));
case LESS_THAN_EQUALS:
return lessThanOrEqual(column, leafToLiteral(leaf));
case IN:
return in(column, leafToLiteralList(leaf));
case BETWEEN:
List<Object> icebergLiterals = leafToLiteralList(leaf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to validate that there are only two literals here, or is this reliable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that we can expect there to be 2, as we're using the BETWEEN operator and Hive wouldn't accept more than 2 arguments

return and(greaterThanOrEqual(column, icebergLiterals.get(0)),
lessThanOrEqual(column, icebergLiterals.get(1)));
case IS_NULL:
return isNull(column);
default:
throw new UnsupportedOperationException("Unknown operator: " + leaf.getOperator());
}
}

// PredicateLeafImpl has a work-around for Kryo serialization with java.util.Date objects where it converts values to
// Timestamp using Date#getTime. This conversion discards microseconds, so this is a necessary to avoid it.
private static final DynFields.UnboundField<?> LITERAL_FIELD = DynFields.builder()
.hiddenImpl(SearchArgumentImpl.PredicateLeafImpl.class, "literal")
.build();

private static Object leafToLiteral(PredicateLeaf leaf) {
switch (leaf.getType()) {
case LONG:
case BOOLEAN:
case STRING:
case FLOAT:
return leaf.getLiteral();
case DATE:
return daysFromTimestamp((Timestamp) leaf.getLiteral());
case TIMESTAMP:
return microsFromTimestamp((Timestamp) LITERAL_FIELD.get(leaf));
case DECIMAL:
return hiveDecimalToBigDecimal((HiveDecimalWritable) leaf.getLiteral());

default:
throw new UnsupportedOperationException("Unknown type: " + leaf.getType());
}
}

private static List<Object> leafToLiteralList(PredicateLeaf leaf) {
switch (leaf.getType()) {
case LONG:
case BOOLEAN:
case FLOAT:
case STRING:
return leaf.getLiteralList();
case DATE:
return leaf.getLiteralList().stream().map(value -> daysFromDate((Date) value))
.collect(Collectors.toList());
case DECIMAL:
return leaf.getLiteralList().stream()
.map(value -> hiveDecimalToBigDecimal((HiveDecimalWritable) value))
.collect(Collectors.toList());
case TIMESTAMP:
return leaf.getLiteralList().stream()
.map(value -> microsFromTimestamp((Timestamp) value))
.collect(Collectors.toList());
default:
throw new UnsupportedOperationException("Unknown type: " + leaf.getType());
}
}

private static BigDecimal hiveDecimalToBigDecimal(HiveDecimalWritable hiveDecimalWritable) {
return hiveDecimalWritable.getHiveDecimal().bigDecimalValue().setScale(hiveDecimalWritable.scale());
}

private static int daysFromDate(Date date) {
return DateTimeUtil.daysFromDate(date.toLocalDate());
}

private static int daysFromTimestamp(Timestamp timestamp) {
return DateTimeUtil.daysFromInstant(timestamp.toInstant());
}

private static long microsFromTimestamp(Timestamp timestamp) {
return DateTimeUtil.microsFromInstant(timestamp.toInstant());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,44 @@
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.SerializationUtil;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
import org.apache.iceberg.mr.mapreduce.IcebergSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
implements CombineHiveInputFormat.AvoidSplitCombination {

private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergInputFormat.class);

@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
// Convert Hive filter to Iceberg filter
String hiveFilter = job.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (hiveFilter != null) {
ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc);
try {
Expression filter = HiveIcebergFilterFactory.generateFilterExpression(sarg);
job.set(InputFormatConfig.FILTER_EXPRESSION, SerializationUtil.serializeToBase64(filter));
} catch (UnsupportedOperationException e) {
LOG.warn("Unable to create Iceberg filter, continuing without filter (will be applied by Hive later): ", e);
}
}

String location = job.get(InputFormatConfig.TABLE_LOCATION);
return Arrays.stream(super.getSplits(job, numSplits))
.map(split -> new HiveIcebergSplit((IcebergSplit) split, location))
Expand Down
Loading