Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1534,10 +1534,10 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
globals()["sparkSessionSingletonInstance"] = (SparkSession
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replace backslash syntax with more elegant (and pep8-recommended) parenthesis syntax

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I don't feel so qualified to judge that, but take your word for it. However do you really want to indent this so much?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my point was on the use of parenthesis instead of thr backslash, which is recommended by pep8. I can keep the indentation.

Copy link
Member

@HyukjinKwon HyukjinKwon Feb 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am wrong. Could you maybe provide the reference?

recommended by pep8

Do you refer this line?

The preferred way of wrapping long lines is by using Python's implied line continuation inside parentheses,
brackets and braces. Long lines can be broken over multiple lines by wrapping expressions in parentheses.
These should be used in preference to using a backslash for line continuation.

I know the rule with binary operator follows this but I guess this case is not disallowed. I am not sure if it is worth sweeping all. They look preferred but not breaking pep8. I mean, it seems not discouraging this line break..

.builder
.config(conf=sparkConf)
.getOrCreate())
return globals()["sparkSessionSingletonInstance"]

...
Expand Down
24 changes: 13 additions & 11 deletions examples/src/main/python/als.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import sys

import numpy as np
from numpy.random import rand
from numpy import matrix
from numpy.random import rand
from pyspark.sql import SparkSession

LAMBDA = 0.01 # regularization
Expand Down Expand Up @@ -62,10 +62,10 @@ def update(i, mat, ratings):
example. Please use pyspark.ml.recommendation.ALS for more
conventional use.""", file=sys.stderr)

spark = SparkSession\
.builder\
.appName("PythonALS")\
.getOrCreate()
spark = (SparkSession
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not changed all this initilization lines, since they do not appear most of the time in the documentation

.builder
.appName("PythonALS")
.getOrCreate())

sc = spark.sparkContext

Expand All @@ -87,17 +87,19 @@ def update(i, mat, ratings):
usb = sc.broadcast(us)

for i in range(ITERATIONS):
ms = sc.parallelize(range(M), partitions) \
.map(lambda x: update(x, usb.value, Rb.value)) \
.collect()
ms = (sc
.parallelize(range(M), partitions)
.map(lambda x: update(x, usb.value, Rb.value))
.collect())
# collect() returns a list, so array ends up being
# a 3-d array, we take the first 2 dims for the matrix
ms = matrix(np.array(ms)[:, :, 0])
msb = sc.broadcast(ms)

us = sc.parallelize(range(U), partitions) \
.map(lambda x: update(x, msb.value, Rb.value.T)) \
.collect()
us = (sc
.parallelize(range(U), partitions)
.map(lambda x: update(x, msb.value, Rb.value.T))
.collect())
us = matrix(np.array(us)[:, :, 0])
usb = sc.broadcast(us)

Expand Down
11 changes: 6 additions & 5 deletions examples/src/main/python/avro_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
from __future__ import print_function

import sys

from functools import reduce

from pyspark.sql import SparkSession


"""
Read data file users.avro in local Spark distro:

Expand Down Expand Up @@ -65,10 +66,10 @@

path = sys.argv[1]

spark = SparkSession\
.builder\
.appName("AvroKeyInputFormat")\
.getOrCreate()
spark = (SparkSession
.builder
.appName("AvroKeyInputFormat")
.getOrCreate())

sc = spark.sparkContext

Expand Down
8 changes: 4 additions & 4 deletions examples/src/main/python/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def closestPoint(p, centers):
as an example! Please refer to examples/src/main/python/ml/kmeans_example.py for an
example on how to use ML's KMeans implementation.""", file=sys.stderr)

spark = SparkSession\
.builder\
.appName("PythonKMeans")\
.getOrCreate()
spark = (SparkSession
.builder
.appName("PythonKMeans")
.getOrCreate())

lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
data = lines.map(parseVector).cache()
Expand Down
16 changes: 9 additions & 7 deletions examples/src/main/python/logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,15 @@ def readPointBatch(iterator):
Please refer to examples/src/main/python/ml/logistic_regression_with_elastic_net.py
to see how ML's implementation is used.""", file=sys.stderr)

spark = SparkSession\
.builder\
.appName("PythonLR")\
.getOrCreate()

points = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])\
.mapPartitions(readPointBatch).cache()
spark = (SparkSession
.builder
.appName("PythonLR")
.getOrCreate())
points = (spark.read
.text(sys.argv[1])
.rdd.map(lambda r: r[0])
.mapPartitions(readPointBatch)
.cache())
iterations = int(sys.argv[2])

# Initialize w to a random value
Expand Down
10 changes: 5 additions & 5 deletions examples/src/main/python/ml/aft_survival_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from __future__ import print_function

# $example on$
from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import AFTSurvivalRegression
# $example off$
from pyspark.sql import SparkSession

Expand All @@ -30,10 +30,10 @@
"""

if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("AFTSurvivalRegressionExample") \
.getOrCreate()
spark = (SparkSession
.builder
.appName("AFTSurvivalRegressionExample")
.getOrCreate())

# $example on$
training = spark.createDataFrame([
Expand Down
17 changes: 9 additions & 8 deletions examples/src/main/python/ml/als_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@
from __future__ import print_function

import sys
if sys.version >= '3':
long = int

from pyspark.sql import SparkSession

# $example on$
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
# $example off$
from pyspark.sql import SparkSession

if sys.version >= '3':
long = int


if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("ALSExample")\
.getOrCreate()
spark = (SparkSession
.builder
.appName("ALSExample")
.getOrCreate())

# $example on$
lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
Expand Down
11 changes: 6 additions & 5 deletions examples/src/main/python/ml/binarizer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

from __future__ import print_function

from pyspark.sql import SparkSession
# $example on$
from pyspark.ml.feature import Binarizer
# $example off$
from pyspark.sql import SparkSession


if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("BinarizerExample")\
.getOrCreate()
spark = (SparkSession
.builder
.appName("BinarizerExample")
.getOrCreate())

# $example on$
continuousDataFrame = spark.createDataFrame([
Expand Down
8 changes: 4 additions & 4 deletions examples/src/main/python/ml/bisecting_k_means_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
"""

if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("BisectingKMeansExample")\
.getOrCreate()
spark = (SparkSession
.builder
.appName("BisectingKMeansExample")
.getOrCreate())

# $example on$
# Loads data.
Expand Down
13 changes: 7 additions & 6 deletions examples/src/main/python/ml/bucketizer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

from __future__ import print_function

from pyspark.sql import SparkSession
# $example on$
from pyspark.ml.feature import Bucketizer
# $example off$
from pyspark.sql import SparkSession


if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("BucketizerExample")\
.getOrCreate()
spark = (SparkSession
.builder
.appName("BucketizerExample")
.getOrCreate())

# $example on$
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
Expand All @@ -39,7 +40,7 @@
# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()
# $example off$

Expand Down
11 changes: 6 additions & 5 deletions examples/src/main/python/ml/chisq_selector_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

from __future__ import print_function

from pyspark.sql import SparkSession
# $example on$
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
# $example off$
from pyspark.sql import SparkSession


if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("ChiSqSelectorExample")\
.getOrCreate()
spark = (SparkSession
.builder
.appName("ChiSqSelectorExample")
.getOrCreate())

# $example on$
df = spark.createDataFrame([
Expand Down
21 changes: 12 additions & 9 deletions examples/src/main/python/ml/count_vectorizer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,26 @@

from __future__ import print_function

from pyspark.sql import SparkSession
# $example on$
from pyspark.ml.feature import CountVectorizer
# $example off$
from pyspark.sql import SparkSession


if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("CountVectorizerExample")\
.getOrCreate()
spark = (SparkSession
.builder
.appName("CountVectorizerExample")
.getOrCreate())

# $example on$
# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])
df = spark.createDataFrame(
[
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you double check if it really does not follow pep8? I have seen the removed syntax more often (e.g., numpy).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this is a recommendation not an obligation. I see it to be more looking like Scala multi-line code, and I prefer it. It is a personal opinion, and I don't think there is a pylint/pep8 check to prevent using .

["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
Expand Down
38 changes: 20 additions & 18 deletions examples/src/main/python/ml/cross_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,29 @@
"""

if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("CrossValidatorExample")\
.getOrCreate()
spark = (SparkSession
.builder
.appName("CrossValidatorExample")
.getOrCreate())

# $example on$
# Prepare training documents, which are labeled.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0),
(10, "spark compile", 1.0),
(11, "hadoop software", 0.0)
], ["id", "text", "label"])
training = spark.createDataFrame(
[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this really a pep8 recommendation or just preference? I think the way it was is more consistent with other Spark code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cannot remember if i did that manually or if it has been done by the pep8 tool. I cannot work on it till next week

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd great if we have some references or quotes.

(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0),
(4, "b spark who", 1.0),
(5, "g d a y", 0.0),
(6, "spark fly", 1.0),
(7, "was mapreduce", 0.0),
(8, "e spark program", 1.0),
(9, "a e c l", 0.0),
(10, "spark compile", 1.0),
(11, "hadoop software", 0.0)
],
["id", "text", "label"])

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
Expand Down
Loading