diff --git a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java index 9f1e26ebcc06..13a75dbb17cc 100644 --- a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java +++ b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java @@ -42,6 +42,10 @@ public static int daysFromDate(LocalDate date) { return (int) ChronoUnit.DAYS.between(EPOCH_DAY, date); } + public static int daysFromInstant(Instant instant) { + return (int) ChronoUnit.DAYS.between(EPOCH, instant.atOffset(ZoneOffset.UTC)); + } + public static LocalTime timeFromMicros(long microFromMidnight) { return LocalTime.ofNanoOfDay(microFromMidnight * 1000); } @@ -54,6 +58,10 @@ public static LocalDateTime timestampFromMicros(long microsFromEpoch) { return ChronoUnit.MICROS.addTo(EPOCH, microsFromEpoch).toLocalDateTime(); } + public static long microsFromInstant(Instant instant) { + return ChronoUnit.MICROS.between(EPOCH, instant.atOffset(ZoneOffset.UTC)); + } + public static long microsFromTimestamp(LocalDateTime dateTime) { return ChronoUnit.MICROS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC)); } diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java index 314832155588..63e823c65815 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java @@ -23,12 +23,13 @@ import java.sql.Date; import java.sql.Timestamp; import java.util.List; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.iceberg.common.DynFields; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.util.DateTimeUtil; @@ -112,6 +113,12 @@ private static Expression translateLeaf(PredicateLeaf leaf) { } } + // PredicateLeafImpl has a work-around for Kryo serialization with java.util.Date objects where it converts values to + // Timestamp using Date#getTime. This conversion discards microseconds, so this is a necessary to avoid it. + private static final DynFields.UnboundField LITERAL_FIELD = DynFields.builder() + .hiddenImpl(SearchArgumentImpl.PredicateLeafImpl.class, "literal") + .build(); + private static Object leafToLiteral(PredicateLeaf leaf) { switch (leaf.getType()) { case LONG: @@ -120,9 +127,9 @@ private static Object leafToLiteral(PredicateLeaf leaf) { case FLOAT: return leaf.getLiteral(); case DATE: + return daysFromTimestamp((Timestamp) leaf.getLiteral()); case TIMESTAMP: - // Hive converts a Date type to a Timestamp internally when retrieving literal - return timestampToUnixEpoch((Timestamp) leaf.getLiteral()); + return microsFromTimestamp((Timestamp) LITERAL_FIELD.get(leaf)); case DECIMAL: return hiveDecimalToBigDecimal((HiveDecimalWritable) leaf.getLiteral()); @@ -139,7 +146,7 @@ private static List leafToLiteralList(PredicateLeaf leaf) { case STRING: return leaf.getLiteralList(); case DATE: - return leaf.getLiteralList().stream().map(value -> dateToMicros((Date) value)) + return leaf.getLiteralList().stream().map(value -> daysFromDate((Date) value)) .collect(Collectors.toList()); case DECIMAL: return leaf.getLiteralList().stream() @@ -147,22 +154,26 @@ private static List leafToLiteralList(PredicateLeaf leaf) { .collect(Collectors.toList()); case TIMESTAMP: return leaf.getLiteralList().stream() - .map(value -> timestampToUnixEpoch((Timestamp) value)) + .map(value -> microsFromTimestamp((Timestamp) value)) .collect(Collectors.toList()); default: throw new UnsupportedOperationException("Unknown type: " + leaf.getType()); } } - private static long dateToMicros(Date date) { - return TimeUnit.MILLISECONDS.toMicros(date.toInstant().toEpochMilli()); - } - private static BigDecimal hiveDecimalToBigDecimal(HiveDecimalWritable hiveDecimalWritable) { return hiveDecimalWritable.getHiveDecimal().bigDecimalValue().setScale(hiveDecimalWritable.scale()); } - private static long timestampToUnixEpoch(Timestamp timestamp) { - return DateTimeUtil.microsFromTimestamp(timestamp.toLocalDateTime()); + private static int daysFromDate(Date date) { + return DateTimeUtil.daysFromDate(date.toLocalDate()); + } + + private static int daysFromTimestamp(Timestamp timestamp) { + return DateTimeUtil.daysFromInstant(timestamp.toInstant()); + } + + private static long microsFromTimestamp(Timestamp timestamp) { + return DateTimeUtil.microsFromInstant(timestamp.toInstant()); } } diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java index 5ae23cd07ea1..17f48626aebd 100644 --- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java +++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergFilterFactory.java @@ -23,15 +23,18 @@ import java.sql.Date; import java.sql.Timestamp; import java.time.LocalDateTime; +import java.time.ZoneOffset; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.iceberg.expressions.And; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.expressions.Not; import org.apache.iceberg.expressions.Or; import org.apache.iceberg.expressions.UnboundPredicate; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.junit.Test; @@ -207,7 +210,7 @@ public void testDateType() { SearchArgument arg = builder.startAnd().equals("date", PredicateLeaf.Type.DATE, Date.valueOf("2015-11-12")).end().build(); - UnboundPredicate expected = Expressions.equal("date", 1447286400000000L); + UnboundPredicate expected = Expressions.equal("date", Literal.of("2015-11-12").to(Types.DateType.get()).value()); UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); assertPredicatesMatch(expected, actual); @@ -215,12 +218,14 @@ public void testDateType() { @Test public void testTimestampType() { + Literal timestampLiteral = Literal.of("2012-10-02T05:16:17.123456").to(Types.TimestampType.withoutZone()); + long timestampMicros = timestampLiteral.value(); + Timestamp ts = Timestamp.from(DateTimeUtil.timestampFromMicros(timestampMicros).toInstant(ZoneOffset.UTC)); + SearchArgument.Builder builder = SearchArgumentFactory.newBuilder(); - SearchArgument arg = builder.startAnd().equals("timestamp", PredicateLeaf.Type.TIMESTAMP, - Timestamp.valueOf("2012-10-02 05:16:17.123")).end().build(); - LocalDateTime dateTime = LocalDateTime.of(2012, 10, 2, 5, 16, 17, 123000000); + SearchArgument arg = builder.startAnd().equals("timestamp", PredicateLeaf.Type.TIMESTAMP, ts).end().build(); - UnboundPredicate expected = Expressions.equal("timestamp", DateTimeUtil.microsFromTimestamp(dateTime)); + UnboundPredicate expected = Expressions.equal("timestamp", timestampMicros); UnboundPredicate actual = (UnboundPredicate) HiveIcebergFilterFactory.generateFilterExpression(arg); assertPredicatesMatch(expected, actual);