Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/pyspark/sql/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def registerFunction(self, name, f, returnType=StringType()):

:param name: name of the UDF
:param f: python function
:param returnType: a :class:`DataType` object
:param returnType: a :class:`pyspark.sql.types.DataType` object

>>> spark.catalog.registerFunction("stringLengthString", lambda x: len(x))
>>> spark.sql("SELECT stringLengthString('test')").collect()
Expand Down
44 changes: 25 additions & 19 deletions python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ def udf(self):
@since(1.4)
def range(self, start, end=None, step=1, numPartitions=None):
"""
Create a :class:`DataFrame` with single LongType column named `id`,
containing elements in a range from `start` to `end` (exclusive) with
step value `step`.
Create a :class:`DataFrame` with single :class:`pyspark.sql.types.LongType` column named
``id``, containing elements in a range from ``start`` to ``end`` (exclusive) with
step value ``step``.

:param start: the start value
:param end: the end value (exclusive)
Expand Down Expand Up @@ -184,7 +184,7 @@ def registerFunction(self, name, f, returnType=StringType()):

:param name: name of the UDF
:param f: python function
:param returnType: a :class:`DataType` object
:param returnType: a :class:`pyspark.sql.types.DataType` object

>>> sqlContext.registerFunction("stringLengthString", lambda x: len(x))
>>> sqlContext.sql("SELECT stringLengthString('test')").collect()
Expand All @@ -209,7 +209,7 @@ def _inferSchema(self, rdd, samplingRatio=None):

:param rdd: an RDD of Row or tuple
:param samplingRatio: sampling ratio, or no sampling (default)
:return: StructType
:return: :class:`pyspark.sql.types.StructType`
"""
return self.sparkSession._inferSchema(rdd, samplingRatio)

Expand All @@ -226,28 +226,34 @@ def createDataFrame(self, data, schema=None, samplingRatio=None):
from ``data``, which should be an RDD of :class:`Row`,
or :class:`namedtuple`, or :class:`dict`.

When ``schema`` is :class:`DataType` or datatype string, it must match the real data, or
exception will be thrown at runtime. If the given schema is not StructType, it will be
wrapped into a StructType as its only field, and the field name will be "value", each record
will also be wrapped into a tuple, which can be converted to row later.
When ``schema`` is :class:`pyspark.sql.types.DataType` or
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a mistake here, thinking "datatype string" was actually meant to be StringType(). I understand now that a datatype string is actually a thing.

Correction incoming...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correction here: #14408

:class:`pyspark.sql.types.StringType`, it must match the
real data, or an exception will be thrown at runtime. If the given schema is not
:class:`pyspark.sql.types.StructType`, it will be wrapped into a
:class:`pyspark.sql.types.StructType` as its only field, and the field name will be "value",
each record will also be wrapped into a tuple, which can be converted to row later.

If schema inference is needed, ``samplingRatio`` is used to determined the ratio of
rows used for schema inference. The first row will be used if ``samplingRatio`` is ``None``.

:param data: an RDD of any kind of SQL data representation(e.g. row, tuple, int, boolean,
etc.), or :class:`list`, or :class:`pandas.DataFrame`.
:param schema: a :class:`DataType` or a datatype string or a list of column names, default
is None. The data type string format equals to `DataType.simpleString`, except that
top level struct type can omit the `struct<>` and atomic types use `typeName()` as
their format, e.g. use `byte` instead of `tinyint` for ByteType. We can also use `int`
as a short name for IntegerType.
:param data: an RDD of any kind of SQL data representation(e.g. :class:`Row`,
:class:`tuple`, ``int``, ``boolean``, etc.), or :class:`list`, or
:class:`pandas.DataFrame`.
:param schema: a :class:`pyspark.sql.types.DataType` or a
:class:`pyspark.sql.types.StringType` or a list of
column names, default is None. The data type string format equals to
:class:`pyspark.sql.types.DataType.simpleString`, except that top level struct type can
omit the ``struct<>`` and atomic types use ``typeName()`` as their format, e.g. use
``byte`` instead of ``tinyint`` for :class:`pyspark.sql.types.ByteType`.
We can also use ``int`` as a short name for :class:`pyspark.sql.types.IntegerType`.
:param samplingRatio: the sample ratio of rows used for inferring
:return: :class:`DataFrame`

.. versionchanged:: 2.0
The schema parameter can be a DataType or a datatype string after 2.0. If it's not a
StructType, it will be wrapped into a StructType and each record will also be wrapped
into a tuple.
The ``schema`` parameter can be a :class:`pyspark.sql.types.DataType` or a
:class:`pyspark.sql.types.StringType` after 2.0.
If it's not a :class:`pyspark.sql.types.StructType`, it will be wrapped into a
:class:`pyspark.sql.types.StructType` and each record will also be wrapped into a tuple.

>>> l = [('Alice', 1)]
>>> sqlContext.createDataFrame(l).collect()
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def writeStream(self):
@property
@since(1.3)
def schema(self):
"""Returns the schema of this :class:`DataFrame` as a :class:`types.StructType`.
"""Returns the schema of this :class:`DataFrame` as a :class:`pyspark.sql.types.StructType`.

>>> df.schema
StructType(List(StructField(age,IntegerType,true),StructField(name,StringType,true)))
Expand Down
21 changes: 13 additions & 8 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def _():
_binary_mathfunctions = {
'atan2': 'Returns the angle theta from the conversion of rectangular coordinates (x, y) to' +
'polar coordinates (r, theta).',
'hypot': 'Computes `sqrt(a^2 + b^2)` without intermediate overflow or underflow.',
'hypot': 'Computes ``sqrt(a^2 + b^2)`` without intermediate overflow or underflow.',
'pow': 'Returns the value of the first argument raised to the power of the second argument.',
}

Expand Down Expand Up @@ -958,7 +958,8 @@ def months_between(date1, date2):
@since(1.5)
def to_date(col):
"""
Converts the column of StringType or TimestampType into DateType.
Converts the column of :class:`pyspark.sql.types.StringType` or
:class:`pyspark.sql.types.TimestampType` into :class:`pyspark.sql.types.DateType`.

>>> df = spark.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_date(df.t).alias('date')).collect()
Expand Down Expand Up @@ -1074,18 +1075,18 @@ def window(timeColumn, windowDuration, slideDuration=None, startTime=None):
[12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in
the order of months are not supported.

The time column must be of TimestampType.
The time column must be of :class:`pyspark.sql.types.TimestampType`.

Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid
interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'.
If the `slideDuration` is not provided, the windows will be tumbling windows.
If the ``slideDuration`` is not provided, the windows will be tumbling windows.

The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start
window intervals. For example, in order to have hourly tumbling windows that start 15 minutes
past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`.

The output column will be a struct called 'window' by default with the nested columns 'start'
and 'end', where 'start' and 'end' will be of `TimestampType`.
and 'end', where 'start' and 'end' will be of :class:`pyspark.sql.types.TimestampType`.

>>> df = spark.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val")
>>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum"))
Expand Down Expand Up @@ -1367,7 +1368,7 @@ def locate(substr, str, pos=1):
could not be found in str.

:param substr: a string
:param str: a Column of StringType
:param str: a Column of :class:`pyspark.sql.types.StringType`
:param pos: start position (zero based)

>>> df = spark.createDataFrame([('abcd',)], ['s',])
Expand Down Expand Up @@ -1506,8 +1507,9 @@ def bin(col):
@ignore_unicode_prefix
@since(1.5)
def hex(col):
"""Computes hex value of the given column, which could be StringType,
BinaryType, IntegerType or LongType.
"""Computes hex value of the given column, which could be :class:`pyspark.sql.types.StringType`,
:class:`pyspark.sql.types.BinaryType`, :class:`pyspark.sql.types.IntegerType` or
:class:`pyspark.sql.types.LongType`.

>>> spark.createDataFrame([('ABC', 3)], ['a', 'b']).select(hex('a'), hex('b')).collect()
[Row(hex(a)=u'414243', hex(b)=u'3')]
Expand Down Expand Up @@ -1781,6 +1783,9 @@ def udf(f, returnType=StringType()):
duplicate invocations may be eliminated or the function may even be invoked more times than
it is present in the query.

:param f: python function
:param returnType: a :class:`pyspark.sql.types.DataType` object

>>> from pyspark.sql.types import IntegerType
>>> slen = udf(lambda s: len(s), IntegerType())
>>> df.select(slen(df.name).alias('slen')).collect()
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def schema(self, schema):
By specifying the schema here, the underlying data source can skip the schema
inference step, and thus speed up data loading.

:param schema: a StructType object
:param schema: a :class:`pyspark.sql.types.StructType` object
"""
if not isinstance(schema, StructType):
raise TypeError("schema should be StructType")
Expand Down Expand Up @@ -125,7 +125,7 @@ def load(self, path=None, format=None, schema=None, **options):

:param path: optional string or a list of string for file-system backed data sources.
:param format: optional string for format of the data source. Default to 'parquet'.
:param schema: optional :class:`StructType` for the input schema.
:param schema: optional :class:`pyspark.sql.types.StructType` for the input schema.
:param options: all other string options

>>> df = spark.read.load('python/test_support/sql/parquet_partitioned', opt1=True,
Expand Down Expand Up @@ -166,7 +166,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,

:param path: string represents path to the JSON dataset,
or RDD of Strings storing JSON objects.
:param schema: an optional :class:`StructType` for the input schema.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
:param primitivesAsString: infers all primitive values as a string type. If None is set,
it uses the default value, ``false``.
:param prefersDecimal: infers all floating-point values as a decimal type. If the values
Expand Down Expand Up @@ -294,7 +294,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
``inferSchema`` option or specify the schema explicitly using ``schema``.

:param path: string, or list of strings, for input path(s).
:param schema: an optional :class:`StructType` for the input schema.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
:param sep: sets the single character as a separator for each field and value.
If None is set, it uses the default value, ``,``.
:param encoding: decodes the CSV files by the given encoding type. If None is set,
Expand Down
41 changes: 23 additions & 18 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def toDF(self, schema=None, sampleRatio=None):

This is a shorthand for ``spark.createDataFrame(rdd, schema, sampleRatio)``

:param schema: a StructType or list of names of columns
:param schema: a :class:`pyspark.sql.types.StructType` or list of names of columns
:param samplingRatio: the sample ratio of rows used for inferring
:return: a DataFrame

Expand Down Expand Up @@ -274,9 +274,9 @@ def udf(self):
@since(2.0)
def range(self, start, end=None, step=1, numPartitions=None):
"""
Create a :class:`DataFrame` with single LongType column named `id`,
containing elements in a range from `start` to `end` (exclusive) with
step value `step`.
Create a :class:`DataFrame` with single :class:`pyspark.sql.types.LongType` column named
``id``, containing elements in a range from ``start`` to ``end`` (exclusive) with
step value ``step``.

:param start: the start value
:param end: the end value (exclusive)
Expand Down Expand Up @@ -307,7 +307,7 @@ def _inferSchemaFromList(self, data):
Infer schema from list of Row or tuple.

:param data: list of Row or tuple
:return: StructType
:return: :class:`pyspark.sql.types.StructType`
"""
if not data:
raise ValueError("can not infer schema from empty dataset")
Expand All @@ -326,7 +326,7 @@ def _inferSchema(self, rdd, samplingRatio=None):

:param rdd: an RDD of Row or tuple
:param samplingRatio: sampling ratio, or no sampling (default)
:return: StructType
:return: :class:`pyspark.sql.types.StructType`
"""
first = rdd.first()
if not first:
Expand Down Expand Up @@ -414,28 +414,33 @@ def createDataFrame(self, data, schema=None, samplingRatio=None):
from ``data``, which should be an RDD of :class:`Row`,
or :class:`namedtuple`, or :class:`dict`.

When ``schema`` is :class:`DataType` or datatype string, it must match the real data, or
exception will be thrown at runtime. If the given schema is not StructType, it will be
wrapped into a StructType as its only field, and the field name will be "value", each record
will also be wrapped into a tuple, which can be converted to row later.
When ``schema`` is :class:`pyspark.sql.types.DataType` or
:class:`pyspark.sql.types.StringType`, it must match the
real data, or an exception will be thrown at runtime. If the given schema is not
:class:`pyspark.sql.types.StructType`, it will be wrapped into a
:class:`pyspark.sql.types.StructType` as its only field, and the field name will be "value",
each record will also be wrapped into a tuple, which can be converted to row later.

If schema inference is needed, ``samplingRatio`` is used to determined the ratio of
rows used for schema inference. The first row will be used if ``samplingRatio`` is ``None``.

:param data: an RDD of any kind of SQL data representation(e.g. row, tuple, int, boolean,
etc.), or :class:`list`, or :class:`pandas.DataFrame`.
:param schema: a :class:`DataType` or a datatype string or a list of column names, default
is None. The data type string format equals to `DataType.simpleString`, except that
top level struct type can omit the `struct<>` and atomic types use `typeName()` as
their format, e.g. use `byte` instead of `tinyint` for ByteType. We can also use `int`
as a short name for IntegerType.
:param schema: a :class:`pyspark.sql.types.DataType` or a
:class:`pyspark.sql.types.StringType` or a list of
column names, default is ``None``. The data type string format equals to
:class:`pyspark.sql.types.DataType.simpleString`, except that top level struct type can
omit the ``struct<>`` and atomic types use ``typeName()`` as their format, e.g. use
``byte`` instead of ``tinyint`` for :class:`pyspark.sql.types.ByteType`. We can also use
``int`` as a short name for ``IntegerType``.
:param samplingRatio: the sample ratio of rows used for inferring
:return: :class:`DataFrame`

.. versionchanged:: 2.0
The schema parameter can be a DataType or a datatype string after 2.0. If it's not a
StructType, it will be wrapped into a StructType and each record will also be wrapped
into a tuple.
The ``schema`` parameter can be a :class:`pyspark.sql.types.DataType` or a
:class:`pyspark.sql.types.StringType` after 2.0. If it's not a
:class:`pyspark.sql.types.StructType`, it will be wrapped into a
:class:`pyspark.sql.types.StructType` and each record will also be wrapped into a tuple.

>>> l = [('Alice', 1)]
>>> spark.createDataFrame(l).collect()
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def schema(self, schema):

.. note:: Experimental.

:param schema: a StructType object
:param schema: a :class:`pyspark.sql.types.StructType` object

>>> s = spark.readStream.schema(sdf_schema)
"""
Expand Down Expand Up @@ -310,7 +310,7 @@ def load(self, path=None, format=None, schema=None, **options):

:param path: optional string for file-system backed data sources.
:param format: optional string for format of the data source. Default to 'parquet'.
:param schema: optional :class:`StructType` for the input schema.
:param schema: optional :class:`pyspark.sql.types.StructType` for the input schema.
:param options: all other string options

>>> json_sdf = spark.readStream.format("json")\
Expand Down Expand Up @@ -349,7 +349,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,

:param path: string represents path to the JSON dataset,
or RDD of Strings storing JSON objects.
:param schema: an optional :class:`StructType` for the input schema.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
:param primitivesAsString: infers all primitive values as a string type. If None is set,
it uses the default value, ``false``.
:param prefersDecimal: infers all floating-point values as a decimal type. If the values
Expand Down Expand Up @@ -461,7 +461,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
.. note:: Experimental.

:param path: string, or list of strings, for input path(s).
:param schema: an optional :class:`StructType` for the input schema.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
:param sep: sets the single character as a separator for each field and value.
If None is set, it uses the default value, ``,``.
:param encoding: decodes the CSV files by the given encoding type. If None is set,
Expand Down
7 changes: 4 additions & 3 deletions python/pyspark/sql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,9 +786,10 @@ def _parse_struct_fields_string(s):
def _parse_datatype_string(s):
"""
Parses the given data type string to a :class:`DataType`. The data type string format equals
to `DataType.simpleString`, except that top level struct type can omit the `struct<>` and
atomic types use `typeName()` as their format, e.g. use `byte` instead of `tinyint` for
ByteType. We can also use `int` as a short name for IntegerType.
to :class:`DataType.simpleString`, except that top level struct type can omit
the ``struct<>`` and atomic types use ``typeName()`` as their format, e.g. use ``byte`` instead
of ``tinyint`` for :class:`ByteType`. We can also use ``int`` as a short name
for :class:`IntegerType`.

>>> _parse_datatype_string("int ")
IntegerType
Expand Down