Skip to content

Commit c7f97ae

Browse files
shardulm94rzhang10
authored andcommitted
Spark: Allow reading timestamp without timezone
Cherry-picked PR 48: Spark: Allow reading timestamp without time zone Fix style and refactor read-timestamp-without-zone option to constant after rebase
1 parent 7e57487 commit c7f97ae

File tree

11 files changed

+344
-15
lines changed

11 files changed

+344
-15
lines changed

spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,11 +202,6 @@ public Type primitive(Type.PrimitiveType primitive) {
202202
"Cannot project decimal with incompatible precision: %s < %s",
203203
requestedDecimal.precision(), decimal.precision());
204204
break;
205-
case TIMESTAMP:
206-
Types.TimestampType timestamp = (Types.TimestampType) primitive;
207-
Preconditions.checkArgument(timestamp.shouldAdjustToUTC(),
208-
"Cannot project timestamp (without time zone) as timestamptz (with time zone)");
209-
break;
210205
default:
211206
}
212207

spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,6 @@ private SparkReadOptions() {
4747

4848
// Overrides the table's read.parquet.vectorization.batch-size
4949
public static final String VECTORIZATION_BATCH_SIZE = "batch-size";
50+
51+
public static final String READ_TIMESTAMP_WITHOUT_ZONE = "read-timestamp-without-zone";
5052
}

spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,7 @@ public DataType primitive(Type.PrimitiveType primitive) {
104104
throw new UnsupportedOperationException(
105105
"Spark does not support time fields");
106106
case TIMESTAMP:
107-
Types.TimestampType timestamp = (Types.TimestampType) primitive;
108-
if (timestamp.shouldAdjustToUTC()) {
109-
return TimestampType$.MODULE$;
110-
}
111-
throw new UnsupportedOperationException(
112-
"Spark does not support timestamp without time zone fields");
107+
return TimestampType$.MODULE$;
113108
case STRING:
114109
return StringType$.MODULE$;
115110
case UUID:

spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ public OrcValueReader<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
104104
return OrcValueReaders.floats();
105105
case DOUBLE:
106106
return OrcValueReaders.doubles();
107+
case TIMESTAMP:
107108
case TIMESTAMP_INSTANT:
108109
return SparkOrcValueReaders.timestampTzs();
109110
case DECIMAL:

spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primit
127127
case DOUBLE:
128128
primitiveValueReader = OrcValueReaders.doubles();
129129
break;
130+
case TIMESTAMP:
130131
case TIMESTAMP_INSTANT:
131132
primitiveValueReader = SparkOrcValueReaders.timestampTzs();
132133
break;

spark/src/test/java/org/apache/iceberg/spark/data/GenericsHelpers.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.sql.Timestamp;
2525
import java.time.Instant;
2626
import java.time.LocalDate;
27+
import java.time.LocalDateTime;
2728
import java.time.OffsetDateTime;
2829
import java.time.ZoneOffset;
2930
import java.time.temporal.ChronoUnit;
@@ -122,13 +123,19 @@ private static void assertEqualsSafe(Type type, Object expected, Object actual)
122123
Assert.assertEquals("ISO-8601 date should be equal", expected.toString(), actual.toString());
123124
break;
124125
case TIMESTAMP:
125-
Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime);
126126
Assert.assertTrue("Should be a Timestamp", actual instanceof Timestamp);
127127
Timestamp ts = (Timestamp) actual;
128128
// milliseconds from nanos has already been added by getTime
129129
OffsetDateTime actualTs = EPOCH.plusNanos(
130130
(ts.getTime() * 1_000_000) + (ts.getNanos() % 1_000_000));
131-
Assert.assertEquals("Timestamp should be equal", expected, actualTs);
131+
Types.TimestampType timestampType = (Types.TimestampType) type;
132+
if (timestampType.shouldAdjustToUTC()) {
133+
Assert.assertTrue("Should expect an OffsetDateTime", expected instanceof OffsetDateTime);
134+
Assert.assertEquals("Timestamp should be equal", expected, actualTs);
135+
} else {
136+
Assert.assertTrue("Should expect an LocalDateTime", expected instanceof LocalDateTime);
137+
Assert.assertEquals("Timestamp should be equal", expected, actualTs.toLocalDateTime());
138+
}
132139
break;
133140
case STRING:
134141
Assert.assertTrue("Should be a String", actual instanceof String);
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.spark.source;
21+
22+
import java.io.File;
23+
import java.io.IOException;
24+
import java.time.LocalDateTime;
25+
import java.util.List;
26+
import java.util.Locale;
27+
import java.util.UUID;
28+
import java.util.stream.Collectors;
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.iceberg.DataFile;
31+
import org.apache.iceberg.DataFiles;
32+
import org.apache.iceberg.FileFormat;
33+
import org.apache.iceberg.PartitionSpec;
34+
import org.apache.iceberg.Schema;
35+
import org.apache.iceberg.Table;
36+
import org.apache.iceberg.data.GenericAppenderFactory;
37+
import org.apache.iceberg.data.GenericRecord;
38+
import org.apache.iceberg.data.Record;
39+
import org.apache.iceberg.hadoop.HadoopTables;
40+
import org.apache.iceberg.io.FileAppender;
41+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
42+
import org.apache.iceberg.spark.data.GenericsHelpers;
43+
import org.apache.iceberg.types.Types;
44+
import org.apache.spark.sql.Dataset;
45+
import org.apache.spark.sql.Row;
46+
import org.apache.spark.sql.SparkSession;
47+
import org.junit.AfterClass;
48+
import org.junit.Assert;
49+
import org.junit.Before;
50+
import org.junit.BeforeClass;
51+
import org.junit.Rule;
52+
import org.junit.Test;
53+
import org.junit.rules.ExpectedException;
54+
import org.junit.rules.TemporaryFolder;
55+
import org.junit.runner.RunWith;
56+
import org.junit.runners.Parameterized;
57+
58+
import static org.apache.iceberg.Files.localOutput;
59+
60+
@RunWith(Parameterized.class)
61+
public abstract class TestTimestampWithoutZone {
62+
private static final Configuration CONF = new Configuration();
63+
private static final HadoopTables TABLES = new HadoopTables(CONF);
64+
65+
private static final Schema SCHEMA = new Schema(
66+
Types.NestedField.required(1, "id", Types.LongType.get()),
67+
Types.NestedField.optional(2, "ts", Types.TimestampType.withoutZone()),
68+
Types.NestedField.optional(3, "data", Types.StringType.get())
69+
);
70+
71+
private static SparkSession spark = null;
72+
73+
@BeforeClass
74+
public static void startSpark() {
75+
TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate();
76+
}
77+
78+
@AfterClass
79+
public static void stopSpark() {
80+
SparkSession currentSpark = TestTimestampWithoutZone.spark;
81+
TestTimestampWithoutZone.spark = null;
82+
currentSpark.stop();
83+
}
84+
85+
@Rule
86+
public TemporaryFolder temp = new TemporaryFolder();
87+
88+
private final String format;
89+
private final boolean vectorized;
90+
91+
@Parameterized.Parameters(name = "format = {0}, vectorized = {1}")
92+
public static Object[][] parameters() {
93+
return new Object[][] {
94+
{ "parquet", false },
95+
{ "parquet", true },
96+
{ "avro", false },
97+
{ "orc", false },
98+
{ "orc", true }
99+
};
100+
}
101+
102+
public TestTimestampWithoutZone(String format, boolean vectorized) {
103+
this.format = format;
104+
this.vectorized = vectorized;
105+
}
106+
107+
private File parent = null;
108+
private File unpartitioned = null;
109+
private List<Record> records = null;
110+
111+
@Before
112+
public void writeUnpartitionedTable() throws IOException {
113+
this.parent = temp.newFolder("TestTimestampWithoutZone");
114+
this.unpartitioned = new File(parent, "unpartitioned");
115+
File dataFolder = new File(unpartitioned, "data");
116+
Assert.assertTrue("Mkdir should succeed", dataFolder.mkdirs());
117+
118+
Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), unpartitioned.toString());
119+
Schema tableSchema = table.schema(); // use the table schema because ids are reassigned
120+
121+
FileFormat fileFormat = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
122+
123+
File testFile = new File(dataFolder, fileFormat.addExtension(UUID.randomUUID().toString()));
124+
125+
// create records using the table's schema
126+
this.records = testRecords(tableSchema);
127+
128+
try (FileAppender<Record> writer = new GenericAppenderFactory(tableSchema).newAppender(
129+
localOutput(testFile), fileFormat)) {
130+
writer.addAll(records);
131+
}
132+
133+
DataFile file = DataFiles.builder(PartitionSpec.unpartitioned())
134+
.withRecordCount(records.size())
135+
.withFileSizeInBytes(testFile.length())
136+
.withPath(testFile.toString())
137+
.build();
138+
139+
table.newAppend().appendFile(file).commit();
140+
}
141+
142+
@Test
143+
public void testUnpartitionedTimestampWithoutZone() {
144+
assertEqualsSafe(SCHEMA.asStruct(), records, read(unpartitioned.toString(), vectorized));
145+
}
146+
147+
@Test
148+
public void testUnpartitionedTimestampWithoutZoneProjection() {
149+
Schema projection = SCHEMA.select("id", "ts");
150+
assertEqualsSafe(projection.asStruct(),
151+
records.stream().map(r -> projectFlat(projection, r)).collect(Collectors.toList()),
152+
read(unpartitioned.toString(), vectorized, "id", "ts"));
153+
}
154+
155+
@Rule
156+
public ExpectedException exception = ExpectedException.none();
157+
158+
@Test
159+
public void testUnpartitionedTimestampWithoutZoneError() {
160+
exception.expect(IllegalArgumentException.class);
161+
exception.expectMessage("Spark does not support timestamp without time zone fields");
162+
163+
spark.read().format("iceberg")
164+
.option("vectorization-enabled", String.valueOf(vectorized))
165+
.option("read-timestamp-without-zone", "false")
166+
.load(unpartitioned.toString())
167+
.collectAsList();
168+
}
169+
170+
private static Record projectFlat(Schema projection, Record record) {
171+
Record result = GenericRecord.create(projection);
172+
List<Types.NestedField> fields = projection.asStruct().fields();
173+
for (int i = 0; i < fields.size(); i += 1) {
174+
Types.NestedField field = fields.get(i);
175+
result.set(i, record.getField(field.name()));
176+
}
177+
return result;
178+
}
179+
180+
public static void assertEqualsSafe(Types.StructType struct,
181+
List<Record> expected, List<Row> actual) {
182+
Assert.assertEquals("Number of results should match expected", expected.size(), actual.size());
183+
for (int i = 0; i < expected.size(); i += 1) {
184+
GenericsHelpers.assertEqualsSafe(struct, expected.get(i), actual.get(i));
185+
}
186+
}
187+
188+
private List<Record> testRecords(Schema schema) {
189+
return Lists.newArrayList(
190+
record(schema, 0L, parseToLocal("2017-12-22T09:20:44.294658"), "junction"),
191+
record(schema, 1L, parseToLocal("2017-12-22T07:15:34.582910"), "alligator"),
192+
record(schema, 2L, parseToLocal("2017-12-22T06:02:09.243857"), "forrest"),
193+
record(schema, 3L, parseToLocal("2017-12-22T03:10:11.134509"), "clapping"),
194+
record(schema, 4L, parseToLocal("2017-12-22T00:34:00.184671"), "brush"),
195+
record(schema, 5L, parseToLocal("2017-12-21T22:20:08.935889"), "trap"),
196+
record(schema, 6L, parseToLocal("2017-12-21T21:55:30.589712"), "element"),
197+
record(schema, 7L, parseToLocal("2017-12-21T17:31:14.532797"), "limited"),
198+
record(schema, 8L, parseToLocal("2017-12-21T15:21:51.237521"), "global"),
199+
record(schema, 9L, parseToLocal("2017-12-21T15:02:15.230570"), "goldfish")
200+
);
201+
}
202+
203+
private static List<Row> read(String table, boolean vectorized) {
204+
return read(table, vectorized, "*");
205+
}
206+
207+
private static List<Row> read(String table, boolean vectorized, String select0, String... selectN) {
208+
Dataset<Row> dataset = spark.read().format("iceberg")
209+
.option("vectorization-enabled", String.valueOf(vectorized))
210+
.option("read-timestamp-without-zone", "true")
211+
.load(table)
212+
.select(select0, selectN);
213+
return dataset.collectAsList();
214+
}
215+
216+
private static LocalDateTime parseToLocal(String timestamp) {
217+
return LocalDateTime.parse(timestamp);
218+
}
219+
220+
private static Record record(Schema schema, Object... values) {
221+
Record rec = GenericRecord.create(schema);
222+
for (int i = 0; i < values.length; i += 1) {
223+
rec.set(i, values[i]);
224+
}
225+
return rec;
226+
}
227+
}

spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@
5454
import org.apache.iceberg.spark.SparkFilters;
5555
import org.apache.iceberg.spark.SparkReadOptions;
5656
import org.apache.iceberg.spark.SparkSchemaUtil;
57+
import org.apache.iceberg.types.Type;
58+
import org.apache.iceberg.types.TypeUtil;
59+
import org.apache.iceberg.types.Types;
5760
import org.apache.iceberg.util.PropertyUtil;
5861
import org.apache.iceberg.util.TableScanUtil;
5962
import org.apache.spark.broadcast.Broadcast;
@@ -101,6 +104,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
101104
private final boolean localityPreferred;
102105
private final boolean batchReadsEnabled;
103106
private final int batchSize;
107+
private final boolean readTimestampWithoutZone;
104108

105109
// lazy variables
106110
private Schema schema = null;
@@ -174,6 +178,16 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
174178
this.batchSize = options.get(SparkReadOptions.VECTORIZATION_BATCH_SIZE).map(Integer::parseInt).orElseGet(() ->
175179
PropertyUtil.propertyAsInt(table.properties(),
176180
TableProperties.PARQUET_BATCH_SIZE, TableProperties.PARQUET_BATCH_SIZE_DEFAULT));
181+
// Allow reading timestamp without time zone as timestamp with time zone. Generally, this is not safe as timestamp
182+
// without time zone is supposed to represent wall clock time semantics, i.e. no matter the reader/writer timezone
183+
// 3PM should always be read as 3PM, but timestamp with time zone represents instant semantics, i.e the timestamp
184+
// is adjusted so that the corresponding time in the reader timezone is displayed. However, at LinkedIn, all readers
185+
// and writers are in the UTC timezone as our production machines are set to UTC. So, timestamp with/without time
186+
// zone is the same.
187+
// When set to false (default), we throw an exception at runtime
188+
// "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields
189+
this.readTimestampWithoutZone = options.get(SparkReadOptions.READ_TIMESTAMP_WITHOUT_ZONE)
190+
.map(Boolean::parseBoolean).orElse(false);
177191
}
178192

179193
private Schema lazySchema() {
@@ -197,6 +211,8 @@ private Expression filterExpression() {
197211

198212
private StructType lazyType() {
199213
if (type == null) {
214+
Preconditions.checkArgument(readTimestampWithoutZone || !hasTimestampWithoutZone(lazySchema()),
215+
"Spark does not support timestamp without time zone fields");
200216
this.type = SparkSchemaUtil.convert(lazySchema());
201217
}
202218
return type;
@@ -365,12 +381,20 @@ public boolean enableBatchRead() {
365381

366382
boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);
367383

384+
boolean hasTimestampWithoutZone = hasTimestampWithoutZone(lazySchema());
385+
368386
this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && ((allOrcFileScanTasks && hasNoRowFilters) ||
369-
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
387+
(allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives && !hasTimestampWithoutZone));
370388
}
371389
return readUsingBatch;
372390
}
373391

392+
private static boolean hasTimestampWithoutZone(Schema schema) {
393+
return TypeUtil.find(schema, t ->
394+
t.typeId().equals(Type.TypeID.TIMESTAMP) && !((Types.TimestampType) t).shouldAdjustToUTC()
395+
) != null;
396+
}
397+
374398
private static void mergeIcebergHadoopConfs(
375399
Configuration baseConf, Map<String, String> options) {
376400
options.keySet().stream()
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.spark.source;
21+
22+
public class TestTimestampWithoutZone24 extends TestTimestampWithoutZone {
23+
public TestTimestampWithoutZone24(String format, boolean vectorized) {
24+
super(format, vectorized);
25+
}
26+
}

0 commit comments

Comments
 (0)