-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-22417][PYTHON] Fix for createDataFrame from pandas.DataFrame with timestamp #19646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22417][PYTHON] Fix for createDataFrame from pandas.DataFrame with timestamp #19646
Conversation
|
@ueshin and @HyukjinKwon this allows Spark to read non-arrow Pandas timestamps to TimestampTypes instead of long values, but there are a couple things to note. I did the conversion with numpy because we can not make changes to the input pandas.DataFrame and making a copy is too expensive. When |
|
retest this please |
|
Test build #83361 has finished for PR 19646 at commit
|
python/pyspark/sql/session.py
Outdated
| has_rec_fix = True | ||
| record_type_list.append((str(col_names[i]), curr_type)) | ||
| if not has_rec_fix: | ||
| record_type_list = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we put this conversion into an internal method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, probably a good idea. I'll see if I can clean it up some. Thanks @viirya !
python/pyspark/sql/session.py
Outdated
| col_names = cur_dtypes.names | ||
| record_type_list = [] | ||
| has_rec_fix = False | ||
| for i in xrange(len(cur_dtypes)): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, session.py didn't define xrange for version > 3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooops, I forgot about that. thx!
|
retest this please |
python/pyspark/sql/session.py
Outdated
| # If type is a datetime64 timestamp, convert to microseconds | ||
| # NOTE: if dtype is M8[ns] then np.record.tolist() will output values as longs, | ||
| # this conversion will lead to an output of py datetime objects, see SPARK-22417 | ||
| if curr_type == np.dtype('M8[ns]'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this datetime64[ns]? What's the defference between M8[ns] and datetime64[ns]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There shouldn't be any difference for the most part. I only used M8 here because when debugging these types, that is what was being output for the record types by numpy.record.dtype. Would you prefer datetime64 if that works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'd prefer it if that works, otherwise I'd like you to add some comments saying we should use M8[ns] instead of datetime64[ns].
|
retest this please |
| from datetime import datetime | ||
| pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)], | ||
| "d": [pd.Timestamp.now().date()]}) | ||
| df = self.spark.createDataFrame(pdf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we specify the schema? For example:
df = self.spark.createDataFrame(pdf, "ts timestamp, d date")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was checking this PR and ran this for my curiosity. I got:
import pandas as pd
from datetime import datetime
pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)], "d": [pd.Timestamp.now().date()]})
spark.createDataFrame(pdf, "d date, ts timestamp")Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/session.py", line 587, in createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
File "/.../spark/python/pyspark/sql/session.py", line 401, in _createFromLocal
data = list(data)
File "/.../spark/python/pyspark/sql/session.py", line 567, in prepare
verify_func(obj)
File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify
verify_value(obj)
File "/.../spark/python/pyspark/sql/types.py", line 1392, in verify_struct
verifier(v)
File "/.../spark/python/pyspark/sql/types.py", line 1411, in verify
verify_value(obj)
File "/.../spark/python/pyspark/sql/types.py", line 1405, in verify_default
verify_acceptable_types(obj)
File "/.../spark/python/pyspark/sql/types.py", line 1300, in verify_acceptable_types
% (dataType, obj, type(obj))))
TypeError: field ts: TimestampType can not accept object 1509411661000000000L in type <type 'long'>
Seems we should fix this one too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, looks like that needs to be fixed also. I thought it was working when schema was supplied, but I'll double-check and add that into the tests.
|
Test build #83380 has finished for PR 19646 at commit
|
python/pyspark/sql/session.py
Outdated
| :return tuple of list of records and schema | ||
| """ | ||
| # Convert pandas.DataFrame to list of numpy records | ||
| np_records = pdf.to_records(index=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after toRecords, what's the type of timestamp value? python datetime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got:
>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).to_records(index=False)[0].tolist()[0]
1509411661000000000L
>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).to_records(index=False)[0][0]
numpy.datetime64('2017-10-31T01:01:01.000000000')whereas:
>>> pd.DataFrame({"d": [pd.Timestamp.now().date()]}).to_records(index=False)[0].tolist()[0]
datetime.date(2017, 11, 3)
>>> pd.DataFrame({"d": [pd.Timestamp.now().date()]}).to_records(index=False)[0][0]
datetime.date(2017, 11, 3)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks! I also tried the data type:
>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).dtypes
ts datetime64[ns]
dtype: object
>>> pd.DataFrame({"d": [pd.Timestamp.now().date()]}).dtypes
d object
dtype: object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
toRecords makes a numpy array of numpy records, and the timestamp dtype is datetime64. Calling toList() on a record converts everything to a list of python objects.
|
@BryanCutler is this tagged to the right JIRA? |
python/pyspark/sql/session.py
Outdated
| data = [schema.toInternal(row) for row in data] | ||
| return self._sc.parallelize(data), schema | ||
|
|
||
| def _getNumpyRecordDtypes(self, rec): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: _getNumpyRecordDtypes -> _get_numpy_record_dtypes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's confusing .. but I usually use thisNamingRule mainly on the purpose of API consistency and otherwise use this_naming_rule. I actually checked and read documentation and other codes few times for clarification for myself .. I believe this_naming_rule is preferred by PEP 8.
But I know that the doc says:
mixedCase is allowed only in contexts where that's already the prevailing style (e.g. threading.py), to retain backwards compatibility.
but .. I believe we should avoid thisNamingRule if it's in particular for internal use and/or unrelated with compatibility. Up to my knowledge, threading.py is the similar case I believe ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I agree we should be using lowercase with underscores which is more of the convention for python. I was only using this format to stay consistent with the rest of the file, but I can change it. Just for the new methods right?
python/pyspark/sql/session.py
Outdated
| record_type_list.append((str(col_names[i]), curr_type)) | ||
| return record_type_list if has_rec_fix else None | ||
|
|
||
| def _convertFromPandas(self, pdf, schema): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for naming
|
I think this should be linked to SPARK-22417. |
| if has_pandas and isinstance(data, pandas.DataFrame): | ||
| if schema is None: | ||
| schema = [str(x) for x in data.columns] | ||
| data = [r.tolist() for r in data.to_records(index=False)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems r.tolist is the problem, how about r[i] for i in xrange(r.size)? Then we can get numpy.datatype64
>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).to_records(index=False)[0].tolist()[0]
1509411661000000000L
>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).to_records(index=False)[0][0]
numpy.datetime64('2017-10-31T02:01:01.000000000+0100')
>>>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but ... numpy.datetime64 is not supported in createDataFrame IIUC:
import pandas as pd
from datetime import datetime
pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]})
print [[v for v in r] for r in pdf.to_records(index=False)]
spark.createDataFrame([[v for v in r] for r in pdf.to_records(index=False)])Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/sql/session.py", line 591, in createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
File "/.../spark/python/pyspark/sql/session.py", line 404, in _createFromLocal
struct = self._inferSchemaFromList(data)
File "/.../spark/python/pyspark/sql/session.py", line 336, in _inferSchemaFromList
schema = reduce(_merge_type, map(_infer_schema, data))
File "/.../spark/python/pyspark/sql/types.py", line 1095, in _infer_schema
fields = [StructField(k, _infer_type(v), True) for k, v in items]
File "/.../spark/python/pyspark/sql/types.py", line 1072, in _infer_type
raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.datetime64'>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(It reminds of me SPARK-6857 BTW)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
according to the ticket, seems we need to convert numpy.datetime64 to python datetime manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem is that nanosecond values can not be converted to a python datetime object, which only has microsecond resolution, so numpy converts it to long. Numpy will convert microseconds and above to python datetime objects, which Spark will correctly infer.
according to the ticket, seems we need to convert numpy.datetime64 to python datetime manually.
This fix is just meant to convert nanosecond timestamps to microseconds so that calling tolist() can fit them in a python object. Does it seem ok to you guys to leave it at that scope for now?
|
|
||
| # Check if any columns need to be fixed for Spark to infer properly | ||
| if len(np_records) > 0: | ||
| record_type_list = self._get_numpy_record_dtypes(np_records[0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we just use pdf.dtype?
>>> pdf.dtypes[0]
dtype('int64')
>>> pdf.to_records(index=False)[0].dtype[0]
dtype('int64')
I think they are same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dtype for a numpy record is in a different format
n [16]: r
Out[16]: (0, datetime.date(2017, 11, 6), 1509411661000000000L)
In [17]: r.dtype
Out[17]: dtype((numpy.record, [(u'index', '<i8'), (u'd', 'O'), (u'ts', '<M8[ns]')]))
so when using numpy.record.astype() it has to be specified in the same format and include dtypes for all fields. If we try to do this with pandas dtypes from the DataFrame, there might be some differences that could cause errors, so I think it's safer to use the dtypes output from numpy and only change the timestamp resolution.
| if len(np_records) > 0: | ||
| record_type_list = self._get_numpy_record_dtypes(np_records[0]) | ||
| if record_type_list is not None: | ||
| return [r.astype(record_type_list).tolist() for r in np_records], schema |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of doing this, we should call DataFrame.astype, which accepts a python dict. Then we can create a dict that maps column name to corrected dtype(only include columns need to cast). We can also specify (copy=False) for better performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then that would modify the input pandas.DataFrame from the user, which would be bad if they use it after this call. Making a copy of the DataFrame might not be good either if it is large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok let's copy it. Is it a valid idea to use DataFrame.astype?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this will increase performance, we will still have to iterate over each record and convert to a list in addition to making a copy of the timestamp data. Another issue is that using DataFrame.astype will truncate the resolution to microseconds, but Pandas will continue to store as datetime64[ns]. see https://stackoverflow.com/a/32827472
This means that we have to change the conversion routine to separate all columns in the DataFrame and manually convert to rows of records instead of using to_records() and tolist().
I think it would be best to keep the casting on the numpy side, it's safer and keeps things simpler.
|
Test build #83508 has finished for PR 19646 at commit
|
| record_type_list.append((str(col_names[i]), curr_type)) | ||
| return record_type_list if has_rec_fix else None | ||
|
|
||
| def _convert_from_pandas(self, pdf, schema): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can remove schema parameter from here because the schema doesn't affect the conversion now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it isn't used in the conversion any more, but I think it should still be here for the case when it's None and then assigned to a list of the pdf column names. That way we can keep all pandas related code in this method.
|
LGTM, merging to master! |
…ith timestamp Currently, a pandas.DataFrame that contains a timestamp of type 'datetime64[ns]' when converted to a Spark DataFrame with `createDataFrame` will interpret the values as LongType. This fix will check for a timestamp type and convert it to microseconds which will allow Spark to read as TimestampType. Added unit test to verify Spark schema is expected for TimestampType and DateType when created from pandas Author: Bryan Cutler <cutlerb@gmail.com> Closes #19646 from BryanCutler/pyspark-non-arrow-createDataFrame-ts-fix-SPARK-22417. (cherry picked from commit 1d34104) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
|
also backport to 2.2 since we consider it's a bug. |
|
Thanks @cloud-fan , |
| df = self.spark.createDataFrame(data) | ||
| self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807])) | ||
|
|
||
| @unittest.skipIf(not _have_pandas, "Pandas not installed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @cloud-fan and @BryanCutler .
This seems to break branch-2.2, but has been hidden behind another SQL error. (cc @gatorsmile , @henryr)
Please see this.
cc @felixcheung since he is RM for 2.2.1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @dongjoon-hyun for tracking this down. It looks like sql/tests.py for branch-2.2 is just missing the following
_have_pandas = False
try:
import pandas
_have_pandas = True
except:
# No Pandas, but that's okay, we'll skip those tests
pass
This was probably added from an earlier PR in master and wasn't included when this one was cherry-picked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can add a patch a little bit later tonight unless someone is able to get to it first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can take it over. I'll submit a pr soon.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @BryanCutler !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, @ueshin ! :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, in that case, maybe we need to revert one of the two original patches and fix one by one, or merge the two follow-ups into one as a hot-fix pr. cc @gatorsmile @cloud-fan
…rom pandas.DataFrame with timestamp ## What changes were proposed in this pull request? This is a follow-up of #19646 for branch-2.2. The original pr breaks branch-2.2 because the cherry-picked patch doesn't include some code which exists in master. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes #19704 from ueshin/issues/SPARK-22417_2.2/fup1.
…ith timestamp Currently, a pandas.DataFrame that contains a timestamp of type 'datetime64[ns]' when converted to a Spark DataFrame with `createDataFrame` will interpret the values as LongType. This fix will check for a timestamp type and convert it to microseconds which will allow Spark to read as TimestampType. Added unit test to verify Spark schema is expected for TimestampType and DateType when created from pandas Author: Bryan Cutler <cutlerb@gmail.com> Closes apache#19646 from BryanCutler/pyspark-non-arrow-createDataFrame-ts-fix-SPARK-22417. (cherry picked from commit 1d34104) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…rom pandas.DataFrame with timestamp ## What changes were proposed in this pull request? This is a follow-up of apache#19646 for branch-2.2. The original pr breaks branch-2.2 because the cherry-picked patch doesn't include some code which exists in master. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes apache#19704 from ueshin/issues/SPARK-22417_2.2/fup1.
What changes were proposed in this pull request?
Currently, a pandas.DataFrame that contains a timestamp of type 'datetime64[ns]' when converted to a Spark DataFrame with
createDataFramewill interpret the values as LongType. This fix will check for a timestamp type and convert it to microseconds which will allow Spark to read as TimestampType.How was this patch tested?
Added unit test to verify Spark schema is expected for TimestampType and DateType when created from pandas