From 4238533f8699dfff75d3b5fe437668cc0ce4d55c Mon Sep 17 00:00:00 2001 From: Jason White Date: Sat, 11 Feb 2017 16:39:19 -0500 Subject: [PATCH 1/5] cast TimestampType.toInternal output to long --- python/pyspark/sql/tests.py | 6 ++++++ python/pyspark/sql/types.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 710585cbe291..5aa9c0701cd6 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1435,6 +1435,12 @@ def test_time_with_timezone(self): self.assertEqual(now, now1) self.assertEqual(now, utcnow1) + # regression test for SPARK-19561 + def test_datetime_at_epoch(self): + epoch = datetime.datetime.fromtimestamp(0) + df = self.spark.createDataFrame([Row(date=epoch)]) + self.assertEqual(df.first()['date'], epoch) + def test_decimal(self): from decimal import Decimal schema = StructType([StructField("decimal", DecimalType(10, 5))]) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 26b54a7fb370..0c7991e5d040 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -189,7 +189,7 @@ def toInternal(self, dt): if dt is not None: seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())) - return int(seconds) * 1000000 + dt.microsecond + return long(int(seconds) * 1000000 + dt.microsecond) def fromInternal(self, ts): if ts is not None: From 5b1dd6785014bc8159e5c8b2c22a18ecb673afde Mon Sep 17 00:00:00 2001 From: Jason White Date: Wed, 15 Feb 2017 22:09:19 -0500 Subject: [PATCH 2/5] replace int function call with long --- python/pyspark/sql/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 0c7991e5d040..1d31f25efad5 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -189,7 +189,7 @@ def toInternal(self, dt): if dt is not None: seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())) - return long(int(seconds) * 1000000 + dt.microsecond) + return long(seconds) * 1000000 + dt.microsecond def fromInternal(self, ts): if ts is not None: From a1936afddb5e16f4b8f8954cd07ce78ad3e58ea3 Mon Sep 17 00:00:00 2001 From: Jason White Date: Thu, 9 Mar 2017 01:19:36 -0500 Subject: [PATCH 3/5] add TimestampType int handling in EvaluatePython, remove casting to long in Python --- python/pyspark/sql/tests.py | 6 ++++-- python/pyspark/sql/types.py | 2 +- .../apache/spark/sql/execution/python/EvaluatePython.scala | 1 + 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 5aa9c0701cd6..93f3f83d15bb 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1438,8 +1438,10 @@ def test_time_with_timezone(self): # regression test for SPARK-19561 def test_datetime_at_epoch(self): epoch = datetime.datetime.fromtimestamp(0) - df = self.spark.createDataFrame([Row(date=epoch)]) - self.assertEqual(df.first()['date'], epoch) + df = self.spark.createDataFrame([Row(date=epoch)]).select('date', lit(epoch).alias('lit_date')) + first = df.first() + self.assertEqual(first['date'], epoch) + self.assertEqual(first['lit_date'], epoch) def test_decimal(self): from decimal import Decimal diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 1d31f25efad5..26b54a7fb370 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -189,7 +189,7 @@ def toInternal(self, dt): if dt is not None: seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo else time.mktime(dt.timetuple())) - return long(seconds) * 1000000 + dt.microsecond + return int(seconds) * 1000000 + dt.microsecond def fromInternal(self, ts): if ts is not None: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index 46fd54e5c742..3a03d538b620 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -112,6 +112,7 @@ object EvaluatePython { case (c: Int, DateType) => c case (c: Long, TimestampType) => c + case (c: Int, TimestampType) => c.toLong case (c, StringType) => UTF8String.fromString(c.toString) From a198d494c24e062f80713b90de77c230d6d34b29 Mon Sep 17 00:00:00 2001 From: Jason White Date: Thu, 9 Mar 2017 01:26:25 -0500 Subject: [PATCH 4/5] linter --- python/pyspark/sql/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 93f3f83d15bb..36db825a6340 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1438,8 +1438,8 @@ def test_time_with_timezone(self): # regression test for SPARK-19561 def test_datetime_at_epoch(self): epoch = datetime.datetime.fromtimestamp(0) - df = self.spark.createDataFrame([Row(date=epoch)]).select('date', lit(epoch).alias('lit_date')) - first = df.first() + df = self.spark.createDataFrame([Row(date=epoch)]) + first = df.select('date', lit(epoch).alias('lit_date')).first() self.assertEqual(first['date'], epoch) self.assertEqual(first['lit_date'], epoch) From bee635a3ebd96fe28e04b0fd95290b6960ea8d22 Mon Sep 17 00:00:00 2001 From: Jason White Date: Thu, 9 Mar 2017 01:30:41 -0500 Subject: [PATCH 5/5] add comment explaining adding Int case handling --- .../org/apache/spark/sql/execution/python/EvaluatePython.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index 3a03d538b620..fcd84705f7e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -112,6 +112,7 @@ object EvaluatePython { case (c: Int, DateType) => c case (c: Long, TimestampType) => c + // Py4J serializes values between MIN_INT and MAX_INT as Ints, not Longs case (c: Int, TimestampType) => c.toLong case (c, StringType) => UTF8String.fromString(c.toString)