Skip to content

Conversation

@dingsl-giser
Copy link

What changes were proposed in this pull request?

Fix problems with pyspark in Windows:

  1. Fixed datetime conversion to timestamp before 1970;
  2. Fixed datetime conversion when timestamp is negative;
  3. Adding a test script.

Why are the changes needed?

Pyspark has problems serializing pre-1970 times in Windows.

An exception occurs when executing the following code under Windows:

rdd = sc.parallelize([('a', datetime(1957, 1, 9, 0, 0)),
                      ('b', datetime(2014, 1, 27, 0, 0))])
df = spark.createDataFrame(rdd, ["id", "date"])

df.show()
df.printSchema()

print(df.collect())
  File "...\spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 195, in toInternal
    else time.mktime(dt.timetuple()))
OverflowError: mktime argument out of range

at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more

and

File ...\spark\python\lib\pyspark.zip\pyspark\sql\types.py, in fromInternal:
Line 207:   return datetime.datetime.fromtimestamp(ts // 1000000).replace(microsecond=ts % 1000000)

OSError: [Errno 22] Invalid argument

After updating the code, the above code was run successfully!

+---+-------------------+
| id|               date|
+---+-------------------+
|  a|1957-01-08 16:00:00|
|  b|2014-01-26 16:00:00|
+---+-------------------+

root
 |-- id: string (nullable = true)
 |-- date: timestamp (nullable = true)

[Row(id='a', date=datetime.datetime(1957, 1, 9, 0, 0)), Row(id='b', date=datetime.datetime(2014, 1, 27, 0, 0))] 

Does this PR introduce any user-facing change?

No

How was this patch tested?

New and existing test suites

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants