Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions python/pyspark/sql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@
"""
from __future__ import absolute_import


def since(version):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question: Should this be in the root pyspark package instead of SQL? We might want to also use this for core APIs as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'v planned to move this into pyspark/init.py later, let's focus on SQL API in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, gotcha. Just asking because #6275 adds a missing startTime property to PySpark's context and I was thinking about suggesting that we add the @since tag there.

def deco(f):
f.__doc__ = f.__doc__.rstrip() + "\n\n.. versionadded:: %s" % version
return f
return deco

# fix the module name conflict for Python 3+
import sys
from . import _types as types
Expand Down
12 changes: 12 additions & 0 deletions python/pyspark/sql/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from pyspark.context import SparkContext
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql import since
from pyspark.sql.types import *

__all__ = ["DataFrame", "Column", "SchemaRDD", "DataFrameNaFunctions",
Expand Down Expand Up @@ -114,6 +115,8 @@ class Column(object):
# 2. Create from an expression
df.colName + 1
1 / df.colName

.. versionadded:: 1.3
"""

def __init__(self, jc):
Expand Down Expand Up @@ -159,6 +162,7 @@ def __init__(self, jc):
bitwiseAND = _bin_op("bitwiseAND")
bitwiseXOR = _bin_op("bitwiseXOR")

@since(1.3)
def getItem(self, key):
"""An expression that gets an item at position `ordinal` out of a list,
or gets an item by key out of a dict.
Expand All @@ -179,6 +183,7 @@ def getItem(self, key):
"""
return self[key]

@since(1.3)
def getField(self, name):
"""An expression that gets a field by name in a StructField.

Expand Down Expand Up @@ -211,6 +216,7 @@ def __getattr__(self, item):
endswith = _bin_op("endsWith")

@ignore_unicode_prefix
@since(1.3)
def substr(self, startPos, length):
"""
Return a :class:`Column` which is a substring of the column
Expand All @@ -234,6 +240,7 @@ def substr(self, startPos, length):
__getslice__ = substr

@ignore_unicode_prefix
@since(1.3)
def inSet(self, *cols):
""" A boolean expression that is evaluated to true if the value of this
expression is contained by the evaluated values of the arguments.
Expand All @@ -259,6 +266,7 @@ def inSet(self, *cols):
isNull = _unary_op("isNull", "True if the current expression is null.")
isNotNull = _unary_op("isNotNull", "True if the current expression is not null.")

@since(1.3)
def alias(self, *alias):
"""Returns this column aliased with a new name or names (in the case of expressions that
return more than one column, such as explode).
Expand All @@ -274,6 +282,7 @@ def alias(self, *alias):
return Column(getattr(self._jc, "as")(_to_seq(sc, list(alias))))

@ignore_unicode_prefix
@since(1.3)
def cast(self, dataType):
""" Convert the column into type `dataType`

Expand All @@ -294,13 +303,15 @@ def cast(self, dataType):
return Column(jc)

@ignore_unicode_prefix
@since(1.3)
def between(self, lowerBound, upperBound):
""" A boolean expression that is evaluated to true if the value of this
expression is between the given columns.
"""
return (self >= lowerBound) & (self <= upperBound)

@ignore_unicode_prefix
@since(1.4)
def when(self, condition, value):
"""Evaluates a list of conditions and returns one of multiple possible result expressions.
If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
Expand All @@ -319,6 +330,7 @@ def when(self, condition, value):
return Column(jc)

@ignore_unicode_prefix
@since(1.4)
def otherwise(self, value):
"""Evaluates a list of conditions and returns one of multiple possible result expressions.
If :func:`Column.otherwise` is not invoked, None is returned for unmatched conditions.
Expand Down
29 changes: 26 additions & 3 deletions python/pyspark/sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from pyspark.rdd import RDD, _prepare_for_python_RDD, ignore_unicode_prefix
from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
from pyspark.sql import since
from pyspark.sql.types import Row, StringType, StructType, _verify_type, \
_infer_schema, _has_nulltype, _merge_type, _create_converter, _python_to_sql_converter
from pyspark.sql.dataframe import DataFrame
Expand Down Expand Up @@ -106,11 +107,13 @@ def _ssql_ctx(self):
self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc())
return self._scala_SQLContext

@since(1.3)
def setConf(self, key, value):
"""Sets the given Spark SQL configuration property.
"""
self._ssql_ctx.setConf(key, value)

@since(1.3)
def getConf(self, key, defaultValue):
"""Returns the value of Spark SQL configuration property for the given key.

Expand All @@ -119,10 +122,12 @@ def getConf(self, key, defaultValue):
return self._ssql_ctx.getConf(key, defaultValue)

@property
@since("1.3.1")
def udf(self):
"""Returns a :class:`UDFRegistration` for UDF registration."""
return UDFRegistration(self)

@since(1.4)
def range(self, start, end, step=1, numPartitions=None):
"""
Create a :class:`DataFrame` with single LongType column named `id`,
Expand All @@ -144,6 +149,7 @@ def range(self, start, end, step=1, numPartitions=None):
return DataFrame(jdf, self)

@ignore_unicode_prefix
@since(1.2)
def registerFunction(self, name, f, returnType=StringType()):
"""Registers a lambda function as a UDF so it can be used in SQL statements.

Expand Down Expand Up @@ -210,7 +216,8 @@ def _inferSchema(self, rdd, samplingRatio=None):

@ignore_unicode_prefix
def inferSchema(self, rdd, samplingRatio=None):
"""::note: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
.. note:: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
warnings.warn("inferSchema is deprecated, please use createDataFrame instead")

Expand All @@ -221,7 +228,8 @@ def inferSchema(self, rdd, samplingRatio=None):

@ignore_unicode_prefix
def applySchema(self, rdd, schema):
"""::note: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
.. note:: Deprecated in 1.3, use :func:`createDataFrame` instead.
"""
warnings.warn("applySchema is deprecated, please use createDataFrame instead")

Expand All @@ -233,6 +241,7 @@ def applySchema(self, rdd, schema):

return self.createDataFrame(rdd, schema)

@since(1.3)
@ignore_unicode_prefix
def createDataFrame(self, data, schema=None, samplingRatio=None):
"""
Expand Down Expand Up @@ -337,6 +346,7 @@ def createDataFrame(self, data, schema=None, samplingRatio=None):
df = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.json())
return DataFrame(df, self)

@since(1.3)
def registerDataFrameAsTable(self, df, tableName):
"""Registers the given :class:`DataFrame` as a temporary table in the catalog.

Expand All @@ -349,6 +359,7 @@ def registerDataFrameAsTable(self, df, tableName):
else:
raise ValueError("Can only register DataFrame as table")

@since(1.0)
def parquetFile(self, *paths):
"""Loads a Parquet file, returning the result as a :class:`DataFrame`.

Expand All @@ -367,6 +378,7 @@ def parquetFile(self, *paths):
jdf = self._ssql_ctx.parquetFile(jpaths)
return DataFrame(jdf, self)

@since(1.0)
def jsonFile(self, path, schema=None, samplingRatio=1.0):
"""Loads a text file storing one JSON object per line as a :class:`DataFrame`.

Expand Down Expand Up @@ -407,6 +419,7 @@ def jsonFile(self, path, schema=None, samplingRatio=1.0):
return DataFrame(df, self)

@ignore_unicode_prefix
@since(1.0)
def jsonRDD(self, rdd, schema=None, samplingRatio=1.0):
"""Loads an RDD storing one JSON object per string as a :class:`DataFrame`.

Expand Down Expand Up @@ -449,6 +462,7 @@ def func(iterator):
df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype)
return DataFrame(df, self)

@since(1.3)
def load(self, path=None, source=None, schema=None, **options):
"""Returns the dataset in a data source as a :class:`DataFrame`.

Expand All @@ -460,6 +474,7 @@ def load(self, path=None, source=None, schema=None, **options):
"""
return self.read.load(path, source, schema, **options)

@since(1.3)
def createExternalTable(self, tableName, path=None, source=None,
schema=None, **options):
"""Creates an external table based on the dataset in a data source.
Expand Down Expand Up @@ -489,6 +504,7 @@ def createExternalTable(self, tableName, path=None, source=None,
return DataFrame(df, self)

@ignore_unicode_prefix
@since(1.0)
def sql(self, sqlQuery):
"""Returns a :class:`DataFrame` representing the result of the given query.

Expand All @@ -499,6 +515,7 @@ def sql(self, sqlQuery):
"""
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)

@since(1.0)
def table(self, tableName):
"""Returns the specified table as a :class:`DataFrame`.

Expand All @@ -510,6 +527,7 @@ def table(self, tableName):
return DataFrame(self._ssql_ctx.table(tableName), self)

@ignore_unicode_prefix
@since(1.3)
def tables(self, dbName=None):
"""Returns a :class:`DataFrame` containing names of tables in the given database.

Expand All @@ -528,6 +546,7 @@ def tables(self, dbName=None):
else:
return DataFrame(self._ssql_ctx.tables(dbName), self)

@since(1.3)
def tableNames(self, dbName=None):
"""Returns a list of names of tables in the database ``dbName``.

Expand All @@ -544,25 +563,29 @@ def tableNames(self, dbName=None):
else:
return [name for name in self._ssql_ctx.tableNames(dbName)]

@since(1.0)
def cacheTable(self, tableName):
"""Caches the specified table in-memory."""
self._ssql_ctx.cacheTable(tableName)

@since(1.0)
def uncacheTable(self, tableName):
"""Removes the specified table from the in-memory cache."""
self._ssql_ctx.uncacheTable(tableName)

@since(1.3)
def clearCache(self):
"""Removes all cached tables from the in-memory cache. """
self._ssql_ctx.clearCache()

@property
@since(1.4)
def read(self):
"""
Returns a :class:`DataFrameReader` that can be used to read data
in as a :class:`DataFrame`.

::note: Experimental
.. note:: Experimental

>>> sqlContext.read
<pyspark.sql.readwriter.DataFrameReader object at ...>
Expand Down
Loading