Skip to content

Commit 9a0d154

Browse files
sshkvarbkahloon
andauthored
Add support for reading/writing timestamps without timezone. (#2757)
Previously Spark could not handle Iceberg tables which contained Timestamp.withoutTimeZone. New parameters are introduced to allow Timestamp without TimeZone to be treated as Timestamp with Timezone. Co-authored-by: bkahloon <kahlonbakht@gmail.com> Co-authored-by: shardulm94
1 parent b3fb81a commit 9a0d154

File tree

20 files changed

+798
-22
lines changed

20 files changed

+798
-22
lines changed

spark/src/main/java/org/apache/iceberg/spark/PruneColumnsWithoutReordering.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -202,11 +202,6 @@ public Type primitive(Type.PrimitiveType primitive) {
202202
"Cannot project decimal with incompatible precision: %s < %s",
203203
requestedDecimal.precision(), decimal.precision());
204204
break;
205-
case TIMESTAMP:
206-
Types.TimestampType timestamp = (Types.TimestampType) primitive;
207-
Preconditions.checkArgument(timestamp.shouldAdjustToUTC(),
208-
"Cannot project timestamp (without time zone) as timestamptz (with time zone)");
209-
break;
210205
default:
211206
}
212207

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.spark;
21+
22+
import org.apache.iceberg.Schema;
23+
import org.apache.iceberg.types.FixupTypes;
24+
import org.apache.iceberg.types.Type;
25+
import org.apache.iceberg.types.TypeUtil;
26+
import org.apache.iceberg.types.Types;
27+
28+
/**
29+
* By default Spark type {@link org.apache.iceberg.types.Types.TimestampType} should be converted to
30+
* {@link Types.TimestampType#withZone()} iceberg type. But we also can convert
31+
* {@link org.apache.iceberg.types.Types.TimestampType} to {@link Types.TimestampType#withoutZone()} iceberg type
32+
* by setting {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES} to 'true'
33+
*/
34+
class SparkFixupTimestampType extends FixupTypes {
35+
36+
private SparkFixupTimestampType(Schema referenceSchema) {
37+
super(referenceSchema);
38+
}
39+
40+
static Schema fixup(Schema schema) {
41+
return new Schema(TypeUtil.visit(schema,
42+
new SparkFixupTimestampType(schema)).asStructType().fields());
43+
}
44+
45+
@Override
46+
public Type primitive(Type.PrimitiveType primitive) {
47+
if (primitive.typeId() == Type.TypeID.TIMESTAMP) {
48+
return Types.TimestampType.withoutZone();
49+
}
50+
return primitive;
51+
}
52+
53+
@Override
54+
protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) {
55+
return Type.TypeID.TIMESTAMP.equals(type.typeId());
56+
}
57+
}

spark/src/main/java/org/apache/iceberg/spark/SparkFixupTypes.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ protected boolean fixupPrimitive(Type.PrimitiveType type, Type source) {
5252
return true;
5353
}
5454
break;
55+
case TIMESTAMP:
56+
if (source.typeId() == Type.TypeID.TIMESTAMP) {
57+
return true;
58+
}
59+
break;
5560
default:
5661
}
5762
return false;

spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,31 @@ public static DataType convert(Type type) {
122122
* @throws IllegalArgumentException if the type cannot be converted
123123
*/
124124
public static Schema convert(StructType sparkType) {
125+
return convert(sparkType, false);
126+
}
127+
128+
/**
129+
* Convert a Spark {@link StructType struct} to a {@link Schema} with new field ids.
130+
* <p>
131+
* This conversion assigns fresh ids.
132+
* <p>
133+
* Some data types are represented as the same Spark type. These are converted to a default type.
134+
* <p>
135+
* To convert using a reference schema for field ids and ambiguous types, use
136+
* {@link #convert(Schema, StructType)}.
137+
*
138+
* @param sparkType a Spark StructType
139+
* @param useTimestampWithoutZone boolean flag indicates that timestamp should be stored without timezone
140+
* @return the equivalent Schema
141+
* @throws IllegalArgumentException if the type cannot be converted
142+
*/
143+
public static Schema convert(StructType sparkType, boolean useTimestampWithoutZone) {
125144
Type converted = SparkTypeVisitor.visit(sparkType, new SparkTypeToType(sparkType));
126-
return new Schema(converted.asNestedType().asStructType().fields());
145+
Schema schema = new Schema(converted.asNestedType().asStructType().fields());
146+
if (useTimestampWithoutZone) {
147+
schema = SparkFixupTimestampType.fixup(schema);
148+
}
149+
return schema;
127150
}
128151

129152
/**

spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,38 @@
2020
package org.apache.iceberg.spark;
2121

2222
import java.util.List;
23+
import java.util.Map;
2324
import java.util.function.BiFunction;
2425
import java.util.function.Function;
2526
import java.util.stream.Collectors;
2627
import org.apache.iceberg.PartitionField;
2728
import org.apache.iceberg.PartitionSpec;
29+
import org.apache.iceberg.Schema;
2830
import org.apache.iceberg.Table;
2931
import org.apache.iceberg.hadoop.HadoopConfigurable;
3032
import org.apache.iceberg.io.FileIO;
3133
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3234
import org.apache.iceberg.transforms.Transform;
3335
import org.apache.iceberg.transforms.UnknownTransform;
36+
import org.apache.iceberg.types.TypeUtil;
37+
import org.apache.iceberg.types.Types;
3438
import org.apache.iceberg.util.Pair;
39+
import org.apache.spark.sql.RuntimeConfig;
3540
import org.apache.spark.util.SerializableConfiguration;
3641

3742
public class SparkUtil {
43+
44+
public static final String HANDLE_TIMESTAMP_WITHOUT_TIMEZONE =
45+
"spark.sql.iceberg.handle-timestamp-without-timezone";
46+
public static final String TIMESTAMP_WITHOUT_TIMEZONE_ERROR = String.format("Cannot handle timestamp without" +
47+
" timezone fields in Spark. Spark does not natively support this type but if you would like to handle all" +
48+
" timestamps as timestamp with timezone set '%s' to true. This will not change the underlying values stored" +
49+
" but will change their displayed values in Spark. For more information please see" +
50+
" https://docs.databricks.com/spark/latest/dataframes-datasets/dates-timestamps.html#ansi-sql-and" +
51+
"-spark-sql-timestamps", HANDLE_TIMESTAMP_WITHOUT_TIMEZONE);
52+
public static final String USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES =
53+
"spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables";
54+
3855
private SparkUtil() {
3956
}
4057

@@ -99,4 +116,58 @@ public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
99116
}
100117
}
101118
}
119+
120+
/**
121+
* Responsible for checking if the table schema has a timestamp without timezone column
122+
* @param schema table schema to check if it contains a timestamp without timezone column
123+
* @return boolean indicating if the schema passed in has a timestamp field without a timezone
124+
*/
125+
public static boolean hasTimestampWithoutZone(Schema schema) {
126+
return TypeUtil.find(schema, t -> Types.TimestampType.withoutZone().equals(t)) != null;
127+
}
128+
129+
/**
130+
* Allow reading/writing timestamp without time zone as timestamp with time zone. Generally,
131+
* this is not safe as timestamp without time zone is supposed to represent wall clock time semantics,
132+
* i.e. no matter the reader/writer timezone 3PM should always be read as 3PM,
133+
* but timestamp with time zone represents instant semantics, i.e the timestamp
134+
* is adjusted so that the corresponding time in the reader timezone is displayed.
135+
* When set to false (default), we throw an exception at runtime
136+
* "Spark does not support timestamp without time zone fields" if reading timestamp without time zone fields
137+
*
138+
* @param readerConfig table read options
139+
* @param sessionConf spark session configurations
140+
* @return boolean indicating if reading timestamps without timezone is allowed
141+
*/
142+
public static boolean canHandleTimestampWithoutZone(Map<String, String> readerConfig, RuntimeConfig sessionConf) {
143+
String readerOption = readerConfig.get(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE);
144+
if (readerOption != null) {
145+
return Boolean.parseBoolean(readerOption);
146+
}
147+
String sessionConfValue = sessionConf.get(HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, null);
148+
if (sessionConfValue != null) {
149+
return Boolean.parseBoolean(sessionConfValue);
150+
}
151+
return false;
152+
}
153+
154+
/**
155+
* Check whether the spark session config contains a {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES}
156+
* property.
157+
* Default value - false
158+
* If true in new table all timestamp fields will be stored as {@link Types.TimestampType#withoutZone()},
159+
* otherwise {@link Types.TimestampType#withZone()} will be used
160+
*
161+
* @param sessionConf a spark runtime config
162+
* @return true if the session config has {@link SparkUtil#USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES} property
163+
* and this property is set to true
164+
*/
165+
public static boolean useTimestampWithoutZoneInNewTables(RuntimeConfig sessionConf) {
166+
String sessionConfValue = sessionConf.get(USE_TIMESTAMP_WITHOUT_TIME_ZONE_IN_NEW_TABLES, null);
167+
if (sessionConfValue != null) {
168+
return Boolean.parseBoolean(sessionConfValue);
169+
}
170+
return false;
171+
}
172+
102173
}

spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,12 +104,7 @@ public DataType primitive(Type.PrimitiveType primitive) {
104104
throw new UnsupportedOperationException(
105105
"Spark does not support time fields");
106106
case TIMESTAMP:
107-
Types.TimestampType timestamp = (Types.TimestampType) primitive;
108-
if (timestamp.shouldAdjustToUTC()) {
109-
return TimestampType$.MODULE$;
110-
}
111-
throw new UnsupportedOperationException(
112-
"Spark does not support timestamp without time zone fields");
107+
return TimestampType$.MODULE$;
113108
case STRING:
114109
return StringType$.MODULE$;
115110
case UUID:

spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ public OrcValueReader<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
105105
case DOUBLE:
106106
return OrcValueReaders.doubles();
107107
case TIMESTAMP_INSTANT:
108+
case TIMESTAMP:
108109
return SparkOrcValueReaders.timestampTzs();
109110
case DECIMAL:
110111
return SparkOrcValueReaders.decimals(primitive.getPrecision(), primitive.getScale());

spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescript
118118
case DECIMAL:
119119
return SparkOrcValueWriters.decimal(primitive.getPrecision(), primitive.getScale());
120120
case TIMESTAMP_INSTANT:
121+
case TIMESTAMP:
121122
return SparkOrcValueWriters.timestampTz();
122123
default:
123124
throw new IllegalArgumentException("Unhandled type " + primitive);

spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkOrcReaders.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ public Converter primitive(Type.PrimitiveType iPrimitive, TypeDescription primit
128128
primitiveValueReader = OrcValueReaders.doubles();
129129
break;
130130
case TIMESTAMP_INSTANT:
131+
case TIMESTAMP:
131132
primitiveValueReader = SparkOrcValueReaders.timestampTzs();
132133
break;
133134
case DECIMAL:

spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,20 @@
2020
package org.apache.iceberg.spark.data;
2121

2222
import java.io.IOException;
23+
import java.util.Map;
2324
import java.util.concurrent.atomic.AtomicInteger;
2425
import org.apache.iceberg.Schema;
26+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
2527
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
28+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
29+
import org.apache.iceberg.spark.SparkUtil;
2630
import org.apache.iceberg.types.TypeUtil;
2731
import org.apache.iceberg.types.Types;
2832
import org.apache.iceberg.types.Types.ListType;
2933
import org.apache.iceberg.types.Types.LongType;
3034
import org.apache.iceberg.types.Types.MapType;
3135
import org.apache.iceberg.types.Types.StructType;
36+
import org.apache.spark.sql.internal.SQLConf;
3237
import org.junit.Rule;
3338
import org.junit.Test;
3439
import org.junit.rules.TemporaryFolder;
@@ -185,4 +190,51 @@ public void testMixedTypes() throws IOException {
185190

186191
writeAndValidate(schema);
187192
}
193+
194+
@Test
195+
public void testTimestampWithoutZone() throws IOException {
196+
withSQLConf(ImmutableMap.of(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true"), () -> {
197+
Schema schema = TypeUtil.assignIncreasingFreshIds(new Schema(
198+
required(0, "id", LongType.get()),
199+
optional(1, "ts_without_zone", Types.TimestampType.withoutZone())));
200+
201+
writeAndValidate(schema);
202+
});
203+
}
204+
205+
protected void withSQLConf(Map<String, String> conf, Action action) throws IOException {
206+
SQLConf sqlConf = SQLConf.get();
207+
208+
Map<String, String> currentConfValues = Maps.newHashMap();
209+
conf.keySet().forEach(confKey -> {
210+
if (sqlConf.contains(confKey)) {
211+
String currentConfValue = sqlConf.getConfString(confKey);
212+
currentConfValues.put(confKey, currentConfValue);
213+
}
214+
});
215+
216+
conf.forEach((confKey, confValue) -> {
217+
if (SQLConf.staticConfKeys().contains(confKey)) {
218+
throw new RuntimeException("Cannot modify the value of a static config: " + confKey);
219+
}
220+
sqlConf.setConfString(confKey, confValue);
221+
});
222+
223+
try {
224+
action.invoke();
225+
} finally {
226+
conf.forEach((confKey, confValue) -> {
227+
if (currentConfValues.containsKey(confKey)) {
228+
sqlConf.setConfString(confKey, currentConfValues.get(confKey));
229+
} else {
230+
sqlConf.unsetConf(confKey);
231+
}
232+
});
233+
}
234+
}
235+
236+
@FunctionalInterface
237+
protected interface Action {
238+
void invoke() throws IOException;
239+
}
188240
}

0 commit comments

Comments
 (0)