Skip to content

Conversation

@dingsl-giser
Copy link

What changes were proposed in this pull request?

Fix problems with pyspark in Windows:

  1. Fixed datetime conversion to timestamp before 1970;
  2. Fixed datetime conversion when timestamp is negative;
  3. Adding a test script.

Why are the changes needed?

Pyspark has problems serializing pre-1970 times in Windows.

An exception occurs when executing the following code under Windows:

rdd = sc.parallelize([('a', datetime(1957, 1, 9, 0, 0)),
                      ('b', datetime(2014, 1, 27, 0, 0))])
df = spark.createDataFrame(rdd, ["id", "date"])

df.show()
df.printSchema()

print(df.collect())
  File "...\spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 195, in toInternal
    else time.mktime(dt.timetuple()))
OverflowError: mktime argument out of range

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more

and

File ...\spark\python\lib\pyspark.zip\pyspark\sql\types.py, in fromInternal:
Line 207:   return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)

OSError: [Errno 22] Invalid argument

After updating the code, the above code was run successfully!

+---+-------------------+
| id|               date|
+---+-------------------+
|  a|1957-01-08 16:00:00|
|  b|2014-01-26 16:00:00|
+---+-------------------+

root
 |-- id: string (nullable = true)
 |-- date: timestamp (nullable = true)

[Row(id='a', date=datetime.datetime(1957, 1, 9, 0, 0)), Row(id='b', date=datetime.datetime(2014, 1, 27, 0, 0))] 

Does this PR introduce any user-facing change?

No

How was this patch tested?

New and existing test suites

Fix problems with pyspark in Windowns:
1. Fixed datetime conversion to timestamp before 1970;
2. Fixed datetime conversion when timestamp is negative;
Add dateTime to test code in RDD
else time.mktime(dt.timetuple()))
except:
# On Windows, the current value is converted to a timestamp when the current value is less than 1970
seconds = (dt - datetime.datetime.fromtimestamp(int(time.localtime(0).tm_sec) / 1000)).total_seconds()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this Windows specific issue?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, linux does not have this problem, and it should be a bug of python3, but this method can solve this problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC 1970 handling issue is not OS specific problem. It would be great if you link some reported issues related to that.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@dingsl-giser dingsl-giser requested a review from HyukjinKwon May 13, 2022 14:49
@dingsl-giser
Copy link
Author

Is there a supervisor for approval?

try:
seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo
else time.mktime(dt.timetuple()))
except:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't better rely on exception handling for regular data parsing path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this with an if-else with OS and negative value check?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll change the test again.

wr_s21 = rdd.sample(True, 0.4, 21).collect()
self.assertNotEqual(set(wr_s11), set(wr_s21))

def test_datetime(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably add a comment like:

SPARK-39176: ... 

See also https://spark.apache.org/contributing.html

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been added and modified, please approve it again.

@HyukjinKwon
Copy link
Member

@AnywalkerGiser mind creating a PR against master branch?

@dingsl-giser
Copy link
Author

@HyukjinKwon It hasn't been tested in master, I found the problem in 3.0.1, and I can test it in master later.

@dingsl-giser dingsl-giser reopened this May 16, 2022
@dingsl-giser dingsl-giser requested a review from HyukjinKwon May 16, 2022 06:23
@dingsl-giser dingsl-giser changed the title [SPARK-39176][PYSPARK][WINDOWS] Fixed a problem with pyspark serializing pre-1970 datetime in windows [SPARK-39176][PYSPARK] Fixed a problem with pyspark serializing pre-1970 datetime in windows May 16, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants