Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 36 additions & 14 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, samplingRatio=None,
dropFieldIfAllNull=None, encoding=None, locale=None, recursiveFileLookup=None):
dropFieldIfAllNull=None, encoding=None, locale=None, pathGlobFilter=None,
recursiveFileLookup=None):
"""
Loads JSON files and returns the results as a :class:`DataFrame`.

Expand Down Expand Up @@ -247,6 +248,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param locale: sets a locale as language tag in IETF BCP 47 format. If None is set,
it uses the default value, ``en-US``. For instance, ``locale`` is used while
parsing dates and timestamps.
:param pathGlobFilter: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
It does not change the behavior of `partition discovery`_.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

Expand All @@ -270,7 +274,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep,
samplingRatio=samplingRatio, dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
locale=locale, recursiveFileLookup=recursiveFileLookup)
locale=locale, pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -309,8 +313,12 @@ def parquet(self, *paths, **options):
Loads Parquet files, returning the result as a :class:`DataFrame`.

:param mergeSchema: sets whether we should merge schemas collected from all
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.
The default value is specified in ``spark.sql.parquet.mergeSchema``.
Parquet part-files. This will override
``spark.sql.parquet.mergeSchema``. The default value is specified in
``spark.sql.parquet.mergeSchema``.
:param pathGlobFilter: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
It does not change the behavior of `partition discovery`_.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

Expand All @@ -319,13 +327,16 @@ def parquet(self, *paths, **options):
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
"""
mergeSchema = options.get('mergeSchema', None)
pathGlobFilter = options.get('pathGlobFilter', None)
recursiveFileLookup = options.get('recursiveFileLookup', None)
self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup)
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup)
return self._df(self._jreader.parquet(_to_seq(self._spark._sc, paths)))

@ignore_unicode_prefix
@since(1.6)
def text(self, paths, wholetext=False, lineSep=None, recursiveFileLookup=None):
def text(self, paths, wholetext=False, lineSep=None, pathGlobFilter=None,
recursiveFileLookup=None):
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
Expand All @@ -338,6 +349,9 @@ def text(self, paths, wholetext=False, lineSep=None, recursiveFileLookup=None):
:param wholetext: if true, read each file from input path(s) as a single row.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
:param pathGlobFilter: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
It does not change the behavior of `partition discovery`_.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

Expand All @@ -349,7 +363,8 @@ def text(self, paths, wholetext=False, lineSep=None, recursiveFileLookup=None):
[Row(value=u'hello\\nthis')]
"""
self._set_opts(
wholetext=wholetext, lineSep=lineSep, recursiveFileLookup=recursiveFileLookup)
wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup)
if isinstance(paths, basestring):
paths = [paths]
return self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(paths)))
Expand All @@ -362,7 +377,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
samplingRatio=None, enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
recursiveFileLookup=None):
pathGlobFilter=None, recursiveFileLookup=None):
r"""Loads a CSV file and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -470,6 +485,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
Maximum length is 1 character.
:param pathGlobFilter: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
It does not change the behavior of `partition discovery`_.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

Expand All @@ -492,7 +510,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, samplingRatio=samplingRatio,
enforceSchema=enforceSchema, emptyValue=emptyValue, locale=locale, lineSep=lineSep,
recursiveFileLookup=recursiveFileLookup)
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down Expand Up @@ -520,20 +538,24 @@ def func(iterator):
raise TypeError("path can be only string, list or RDD")

@since(1.5)
def orc(self, path, mergeSchema=None, recursiveFileLookup=None):
def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
"""Loads ORC files, returning the result as a :class:`DataFrame`.

:param mergeSchema: sets whether we should merge schemas collected from all
ORC part-files. This will override ``spark.sql.orc.mergeSchema``.
The default value is specified in ``spark.sql.orc.mergeSchema``.
ORC part-files. This will override ``spark.sql.orc.mergeSchema``.
The default value is specified in ``spark.sql.orc.mergeSchema``.
:param pathGlobFilter: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
It does not change the behavior of `partition discovery`_.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.
disables `partition discovery`_.

>>> df = spark.read.orc('python/test_support/sql/orc_partitioned')
>>> df.dtypes
[('a', 'bigint'), ('b', 'int'), ('c', 'int')]
"""
self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup)
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
path = [path]
return self._df(self._jreader.orc(_to_seq(self._spark._sc, path)))
Expand Down
55 changes: 35 additions & 20 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,6 @@ def option(self, key, value):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.

.. note:: Evolving.

Expand All @@ -360,9 +357,6 @@ def options(self, **options):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.

.. note:: Evolving.

Expand Down Expand Up @@ -411,7 +405,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None, lineSep=None, locale=None,
dropFieldIfAllNull=None, encoding=None, recursiveFileLookup=None):
dropFieldIfAllNull=None, encoding=None, pathGlobFilter=None,
recursiveFileLookup=None):
"""
Loads a JSON file stream and returns the results as a :class:`DataFrame`.

Expand Down Expand Up @@ -487,6 +482,9 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
the JSON files. For example UTF-16BE, UTF-32LE. If None is set,
the encoding of input JSON will be detected automatically
when the multiLine option is set to ``true``.
:param pathGlobFilter: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
It does not change the behavior of `partition discovery`_.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

Expand All @@ -507,21 +505,24 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars, lineSep=lineSep, locale=locale,
dropFieldIfAllNull=dropFieldIfAllNull, encoding=encoding,
recursiveFileLookup=recursiveFileLookup)
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.json(path))
else:
raise TypeError("path can be only a single string")

@since(2.3)
def orc(self, path, mergeSchema=None, recursiveFileLookup=None):
def orc(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
"""Loads a ORC file stream, returning the result as a :class:`DataFrame`.

.. note:: Evolving.

:param mergeSchema: sets whether we should merge schemas collected from all
ORC part-files. This will override ``spark.sql.orc.mergeSchema``.
The default value is specified in ``spark.sql.orc.mergeSchema``.
ORC part-files. This will override ``spark.sql.orc.mergeSchema``.
The default value is specified in ``spark.sql.orc.mergeSchema``.
:param pathGlobFilter: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
It does not change the behavior of `partition discovery`_.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

Expand All @@ -531,22 +532,27 @@ def orc(self, path, mergeSchema=None, recursiveFileLookup=None):
>>> orc_sdf.schema == sdf_schema
True
"""
self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup)
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.orc(path))
else:
raise TypeError("path can be only a single string")

@since(2.0)
def parquet(self, path, mergeSchema=None, recursiveFileLookup=None):
def parquet(self, path, mergeSchema=None, pathGlobFilter=None, recursiveFileLookup=None):
"""
Loads a Parquet file stream, returning the result as a :class:`DataFrame`.

.. note:: Evolving.

:param mergeSchema: sets whether we should merge schemas collected from all
Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``.
The default value is specified in ``spark.sql.parquet.mergeSchema``.
Parquet part-files. This will override
``spark.sql.parquet.mergeSchema``. The default value is specified in
``spark.sql.parquet.mergeSchema``.
:param pathGlobFilter: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
It does not change the behavior of `partition discovery`_.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

Expand All @@ -556,15 +562,17 @@ def parquet(self, path, mergeSchema=None, recursiveFileLookup=None):
>>> parquet_sdf.schema == sdf_schema
True
"""
self._set_opts(mergeSchema=mergeSchema, recursiveFileLookup=recursiveFileLookup)
self._set_opts(mergeSchema=mergeSchema, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.parquet(path))
else:
raise TypeError("path can be only a single string")

@ignore_unicode_prefix
@since(2.0)
def text(self, path, wholetext=False, lineSep=None, recursiveFileLookup=None):
def text(self, path, wholetext=False, lineSep=None, pathGlobFilter=None,
recursiveFileLookup=None):
"""
Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
Expand All @@ -579,6 +587,9 @@ def text(self, path, wholetext=False, lineSep=None, recursiveFileLookup=None):
:param wholetext: if true, read each file from input path(s) as a single row.
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
:param pathGlobFilter: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
It does not change the behavior of `partition discovery`_.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

Expand All @@ -589,7 +600,8 @@ def text(self, path, wholetext=False, lineSep=None, recursiveFileLookup=None):
True
"""
self._set_opts(
wholetext=wholetext, lineSep=lineSep, recursiveFileLookup=recursiveFileLookup)
wholetext=wholetext, lineSep=lineSep, pathGlobFilter=pathGlobFilter,
recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.text(path))
else:
Expand All @@ -603,7 +615,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
enforceSchema=None, emptyValue=None, locale=None, lineSep=None,
recursiveFileLookup=None):
pathGlobFilter=None, recursiveFileLookup=None):
r"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.

This function will go through the input once to determine the input schema if
Expand Down Expand Up @@ -706,6 +718,9 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param lineSep: defines the line separator that should be used for parsing. If None is
set, it covers all ``\\r``, ``\\r\\n`` and ``\\n``.
Maximum length is 1 character.
:param pathGlobFilter: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows `org.apache.hadoop.fs.GlobFilter`.
It does not change the behavior of `partition discovery`_.
:param recursiveFileLookup: recursively scan a directory for files. Using this option
disables `partition discovery`_.

Expand All @@ -726,7 +741,7 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping, enforceSchema=enforceSchema,
emptyValue=emptyValue, locale=locale, lineSep=lineSep,
recursiveFileLookup=recursiveFileLookup)
pathGlobFilter=pathGlobFilter, recursiveFileLookup=recursiveFileLookup)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:
Expand Down
Loading