-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Add support for reading/writing timestamps without timezone. #2757
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for reading/writing timestamps without timezone. #2757
Conversation
spark/src/main/java/org/apache/iceberg/spark/SparkFixupTimestampType.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/SparkFixupTimestampType.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java
Outdated
Show resolved
Hide resolved
spark/src/test/java/org/apache/iceberg/spark/data/AvroDataTest.java
Outdated
Show resolved
Hide resolved
spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
Outdated
Show resolved
Hide resolved
spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
Outdated
Show resolved
Hide resolved
spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Outdated
Show resolved
Hide resolved
spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java
Outdated
Show resolved
Hide resolved
| public void testWriteTimestampWithoutZoneError() { | ||
| String errorMessage = String.format("Write operation performed on a timestamp without timezone field while " + | ||
| "'%s' set to false should throw exception", SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE); | ||
| Runnable insert = () -> sql("INSERT INTO %s VALUES %s", tableName, rowToSqlValues(values)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just put this right into the AssertThrows, we usually do this.
AssertThrows(errorMessage, IllegalArguementException.class, SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR,
() -> code);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using the rowToSqlValues function, it may be simpler to just do a spark.createDataset(values).write command. That way we don't have to worry about string building
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have moved insert Runnable and errorMessage to the AssertThrows
I would like to leave rowToSqlValues if you do not mind. From my point of view this is better to have pure sql in this test
spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java
Outdated
Show resolved
Hide resolved
RussellSpitzer
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just have a few little clean up nits, I think once these are done we should be good to go!
|
@RussellSpitzer I have pushed code changes for requested clean up nits |
| SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE), | ||
| IllegalArgumentException.class, | ||
| SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR, | ||
| () -> spark.read().format("iceberg") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrong indentation here, .option should be indented as it's a continuation of spark.read.format and not another arg to assertThrows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, fixed
|
Tests running now! |
Added fix for |
|
added additional formatting fix for |
|
@sshkvar for future reference you can use |
| Option<SparkSession> sparkSession = SparkSession.getActiveSession(); | ||
| if (sparkSession.isDefined()) { | ||
| sparkSession.get().conf().set(SparkUtil.HANDLE_TIMESTAMP_WITHOUT_TIMEZONE, "true"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is in AvroDataTest, but there is some code for handling additional Spark SQL conf options here that might be something worth copying:
iceberg/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java
Lines 171 to 206 in 25eaeba
| protected void withSQLConf(Map<String, String> conf, Action action) { | |
| SQLConf sqlConf = SQLConf.get(); | |
| Map<String, String> currentConfValues = Maps.newHashMap(); | |
| conf.keySet().forEach(confKey -> { | |
| if (sqlConf.contains(confKey)) { | |
| String currentConfValue = sqlConf.getConfString(confKey); | |
| currentConfValues.put(confKey, currentConfValue); | |
| } | |
| }); | |
| conf.forEach((confKey, confValue) -> { | |
| if (SQLConf.staticConfKeys().contains(confKey)) { | |
| throw new RuntimeException("Cannot modify the value of a static config: " + confKey); | |
| } | |
| sqlConf.setConfString(confKey, confValue); | |
| }); | |
| try { | |
| action.invoke(); | |
| } finally { | |
| conf.forEach((confKey, confValue) -> { | |
| if (currentConfValues.containsKey(confKey)) { | |
| sqlConf.setConfString(confKey, currentConfValues.get(confKey)); | |
| } else { | |
| sqlConf.unsetConf(confKey); | |
| } | |
| }); | |
| } | |
| } | |
| @FunctionalInterface | |
| protected interface Action { | |
| void invoke(); | |
| } | |
| } |
Additionally, it might be good to unset this property after its use (so the configuration doesn't get mixed up with other tests now or in the future).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, I changed the code based on your recommendations
ok, thanks |
Add `spark.sql.iceberg.store-timestamp-without-zone` spark config to indicate which iceberg type (Types.TimestampType.withZone() or Types.TimestampType.withoutZone()) will be used for spark `TimestampType` type
…ted level should be one of the following: 6, 8. `
…comments. Fixed code formatting
e869a15 to
bc316c4
Compare
|
Check failed Locally this test passed successfully. @RussellSpitzer Can we try to rerun this step? |
|
I can trigger a re-run, just pinged @karuppayya to take a look |
|
|
||
| static Schema fixup(Schema schema) { | ||
| return new Schema(TypeUtil.visit(schema, | ||
| new SparkFixupTimestampType(schema)).asStructType().fields()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think this is overly indented.
When continuing an expression on a second line, we usually indent 4 spaces (as opposed to the normal 2 spaces for changes in scope etc).
So I think that new SparkFixup… on this line should align the n of new with the second r from return above it (4 spaces in from the start of the word return).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it asap, where can I find code style guide?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have pushed fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you use IntelliJ, you can have it set up to point to the style guide and auto format it for you: http://iceberg.apache.org/community/#setting-up-ide-and-code-style
Otherwise, in general I believe the rules come from here: https://github.com/apache/iceberg/blob/master/.baseline/idea/intellij-java-palantir-style.xml
There are admittedly a number of these cases, so auto formatting seems like the best idea. If you don’t use IntelliJ, I think this command can be run from command line somehow but I’m not 100% sure.
Thanks for all the work on this so far. It’s very close!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the plug-in that is used in general for your info. Specifically, this one would be the checkstyle one, but we also use error prone as well: https://github.com/palantir/gradle-baseline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot, I am using IntelliJ so it is really helpful
| private StructType lazyType() { | ||
| if (type == null) { | ||
| Preconditions.checkArgument(readTimestampWithoutZone || !SparkUtil.hasTimestampWithoutZone(lazySchema()), | ||
| SparkUtil.TIMESTAMP_WITHOUT_TIMEZONE_ERROR); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add this check into SparkSchemaUtil.convert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think no, because we need this check only in few places, while SparkSchemaUtil.convert used in a lot of places
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reference, this is something just to consider and not a strong mandate or requirement to merge.
| @FunctionalInterface | ||
| protected interface Action { | ||
| void invoke() throws IOException; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a follow up PR (in a separate PR either before or after this one is merged), particularly if you're looking for some more work to do to contribute to the project, you might explore if this combination of withSQLConf and the corresponding @FunctionalInterface can be abstracted into their own interface that one could mix into tests.
I'm not 100% sure how that would look, maybe an interface like ConfigurableTestSQLConf or something?
Again, just copying it for now is fine, but it would be nice to reduce the code duplication and make this easier for others to use in the future. Your exploration might find that it’s better to not do that (I’m more of a Scala developer myself and so to me it feels like a mixin). Something to think about for later though!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fully agree with you, it can be moved to separate interface with static method and placed in some general package like
@FunctionalInterface
public interface ConfigurableTestSQLConf {
void invoke() throws IOException;
static void withSQLConf(Map<String, String> conf, ConfigurableTestSQLConf action) throws IOException {
SQLConf sqlConf = SQLConf.get();
Map<String, String> currentConfValues = Maps.newHashMap();
conf.keySet().forEach(confKey -> {
if (sqlConf.contains(confKey)) {
String currentConfValue = sqlConf.getConfString(confKey);
currentConfValues.put(confKey, currentConfValue);
}
});
conf.forEach((confKey, confValue) -> {
if (SQLConf.staticConfKeys().contains(confKey)) {
throw new RuntimeException("Cannot modify the value of a static config: " + confKey);
}
sqlConf.setConfString(confKey, confValue);
});
try {
action.invoke();
} finally {
conf.forEach((confKey, confValue) -> {
if (currentConfValues.containsKey(confKey)) {
sqlConf.setConfString(confKey, currentConfValues.get(confKey));
} else {
sqlConf.unsetConf(confKey);
}
});
}
}
}But this part better to do in separate PR, because other packages will be affected
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me. Thank you @sshkvar!
…2757) Previously Spark could not handle Iceberg tables which contained Timestamp.withoutTimeZone. New parameters are introduced to allow Timestamp without TimeZone to be treated as Timestamp with Timezone. Co-authored-by: bkahloon <kahlonbakht@gmail.com> Co-authored-by: shardulm94
Merge remote-tracking branch 'upstream/merge-master-20210816' into master ## 该MR主要解决什么? merge upstream/master,引入最近的一些bugFix和优化 ## 该MR的修改是什么? 核心关注PR: > Predicate PushDown 支持,https://github.com/apache/iceberg/pull/2358, https://github.com/apache/iceberg/pull/2926, https://github.com/apache/iceberg/pull/2777/files > Spark场景写入空dataset 报错问题,直接skip掉即可, apache#2960 > Flink UI补充uidPrefix到operator方便跟踪多个iceberg sink任务, apache#288 > Spark 修复nested Struct Pruning问题, apache#2877 > 可以使用Table Properties指定创建v2 format表,apache#2887 > 补充SortRewriteStrategy框架,逐步支持不同rewrite策略, apache#2609 (WIP:apache#2829) > Spark 为catalog配置hadoop属性支持, apache#2792 > Spark 针对timestamps without timezone读写支持, apache#2757 > Spark MicroBatch支持配置属性skip delete snapshots, apache#2752 > Spark V2 RewriteDatafilesAction 支持 > Core: Add validation for row-level deletes with rewrites, apache#2865 > schema time travel 功能相关,补充schema-id, Core: add schema id to snapshot > Spark Extension支持identifier fields操作, apache#2560 > Parquet: Update to 1.12.0, apache#2441 > Hive: Vectorized ORC reads for Hive, apache#2613 > Spark: Add an action to remove all referenced files, apache#2415 ## 该MR是如何测试的? UT
Creating PR from my forked repo as discussed #2282