diff --git a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java index 6c7475d35cc4..84a58c505ecc 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java +++ b/data/src/test/java/org/apache/iceberg/data/TestLocalScan.java @@ -456,6 +456,8 @@ private DataFile writeFile(String location, String filename, Schema schema, List @Test public void testFilterWithDateAndTimestamp() throws IOException { + // TODO: Add multiple timestamp tests - there's an issue with ORC caching TZ in ThreadLocal, so it's not possible + // to change TZ and test with ORC as they will produce incompatible values. Schema schema = new Schema( required(1, "timestamp_with_zone", Types.TimestampType.withZone()), required(2, "timestamp_without_zone", Types.TimestampType.withoutZone()), diff --git a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java index 826686ef7e96..a18ef5f9d427 100644 --- a/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/orc/TestGenericData.java @@ -65,57 +65,62 @@ public void writeAndValidateRepeatingRecords() throws IOException { @Test public void writeAndValidateTimestamps() throws IOException { - Schema timestampSchema = new Schema( - required(1, "tsTzCol", Types.TimestampType.withZone()), - required(2, "tsCol", Types.TimestampType.withoutZone()) - ); - - // Write using America/New_York timezone - TimeZone.setDefault(TimeZone.getTimeZone("America/New_York")); - GenericRecord record1 = GenericRecord.create(timestampSchema); - record1.setField("tsTzCol", OffsetDateTime.parse("2017-01-16T17:10:34-08:00")); - record1.setField("tsCol", LocalDateTime.parse("1970-01-01T00:01:00")); - GenericRecord record2 = GenericRecord.create(timestampSchema); - record2.setField("tsTzCol", OffsetDateTime.parse("2017-05-16T17:10:34-08:00")); - record2.setField("tsCol", LocalDateTime.parse("1970-05-01T00:01:00")); - GenericRecord record3 = GenericRecord.create(timestampSchema); - record3.setField("tsTzCol", OffsetDateTime.parse("1935-01-16T17:10:34-08:00")); - record3.setField("tsCol", LocalDateTime.parse("1935-01-01T00:01:00")); - GenericRecord record4 = GenericRecord.create(timestampSchema); - record4.setField("tsTzCol", OffsetDateTime.parse("1935-05-16T17:10:34-08:00")); - record4.setField("tsCol", LocalDateTime.parse("1935-05-01T00:01:00")); - - File testFile = temp.newFile(); - Assert.assertTrue("Delete should succeed", testFile.delete()); + TimeZone currentTz = TimeZone.getDefault(); + try { + Schema timestampSchema = new Schema( + required(1, "tsTzCol", Types.TimestampType.withZone()), + required(2, "tsCol", Types.TimestampType.withoutZone()) + ); + + // Write using America/New_York timezone + TimeZone.setDefault(TimeZone.getTimeZone("America/New_York")); + GenericRecord record1 = GenericRecord.create(timestampSchema); + record1.setField("tsTzCol", OffsetDateTime.parse("2017-01-16T17:10:34-08:00")); + record1.setField("tsCol", LocalDateTime.parse("1970-01-01T00:01:00")); + GenericRecord record2 = GenericRecord.create(timestampSchema); + record2.setField("tsTzCol", OffsetDateTime.parse("2017-05-16T17:10:34-08:00")); + record2.setField("tsCol", LocalDateTime.parse("1970-05-01T00:01:00")); + GenericRecord record3 = GenericRecord.create(timestampSchema); + record3.setField("tsTzCol", OffsetDateTime.parse("1935-01-16T17:10:34-08:00")); + record3.setField("tsCol", LocalDateTime.parse("1935-01-01T00:01:00")); + GenericRecord record4 = GenericRecord.create(timestampSchema); + record4.setField("tsTzCol", OffsetDateTime.parse("1935-05-16T17:10:34-08:00")); + record4.setField("tsCol", LocalDateTime.parse("1935-05-01T00:01:00")); + + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = ORC.write(Files.localOutput(testFile)) + .schema(timestampSchema) + .createWriterFunc(GenericOrcWriter::buildWriter) + .build()) { + writer.add(record1); + writer.add(record2); + writer.add(record3); + writer.add(record4); + } - try (FileAppender writer = ORC.write(Files.localOutput(testFile)) - .schema(timestampSchema) - .createWriterFunc(GenericOrcWriter::buildWriter) - .build()) { - writer.add(record1); - writer.add(record2); - writer.add(record3); - writer.add(record4); - } + // Read using Asia/Kolkata timezone + TimeZone.setDefault(TimeZone.getTimeZone("Asia/Kolkata")); + List rows; + try (CloseableIterable reader = ORC.read(Files.localInput(testFile)) + .project(timestampSchema) + .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(timestampSchema, fileSchema)) + .build()) { + rows = Lists.newArrayList(reader); + } - // Read using Asia/Kolkata timezone - TimeZone.setDefault(TimeZone.getTimeZone("Asia/Kolkata")); - List rows; - try (CloseableIterable reader = ORC.read(Files.localInput(testFile)) - .project(timestampSchema) - .createReaderFunc(fileSchema -> GenericOrcReader.buildReader(timestampSchema, fileSchema)) - .build()) { - rows = Lists.newArrayList(reader); + Assert.assertEquals(OffsetDateTime.parse("2017-01-17T01:10:34Z"), rows.get(0).getField("tsTzCol")); + Assert.assertEquals(LocalDateTime.parse("1970-01-01T00:01:00"), rows.get(0).getField("tsCol")); + Assert.assertEquals(OffsetDateTime.parse("2017-05-17T01:10:34Z"), rows.get(1).getField("tsTzCol")); + Assert.assertEquals(LocalDateTime.parse("1970-05-01T00:01:00"), rows.get(1).getField("tsCol")); + Assert.assertEquals(OffsetDateTime.parse("1935-01-17T01:10:34Z"), rows.get(2).getField("tsTzCol")); + Assert.assertEquals(LocalDateTime.parse("1935-01-01T00:01:00"), rows.get(2).getField("tsCol")); + Assert.assertEquals(OffsetDateTime.parse("1935-05-17T01:10:34Z"), rows.get(3).getField("tsTzCol")); + Assert.assertEquals(LocalDateTime.parse("1935-05-01T00:01:00"), rows.get(3).getField("tsCol")); + } finally { + TimeZone.setDefault(currentTz); } - - Assert.assertEquals(OffsetDateTime.parse("2017-01-17T01:10:34Z"), rows.get(0).getField("tsTzCol")); - Assert.assertEquals(LocalDateTime.parse("1970-01-01T00:01:00"), rows.get(0).getField("tsCol")); - Assert.assertEquals(OffsetDateTime.parse("2017-05-17T01:10:34Z"), rows.get(1).getField("tsTzCol")); - Assert.assertEquals(LocalDateTime.parse("1970-05-01T00:01:00"), rows.get(1).getField("tsCol")); - Assert.assertEquals(OffsetDateTime.parse("1935-01-17T01:10:34Z"), rows.get(2).getField("tsTzCol")); - Assert.assertEquals(LocalDateTime.parse("1935-01-01T00:01:00"), rows.get(2).getField("tsCol")); - Assert.assertEquals(OffsetDateTime.parse("1935-05-17T01:10:34Z"), rows.get(3).getField("tsTzCol")); - Assert.assertEquals(LocalDateTime.parse("1935-05-01T00:01:00"), rows.get(3).getField("tsCol")); } private void writeAndValidateRecords(Schema schema, List expected) throws IOException { diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java index a5bcc2f8b161..86b76972a346 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java @@ -22,16 +22,17 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.sql.Timestamp; -import java.time.temporal.ChronoUnit; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Metrics; import org.apache.iceberg.Schema; +import org.apache.iceberg.common.DynFields; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.io.InputFile; @@ -43,7 +44,6 @@ import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.DateTimeUtil; import org.apache.orc.BooleanColumnStatistics; import org.apache.orc.ColumnStatistics; import org.apache.orc.DateColumnStatistics; @@ -55,6 +55,7 @@ import org.apache.orc.TimestampColumnStatistics; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; +import org.apache.orc.impl.ColumnStatisticsImpl; public class OrcMetrics { @@ -157,10 +158,7 @@ private static Optional fromOrcMin(Types.NestedField column, .setScale(((Types.DecimalType) column.type()).scale())) .orElse(null); } else if (columnStats instanceof DateColumnStatistics) { - min = Optional.ofNullable(((DateColumnStatistics) columnStats).getMinimum()) - .map(minStats -> DateTimeUtil.daysFromDate( - DateTimeUtil.EPOCH.plus(minStats.getTime(), ChronoUnit.MILLIS).toLocalDate())) - .orElse(null); + min = Optional.ofNullable(minDayFromEpoch((DateColumnStatistics) columnStats)).orElse(null); } else if (columnStats instanceof TimestampColumnStatistics) { TimestampColumnStatistics tColStats = (TimestampColumnStatistics) columnStats; Timestamp minValue = tColStats.getMinimumUTC(); @@ -196,10 +194,7 @@ private static Optional fromOrcMax(Types.NestedField column, .setScale(((Types.DecimalType) column.type()).scale())) .orElse(null); } else if (columnStats instanceof DateColumnStatistics) { - max = Optional.ofNullable(((DateColumnStatistics) columnStats).getMaximum()) - .map(maxStats -> DateTimeUtil.daysFromDate( - DateTimeUtil.EPOCH.plus(maxStats.getTime(), ChronoUnit.MILLIS).toLocalDate())) - .orElse(null); + max = Optional.ofNullable(maxDayFromEpoch((DateColumnStatistics) columnStats)).orElse(null); } else if (columnStats instanceof TimestampColumnStatistics) { TimestampColumnStatistics tColStats = (TimestampColumnStatistics) columnStats; Timestamp maxValue = tColStats.getMaximumUTC(); @@ -274,4 +269,27 @@ public TypeDescription primitive(Type.PrimitiveType iPrimitive, TypeDescription return primitive; } } + + private static final Class DATE_STATS_IMPL = Stream.of(ColumnStatisticsImpl.class.getDeclaredClasses()) + .filter(statsClass -> "DateStatisticsImpl".equals(statsClass.getSimpleName())) + .findFirst() + .orElse(null); + + private static final DynFields.UnboundField DATE_MINIMUM = DynFields.builder() + .hiddenImpl(DATE_STATS_IMPL, "minimum") + .defaultAlwaysNull() // if the minimum field isn't found, don't add a value + .build(); + + private static final DynFields.UnboundField DATE_MAXIMUM = DynFields.builder() + .hiddenImpl(DATE_STATS_IMPL, "maximum") + .defaultAlwaysNull() // if the minimum field isn't found, don't add a value + .build(); + + private static Integer minDayFromEpoch(DateColumnStatistics stats) { + return DATE_MINIMUM.get(stats); + } + + private static Integer maxDayFromEpoch(DateColumnStatistics stats) { + return DATE_MAXIMUM.get(stats); + } } diff --git a/orc/src/test/java/org/apache/iceberg/orc/TestExpressionToSearchArgument.java b/orc/src/test/java/org/apache/iceberg/orc/TestExpressionToSearchArgument.java index 9ddf8d02368b..9812668b76f6 100644 --- a/orc/src/test/java/org/apache/iceberg/orc/TestExpressionToSearchArgument.java +++ b/orc/src/test/java/org/apache/iceberg/orc/TestExpressionToSearchArgument.java @@ -105,34 +105,38 @@ public void testPrimitiveTypes() { @Test public void testTimezoneSensitiveTypes() { - for (String timezone : new String[]{"America/New_York", "Asia/Kolkata", "UTC/Greenwich"}) { - TimeZone.setDefault(TimeZone.getTimeZone(timezone)); - OffsetDateTime tsTzPredicate = OffsetDateTime.parse("2019-10-02T00:47:28.207366Z"); - OffsetDateTime tsPredicate = OffsetDateTime.parse("1968-01-16T13:07:59.048625Z"); - OffsetDateTime epoch = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + TimeZone currentTz = TimeZone.getDefault(); + try { + for (String timezone : new String[]{"America/New_York", "Asia/Kolkata", "UTC/Greenwich"}) { + TimeZone.setDefault(TimeZone.getTimeZone(timezone)); + OffsetDateTime tsTzPredicate = OffsetDateTime.parse("2019-10-02T00:47:28.207366Z"); + OffsetDateTime tsPredicate = OffsetDateTime.parse("1968-01-16T13:07:59.048625Z"); + OffsetDateTime epoch = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); - Schema schema = new Schema( - required(1, "date", Types.DateType.get()), - required(2, "tsTz", Types.TimestampType.withZone()), - required(3, "ts", Types.TimestampType.withoutZone()) - ); + Schema schema = new Schema( + required(1, "date", Types.DateType.get()), + required(2, "tsTz", Types.TimestampType.withZone()), + required(3, "ts", Types.TimestampType.withoutZone()) + ); - Expression expr = and( - and(equal("date", 10L), equal("tsTz", ChronoUnit.MICROS.between(epoch, tsTzPredicate))), - equal("ts", ChronoUnit.MICROS.between(epoch, tsPredicate)) - ); - Expression boundFilter = Binder.bind(schema.asStruct(), expr, true); - SearchArgument expected = SearchArgumentFactory.newBuilder() - .startAnd() - .equals("`date`", Type.DATE, Date.valueOf(LocalDate.parse("1970-01-11", DateTimeFormatter.ISO_LOCAL_DATE))) - // Temporarily disable filters on Timestamp columns due to ORC-611 - // .equals("`tsTz`", Type.TIMESTAMP, Timestamp.from(tsTzPredicate.toInstant())) - // .equals("`ts`", Type.TIMESTAMP, Timestamp.from(tsPredicate.toInstant())) - .end() - .build(); + Expression expr = and( + and(equal("date", 10L), equal("tsTz", ChronoUnit.MICROS.between(epoch, tsTzPredicate))), + equal("ts", ChronoUnit.MICROS.between(epoch, tsPredicate)) + ); + Expression boundFilter = Binder.bind(schema.asStruct(), expr, true); + SearchArgument expected = SearchArgumentFactory.newBuilder() + .startAnd() + .equals("`date`", Type.DATE, Date.valueOf(LocalDate.parse("1970-01-11", DateTimeFormatter.ISO_LOCAL_DATE))) + // .equals("`tsTz`", Type.TIMESTAMP, Timestamp.from(tsTzPredicate.toInstant())) + // .equals("`ts`", Type.TIMESTAMP, Timestamp.from(tsPredicate.toInstant())) + .end() + .build(); - SearchArgument actual = ExpressionToSearchArgument.convert(boundFilter, ORCSchemaUtil.convert(schema)); - Assert.assertEquals(expected.toString(), actual.toString()); + SearchArgument actual = ExpressionToSearchArgument.convert(boundFilter, ORCSchemaUtil.convert(schema)); + Assert.assertEquals(expected.toString(), actual.toString()); + } + } finally { + TimeZone.setDefault(currentTz); } }