diff --git a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java index 9f1e26ebcc06..13a75dbb17cc 100644 --- a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java @@ -42,6 +42,10 @@ public static int daysFromDate(LocalDate date) { return (int) ChronoUnit.DAYS.between(EPOCH_DAY, date); } + public static int daysFromInstant(Instant instant) { + return (int) ChronoUnit.DAYS.between(EPOCH, instant.atOffset(ZoneOffset.UTC)); + } + public static LocalTime timeFromMicros(long microFromMidnight) { return LocalTime.ofNanoOfDay(microFromMidnight * 1000); } @@ -54,6 +58,10 @@ public static LocalDateTime timestampFromMicros(long microsFromEpoch) { return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch).toLocalDateTime(); } + public static long microsFromInstant(Instant instant) { + return ChronoUnit.MICROS.between(EPOCH, instant.atOffset(ZoneOffset.UTC)); + } + public static long microsFromTimestamp(LocalDateTime dateTime) { return ChronoUnit.MICROS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC)); } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java new file mode 100644 index 000000000000..63e823c65815 --- /dev/null +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.iceberg.common.DynFields; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.util.DateTimeUtil; + +import static org.apache.iceberg.expressions.Expressions.and; +import static org.apache.iceberg.expressions.Expressions.equal; +import static org.apache.iceberg.expressions.Expressions.greaterThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.in; +import static org.apache.iceberg.expressions.Expressions.isNull; +import static org.apache.iceberg.expressions.Expressions.lessThan; +import static org.apache.iceberg.expressions.Expressions.lessThanOrEqual; +import static org.apache.iceberg.expressions.Expressions.not; +import static org.apache.iceberg.expressions.Expressions.or; + + +public class HiveIcebergFilterFactory { + + private HiveIcebergFilterFactory() { + } + + public static Expression generateFilterExpression(SearchArgument sarg) { + return translate(sarg.getExpression(), sarg.getLeaves()); + } + + /** + * Recursive method to traverse down the ExpressionTree to evaluate each expression and its leaf nodes. + * @param tree Current ExpressionTree where the 'top' node is being evaluated. + * @param leaves List of all leaf nodes within the tree. + * @return Expression that is translated from the Hive SearchArgument. + */ + private static Expression translate(ExpressionTree tree, List leaves) { + List childNodes = tree.getChildren(); + switch (tree.getOperator()) { + case OR: + Expression orResult = Expressions.alwaysFalse(); + for (ExpressionTree child : childNodes) { + orResult = or(orResult, translate(child, leaves)); + } + return orResult; + case AND: + Expression result = Expressions.alwaysTrue(); + for (ExpressionTree child : childNodes) { + result = and(result, translate(child, leaves)); + } + return result; + case NOT: + return not(translate(childNodes.get(0), leaves)); + case LEAF: + return translateLeaf(leaves.get(tree.getLeaf())); + case CONSTANT: + throw new UnsupportedOperationException("CONSTANT operator is not supported"); + default: + throw new UnsupportedOperationException("Unknown operator: " + tree.getOperator()); + } + } + + /** + * Translate leaf nodes from Hive operator to Iceberg operator. + * @param leaf Leaf node + * @return Expression fully translated from Hive PredicateLeaf + */ + private static Expression translateLeaf(PredicateLeaf leaf) { + String column = leaf.getColumnName(); + switch (leaf.getOperator()) { + case EQUALS: + return equal(column, leafToLiteral(leaf)); + case LESS_THAN: + return lessThan(column, leafToLiteral(leaf)); + case LESS_THAN_EQUALS: + return lessThanOrEqual(column, leafToLiteral(leaf)); + case IN: + return in(column, leafToLiteralList(leaf)); + case BETWEEN: + List icebergLiterals = leafToLiteralList(leaf); + return and(greaterThanOrEqual(column, icebergLiterals.get(0)), + lessThanOrEqual(column, icebergLiterals.get(1))); + case IS_NULL: + return isNull(column); + default: + throw new UnsupportedOperationException("Unknown operator: " + leaf.getOperator()); + } + } + + // PredicateLeafImpl has a work-around for Kryo serialization with java.util.Date objects where it converts values to + // Timestamp using Date#getTime. This conversion discards microseconds, so this is a necessary to avoid it. + private static final DynFields.UnboundField LITERAL_FIELD = DynFields.builder() + .hiddenImpl(SearchArgumentImpl.PredicateLeafImpl.class, "literal") + .build(); + + private static Object leafToLiteral(PredicateLeaf leaf) { + switch (leaf.getType()) { + case LONG: + case BOOLEAN: + case STRING: + case FLOAT: + return leaf.getLiteral(); + case DATE: + return daysFromTimestamp((Timestamp) leaf.getLiteral()); + case TIMESTAMP: + return microsFromTimestamp((Timestamp) LITERAL_FIELD.get(leaf)); + case DECIMAL: + return hiveDecimalToBigDecimal((HiveDecimalWritable) leaf.getLiteral()); + + default: + throw new UnsupportedOperationException("Unknown type: " + leaf.getType()); + } + } + + private static List leafToLiteralList(PredicateLeaf leaf) { + switch (leaf.getType()) { + case LONG: + case BOOLEAN: + case FLOAT: + case STRING: + return leaf.getLiteralList(); + case DATE: + return leaf.getLiteralList().stream().map(value -> daysFromDate((Date) value)) + .collect(Collectors.toList()); + case DECIMAL: + return leaf.getLiteralList().stream() + .map(value -> hiveDecimalToBigDecimal((HiveDecimalWritable) value)) + .collect(Collectors.toList()); + case TIMESTAMP: + return leaf.getLiteralList().stream() + .map(value -> microsFromTimestamp((Timestamp) value)) + .collect(Collectors.toList()); + default: + throw new UnsupportedOperationException("Unknown type: " + leaf.getType()); + } + } + + private static BigDecimal hiveDecimalToBigDecimal(HiveDecimalWritable hiveDecimalWritable) { + return hiveDecimalWritable.getHiveDecimal().bigDecimalValue().setScale(hiveDecimalWritable.scale()); + } + + private static int daysFromDate(Date date) { + return DateTimeUtil.daysFromDate(date.toLocalDate()); + } + + private static int daysFromTimestamp(Timestamp timestamp) { + return DateTimeUtil.daysFromInstant(timestamp.toInstant()); + } + + private static long microsFromTimestamp(Timestamp timestamp) { + return DateTimeUtil.microsFromInstant(timestamp.toInstant()); + } +} diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java index a7da368332de..a8a31d2bcd96 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java @@ -23,19 +23,44 @@ import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; +import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; +import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.iceberg.data.Record; +import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.mr.InputFormatConfig; +import org.apache.iceberg.mr.SerializationUtil; import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat; import org.apache.iceberg.mr.mapreduce.IcebergSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HiveIcebergInputFormat extends MapredIcebergInputFormat implements CombineHiveInputFormat.AvoidSplitCombination { + private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergInputFormat.class); + @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + // Convert Hive filter to Iceberg filter + String hiveFilter = job.get(TableScanDesc.FILTER_EXPR_CONF_STR); + if (hiveFilter != null) { + ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities + .deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class); + SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc); + try { + Expression filter = HiveIcebergFilterFactory.generateFilterExpression(sarg); + job.set(InputFormatConfig.FILTER_EXPRESSION, SerializationUtil.serializeToBase64(filter)); + } catch (UnsupportedOperationException e) { + LOG.warn("Unable to create Iceberg filter, continuing without filter (will be applied by Hive later): ", e); + } + } + String location = job.get(InputFormatConfig.TABLE_LOCATION); return Arrays.stream(super.getSplits(job, numSplits)) .map(split -> new HiveIcebergSplit((IcebergSplit) split, location)) diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java new file mode 100644 index 000000000000..5dc327358df6 --- /dev/null +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.mr.hive; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.ZoneOffset; +import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.iceberg.expressions.And; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.expressions.Not; +import org.apache.iceberg.expressions.Or; +import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestHiveIcebergFilterFactory { + + @Test + public void testEqualsOperand() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startAnd().equals("salary", PredicateLeaf.Type.LONG, 3000L).end().build(); + + UnboundPredicate expected = Expressions.equal("salary", 3000L); + UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertPredicatesMatch(expected, actual); + } + + @Test + public void testNotEqualsOperand() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startNot().equals("salary", PredicateLeaf.Type.LONG, 3000L).end().build(); + + Not expected = (Not) Expressions.not(Expressions.equal("salary", 3000L)); + Not actual = (Not) HiveIcebergFilterFactory.generateFilterExpression(arg); + + UnboundPredicate childExpressionActual = (UnboundPredicate) actual.child(); + UnboundPredicate childExpressionExpected = Expressions.equal("salary", 3000L); + + assertEquals(actual.op(), expected.op()); + assertEquals(actual.child().op(), expected.child().op()); + assertEquals(childExpressionActual.ref().name(), childExpressionExpected.ref().name()); + assertEquals(childExpressionActual.literal(), childExpressionExpected.literal()); + } + + @Test + public void testLessThanOperand() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startAnd().lessThan("salary", PredicateLeaf.Type.LONG, 3000L).end().build(); + + UnboundPredicate expected = Expressions.lessThan("salary", 3000L); + UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertEquals(actual.op(), expected.op()); + assertEquals(actual.literal(), expected.literal()); + assertEquals(actual.ref().name(), expected.ref().name()); + } + + @Test + public void testLessThanEqualsOperand() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startAnd().lessThanEquals("salary", PredicateLeaf.Type.LONG, 3000L).end().build(); + + UnboundPredicate expected = Expressions.lessThanOrEqual("salary", 3000L); + UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertPredicatesMatch(expected, actual); + } + + @Test + public void testInOperand() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startAnd().in("salary", PredicateLeaf.Type.LONG, 3000L, 4000L).end().build(); + + UnboundPredicate expected = Expressions.in("salary", 3000L, 4000L); + UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertEquals(actual.op(), expected.op()); + assertEquals(actual.literals(), expected.literals()); + assertEquals(actual.ref().name(), expected.ref().name()); + } + + @Test + public void testBetweenOperand() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder + .startAnd() + .between("salary", PredicateLeaf.Type.LONG, 3000L, 4000L).end().build(); + + And expected = (And) Expressions.and(Expressions.greaterThanOrEqual("salary", 3000L), + Expressions.lessThanOrEqual("salary", 3000L)); + And actual = (And) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertEquals(actual.op(), expected.op()); + assertEquals(actual.left().op(), expected.left().op()); + assertEquals(actual.right().op(), expected.right().op()); + } + + @Test + public void testIsNullOperand() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startAnd().isNull("salary", PredicateLeaf.Type.LONG).end().build(); + + UnboundPredicate expected = Expressions.isNull("salary"); + UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertEquals(actual.op(), expected.op()); + assertEquals(actual.ref().name(), expected.ref().name()); + } + + @Test + public void testAndOperand() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder + .startAnd() + .equals("salary", PredicateLeaf.Type.LONG, 3000L) + .equals("salary", PredicateLeaf.Type.LONG, 4000L) + .end().build(); + + And expected = (And) Expressions + .and(Expressions.equal("salary", 3000L), Expressions.equal("salary", 4000L)); + And actual = (And) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertEquals(actual.op(), expected.op()); + assertEquals(actual.left().op(), expected.left().op()); + assertEquals(actual.right().op(), expected.right().op()); + } + + @Test + public void testOrOperand() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder + .startOr() + .equals("salary", PredicateLeaf.Type.LONG, 3000L) + .equals("salary", PredicateLeaf.Type.LONG, 4000L) + .end().build(); + + Or expected = (Or) Expressions + .or(Expressions.equal("salary", 3000L), Expressions.equal("salary", 4000L)); + Or actual = (Or) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertEquals(actual.op(), expected.op()); + assertEquals(actual.left().op(), expected.left().op()); + assertEquals(actual.right().op(), expected.right().op()); + } + + @Test + public void testStringType() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startAnd().equals("string", PredicateLeaf.Type.STRING, "Joe").end().build(); + + UnboundPredicate expected = Expressions.equal("string", "Joe"); + UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertPredicatesMatch(expected, actual); + } + + @Test + public void testFloatType() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startAnd().equals("float", PredicateLeaf.Type.FLOAT, 1200D).end().build(); + + UnboundPredicate expected = Expressions.equal("float", 1200D); + UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertPredicatesMatch(expected, actual); + } + + @Test + public void testBooleanType() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startAnd().equals("boolean", PredicateLeaf.Type.BOOLEAN, true).end().build(); + + UnboundPredicate expected = Expressions.equal("boolean", true); + UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertPredicatesMatch(expected, actual); + } + + @Test + public void testDateType() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startAnd().equals("date", PredicateLeaf.Type.DATE, + Date.valueOf("2015-11-12")).end().build(); + + UnboundPredicate expected = Expressions.equal("date", Literal.of("2015-11-12").to(Types.DateType.get()).value()); + UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertPredicatesMatch(expected, actual); + } + + @Test + public void testTimestampType() { + Literal timestampLiteral = Literal.of("2012-10-02T05:16:17.123456").to(Types.TimestampType.withoutZone()); + long timestampMicros = timestampLiteral.value(); + Timestamp ts = Timestamp.from(DateTimeUtil.timestampFromMicros(timestampMicros).toInstant(ZoneOffset.UTC)); + + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startAnd().equals("timestamp", PredicateLeaf.Type.TIMESTAMP, ts).end().build(); + + UnboundPredicate expected = Expressions.equal("timestamp", timestampMicros); + UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertPredicatesMatch(expected, actual); + } + + @Test + public void testDecimalType() { + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); + SearchArgument arg = builder.startAnd().equals("decimal", PredicateLeaf.Type.DECIMAL, + new HiveDecimalWritable("20.12")).end().build(); + + UnboundPredicate expected = Expressions.equal("decimal", new BigDecimal("20.12")); + UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); + + assertPredicatesMatch(expected, actual); + } + + private void assertPredicatesMatch(UnboundPredicate expected, UnboundPredicate actual) { + assertEquals(expected.op(), actual.op()); + assertEquals(expected.literal(), actual.literal()); + assertEquals(expected.ref().name(), actual.ref().name()); + } +}