Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support micro seconds precisison during copy unload #492

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -710,111 +710,102 @@ class SnowflakeResultSetRDDSuite extends IntegrationSuiteBase {

test("testTimestamp") {
setupTimestampTable
// COPY UNLOAD can't be run because it only supports millisecond(0.001s).
if (!params.useCopyUnload) {
val result = sparkSession.sql("select * from test_table_timestamp")
val result = sparkSession.sql("select * from test_table_timestamp")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case failed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Mingli-Rui Yea sorry, haven't finished the change yet, should've mentioned this. Will push another commit this week.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be great to add new test cases instead of changing existing one, e.g. test("testTimestamp copy unload").

  1. With the new test, you can set below options in the new test cases only
    thisConnectorOptionsNoTable += ("timestamp_ntz_output_format" -> "YYYY-MM-DD HH24:MI:SS.FF6")
    thisConnectorOptionsNoTable += ("timestamp_ltz_output_format" -> "TZHTZM YYYY-MM-DD HH24:MI:SS.FF6")
    thisConnectorOptionsNoTable += ("timestamp_tz_output_format" -> "TZHTZM YYYY-MM-DD HH24:MI:SS.FF6")
  1. With the new test case, you can test the new internal parameter ( I suggested in another comment) can disable/enable the test.


testPushdown(
s""" SELECT * FROM ( $test_table_timestamp ) AS "SF_CONNECTOR_QUERY_ALIAS" """.stripMargin,
result,
test_table_timestamp_rows
)
}
testPushdown(
s""" SELECT * FROM ( $test_table_timestamp ) AS "SF_CONNECTOR_QUERY_ALIAS" """.stripMargin,
result,
test_table_timestamp_rows
)
}

// Most simple case for timestamp write
test("testTimestamp write") {
setupTimestampTable
// COPY UNLOAD can't be run because it only supports millisecond(0.001s).
if (!params.useCopyUnload) {
val createTableSql =
s"""create or replace table $test_table_write (
| int_c int,
| ts_ltz_c timestamp_ltz(9), ts_ltz_c0 timestamp_ltz(0),
| ts_ltz_c3 timestamp_ltz(3), ts_ltz_c6 timestamp_ltz(6),
|
| ts_ntz_c timestamp_ntz(9), ts_ntz_c0 timestamp_ntz(0),
| ts_ntz_c3 timestamp_ntz(3), ts_ntz_c6 timestamp_ntz(6),
|
| ts_tz_c timestamp_tz(9), ts_tz_c0 timestamp_tz(0),
| ts_tz_c3 timestamp_tz(3), ts_tz_c6 timestamp_tz(6)
| )""".stripMargin
writeAndCheckForOneTable(sparkSession, thisConnectorOptionsNoTable,
test_table_timestamp, "", test_table_write, Some(createTableSql), true)
}
val createTableSql =
s"""create or replace table $test_table_write (
| int_c int,
| ts_ltz_c timestamp_ltz(9), ts_ltz_c0 timestamp_ltz(0),
| ts_ltz_c3 timestamp_ltz(3), ts_ltz_c6 timestamp_ltz(6),
|
| ts_ntz_c timestamp_ntz(9), ts_ntz_c0 timestamp_ntz(0),
| ts_ntz_c3 timestamp_ntz(3), ts_ntz_c6 timestamp_ntz(6),
|
| ts_tz_c timestamp_tz(9), ts_tz_c0 timestamp_tz(0),
| ts_tz_c3 timestamp_tz(3), ts_tz_c6 timestamp_tz(6)
| )""".stripMargin
writeAndCheckForOneTable(sparkSession, thisConnectorOptionsNoTable,
test_table_timestamp, "", test_table_write, Some(createTableSql), true)
}

// test timestamp write with timezone
test("testTimestamp write with timezone") {
setupTimestampTable
// COPY UNLOAD can't be run because it only supports millisecond(0.001s).
if (!params.useCopyUnload) {
var oldValue: Option[String] = None
if (thisConnectorOptionsNoTable.contains("sftimezone")) {
oldValue = Some(thisConnectorOptionsNoTable("sftimezone"))
thisConnectorOptionsNoTable -= "sftimezone"
var oldValue: Option[String] = None
if (thisConnectorOptionsNoTable.contains("sftimezone")) {
oldValue = Some(thisConnectorOptionsNoTable("sftimezone"))
thisConnectorOptionsNoTable -= "sftimezone"
}
val oldTimezone = TimeZone.getDefault

val createTableSql =
s"""create or replace table $test_table_write (
| int_c int,
| ts_ltz_c timestamp_ltz(9), ts_ltz_c0 timestamp_ltz(0),
| ts_ltz_c3 timestamp_ltz(3), ts_ltz_c6 timestamp_ltz(6),
|
| ts_ntz_c timestamp_ntz(9), ts_ntz_c0 timestamp_ntz(0),
| ts_ntz_c3 timestamp_ntz(3), ts_ntz_c6 timestamp_ntz(6),
|
| ts_tz_c timestamp_tz(9), ts_tz_c0 timestamp_tz(0),
| ts_tz_c3 timestamp_tz(3), ts_tz_c6 timestamp_tz(6)
| )""".stripMargin

// Test conditions with (sfTimezone, sparkTimezone)
val testConditions: List[(String, String)] = List(
(null, "GMT")
, (null, "America/Los_Angeles")
, ("America/New_York", "America/Los_Angeles")
)

for ((sfTimezone, sparkTimezone) <- testConditions) {
// set spark timezone
val thisSparkSession = if (sparkTimezone != null) {
TimeZone.setDefault(TimeZone.getTimeZone(sparkTimezone))
SparkSession.builder
.master("local")
.appName("SnowflakeSourceSuite")
.config("spark.sql.shuffle.partitions", "6")
.config("spark.driver.extraJavaOptions", s"-Duser.timezone=$sparkTimezone")
.config("spark.executor.extraJavaOptions", s"-Duser.timezone=$sparkTimezone")
.config("spark.sql.session.timeZone", sparkTimezone)
.getOrCreate()
} else {
sparkSession
}
val oldTimezone = TimeZone.getDefault

val createTableSql =
s"""create or replace table $test_table_write (
| int_c int,
| ts_ltz_c timestamp_ltz(9), ts_ltz_c0 timestamp_ltz(0),
| ts_ltz_c3 timestamp_ltz(3), ts_ltz_c6 timestamp_ltz(6),
|
| ts_ntz_c timestamp_ntz(9), ts_ntz_c0 timestamp_ntz(0),
| ts_ntz_c3 timestamp_ntz(3), ts_ntz_c6 timestamp_ntz(6),
|
| ts_tz_c timestamp_tz(9), ts_tz_c0 timestamp_tz(0),
| ts_tz_c3 timestamp_tz(3), ts_tz_c6 timestamp_tz(6)
| )""".stripMargin

// Test conditions with (sfTimezone, sparkTimezone)
val testConditions: List[(String, String)] = List(
(null, "GMT")
, (null, "America/Los_Angeles")
, ("America/New_York", "America/Los_Angeles")
)

for ((sfTimezone, sparkTimezone) <- testConditions) {
// set spark timezone
val thisSparkSession = if (sparkTimezone != null) {
TimeZone.setDefault(TimeZone.getTimeZone(sparkTimezone))
SparkSession.builder
.master("local")
.appName("SnowflakeSourceSuite")
.config("spark.sql.shuffle.partitions", "6")
.config("spark.driver.extraJavaOptions", s"-Duser.timezone=$sparkTimezone")
.config("spark.executor.extraJavaOptions", s"-Duser.timezone=$sparkTimezone")
.config("spark.sql.session.timeZone", sparkTimezone)
.getOrCreate()
} else {
sparkSession
// Set timezone option
if (sfTimezone != null) {
if (thisConnectorOptionsNoTable.contains("sftimezone")) {
thisConnectorOptionsNoTable -= "sftimezone"
}

// Set timezone option
if (sfTimezone != null) {
if (thisConnectorOptionsNoTable.contains("sftimezone")) {
thisConnectorOptionsNoTable -= "sftimezone"
}
thisConnectorOptionsNoTable += ("sftimezone" -> sfTimezone)
} else {
if (thisConnectorOptionsNoTable.contains("sftimezone")) {
thisConnectorOptionsNoTable -= "sftimezone"
}
thisConnectorOptionsNoTable += ("sftimezone" -> sfTimezone)
} else {
if (thisConnectorOptionsNoTable.contains("sftimezone")) {
thisConnectorOptionsNoTable -= "sftimezone"
}

writeAndCheckForOneTable(thisSparkSession, thisConnectorOptionsNoTable,
test_table_timestamp, "", test_table_write, Some(createTableSql), true)
}

// restore options for further test
thisConnectorOptionsNoTable -= "sftimezone"
if (oldValue.isDefined) {
thisConnectorOptionsNoTable += ("sftimezone" -> oldValue.get)
}
TimeZone.setDefault(oldTimezone)
writeAndCheckForOneTable(thisSparkSession, thisConnectorOptionsNoTable,
test_table_timestamp, "", test_table_write, Some(createTableSql), true)
}

// restore options for further test
thisConnectorOptionsNoTable -= "sftimezone"
if (oldValue.isDefined) {
thisConnectorOptionsNoTable += ("sftimezone" -> oldValue.get)
}
TimeZone.setDefault(oldTimezone)
}

test("testLargeResult") {
Expand Down
28 changes: 24 additions & 4 deletions src/main/scala/net/snowflake/spark/snowflake/Conversions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,18 @@ private[snowflake] object Conversions {
// Note - we use a pattern with timezone in the beginning, to make sure
// parsing with PATTERN_NTZ fails for PATTERN_TZLTZ strings.
// Note - for JDK 1.6, we use Z ipo XX for SimpleDateFormat
// Because simpleDateFormat only support milliseconds,
// we need to refactor this and handle nano seconds field separately
private val PATTERN_TZLTZ =
if (System.getProperty("java.version").startsWith("1.6.")) {
"Z yyyy-MM-dd HH:mm:ss.SSS"
"Z yyyy-MM-dd HH:mm:ss."
} else {
"XX yyyy-MM-dd HH:mm:ss.SSS"
"XX yyyy-MM-dd HH:mm:ss."
}

// For NTZ, Snowflake serializes w/o timezone
private val PATTERN_NTZ = "yyyy-MM-dd HH:mm:ss.SSS"
// and handle nano seconds field separately during parsing
private val PATTERN_NTZ = "yyyy-MM-dd HH:mm:ss."

// For DATE, simple ISO format
private val PATTERN_DATE = "yyyy-MM-dd"
Expand Down Expand Up @@ -193,8 +196,25 @@ private[snowflake] object Conversions {
* Parse a string exported from a Snowflake TIMESTAMP column
*/
private def parseTimestamp(s: String, isInternalRow: Boolean): Any = {
// Need to handle the nano seconds filed separately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add an internal parameter to enable/disable the change? It's enabled by default.
If some users is broken, they can disable the fix as a workaround.
For example, https://github.com/snowflakedb/spark-snowflake/blob/master/src/main/scala/net/snowflake/spark/snowflake/Parameters.scala#LL163C14-L163C14

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm.. do you mean to have a parameter such as internal_support_micro_second_during_unload ? IMO, we should always try to support micro second level precision since by default direct JDBC supports this precision. It would make this part of the code confusing if we have two code paths and two set of timestamp patterns..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree users need to use this fix. This is why we can set the internal parameter as true by default.
This is the internal policy to introduce a parameter to disable the fix if possible.
The internal parameter can be removed later. So the code will be clean up accordingly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Will push a new commit to add this parameter.

// valueOf only works with yyyy-[m]m-[d]d hh:mm:ss[.f...]
// so we need to do a little parsing
val timestampRegex = """\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3,9}""".r

val parsedTS = timestampRegex.findFirstMatchIn(s) match {
case Some(ts) => ts.toString()
case None => throw new IllegalArgumentException(s"Malformed timestamp $s")
}

val ts = java.sql.Timestamp.valueOf(parsedTS)
val nanoFraction = ts.getNanos

val res = new Timestamp(snowflakeTimestampFormat.parse(s).getTime)
if (isInternalRow) DateTimeUtils.fromJavaTimestamp(res)

res.setNanos(nanoFraction)
// Since fromJavaTimestamp and spark only support microsecond
// level precision so have to divide the nano field by 1000
if (isInternalRow) (DateTimeUtils.fromJavaTimestamp(res) + nanoFraction/1000)
else res
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,88 @@ class ConversionsSuite extends FunSuite {

assert(expect == result.toString())
}

test("Data with micro-seconds and nano-seconds precision should be correctly converted"){
Copy link
Contributor

@sfc-gh-mrui sfc-gh-mrui Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a unit test. It proves that it can parse timestamp with micro/nano-seconds.
Could you please add an integration test (AKA end-to-end test)?

val convertRow = Conversions.createRowConverter[Row](TestUtils.testSchema)
val doubleMin = Double.MinValue.toString
val longMax = Long.MaxValue.toString
// scalastyle:off
val unicodeString = "Unicode是樂趣"
// scalastyle:on

val timestampString = "2014-03-01 00:00:01.123456"

val expectedTimestampMicro: Timestamp = java.sql.Timestamp.valueOf(timestampString)

val dateString = "2015-07-01"
val expectedDate = TestUtils.toMillis(2015, 6, 1, 0, 0, 0)



val timestampString2 = "2014-03-01 00:00:01.123456789"

val expectedTimestampMicro2: Timestamp = java.sql.Timestamp.valueOf(timestampString2)

val dateString2 = "2015-07-01"
val expectedDate2 = TestUtils.toMillis(2015, 6, 1, 0, 0, 0)

val convertedRow = convertRow(
Array(
"1",
dateString,
"123.45",
doubleMin,
"1.0",
"42",
longMax,
"23",
unicodeString,
timestampString
)
)

val expectedRow = Row(
1.asInstanceOf[Byte],
new Date(expectedDate),
new java.math.BigDecimal("123.45"),
Double.MinValue,
1.0f,
42,
Long.MaxValue,
23.toShort,
unicodeString,
expectedTimestampMicro
)

val convertedRow2 = convertRow(
Array(
"1",
dateString2,
"123.45",
doubleMin,
"1.0",
"42",
longMax,
"23",
unicodeString,
timestampString2
)
)

val expectedRow2 = Row(
1.asInstanceOf[Byte],
new Date(expectedDate2),
new java.math.BigDecimal("123.45"),
Double.MinValue,
1.0f,
42,
Long.MaxValue,
23.toShort,
unicodeString,
expectedTimestampMicro2
)

assert(convertedRow == expectedRow)
assert(convertedRow2 == expectedRow2)
}
}