Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions python/pyspark/ml/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ def context(self, sqlContext):

.. note:: Deprecated in 2.1 and will be removed in 3.0, use session instead.
"""
warnings.warn("Deprecated in 2.1 and will be removed in 3.0, use session instead.")
warnings.warn(
"Deprecated in 2.1 and will be removed in 3.0, use session instead.",
DeprecationWarning)
self._jwrite.context(sqlContext._ssql_ctx)
return self

Expand Down Expand Up @@ -256,7 +258,9 @@ def context(self, sqlContext):

.. note:: Deprecated in 2.1 and will be removed in 3.0, use session instead.
"""
warnings.warn("Deprecated in 2.1 and will be removed in 3.0, use session instead.")
warnings.warn(
"Deprecated in 2.1 and will be removed in 3.0, use session instead.",
DeprecationWarning)
self._jread.context(sqlContext._ssql_ctx)
return self

Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
"""
warnings.warn(
"Deprecated in 2.0.0. Use ml.classification.LogisticRegression or "
"LogisticRegressionWithLBFGS.")
"LogisticRegressionWithLBFGS.", DeprecationWarning)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another example:

2017-10-19 6 38 50


def train(rdd, i):
return callMLlibFunc("trainLogisticRegressionModelWithSGD", rdd, int(iterations),
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/mllib/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def precision(self, label=None):
"""
if label is None:
# note:: Deprecated in 2.0.0. Use accuracy.
warnings.warn("Deprecated in 2.0.0. Use accuracy.")
warnings.warn("Deprecated in 2.0.0. Use accuracy.", DeprecationWarning)
return self.call("precision")
else:
return self.call("precision", float(label))
Expand All @@ -246,7 +246,7 @@ def recall(self, label=None):
"""
if label is None:
# note:: Deprecated in 2.0.0. Use accuracy.
warnings.warn("Deprecated in 2.0.0. Use accuracy.")
warnings.warn("Deprecated in 2.0.0. Use accuracy.", DeprecationWarning)
return self.call("recall")
else:
return self.call("recall", float(label))
Expand All @@ -259,7 +259,7 @@ def fMeasure(self, label=None, beta=None):
if beta is None:
if label is None:
# note:: Deprecated in 2.0.0. Use accuracy.
warnings.warn("Deprecated in 2.0.0. Use accuracy.")
warnings.warn("Deprecated in 2.0.0. Use accuracy.", DeprecationWarning)
return self.call("fMeasure")
else:
return self.call("fMeasure", label)
Expand Down
8 changes: 5 additions & 3 deletions python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0,
A condition which decides iteration termination.
(default: 0.001)
"""
warnings.warn("Deprecated in 2.0.0. Use ml.regression.LinearRegression.")
warnings.warn(
"Deprecated in 2.0.0. Use ml.regression.LinearRegression.", DeprecationWarning)

def train(rdd, i):
return callMLlibFunc("trainLinearRegressionModelWithSGD", rdd, int(iterations),
Expand Down Expand Up @@ -421,7 +422,8 @@ def train(cls, data, iterations=100, step=1.0, regParam=0.01,
"""
warnings.warn(
"Deprecated in 2.0.0. Use ml.regression.LinearRegression with elasticNetParam = 1.0. "
"Note the default regParam is 0.01 for LassoWithSGD, but is 0.0 for LinearRegression.")
"Note the default regParam is 0.01 for LassoWithSGD, but is 0.0 for LinearRegression.",
DeprecationWarning)

def train(rdd, i):
return callMLlibFunc("trainLassoModelWithSGD", rdd, int(iterations), float(step),
Expand Down Expand Up @@ -566,7 +568,7 @@ def train(cls, data, iterations=100, step=1.0, regParam=0.01,
warnings.warn(
"Deprecated in 2.0.0. Use ml.regression.LinearRegression with elasticNetParam = 0.0. "
"Note the default regParam is 0.01 for RidgeRegressionWithSGD, but is 0.0 for "
"LinearRegression.")
"LinearRegression.", DeprecationWarning)

def train(rdd, i):
return callMLlibFunc("trainRidgeModelWithSGD", rdd, int(iterations), float(step),
Expand Down
3 changes: 3 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ def registerTempTable(self, name):

.. note:: Deprecated in 2.0, use createOrReplaceTempView instead.
"""
warnings.warn(
"Deprecated in 2.0, use createOrReplaceTempView instead.", DeprecationWarning)
self._jdf.createOrReplaceTempView(name)

@since(2.0)
Expand Down Expand Up @@ -1308,6 +1310,7 @@ def unionAll(self, other):

.. note:: Deprecated in 2.0, use :func:`union` instead.
"""
warnings.warn("Deprecated in 2.0, use union instead.", DeprecationWarning)
return self.union(other)

@since(2.3)
Expand Down
18 changes: 18 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import math
import sys
import functools
import warnings

if sys.version < "3":
from itertools import imap as map
Expand All @@ -44,6 +45,14 @@ def _(col):
return _


def _wrap_deprecated_function(func, message):
""" Wrap the deprecated function to print out deprecation warnings"""
def _(col):
warnings.warn(message, DeprecationWarning)
return func(col)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, I intendedly avoided *args and **kwargs to keep the argument signature printed in pydoc, help(...).

return functools.wraps(func)(_)


def _create_binary_mathfunction(name, doc=""):
""" Create a binary mathfunction by name"""
def _(col1, col2):
Expand Down Expand Up @@ -207,6 +216,12 @@ def _():
"""returns the relative rank (i.e. percentile) of rows within a window partition.""",
}

# Wraps deprecated functions (keys) with the messages (values).
_functions_deprecated = {
'toDegrees': 'Deprecated in 2.1, use degrees instead.',
'toRadians': 'Deprecated in 2.1, use radians instead.',
}

for _name, _doc in _functions.items():
globals()[_name] = since(1.3)(_create_function(_name, _doc))
for _name, _doc in _functions_1_4.items():
Expand All @@ -219,6 +234,8 @@ def _():
globals()[_name] = since(1.6)(_create_function(_name, _doc))
for _name, _doc in _functions_2_1.items():
globals()[_name] = since(2.1)(_create_function(_name, _doc))
for _name, _message in _functions_deprecated.items():
globals()[_name] = _wrap_deprecated_function(globals()[_name], _message)
del _name, _doc


Expand All @@ -227,6 +244,7 @@ def approxCountDistinct(col, rsd=None):
"""
.. note:: Deprecated in 2.1, use :func:`approx_count_distinct` instead.
"""
warnings.warn("Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
return approx_count_distinct(col, rsd)


Expand Down
14 changes: 12 additions & 2 deletions python/pyspark/streaming/flume.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,13 @@ def createStream(ssc, hostname, port,
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
:return: A DStream object

.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0.
See SPARK-22142.
"""
warnings.warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems warnings is not imported in this file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is not. Will make a followup after double checking other files too. Thank you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you :) It will be good to also check why master build does not fail since python should complain about it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I took a quick look and I think this one is actually not being tested and seems that's why .. will double check and take a closer look tonight (KST).

I have seen few mistakes about this so far and .. I am working on Python coverage BTW - https://issues.apache.org/jira/browse/SPARK-7721

Anyway, it was my stupid mistake. Thanks ..

"Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. "
"See SPARK-22142.",
DeprecationWarning)
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
helper = FlumeUtils._get_helper(ssc._sc)
jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
Expand All @@ -82,8 +87,13 @@ def createPollingStream(ssc, addresses,
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
:return: A DStream object

.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0.
See SPARK-22142.
"""
warnings.warn(
"Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. "
"See SPARK-22142.",
DeprecationWarning)
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
hosts = []
ports = []
Expand Down
72 changes: 62 additions & 10 deletions python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# limitations under the License.
#

import warnings

from py4j.protocol import Py4JJavaError

from pyspark.rdd import RDD
Expand Down Expand Up @@ -56,8 +58,13 @@ def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
:param valueDecoder: A function used to decode value (default is utf8_decoder)
:return: A DStream object

.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
See SPARK-21893.
"""
warnings.warn(
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
"See SPARK-21893.",
DeprecationWarning)
if kafkaParams is None:
kafkaParams = dict()
kafkaParams.update({
Expand Down Expand Up @@ -105,8 +112,13 @@ def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None,
:return: A DStream object

.. note:: Experimental
.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
See SPARK-21893.
"""
warnings.warn(
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
"See SPARK-21893.",
DeprecationWarning)
if fromOffsets is None:
fromOffsets = dict()
if not isinstance(topics, list):
Expand Down Expand Up @@ -159,8 +171,13 @@ def createRDD(sc, kafkaParams, offsetRanges, leaders=None,
:return: An RDD object

.. note:: Experimental
.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
See SPARK-21893.
"""
warnings.warn(
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
"See SPARK-21893.",
DeprecationWarning)
if leaders is None:
leaders = dict()
if not isinstance(kafkaParams, dict):
Expand Down Expand Up @@ -229,7 +246,8 @@ class OffsetRange(object):
"""
Represents a range of offsets from a single Kafka TopicAndPartition.

.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
See SPARK-21893.
"""

def __init__(self, topic, partition, fromOffset, untilOffset):
Expand All @@ -240,6 +258,10 @@ def __init__(self, topic, partition, fromOffset, untilOffset):
:param fromOffset: Inclusive starting offset.
:param untilOffset: Exclusive ending offset.
"""
warnings.warn(
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
"See SPARK-21893.",
DeprecationWarning)
self.topic = topic
self.partition = partition
self.fromOffset = fromOffset
Expand Down Expand Up @@ -270,7 +292,8 @@ class TopicAndPartition(object):
"""
Represents a specific topic and partition for Kafka.

.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
See SPARK-21893.
"""

def __init__(self, topic, partition):
Expand All @@ -279,6 +302,10 @@ def __init__(self, topic, partition):
:param topic: Kafka topic name.
:param partition: Kafka partition id.
"""
warnings.warn(
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
"See SPARK-21893.",
DeprecationWarning)
self._topic = topic
self._partition = partition

Expand All @@ -303,7 +330,8 @@ class Broker(object):
"""
Represent the host and port info for a Kafka broker.

.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
See SPARK-21893.
"""

def __init__(self, host, port):
Expand All @@ -312,6 +340,10 @@ def __init__(self, host, port):
:param host: Broker's hostname.
:param port: Broker's port.
"""
warnings.warn(
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
"See SPARK-21893.",
DeprecationWarning)
self._host = host
self._port = port

Expand All @@ -323,10 +355,15 @@ class KafkaRDD(RDD):
"""
A Python wrapper of KafkaRDD, to provide additional information on normal RDD.

.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
See SPARK-21893.
"""

def __init__(self, jrdd, ctx, jrdd_deserializer):
warnings.warn(
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
"See SPARK-21893.",
DeprecationWarning)
RDD.__init__(self, jrdd, ctx, jrdd_deserializer)

def offsetRanges(self):
Expand All @@ -345,10 +382,15 @@ class KafkaDStream(DStream):
"""
A Python wrapper of KafkaDStream

.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
See SPARK-21893.
"""

def __init__(self, jdstream, ssc, jrdd_deserializer):
warnings.warn(
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
"See SPARK-21893.",
DeprecationWarning)
DStream.__init__(self, jdstream, ssc, jrdd_deserializer)

def foreachRDD(self, func):
Expand Down Expand Up @@ -383,10 +425,15 @@ class KafkaTransformedDStream(TransformedDStream):
"""
Kafka specific wrapper of TransformedDStream to transform on Kafka RDD.

.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
See SPARK-21893.
"""

def __init__(self, prev, func):
warnings.warn(
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
"See SPARK-21893.",
DeprecationWarning)
TransformedDStream.__init__(self, prev, func)

@property
Expand All @@ -405,7 +452,8 @@ class KafkaMessageAndMetadata(object):
"""
Kafka message and metadata information. Including topic, partition, offset and message

.. note:: Deprecated in 2.3.0
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
See SPARK-21893.
"""

def __init__(self, topic, partition, offset, key, message):
Expand All @@ -419,6 +467,10 @@ def __init__(self, topic, partition, offset, key, message):
:param message: actual message payload of this Kafka message, the return data is
undecoded bytearray.
"""
warnings.warn(
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
"See SPARK-21893.",
DeprecationWarning)
self.topic = topic
self.partition = partition
self.offset = offset
Expand Down