Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

if sys.version >= '3':
basestring = unicode = str
xrange = range
else:
from itertools import imap as map

Expand Down Expand Up @@ -416,6 +417,50 @@ def _createFromLocal(self, data, schema):
data = [schema.toInternal(row) for row in data]
return self._sc.parallelize(data), schema

def _get_numpy_record_dtypes(self, rec):
"""
Used when converting a pandas.DataFrame to Spark using to_records(), this will correct
the dtypes of records so they can be properly loaded into Spark.
:param rec: a numpy record to check dtypes
:return corrected dtypes for a numpy.record or None if no correction needed
"""
import numpy as np
cur_dtypes = rec.dtype
col_names = cur_dtypes.names
record_type_list = []
has_rec_fix = False
for i in xrange(len(cur_dtypes)):
curr_type = cur_dtypes[i]
# If type is a datetime64 timestamp, convert to microseconds
# NOTE: if dtype is datetime[ns] then np.record.tolist() will output values as longs,
# conversion from [us] or lower will lead to py datetime objects, see SPARK-22417
if curr_type == np.dtype('datetime64[ns]'):
curr_type = 'datetime64[us]'
has_rec_fix = True
record_type_list.append((str(col_names[i]), curr_type))
return record_type_list if has_rec_fix else None

def _convert_from_pandas(self, pdf, schema):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can remove schema parameter from here because the schema doesn't affect the conversion now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it isn't used in the conversion any more, but I think it should still be here for the case when it's None and then assigned to a list of the pdf column names. That way we can keep all pandas related code in this method.

"""
Convert a pandas.DataFrame to list of records that can be used to make a DataFrame
:return tuple of list of records and schema
"""
# If no schema supplied by user then get the names of columns only
if schema is None:
schema = [str(x) for x in pdf.columns]

# Convert pandas.DataFrame to list of numpy records
np_records = pdf.to_records(index=False)

# Check if any columns need to be fixed for Spark to infer properly
if len(np_records) > 0:
record_type_list = self._get_numpy_record_dtypes(np_records[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just use pdf.dtype?

>>> pdf.dtypes[0]
dtype('int64')
>>> pdf.to_records(index=False)[0].dtype[0]
dtype('int64')

I think they are same

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dtype for a numpy record is in a different format

n [16]: r
Out[16]: (0, datetime.date(2017, 11, 6), 1509411661000000000L)

In [17]: r.dtype
Out[17]: dtype((numpy.record, [(u'index', '<i8'), (u'd', 'O'), (u'ts', '<M8[ns]')]))

so when using numpy.record.astype() it has to be specified in the same format and include dtypes for all fields. If we try to do this with pandas dtypes from the DataFrame, there might be some differences that could cause errors, so I think it's safer to use the dtypes output from numpy and only change the timestamp resolution.

if record_type_list is not None:
return [r.astype(record_type_list).tolist() for r in np_records], schema
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of doing this, we should call DataFrame.astype, which accepts a python dict. Then we can create a dict that maps column name to corrected dtype(only include columns need to cast). We can also specify (copy=False) for better performance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then that would modify the input pandas.DataFrame from the user, which would be bad if they use it after this call. Making a copy of the DataFrame might not be good either if it is large.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok let's copy it. Is it a valid idea to use DataFrame.astype?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will increase performance, we will still have to iterate over each record and convert to a list in addition to making a copy of the timestamp data. Another issue is that using DataFrame.astype will truncate the resolution to microseconds, but Pandas will continue to store as datetime64[ns]. see https://stackoverflow.com/a/32827472

This means that we have to change the conversion routine to separate all columns in the DataFrame and manually convert to rows of records instead of using to_records() and tolist().

I think it would be best to keep the casting on the numpy side, it's safer and keeps things simpler.


# Convert list of numpy records to python lists
return [r.tolist() for r in np_records], schema

@since(2.0)
@ignore_unicode_prefix
def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True):
Expand Down Expand Up @@ -512,9 +557,7 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
except Exception:
has_pandas = False
if has_pandas and isinstance(data, pandas.DataFrame):
if schema is None:
schema = [str(x) for x in data.columns]
data = [r.tolist() for r in data.to_records(index=False)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems r.tolist is the problem, how about r[i] for i in xrange(r.size)? Then we can get numpy.datatype64

>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).to_records(index=False)[0].tolist()[0]
1509411661000000000L
>>> pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]}).to_records(index=False)[0][0]
numpy.datetime64('2017-10-31T02:01:01.000000000+0100')
>>>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but ... numpy.datetime64 is not supported in createDataFrame IIUC:

import pandas as pd
from datetime import datetime
pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)]})
print [[v for v in r] for r in pdf.to_records(index=False)]
spark.createDataFrame([[v for v in r] for r in pdf.to_records(index=False)])
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 591, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/.../spark/python/pyspark/sql/session.py", line 404, in _createFromLocal
    struct = self._inferSchemaFromList(data)
  File "/.../spark/python/pyspark/sql/session.py", line 336, in _inferSchemaFromList
    schema = reduce(_merge_type, map(_infer_schema, data))
  File "/.../spark/python/pyspark/sql/types.py", line 1095, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/.../spark/python/pyspark/sql/types.py", line 1072, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.datetime64'>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(It reminds of me SPARK-6857 BTW)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

according to the ticket, seems we need to convert numpy.datetime64 to python datetime manually.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the problem is that nanosecond values can not be converted to a python datetime object, which only has microsecond resolution, so numpy converts it to long. Numpy will convert microseconds and above to python datetime objects, which Spark will correctly infer.

according to the ticket, seems we need to convert numpy.datetime64 to python datetime manually.

This fix is just meant to convert nanosecond timestamps to microseconds so that calling tolist() can fit them in a python object. Does it seem ok to you guys to leave it at that scope for now?

data, schema = self._convert_from_pandas(data, schema)

if isinstance(schema, StructType):
verify_func = _make_type_verifier(schema) if verifySchema else lambda _: True
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2592,6 +2592,21 @@ def test_create_dataframe_from_array_of_long(self):
df = self.spark.createDataFrame(data)
self.assertEqual(df.first(), Row(longarray=[-9223372036854775808, 0, 9223372036854775807]))

@unittest.skipIf(not _have_pandas, "Pandas not installed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @cloud-fan and @BryanCutler .
This seems to break branch-2.2, but has been hidden behind another SQL error. (cc @gatorsmile , @henryr)
Please see this.

cc @felixcheung since he is RM for 2.2.1.

Copy link
Member Author

@BryanCutler BryanCutler Nov 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dongjoon-hyun for tracking this down. It looks like sql/tests.py for branch-2.2 is just missing the following


_have_pandas = False
try:
    import pandas
    _have_pandas = True
except:
    # No Pandas, but that's okay, we'll skip those tests
    pass

This was probably added from an earlier PR in master and wasn't included when this one was cherry-picked.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a patch a little bit later tonight unless someone is able to get to it first.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can take it over. I'll submit a pr soon.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @BryanCutler !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, @ueshin ! :)

Copy link
Member

@dongjoon-hyun dongjoon-hyun Nov 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, @ueshin .
branch-2.2 Jenkins will fail due to #19701 .
Could you review and merge #19701 first?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, in that case, maybe we need to revert one of the two original patches and fix one by one, or merge the two follow-ups into one as a hot-fix pr. cc @gatorsmile @cloud-fan

def test_create_dataframe_from_pandas_with_timestamp(self):
import pandas as pd
from datetime import datetime
pdf = pd.DataFrame({"ts": [datetime(2017, 10, 31, 1, 1, 1)],
"d": [pd.Timestamp.now().date()]})
# test types are inferred correctly without specifying schema
df = self.spark.createDataFrame(pdf)
self.assertTrue(isinstance(df.schema['ts'].dataType, TimestampType))
self.assertTrue(isinstance(df.schema['d'].dataType, DateType))
# test with schema will accept pdf as input
df = self.spark.createDataFrame(pdf, schema="d date, ts timestamp")
self.assertTrue(isinstance(df.schema['ts'].dataType, TimestampType))
self.assertTrue(isinstance(df.schema['d'].dataType, DateType))


class HiveSparkSubmitTests(SparkSubmitTests):

Expand Down