From 78b66d89dbdd1b0d2a52d844eefd104f25590eb6 Mon Sep 17 00:00:00 2001 From: Gaetan Semet Date: Mon, 9 Jan 2017 12:43:17 +0100 Subject: [PATCH 1/4] execute isort/pep8 on example files + manual editing (replace '\' by parenthesis for multiline syntax) --- docs/streaming-programming-guide.md | 8 ++-- examples/src/main/python/als.py | 24 ++++++------ examples/src/main/python/avro_inputformat.py | 11 +++--- examples/src/main/python/kmeans.py | 8 ++-- .../src/main/python/logistic_regression.py | 17 +++++---- .../main/python/ml/aft_survival_regression.py | 10 ++--- examples/src/main/python/ml/als_example.py | 17 +++++---- .../src/main/python/ml/binarizer_example.py | 11 +++--- .../python/ml/bisecting_k_means_example.py | 8 ++-- .../src/main/python/ml/bucketizer_example.py | 13 ++++--- .../main/python/ml/chisq_selector_example.py | 11 +++--- .../python/ml/count_vectorizer_example.py | 21 +++++----- .../src/main/python/ml/cross_validator.py | 38 ++++++++++--------- .../src/main/python/ml/dataframe_example.py | 18 +++++---- examples/src/main/python/ml/dct_example.py | 9 +++-- .../decision_tree_classification_example.py | 21 +++++----- .../ml/decision_tree_regression_example.py | 18 +++++---- .../python/ml/elementwise_product_example.py | 9 +++-- .../ml/estimator_transformer_param_example.py | 16 ++++---- .../python/ml/gaussian_mixture_example.py | 8 ++-- .../generalized_linear_regression_example.py | 16 ++++---- ...radient_boosted_tree_classifier_example.py | 20 +++++----- ...gradient_boosted_tree_regressor_example.py | 22 ++++++----- .../main/python/ml/index_to_string_example.py | 8 ++-- .../python/ml/isotonic_regression_example.py | 13 ++++--- examples/src/main/python/ml/kmeans_example.py | 10 ++--- examples/src/main/python/ml/lda_example.py | 13 ++++--- .../ml/linear_regression_with_elastic_net.py | 15 +++++--- .../ml/logistic_regression_summary_example.py | 8 ++-- .../logistic_regression_with_elastic_net.py | 13 ++++--- .../main/python/ml/max_abs_scaler_example.py | 8 ++-- .../main/python/ml/min_max_scaler_example.py | 8 ++-- ...ss_logistic_regression_with_elastic_net.py | 16 ++++---- .../multilayer_perceptron_classification.py | 10 +++-- examples/src/main/python/ml/n_gram_example.py | 8 ++-- .../src/main/python/ml/naive_bayes_example.py | 17 +++++---- .../src/main/python/ml/normalizer_example.py | 9 +++-- .../src/main/python/ml/one_vs_rest_example.py | 13 ++++--- .../main/python/ml/onehot_encoder_example.py | 9 +++-- examples/src/main/python/ml/pca_example.py | 9 +++-- .../src/main/python/ml/pipeline_example.py | 9 +++-- .../python/ml/polynomial_expansion_example.py | 1 + .../python/ml/quantile_discretizer_example.py | 8 ++-- .../ml/random_forest_classifier_example.py | 28 ++++++++------ .../ml/random_forest_regressor_example.py | 27 +++++++------ .../src/main/python/ml/rformula_example.py | 9 +++-- .../src/main/python/ml/sql_transformer.py | 9 +++-- .../main/python/ml/standard_scaler_example.py | 9 +++-- .../python/ml/stopwords_remover_example.py | 9 +++-- .../main/python/ml/string_indexer_example.py | 9 +++-- examples/src/main/python/ml/tf_idf_example.py | 10 ++--- .../src/main/python/ml/tokenizer_example.py | 15 +++++--- .../main/python/ml/train_validation_split.py | 23 +++++------ .../python/ml/vector_assembler_example.py | 11 +++--- .../main/python/ml/vector_indexer_example.py | 9 +++-- .../main/python/ml/vector_slicer_example.py | 13 ++++--- .../src/main/python/ml/word2vec_example.py | 9 +++-- .../binary_classification_metrics_example.py | 11 ++++-- .../python/mllib/bisecting_k_means_example.py | 2 +- .../src/main/python/mllib/correlations.py | 5 ++- .../main/python/mllib/correlations_example.py | 4 +- .../decision_tree_classification_example.py | 1 + .../mllib/decision_tree_regression_example.py | 1 + .../mllib/elementwise_product_example.py | 1 + .../src/main/python/mllib/fpgrowth_example.py | 3 +- .../python/mllib/gaussian_mixture_example.py | 6 +-- .../python/mllib/gaussian_mixture_model.py | 10 ++--- ...radient_boosting_classification_example.py | 1 + .../gradient_boosting_regression_example.py | 1 + .../mllib/hypothesis_testing_example.py | 1 + ...testing_kolmogorov_smirnov_test_example.py | 1 + .../mllib/isotonic_regression_example.py | 9 ++++- .../src/main/python/mllib/k_means_example.py | 5 ++- .../kernel_density_estimation_example.py | 1 + .../latent_dirichlet_allocation_example.py | 1 + .../linear_regression_with_sgd_example.py | 4 +- .../main/python/mllib/logistic_regression.py | 2 +- .../logistic_regression_with_lbfgs_example.py | 4 +- .../mllib/multi_class_metrics_example.py | 4 +- .../mllib/multi_label_metrics_example.py | 3 +- .../main/python/mllib/naive_bayes_example.py | 5 ++- .../main/python/mllib/normalizer_example.py | 1 + .../power_iteration_clustering_example.py | 10 +++-- .../random_forest_classification_example.py | 1 + .../mllib/random_forest_regression_example.py | 1 + .../python/mllib/ranking_metrics_example.py | 5 ++- .../python/mllib/recommendation_example.py | 7 ++-- .../mllib/regression_metrics_example.py | 4 +- .../python/mllib/standard_scaler_example.py | 1 + .../python/mllib/streaming_k_means_example.py | 10 +++-- .../streaming_linear_regression_example.py | 7 ++-- .../mllib/summary_statistics_example.py | 6 ++- .../main/python/mllib/svm_with_sgd_example.py | 3 +- .../src/main/python/mllib/tf_idf_example.py | 3 +- examples/src/main/python/mllib/word2vec.py | 1 + .../src/main/python/mllib/word2vec_example.py | 1 + examples/src/main/python/pagerank.py | 8 ++-- .../src/main/python/parquet_inputformat.py | 12 +++--- examples/src/main/python/pi.py | 14 ++++--- examples/src/main/python/sort.py | 15 ++++---- examples/src/main/python/sql/basic.py | 18 ++++----- examples/src/main/python/sql/datasource.py | 27 ++++++------- examples/src/main/python/sql/hive.py | 16 ++++---- .../streaming/structured_network_wordcount.py | 34 ++++++++--------- .../structured_network_wordcount_windowed.py | 5 +-- examples/src/main/python/status_api_demo.py | 4 +- .../streaming/direct_kafka_wordcount.py | 8 ++-- .../main/python/streaming/flume_wordcount.py | 8 ++-- .../main/python/streaming/hdfs_wordcount.py | 8 ++-- .../main/python/streaming/kafka_wordcount.py | 8 ++-- .../python/streaming/network_wordcount.py | 8 ++-- .../streaming/network_wordjoinsentiments.py | 21 +++++----- .../src/main/python/streaming/queue_stream.py | 1 + .../python/streaming/sql_network_wordcount.py | 6 +-- .../streaming/stateful_network_wordcount.py | 8 ++-- .../src/main/python/transitive_closure.py | 8 ++-- examples/src/main/python/wordcount.py | 16 ++++---- 117 files changed, 646 insertions(+), 503 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 38b4f7817713..4fad3359e4e6 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1534,10 +1534,10 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_ # Lazily instantiated global instance of SparkSession def getSparkSessionInstance(sparkConf): if ("sparkSessionSingletonInstance" not in globals()): - globals()["sparkSessionSingletonInstance"] = SparkSession \ - .builder \ - .config(conf=sparkConf) \ - .getOrCreate() + globals()["sparkSessionSingletonInstance"] = (SparkSession + .builder + .config(conf=sparkConf) + .getOrCreate()) return globals()["sparkSessionSingletonInstance"] ... diff --git a/examples/src/main/python/als.py b/examples/src/main/python/als.py index 6d3241876ad5..16393bb32c99 100755 --- a/examples/src/main/python/als.py +++ b/examples/src/main/python/als.py @@ -26,8 +26,8 @@ import sys import numpy as np -from numpy.random import rand from numpy import matrix +from numpy.random import rand from pyspark.sql import SparkSession LAMBDA = 0.01 # regularization @@ -62,10 +62,10 @@ def update(i, mat, ratings): example. Please use pyspark.ml.recommendation.ALS for more conventional use.""", file=sys.stderr) - spark = SparkSession\ - .builder\ - .appName("PythonALS")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("PythonALS") + .getOrCreate()) sc = spark.sparkContext @@ -87,17 +87,19 @@ def update(i, mat, ratings): usb = sc.broadcast(us) for i in range(ITERATIONS): - ms = sc.parallelize(range(M), partitions) \ - .map(lambda x: update(x, usb.value, Rb.value)) \ - .collect() + ms = (sc + .parallelize(range(M), partitions) + .map(lambda x: update(x, usb.value, Rb.value)) + .collect()) # collect() returns a list, so array ends up being # a 3-d array, we take the first 2 dims for the matrix ms = matrix(np.array(ms)[:, :, 0]) msb = sc.broadcast(ms) - us = sc.parallelize(range(U), partitions) \ - .map(lambda x: update(x, msb.value, Rb.value.T)) \ - .collect() + us = (sc + .parallelize(range(U), partitions) + .map(lambda x: update(x, msb.value, Rb.value.T)) + .collect()) us = matrix(np.array(us)[:, :, 0]) usb = sc.broadcast(us) diff --git a/examples/src/main/python/avro_inputformat.py b/examples/src/main/python/avro_inputformat.py index 4422f9e7a958..378226934bac 100644 --- a/examples/src/main/python/avro_inputformat.py +++ b/examples/src/main/python/avro_inputformat.py @@ -18,10 +18,11 @@ from __future__ import print_function import sys - from functools import reduce + from pyspark.sql import SparkSession + """ Read data file users.avro in local Spark distro: @@ -65,10 +66,10 @@ path = sys.argv[1] - spark = SparkSession\ - .builder\ - .appName("AvroKeyInputFormat")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("AvroKeyInputFormat") + .getOrCreate()) sc = spark.sparkContext diff --git a/examples/src/main/python/kmeans.py b/examples/src/main/python/kmeans.py index 92e0a3ae2ee6..0a4d4fdbed8f 100755 --- a/examples/src/main/python/kmeans.py +++ b/examples/src/main/python/kmeans.py @@ -55,10 +55,10 @@ def closestPoint(p, centers): as an example! Please refer to examples/src/main/python/ml/kmeans_example.py for an example on how to use ML's KMeans implementation.""", file=sys.stderr) - spark = SparkSession\ - .builder\ - .appName("PythonKMeans")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("PythonKMeans") + .getOrCreate()) lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0]) data = lines.map(parseVector).cache() diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py index 01c938454b10..b380f3ba69e7 100755 --- a/examples/src/main/python/logistic_regression.py +++ b/examples/src/main/python/logistic_regression.py @@ -29,7 +29,6 @@ import numpy as np from pyspark.sql import SparkSession - D = 10 # Number of dimensions @@ -55,13 +54,15 @@ def readPointBatch(iterator): Please refer to examples/src/main/python/ml/logistic_regression_with_elastic_net.py to see how ML's implementation is used.""", file=sys.stderr) - spark = SparkSession\ - .builder\ - .appName("PythonLR")\ - .getOrCreate() - - points = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])\ - .mapPartitions(readPointBatch).cache() + spark = (SparkSession + .builder + .appName("PythonLR") + .getOrCreate()) + points = (spark.read + .text(sys.argv[1]) + .rdd.map(lambda r: r[0]) + .mapPartitions(readPointBatch) + .cache()) iterations = int(sys.argv[2]) # Initialize w to a random value diff --git a/examples/src/main/python/ml/aft_survival_regression.py b/examples/src/main/python/ml/aft_survival_regression.py index 2f0ca995e55c..a859367d486c 100644 --- a/examples/src/main/python/ml/aft_survival_regression.py +++ b/examples/src/main/python/ml/aft_survival_regression.py @@ -18,8 +18,8 @@ from __future__ import print_function # $example on$ -from pyspark.ml.regression import AFTSurvivalRegression from pyspark.ml.linalg import Vectors +from pyspark.ml.regression import AFTSurvivalRegression # $example off$ from pyspark.sql import SparkSession @@ -30,10 +30,10 @@ """ if __name__ == "__main__": - spark = SparkSession \ - .builder \ - .appName("AFTSurvivalRegressionExample") \ - .getOrCreate() + spark = (SparkSession + .builder + .appName("AFTSurvivalRegressionExample") + .getOrCreate()) # $example on$ training = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/als_example.py b/examples/src/main/python/ml/als_example.py index 1a979ff5b5be..1cb3950e3c84 100644 --- a/examples/src/main/python/ml/als_example.py +++ b/examples/src/main/python/ml/als_example.py @@ -18,22 +18,23 @@ from __future__ import print_function import sys -if sys.version >= '3': - long = int - -from pyspark.sql import SparkSession # $example on$ from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.recommendation import ALS from pyspark.sql import Row # $example off$ +from pyspark.sql import SparkSession + +if sys.version >= '3': + long = int + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("ALSExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("ALSExample") + .getOrCreate()) # $example on$ lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd diff --git a/examples/src/main/python/ml/binarizer_example.py b/examples/src/main/python/ml/binarizer_example.py index 669bb2aeabec..93b27a3c602f 100644 --- a/examples/src/main/python/ml/binarizer_example.py +++ b/examples/src/main/python/ml/binarizer_example.py @@ -17,16 +17,17 @@ from __future__ import print_function -from pyspark.sql import SparkSession # $example on$ from pyspark.ml.feature import Binarizer # $example off$ +from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("BinarizerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("BinarizerExample") + .getOrCreate()) # $example on$ continuousDataFrame = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/bisecting_k_means_example.py b/examples/src/main/python/ml/bisecting_k_means_example.py index 1263cb5d177a..7aae252d647b 100644 --- a/examples/src/main/python/ml/bisecting_k_means_example.py +++ b/examples/src/main/python/ml/bisecting_k_means_example.py @@ -29,10 +29,10 @@ """ if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("BisectingKMeansExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("BisectingKMeansExample") + .getOrCreate()) # $example on$ # Loads data. diff --git a/examples/src/main/python/ml/bucketizer_example.py b/examples/src/main/python/ml/bucketizer_example.py index 742f35093b9d..37070923cd6e 100644 --- a/examples/src/main/python/ml/bucketizer_example.py +++ b/examples/src/main/python/ml/bucketizer_example.py @@ -17,16 +17,17 @@ from __future__ import print_function -from pyspark.sql import SparkSession # $example on$ from pyspark.ml.feature import Bucketizer # $example off$ +from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("BucketizerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("BucketizerExample") + .getOrCreate()) # $example on$ splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")] @@ -39,7 +40,7 @@ # Transform original data into its bucket index. bucketedData = bucketizer.transform(dataFrame) - print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1)) + print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1)) bucketedData.show() # $example off$ diff --git a/examples/src/main/python/ml/chisq_selector_example.py b/examples/src/main/python/ml/chisq_selector_example.py index 028a9ea9d67b..17077b3e615a 100644 --- a/examples/src/main/python/ml/chisq_selector_example.py +++ b/examples/src/main/python/ml/chisq_selector_example.py @@ -17,17 +17,18 @@ from __future__ import print_function -from pyspark.sql import SparkSession # $example on$ from pyspark.ml.feature import ChiSqSelector from pyspark.ml.linalg import Vectors # $example off$ +from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("ChiSqSelectorExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("ChiSqSelectorExample") + .getOrCreate()) # $example on$ df = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/count_vectorizer_example.py b/examples/src/main/python/ml/count_vectorizer_example.py index f2e41db77d89..fb282d648dea 100644 --- a/examples/src/main/python/ml/count_vectorizer_example.py +++ b/examples/src/main/python/ml/count_vectorizer_example.py @@ -17,23 +17,26 @@ from __future__ import print_function -from pyspark.sql import SparkSession # $example on$ from pyspark.ml.feature import CountVectorizer # $example off$ +from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("CountVectorizerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("CountVectorizerExample") + .getOrCreate()) # $example on$ # Input data: Each row is a bag of words with a ID. - df = spark.createDataFrame([ - (0, "a b c".split(" ")), - (1, "a b b c a".split(" ")) - ], ["id", "words"]) + df = spark.createDataFrame( + [ + (0, "a b c".split(" ")), + (1, "a b b c a".split(" ")) + ], + ["id", "words"]) # fit a CountVectorizerModel from the corpus. cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0) diff --git a/examples/src/main/python/ml/cross_validator.py b/examples/src/main/python/ml/cross_validator.py index db7054307c2e..a6c236a7d318 100644 --- a/examples/src/main/python/ml/cross_validator.py +++ b/examples/src/main/python/ml/cross_validator.py @@ -35,27 +35,29 @@ """ if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("CrossValidatorExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("CrossValidatorExample") + .getOrCreate()) # $example on$ # Prepare training documents, which are labeled. - training = spark.createDataFrame([ - (0, "a b c d e spark", 1.0), - (1, "b d", 0.0), - (2, "spark f g h", 1.0), - (3, "hadoop mapreduce", 0.0), - (4, "b spark who", 1.0), - (5, "g d a y", 0.0), - (6, "spark fly", 1.0), - (7, "was mapreduce", 0.0), - (8, "e spark program", 1.0), - (9, "a e c l", 0.0), - (10, "spark compile", 1.0), - (11, "hadoop software", 0.0) - ], ["id", "text", "label"]) + training = spark.createDataFrame( + [ + (0, "a b c d e spark", 1.0), + (1, "b d", 0.0), + (2, "spark f g h", 1.0), + (3, "hadoop mapreduce", 0.0), + (4, "b spark who", 1.0), + (5, "g d a y", 0.0), + (6, "spark fly", 1.0), + (7, "was mapreduce", 0.0), + (8, "e spark program", 1.0), + (9, "a e c l", 0.0), + (10, "spark compile", 1.0), + (11, "hadoop software", 0.0) + ], + ["id", "text", "label"]) # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr. tokenizer = Tokenizer(inputCol="text", outputCol="words") diff --git a/examples/src/main/python/ml/dataframe_example.py b/examples/src/main/python/ml/dataframe_example.py index 109f901012c9..109a293dd8df 100644 --- a/examples/src/main/python/ml/dataframe_example.py +++ b/examples/src/main/python/ml/dataframe_example.py @@ -22,13 +22,13 @@ from __future__ import print_function import os +import shutil import sys import tempfile -import shutil -from pyspark.sql import SparkSession from pyspark.mllib.stat import Statistics from pyspark.mllib.util import MLUtils +from pyspark.sql import SparkSession if __name__ == "__main__": if len(sys.argv) > 2: @@ -39,10 +39,10 @@ else: input = "data/mllib/sample_libsvm_data.txt" - spark = SparkSession \ - .builder \ - .appName("DataFrameExample") \ - .getOrCreate() + spark = (SparkSession + .builder + .appName("DataFrameExample") + .getOrCreate()) # Load input data print("Loading LIBSVM file with UDT from " + input + ".") @@ -57,8 +57,10 @@ labelSummary.show() # Convert features column to an RDD of vectors. - features = MLUtils.convertVectorColumnsFromML(df, "features") \ - .select("features").rdd.map(lambda r: r.features) + features = (MLUtils + .convertVectorColumnsFromML(df, "features") + .select("features") + .rdd.map(lambda r: r.features)) summary = Statistics.colStats(features) print("Selected features column with average values:\n" + str(summary.mean())) diff --git a/examples/src/main/python/ml/dct_example.py b/examples/src/main/python/ml/dct_example.py index c0457f8d0f43..8ed00a62cf6b 100644 --- a/examples/src/main/python/ml/dct_example.py +++ b/examples/src/main/python/ml/dct_example.py @@ -23,11 +23,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("DCTExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("DCTExample") + .getOrCreate()) # $example on$ df = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/decision_tree_classification_example.py b/examples/src/main/python/ml/decision_tree_classification_example.py index d6e2977de008..280661df75d5 100644 --- a/examples/src/main/python/ml/decision_tree_classification_example.py +++ b/examples/src/main/python/ml/decision_tree_classification_example.py @@ -23,16 +23,17 @@ # $example on$ from pyspark.ml import Pipeline from pyspark.ml.classification import DecisionTreeClassifier -from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator +from pyspark.ml.feature import StringIndexer, VectorIndexer # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("DecisionTreeClassificationExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("DecisionTreeClassificationExample") + .getOrCreate()) # $example on$ # Load the data stored in LIBSVM format as a DataFrame. @@ -43,8 +44,9 @@ labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data) # Automatically identify categorical features, and index them. # We specify maxCategories so features with > 4 distinct values are treated as continuous. - featureIndexer =\ - VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data) + featureIndexer = VectorIndexer(inputCol="features", + outputCol="indexedFeatures", + maxCategories=4).fit(data) # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) @@ -65,8 +67,9 @@ predictions.select("prediction", "indexedLabel", "features").show(5) # Select (prediction, true label) and compute test error - evaluator = MulticlassClassificationEvaluator( - labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy") + evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", + predictionCol="prediction", + metricName="accuracy") accuracy = evaluator.evaluate(predictions) print("Test Error = %g " % (1.0 - accuracy)) diff --git a/examples/src/main/python/ml/decision_tree_regression_example.py b/examples/src/main/python/ml/decision_tree_regression_example.py index 58d7ad921d8e..eabae21ef1cc 100644 --- a/examples/src/main/python/ml/decision_tree_regression_example.py +++ b/examples/src/main/python/ml/decision_tree_regression_example.py @@ -22,17 +22,18 @@ # $example on$ from pyspark.ml import Pipeline -from pyspark.ml.regression import DecisionTreeRegressor -from pyspark.ml.feature import VectorIndexer from pyspark.ml.evaluation import RegressionEvaluator +from pyspark.ml.feature import VectorIndexer +from pyspark.ml.regression import DecisionTreeRegressor # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("DecisionTreeRegressionExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("DecisionTreeRegressionExample") + .getOrCreate()) # $example on$ # Load the data stored in LIBSVM format as a DataFrame. @@ -40,8 +41,9 @@ # Automatically identify categorical features, and index them. # We specify maxCategories so features with > 4 distinct values are treated as continuous. - featureIndexer =\ - VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data) + featureIndexer = VectorIndexer(inputCol="features", + outputCol="indexedFeatures", + maxCategories=4).fit(data) # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) diff --git a/examples/src/main/python/ml/elementwise_product_example.py b/examples/src/main/python/ml/elementwise_product_example.py index 590053998bcc..a1374fc9e635 100644 --- a/examples/src/main/python/ml/elementwise_product_example.py +++ b/examples/src/main/python/ml/elementwise_product_example.py @@ -23,11 +23,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("ElementwiseProductExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("ElementwiseProductExample") + .getOrCreate()) # $example on$ # Create some vector data; also works for sparse vectors diff --git a/examples/src/main/python/ml/estimator_transformer_param_example.py b/examples/src/main/python/ml/estimator_transformer_param_example.py index eb2105143539..08e476f26b53 100644 --- a/examples/src/main/python/ml/estimator_transformer_param_example.py +++ b/examples/src/main/python/ml/estimator_transformer_param_example.py @@ -21,16 +21,17 @@ from __future__ import print_function # $example on$ -from pyspark.ml.linalg import Vectors from pyspark.ml.classification import LogisticRegression +from pyspark.ml.linalg import Vectors # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("EstimatorTransformerParamExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("EstimatorTransformerParamExample") + .getOrCreate()) # $example on$ # Prepare training data from a list of (label, features) tuples. @@ -82,8 +83,9 @@ # Note that model2.transform() outputs a "myProbability" column instead of the usual # 'probability' column since we renamed the lr.probabilityCol parameter previously. prediction = model2.transform(test) - result = prediction.select("features", "label", "myProbability", "prediction") \ - .collect() + result = (prediction + .select("features", "label", "myProbability", "prediction") + .collect()) for row in result: print("features=%s, label=%s -> prob=%s, prediction=%s" diff --git a/examples/src/main/python/ml/gaussian_mixture_example.py b/examples/src/main/python/ml/gaussian_mixture_example.py index e4a0d314e9d9..3e3bb8c2603a 100644 --- a/examples/src/main/python/ml/gaussian_mixture_example.py +++ b/examples/src/main/python/ml/gaussian_mixture_example.py @@ -29,10 +29,10 @@ """ if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("GaussianMixtureExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("GaussianMixtureExample") + .getOrCreate()) # $example on$ # loads data diff --git a/examples/src/main/python/ml/generalized_linear_regression_example.py b/examples/src/main/python/ml/generalized_linear_regression_example.py index 796752a60f3a..fa4cf0562199 100644 --- a/examples/src/main/python/ml/generalized_linear_regression_example.py +++ b/examples/src/main/python/ml/generalized_linear_regression_example.py @@ -17,11 +17,12 @@ from __future__ import print_function -from pyspark.sql import SparkSession # $example on$ from pyspark.ml.regression import GeneralizedLinearRegression +from pyspark.sql import SparkSession # $example off$ + """ An example demonstrating generalized linear regression. Run with: @@ -29,15 +30,16 @@ """ if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("GeneralizedLinearRegressionExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("GeneralizedLinearRegressionExample") + .getOrCreate()) # $example on$ # Load training data - dataset = spark.read.format("libsvm")\ - .load("data/mllib/sample_linear_regression_data.txt") + dataset = (spark + .read.format("libsvm") + .load("data/mllib/sample_linear_regression_data.txt")) glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3) diff --git a/examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py b/examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py index c2042fd7b7b0..8732c8de510b 100644 --- a/examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py +++ b/examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py @@ -23,16 +23,16 @@ # $example on$ from pyspark.ml import Pipeline from pyspark.ml.classification import GBTClassifier -from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator +from pyspark.ml.feature import StringIndexer, VectorIndexer # $example off$ from pyspark.sql import SparkSession if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("GradientBoostedTreeClassifierExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("GradientBoostedTreeClassifierExample") + .getOrCreate()) # $example on$ # Load and parse the data file, converting it to a DataFrame. @@ -43,8 +43,9 @@ labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data) # Automatically identify categorical features, and index them. # Set maxCategories so features with > 4 distinct values are treated as continuous. - featureIndexer =\ - VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data) + featureIndexer = VectorIndexer(inputCol="features", + outputCol="indexedFeatures", + maxCategories=4).fit(data) # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) @@ -65,8 +66,9 @@ predictions.select("prediction", "indexedLabel", "features").show(5) # Select (prediction, true label) and compute test error - evaluator = MulticlassClassificationEvaluator( - labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy") + evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", + predictionCol="prediction", + metricName="accuracy") accuracy = evaluator.evaluate(predictions) print("Test Error = %g" % (1.0 - accuracy)) diff --git a/examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py b/examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py index cc96c973e4b2..bbfcc21a281f 100644 --- a/examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py +++ b/examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py @@ -22,17 +22,17 @@ # $example on$ from pyspark.ml import Pipeline -from pyspark.ml.regression import GBTRegressor -from pyspark.ml.feature import VectorIndexer from pyspark.ml.evaluation import RegressionEvaluator +from pyspark.ml.feature import VectorIndexer +from pyspark.ml.regression import GBTRegressor # $example off$ from pyspark.sql import SparkSession if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("GradientBoostedTreeRegressorExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("GradientBoostedTreeRegressorExample") + .getOrCreate()) # $example on$ # Load and parse the data file, converting it to a DataFrame. @@ -40,8 +40,9 @@ # Automatically identify categorical features, and index them. # Set maxCategories so features with > 4 distinct values are treated as continuous. - featureIndexer =\ - VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data) + featureIndexer = VectorIndexer(inputCol="features", + outputCol="indexedFeatures", + maxCategories=4).fit(data) # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) @@ -62,8 +63,9 @@ predictions.select("prediction", "label", "features").show(5) # Select (prediction, true label) and compute test error - evaluator = RegressionEvaluator( - labelCol="label", predictionCol="prediction", metricName="rmse") + evaluator = RegressionEvaluator(labelCol="label", + predictionCol="prediction", + metricName="rmse") rmse = evaluator.evaluate(predictions) print("Root Mean Squared Error (RMSE) on test data = %g" % rmse) diff --git a/examples/src/main/python/ml/index_to_string_example.py b/examples/src/main/python/ml/index_to_string_example.py index 33d104e8e3f4..f05a48beaedf 100644 --- a/examples/src/main/python/ml/index_to_string_example.py +++ b/examples/src/main/python/ml/index_to_string_example.py @@ -23,10 +23,10 @@ from pyspark.sql import SparkSession if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("IndexToStringExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("IndexToStringExample") + .getOrCreate()) # $example on$ df = spark.createDataFrame( diff --git a/examples/src/main/python/ml/isotonic_regression_example.py b/examples/src/main/python/ml/isotonic_regression_example.py index 6ae15f1b4b0d..265d240c7de1 100644 --- a/examples/src/main/python/ml/isotonic_regression_example.py +++ b/examples/src/main/python/ml/isotonic_regression_example.py @@ -32,15 +32,16 @@ """ if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("IsotonicRegressionExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("IsotonicRegressionExample") + .getOrCreate()) # $example on$ # Loads data. - dataset = spark.read.format("libsvm")\ - .load("data/mllib/sample_isotonic_regression_libsvm_data.txt") + dataset = (spark.read + .format("libsvm") + .load("data/mllib/sample_isotonic_regression_libsvm_data.txt")) # Trains an isotonic regression model. model = IsotonicRegression().fit(dataset) diff --git a/examples/src/main/python/ml/kmeans_example.py b/examples/src/main/python/ml/kmeans_example.py index 6846ec459971..7541cc06dbf7 100644 --- a/examples/src/main/python/ml/kmeans_example.py +++ b/examples/src/main/python/ml/kmeans_example.py @@ -20,9 +20,9 @@ # $example on$ from pyspark.ml.clustering import KMeans # $example off$ - from pyspark.sql import SparkSession + """ An example demonstrating k-means clustering. Run with: @@ -32,10 +32,10 @@ """ if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("KMeansExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("KMeansExample") + .getOrCreate()) # $example on$ # Loads data. diff --git a/examples/src/main/python/ml/lda_example.py b/examples/src/main/python/ml/lda_example.py index a8b346f72cd6..45981be3423d 100644 --- a/examples/src/main/python/ml/lda_example.py +++ b/examples/src/main/python/ml/lda_example.py @@ -30,14 +30,17 @@ """ if __name__ == "__main__": - spark = SparkSession \ - .builder \ - .appName("LDAExample") \ - .getOrCreate() + spark = (SparkSession + .builder + .appName("LDAExample") + .getOrCreate()) # $example on$ # Loads data. - dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt") + dataset = (spark + .read + .format("libsvm") + .load("data/mllib/sample_lda_libsvm_data.txt")) # Trains a LDA model. lda = LDA(k=10, maxIter=10) diff --git a/examples/src/main/python/ml/linear_regression_with_elastic_net.py b/examples/src/main/python/ml/linear_regression_with_elastic_net.py index 6639e9160ab7..f92f4556a89e 100644 --- a/examples/src/main/python/ml/linear_regression_with_elastic_net.py +++ b/examples/src/main/python/ml/linear_regression_with_elastic_net.py @@ -22,16 +22,19 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("LinearRegressionWithElasticNet")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("LinearRegressionWithElasticNet") + .getOrCreate()) # $example on$ # Load training data - training = spark.read.format("libsvm")\ - .load("data/mllib/sample_linear_regression_data.txt") + training = (spark + .read + .format("libsvm") + .load("data/mllib/sample_linear_regression_data.txt")) lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) diff --git a/examples/src/main/python/ml/logistic_regression_summary_example.py b/examples/src/main/python/ml/logistic_regression_summary_example.py index bd440a1fbe8d..3fb009e67d16 100644 --- a/examples/src/main/python/ml/logistic_regression_summary_example.py +++ b/examples/src/main/python/ml/logistic_regression_summary_example.py @@ -29,10 +29,10 @@ """ if __name__ == "__main__": - spark = SparkSession \ - .builder \ - .appName("LogisticRegressionSummary") \ - .getOrCreate() + spark = (SparkSession + .builder + .appName("LogisticRegressionSummary") + .getOrCreate()) # Load training data training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") diff --git a/examples/src/main/python/ml/logistic_regression_with_elastic_net.py b/examples/src/main/python/ml/logistic_regression_with_elastic_net.py index d095fbd37340..41f02cd31f55 100644 --- a/examples/src/main/python/ml/logistic_regression_with_elastic_net.py +++ b/examples/src/main/python/ml/logistic_regression_with_elastic_net.py @@ -22,15 +22,18 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("LogisticRegressionWithElasticNet")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("LogisticRegressionWithElasticNet") + .getOrCreate()) # $example on$ # Load training data - training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + training = (spark.read + .format("libsvm") + .load("data/mllib/sample_libsvm_data.txt")) lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) diff --git a/examples/src/main/python/ml/max_abs_scaler_example.py b/examples/src/main/python/ml/max_abs_scaler_example.py index 45eda3cdadde..a5043af56512 100644 --- a/examples/src/main/python/ml/max_abs_scaler_example.py +++ b/examples/src/main/python/ml/max_abs_scaler_example.py @@ -24,10 +24,10 @@ from pyspark.sql import SparkSession if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("MaxAbsScalerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("MaxAbsScalerExample") + .getOrCreate()) # $example on$ dataFrame = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/min_max_scaler_example.py b/examples/src/main/python/ml/min_max_scaler_example.py index b5f272e59bc3..3814691c7620 100644 --- a/examples/src/main/python/ml/min_max_scaler_example.py +++ b/examples/src/main/python/ml/min_max_scaler_example.py @@ -24,10 +24,10 @@ from pyspark.sql import SparkSession if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("MinMaxScalerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("MinMaxScalerExample") + .getOrCreate()) # $example on$ dataFrame = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/multiclass_logistic_regression_with_elastic_net.py b/examples/src/main/python/ml/multiclass_logistic_regression_with_elastic_net.py index bb9cd82d6ba2..9a6074d1050f 100644 --- a/examples/src/main/python/ml/multiclass_logistic_regression_with_elastic_net.py +++ b/examples/src/main/python/ml/multiclass_logistic_regression_with_elastic_net.py @@ -23,17 +23,17 @@ from pyspark.sql import SparkSession if __name__ == "__main__": - spark = SparkSession \ - .builder \ - .appName("MulticlassLogisticRegressionWithElasticNet") \ - .getOrCreate() + spark = (SparkSession + .builder + .appName("MulticlassLogisticRegressionWithElasticNet") + .getOrCreate()) # $example on$ # Load training data - training = spark \ - .read \ - .format("libsvm") \ - .load("data/mllib/sample_multiclass_classification_data.txt") + training = (spark + .read + .format("libsvm") + .load("data/mllib/sample_multiclass_classification_data.txt")) lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) diff --git a/examples/src/main/python/ml/multilayer_perceptron_classification.py b/examples/src/main/python/ml/multilayer_perceptron_classification.py index 88fc69f75395..8c83fcef8ae0 100644 --- a/examples/src/main/python/ml/multilayer_perceptron_classification.py +++ b/examples/src/main/python/ml/multilayer_perceptron_classification.py @@ -24,13 +24,15 @@ from pyspark.sql import SparkSession if __name__ == "__main__": - spark = SparkSession\ - .builder.appName("multilayer_perceptron_classification_example").getOrCreate() + spark = (SparkSession + .builder + .appName("multilayer_perceptron_classification_example") + .getOrCreate()) # $example on$ # Load training data - data = spark.read.format("libsvm")\ - .load("data/mllib/sample_multiclass_classification_data.txt") + data = (spark.read.format("libsvm") + .load("data/mllib/sample_multiclass_classification_data.txt")) # Split the data into train and test splits = data.randomSplit([0.6, 0.4], 1234) diff --git a/examples/src/main/python/ml/n_gram_example.py b/examples/src/main/python/ml/n_gram_example.py index 31676e076a11..26862731a435 100644 --- a/examples/src/main/python/ml/n_gram_example.py +++ b/examples/src/main/python/ml/n_gram_example.py @@ -23,10 +23,10 @@ from pyspark.sql import SparkSession if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("NGramExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("NGramExample") + .getOrCreate()) # $example on$ wordDataFrame = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/naive_bayes_example.py b/examples/src/main/python/ml/naive_bayes_example.py index 7290ab81cd0e..a8aa67555096 100644 --- a/examples/src/main/python/ml/naive_bayes_example.py +++ b/examples/src/main/python/ml/naive_bayes_example.py @@ -23,16 +23,18 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("NaiveBayesExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("NaiveBayesExample") + .getOrCreate()) # $example on$ # Load training data - data = spark.read.format("libsvm") \ - .load("data/mllib/sample_libsvm_data.txt") + data = (spark.read + .format("libsvm") + .load("data/mllib/sample_libsvm_data.txt")) # Split the data into train and test splits = data.randomSplit([0.6, 0.4], 1234) @@ -50,7 +52,8 @@ predictions.show() # compute accuracy on the test set - evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", + evaluator = MulticlassClassificationEvaluator(labelCol="label", + predictionCol="prediction", metricName="accuracy") accuracy = evaluator.evaluate(predictions) print("Test set accuracy = " + str(accuracy)) diff --git a/examples/src/main/python/ml/normalizer_example.py b/examples/src/main/python/ml/normalizer_example.py index 510bd825fd28..fc6a8cd81cf3 100644 --- a/examples/src/main/python/ml/normalizer_example.py +++ b/examples/src/main/python/ml/normalizer_example.py @@ -23,11 +23,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("NormalizerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("NormalizerExample") + .getOrCreate()) # $example on$ dataFrame = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/one_vs_rest_example.py b/examples/src/main/python/ml/one_vs_rest_example.py index 8e00c25d9342..5746a446a9e6 100644 --- a/examples/src/main/python/ml/one_vs_rest_example.py +++ b/examples/src/main/python/ml/one_vs_rest_example.py @@ -31,15 +31,16 @@ """ if __name__ == "__main__": - spark = SparkSession \ - .builder \ - .appName("OneVsRestExample") \ - .getOrCreate() + spark = (SparkSession + .builder + .appName("OneVsRestExample") + .getOrCreate()) # $example on$ # load data file. - inputData = spark.read.format("libsvm") \ - .load("data/mllib/sample_multiclass_classification_data.txt") + inputData = (spark.read + .format("libsvm") + .load("data/mllib/sample_multiclass_classification_data.txt")) # generate the train/test split. (train, test) = inputData.randomSplit([0.8, 0.2]) diff --git a/examples/src/main/python/ml/onehot_encoder_example.py b/examples/src/main/python/ml/onehot_encoder_example.py index e1996c7f0a55..2d77070e76fa 100644 --- a/examples/src/main/python/ml/onehot_encoder_example.py +++ b/examples/src/main/python/ml/onehot_encoder_example.py @@ -22,11 +22,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("OneHotEncoderExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("OneHotEncoderExample") + .getOrCreate()) # $example on$ df = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/pca_example.py b/examples/src/main/python/ml/pca_example.py index 38746aced096..5f2b2a38301e 100644 --- a/examples/src/main/python/ml/pca_example.py +++ b/examples/src/main/python/ml/pca_example.py @@ -23,11 +23,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("PCAExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("PCAExample") + .getOrCreate()) # $example on$ data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),), diff --git a/examples/src/main/python/ml/pipeline_example.py b/examples/src/main/python/ml/pipeline_example.py index e1fab7cbe6d8..8a5b227287fd 100644 --- a/examples/src/main/python/ml/pipeline_example.py +++ b/examples/src/main/python/ml/pipeline_example.py @@ -26,11 +26,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("PipelineExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("PipelineExample") + .getOrCreate()) # $example on$ # Prepare training documents from a list of (id, text, label) tuples. diff --git a/examples/src/main/python/ml/polynomial_expansion_example.py b/examples/src/main/python/ml/polynomial_expansion_example.py index 40bcb7b13a3d..34caa1d8264d 100644 --- a/examples/src/main/python/ml/polynomial_expansion_example.py +++ b/examples/src/main/python/ml/polynomial_expansion_example.py @@ -23,6 +23,7 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": spark = SparkSession\ .builder\ diff --git a/examples/src/main/python/ml/quantile_discretizer_example.py b/examples/src/main/python/ml/quantile_discretizer_example.py index 0fc1d1949a77..400b43ab5ee9 100644 --- a/examples/src/main/python/ml/quantile_discretizer_example.py +++ b/examples/src/main/python/ml/quantile_discretizer_example.py @@ -23,10 +23,10 @@ from pyspark.sql import SparkSession if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("QuantileDiscretizerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("QuantileDiscretizerExample") + .getOrCreate()) # $example on$ data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)] diff --git a/examples/src/main/python/ml/random_forest_classifier_example.py b/examples/src/main/python/ml/random_forest_classifier_example.py index 4eaa94dd7f48..dcdaa44e6da4 100644 --- a/examples/src/main/python/ml/random_forest_classifier_example.py +++ b/examples/src/main/python/ml/random_forest_classifier_example.py @@ -23,16 +23,17 @@ # $example on$ from pyspark.ml import Pipeline from pyspark.ml.classification import RandomForestClassifier -from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator +from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("RandomForestClassifierExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("RandomForestClassifierExample") + .getOrCreate()) # $example on$ # Load and parse the data file, converting it to a DataFrame. @@ -44,17 +45,21 @@ # Automatically identify categorical features, and index them. # Set maxCategories so features with > 4 distinct values are treated as continuous. - featureIndexer =\ - VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data) + featureIndexer = VectorIndexer(inputCol="features", + outputCol="indexedFeatures", + maxCategories=4).fit(data) # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) # Train a RandomForest model. - rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10) + rf = RandomForestClassifier(labelCol="indexedLabel", + featuresCol="indexedFeatures", + numTrees=10) # Convert indexed labels back to original labels. - labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", + labelConverter = IndexToString(inputCol="prediction", + outputCol="predictedLabel", labels=labelIndexer.labels) # Chain indexers and forest in a Pipeline @@ -70,8 +75,9 @@ predictions.select("predictedLabel", "label", "features").show(5) # Select (prediction, true label) and compute test error - evaluator = MulticlassClassificationEvaluator( - labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy") + evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", + predictionCol="prediction", + metricName="accuracy") accuracy = evaluator.evaluate(predictions) print("Test Error = %g" % (1.0 - accuracy)) diff --git a/examples/src/main/python/ml/random_forest_regressor_example.py b/examples/src/main/python/ml/random_forest_regressor_example.py index a34edff2ecaa..81aafdfc8a8d 100644 --- a/examples/src/main/python/ml/random_forest_regressor_example.py +++ b/examples/src/main/python/ml/random_forest_regressor_example.py @@ -22,26 +22,30 @@ # $example on$ from pyspark.ml import Pipeline -from pyspark.ml.regression import RandomForestRegressor -from pyspark.ml.feature import VectorIndexer from pyspark.ml.evaluation import RegressionEvaluator +from pyspark.ml.feature import VectorIndexer +from pyspark.ml.regression import RandomForestRegressor # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("RandomForestRegressorExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("RandomForestRegressorExample") + .getOrCreate()) # $example on$ # Load and parse the data file, converting it to a DataFrame. - data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") + data = (spark.read + .format("libsvm") + .load("data/mllib/sample_libsvm_data.txt")) # Automatically identify categorical features, and index them. # Set maxCategories so features with > 4 distinct values are treated as continuous. - featureIndexer =\ - VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data) + featureIndexer = VectorIndexer(inputCol="features", + outputCol="indexedFeatures", + maxCategories=4).fit(data) # Split the data into training and test sets (30% held out for testing) (trainingData, testData) = data.randomSplit([0.7, 0.3]) @@ -62,8 +66,9 @@ predictions.select("prediction", "label", "features").show(5) # Select (prediction, true label) and compute test error - evaluator = RegressionEvaluator( - labelCol="label", predictionCol="prediction", metricName="rmse") + evaluator = RegressionEvaluator(labelCol="label", + predictionCol="prediction", + metricName="rmse") rmse = evaluator.evaluate(predictions) print("Root Mean Squared Error (RMSE) on test data = %g" % rmse) diff --git a/examples/src/main/python/ml/rformula_example.py b/examples/src/main/python/ml/rformula_example.py index 6629239db29e..bfbbe6f8ec06 100644 --- a/examples/src/main/python/ml/rformula_example.py +++ b/examples/src/main/python/ml/rformula_example.py @@ -22,11 +22,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("RFormulaExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("RFormulaExample") + .getOrCreate()) # $example on$ dataset = spark.createDataFrame( diff --git a/examples/src/main/python/ml/sql_transformer.py b/examples/src/main/python/ml/sql_transformer.py index 0bf8f35720c9..85507d4a1ee7 100644 --- a/examples/src/main/python/ml/sql_transformer.py +++ b/examples/src/main/python/ml/sql_transformer.py @@ -22,11 +22,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("SQLTransformerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("SQLTransformerExample") + .getOrCreate()) # $example on$ df = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/standard_scaler_example.py b/examples/src/main/python/ml/standard_scaler_example.py index c0027480e69b..bd8140fcc2a7 100644 --- a/examples/src/main/python/ml/standard_scaler_example.py +++ b/examples/src/main/python/ml/standard_scaler_example.py @@ -22,11 +22,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("StandardScalerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("StandardScalerExample") + .getOrCreate()) # $example on$ dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") diff --git a/examples/src/main/python/ml/stopwords_remover_example.py b/examples/src/main/python/ml/stopwords_remover_example.py index 3b8e7855e3e7..99f32acbe4e8 100644 --- a/examples/src/main/python/ml/stopwords_remover_example.py +++ b/examples/src/main/python/ml/stopwords_remover_example.py @@ -22,11 +22,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("StopWordsRemoverExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("StopWordsRemoverExample") + .getOrCreate()) # $example on$ sentenceData = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/string_indexer_example.py b/examples/src/main/python/ml/string_indexer_example.py index 2255bfb9c1a6..f64850dfcab9 100644 --- a/examples/src/main/python/ml/string_indexer_example.py +++ b/examples/src/main/python/ml/string_indexer_example.py @@ -22,11 +22,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("StringIndexerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("StringIndexerExample") + .getOrCreate()) # $example on$ df = spark.createDataFrame( diff --git a/examples/src/main/python/ml/tf_idf_example.py b/examples/src/main/python/ml/tf_idf_example.py index d43244fa68e9..e66d9b1ba4ea 100644 --- a/examples/src/main/python/ml/tf_idf_example.py +++ b/examples/src/main/python/ml/tf_idf_example.py @@ -18,15 +18,15 @@ from __future__ import print_function # $example on$ -from pyspark.ml.feature import HashingTF, IDF, Tokenizer +from pyspark.ml.feature import IDF, HashingTF, Tokenizer # $example off$ from pyspark.sql import SparkSession if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("TfIdfExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("TfIdfExample") + .getOrCreate()) # $example on$ sentenceData = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/tokenizer_example.py b/examples/src/main/python/ml/tokenizer_example.py index 5c65c5c9f826..73c8f4bfba66 100644 --- a/examples/src/main/python/ml/tokenizer_example.py +++ b/examples/src/main/python/ml/tokenizer_example.py @@ -18,17 +18,20 @@ from __future__ import print_function # $example on$ -from pyspark.ml.feature import Tokenizer, RegexTokenizer +from pyspark.ml.feature import RegexTokenizer, Tokenizer +# $example off$ +from pyspark.sql import SparkSession +# $example on$ from pyspark.sql.functions import col, udf from pyspark.sql.types import IntegerType # $example off$ -from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("TokenizerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("TokenizerExample") + .getOrCreate()) # $example on$ sentenceDataFrame = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/train_validation_split.py b/examples/src/main/python/ml/train_validation_split.py index d104f7d30a1b..51ebd39f087e 100644 --- a/examples/src/main/python/ml/train_validation_split.py +++ b/examples/src/main/python/ml/train_validation_split.py @@ -31,15 +31,16 @@ """ if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("TrainValidationSplit")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("TrainValidationSplit") + .getOrCreate()) # $example on$ # Prepare training and test data. - data = spark.read.format("libsvm")\ - .load("data/mllib/sample_linear_regression_data.txt") + data = (spark.read + .format("libsvm") + .load("data/mllib/sample_linear_regression_data.txt")) train, test = data.randomSplit([0.9, 0.1], seed=12345) lr = LinearRegression(maxIter=10) @@ -47,11 +48,11 @@ # We use a ParamGridBuilder to construct a grid of parameters to search over. # TrainValidationSplit will try all combinations of values and determine best model using # the evaluator. - paramGrid = ParamGridBuilder()\ - .addGrid(lr.regParam, [0.1, 0.01]) \ - .addGrid(lr.fitIntercept, [False, True])\ - .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\ - .build() + paramGrid = (ParamGridBuilder() + .addGrid(lr.regParam, [0.1, 0.01]) + .addGrid(lr.fitIntercept, [False, True]) + .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) + .build()) # In this case the estimator is simply the linear regression. # A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator. diff --git a/examples/src/main/python/ml/vector_assembler_example.py b/examples/src/main/python/ml/vector_assembler_example.py index 98de1d5ea7da..58220c2cbe78 100644 --- a/examples/src/main/python/ml/vector_assembler_example.py +++ b/examples/src/main/python/ml/vector_assembler_example.py @@ -18,16 +18,17 @@ from __future__ import print_function # $example on$ -from pyspark.ml.linalg import Vectors from pyspark.ml.feature import VectorAssembler +from pyspark.ml.linalg import Vectors # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("VectorAssemblerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("VectorAssemblerExample") + .getOrCreate()) # $example on$ dataset = spark.createDataFrame( diff --git a/examples/src/main/python/ml/vector_indexer_example.py b/examples/src/main/python/ml/vector_indexer_example.py index 5c2956077d6c..9ed68f2cddf4 100644 --- a/examples/src/main/python/ml/vector_indexer_example.py +++ b/examples/src/main/python/ml/vector_indexer_example.py @@ -22,11 +22,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("VectorIndexerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("VectorIndexerExample") + .getOrCreate()) # $example on$ data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") diff --git a/examples/src/main/python/ml/vector_slicer_example.py b/examples/src/main/python/ml/vector_slicer_example.py index 68c8cfe27e37..8ca2c7fd5dd0 100644 --- a/examples/src/main/python/ml/vector_slicer_example.py +++ b/examples/src/main/python/ml/vector_slicer_example.py @@ -20,15 +20,18 @@ # $example on$ from pyspark.ml.feature import VectorSlicer from pyspark.ml.linalg import Vectors -from pyspark.sql.types import Row # $example off$ from pyspark.sql import SparkSession +# $example on$ +from pyspark.sql.types import Row +# $example off$ + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("VectorSlicerExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("VectorSlicerExample") + .getOrCreate()) # $example on$ df = spark.createDataFrame([ diff --git a/examples/src/main/python/ml/word2vec_example.py b/examples/src/main/python/ml/word2vec_example.py index 77f8951df088..cbc41807600d 100644 --- a/examples/src/main/python/ml/word2vec_example.py +++ b/examples/src/main/python/ml/word2vec_example.py @@ -22,11 +22,12 @@ # $example off$ from pyspark.sql import SparkSession + if __name__ == "__main__": - spark = SparkSession\ - .builder\ - .appName("Word2VecExample")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("Word2VecExample") + .getOrCreate()) # $example on$ # Input data: Each row is a bag of words from a sentence or document. diff --git a/examples/src/main/python/mllib/binary_classification_metrics_example.py b/examples/src/main/python/mllib/binary_classification_metrics_example.py index 91f8378f29c0..1b246df84600 100644 --- a/examples/src/main/python/mllib/binary_classification_metrics_example.py +++ b/examples/src/main/python/mllib/binary_classification_metrics_example.py @@ -18,12 +18,14 @@ Binary Classification Metrics Example. """ from __future__ import print_function -from pyspark.sql import SparkSession + # $example on$ from pyspark.mllib.classification import LogisticRegressionWithLBFGS from pyspark.mllib.evaluation import BinaryClassificationMetrics from pyspark.mllib.regression import LabeledPoint # $example off$ +from pyspark.sql import SparkSession + if __name__ == "__main__": spark = SparkSession\ @@ -34,9 +36,10 @@ # $example on$ # Several of the methods available in scala are currently missing from pyspark # Load training data in LIBSVM format - data = spark\ - .read.format("libsvm").load("data/mllib/sample_binary_classification_data.txt")\ - .rdd.map(lambda row: LabeledPoint(row[0], row[1])) + data = (spark + .read.format("libsvm") + .load("data/mllib/sample_binary_classification_data.txt") + .rdd.map(lambda row: LabeledPoint(row[0], row[1]))) # Split data into training (60%) and test (40%) training, test = data.randomSplit([0.6, 0.4], seed=11) diff --git a/examples/src/main/python/mllib/bisecting_k_means_example.py b/examples/src/main/python/mllib/bisecting_k_means_example.py index 7f4d0402d620..dff09f59c497 100644 --- a/examples/src/main/python/mllib/bisecting_k_means_example.py +++ b/examples/src/main/python/mllib/bisecting_k_means_example.py @@ -20,11 +20,11 @@ # $example on$ from numpy import array # $example off$ - from pyspark import SparkContext # $example on$ from pyspark.mllib.clustering import BisectingKMeans, BisectingKMeansModel # $example off$ +# if __name__ == "__main__": sc = SparkContext(appName="PythonBisectingKMeansExample") # SparkContext diff --git a/examples/src/main/python/mllib/correlations.py b/examples/src/main/python/mllib/correlations.py index 0e13546b88e6..835e393dc5c1 100755 --- a/examples/src/main/python/mllib/correlations.py +++ b/examples/src/main/python/mllib/correlations.py @@ -39,8 +39,9 @@ filepath = 'data/mllib/sample_linear_regression_data.txt' corrType = 'pearson' - points = MLUtils.loadLibSVMFile(sc, filepath)\ - .map(lambda lp: LabeledPoint(lp.label, lp.features.toArray())) + points = (MLUtils + .loadLibSVMFile(sc, filepath) + .map(lambda lp: LabeledPoint(lp.label, lp.features.toArray()))) print() print('Summary of data file: ' + filepath) diff --git a/examples/src/main/python/mllib/correlations_example.py b/examples/src/main/python/mllib/correlations_example.py index 66d18f6e5df1..ee62215c4d4e 100644 --- a/examples/src/main/python/mllib/correlations_example.py +++ b/examples/src/main/python/mllib/correlations_example.py @@ -17,13 +17,15 @@ from __future__ import print_function +# $example on$ import numpy as np - +# $example off$ from pyspark import SparkContext # $example on$ from pyspark.mllib.stat import Statistics # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="CorrelationsExample") # SparkContext diff --git a/examples/src/main/python/mllib/decision_tree_classification_example.py b/examples/src/main/python/mllib/decision_tree_classification_example.py index 7eecf500584a..5d3023a12746 100644 --- a/examples/src/main/python/mllib/decision_tree_classification_example.py +++ b/examples/src/main/python/mllib/decision_tree_classification_example.py @@ -26,6 +26,7 @@ from pyspark.mllib.util import MLUtils # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="PythonDecisionTreeClassificationExample") diff --git a/examples/src/main/python/mllib/decision_tree_regression_example.py b/examples/src/main/python/mllib/decision_tree_regression_example.py index acf9e25fdf31..ebc7825f9b09 100644 --- a/examples/src/main/python/mllib/decision_tree_regression_example.py +++ b/examples/src/main/python/mllib/decision_tree_regression_example.py @@ -25,6 +25,7 @@ from pyspark.mllib.tree import DecisionTree, DecisionTreeModel from pyspark.mllib.util import MLUtils # $example off$ +# if __name__ == "__main__": diff --git a/examples/src/main/python/mllib/elementwise_product_example.py b/examples/src/main/python/mllib/elementwise_product_example.py index 6d8bf6d42e08..a2232e97ba2a 100644 --- a/examples/src/main/python/mllib/elementwise_product_example.py +++ b/examples/src/main/python/mllib/elementwise_product_example.py @@ -23,6 +23,7 @@ from pyspark.mllib.linalg import Vectors # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="ElementwiseProductExample") # SparkContext diff --git a/examples/src/main/python/mllib/fpgrowth_example.py b/examples/src/main/python/mllib/fpgrowth_example.py index 715f5268206c..06ac43e56320 100644 --- a/examples/src/main/python/mllib/fpgrowth_example.py +++ b/examples/src/main/python/mllib/fpgrowth_example.py @@ -15,10 +15,11 @@ # limitations under the License. # +from pyspark import SparkContext # $example on$ from pyspark.mllib.fpm import FPGrowth # $example off$ -from pyspark import SparkContext + if __name__ == "__main__": sc = SparkContext(appName="FPGrowth") diff --git a/examples/src/main/python/mllib/gaussian_mixture_example.py b/examples/src/main/python/mllib/gaussian_mixture_example.py index a60e799d62eb..b1f7c69ad6bc 100644 --- a/examples/src/main/python/mllib/gaussian_mixture_example.py +++ b/examples/src/main/python/mllib/gaussian_mixture_example.py @@ -20,12 +20,12 @@ # $example on$ from numpy import array # $example off$ - from pyspark import SparkContext # $example on$ from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="GaussianMixtureExample") # SparkContext @@ -39,8 +39,8 @@ # Save and load model gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel") - sameModel = GaussianMixtureModel\ - .load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel") + sameModel = GaussianMixtureModel.load( + sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel") # output parameters of model for i in range(2): diff --git a/examples/src/main/python/mllib/gaussian_mixture_model.py b/examples/src/main/python/mllib/gaussian_mixture_model.py index 6b46e27ddaaa..2128aa49dadf 100644 --- a/examples/src/main/python/mllib/gaussian_mixture_model.py +++ b/examples/src/main/python/mllib/gaussian_mixture_model.py @@ -20,17 +20,17 @@ """ from __future__ import print_function +import argparse +import random import sys -if sys.version >= '3': - long = int -import random -import argparse import numpy as np - from pyspark import SparkConf, SparkContext from pyspark.mllib.clustering import GaussianMixture +if sys.version >= '3': + long = int + def parseVector(line): return np.array([float(x) for x in line.split(' ')]) diff --git a/examples/src/main/python/mllib/gradient_boosting_classification_example.py b/examples/src/main/python/mllib/gradient_boosting_classification_example.py index 65a03572be9b..b5ae424ff731 100644 --- a/examples/src/main/python/mllib/gradient_boosting_classification_example.py +++ b/examples/src/main/python/mllib/gradient_boosting_classification_example.py @@ -26,6 +26,7 @@ from pyspark.mllib.util import MLUtils # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="PythonGradientBoostedTreesClassificationExample") # $example on$ diff --git a/examples/src/main/python/mllib/gradient_boosting_regression_example.py b/examples/src/main/python/mllib/gradient_boosting_regression_example.py index 877f8ab461cc..8456ea8d3f11 100644 --- a/examples/src/main/python/mllib/gradient_boosting_regression_example.py +++ b/examples/src/main/python/mllib/gradient_boosting_regression_example.py @@ -26,6 +26,7 @@ from pyspark.mllib.util import MLUtils # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="PythonGradientBoostedTreesRegressionExample") # $example on$ diff --git a/examples/src/main/python/mllib/hypothesis_testing_example.py b/examples/src/main/python/mllib/hypothesis_testing_example.py index e566ead0d318..150eba022e6f 100644 --- a/examples/src/main/python/mllib/hypothesis_testing_example.py +++ b/examples/src/main/python/mllib/hypothesis_testing_example.py @@ -24,6 +24,7 @@ from pyspark.mllib.stat import Statistics # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="HypothesisTestingExample") diff --git a/examples/src/main/python/mllib/hypothesis_testing_kolmogorov_smirnov_test_example.py b/examples/src/main/python/mllib/hypothesis_testing_kolmogorov_smirnov_test_example.py index ef380dee79d3..8b4463795e63 100644 --- a/examples/src/main/python/mllib/hypothesis_testing_kolmogorov_smirnov_test_example.py +++ b/examples/src/main/python/mllib/hypothesis_testing_kolmogorov_smirnov_test_example.py @@ -22,6 +22,7 @@ from pyspark.mllib.stat import Statistics # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="HypothesisTestingKolmogorovSmirnovTestExample") diff --git a/examples/src/main/python/mllib/isotonic_regression_example.py b/examples/src/main/python/mllib/isotonic_regression_example.py index 33d618ab48ea..6e9478405ad9 100644 --- a/examples/src/main/python/mllib/isotonic_regression_example.py +++ b/examples/src/main/python/mllib/isotonic_regression_example.py @@ -20,13 +20,18 @@ """ from __future__ import print_function -from pyspark import SparkContext # $example on$ import math -from pyspark.mllib.regression import LabeledPoint, IsotonicRegression, IsotonicRegressionModel +# $example off$ + +from pyspark import SparkContext +# $example on$ +from pyspark.mllib.regression import (IsotonicRegression, + IsotonicRegressionModel, LabeledPoint) from pyspark.mllib.util import MLUtils # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="PythonIsotonicRegressionExample") diff --git a/examples/src/main/python/mllib/k_means_example.py b/examples/src/main/python/mllib/k_means_example.py index d6058f45020c..31b26d7497af 100644 --- a/examples/src/main/python/mllib/k_means_example.py +++ b/examples/src/main/python/mllib/k_means_example.py @@ -18,15 +18,16 @@ from __future__ import print_function # $example on$ -from numpy import array from math import sqrt -# $example off$ +from numpy import array +# $example off$ from pyspark import SparkContext # $example on$ from pyspark.mllib.clustering import KMeans, KMeansModel # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="KMeansExample") # SparkContext diff --git a/examples/src/main/python/mllib/kernel_density_estimation_example.py b/examples/src/main/python/mllib/kernel_density_estimation_example.py index 3e8f7241a4a1..0c607325ae19 100644 --- a/examples/src/main/python/mllib/kernel_density_estimation_example.py +++ b/examples/src/main/python/mllib/kernel_density_estimation_example.py @@ -22,6 +22,7 @@ from pyspark.mllib.stat import KernelDensity # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="KernelDensityEstimationExample") # SparkContext diff --git a/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py b/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py index 2a1bef5f207b..2fd34b04fdf5 100644 --- a/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py +++ b/examples/src/main/python/mllib/latent_dirichlet_allocation_example.py @@ -23,6 +23,7 @@ from pyspark.mllib.linalg import Vectors # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="LatentDirichletAllocationExample") # SparkContext diff --git a/examples/src/main/python/mllib/linear_regression_with_sgd_example.py b/examples/src/main/python/mllib/linear_regression_with_sgd_example.py index 6744463d40ef..31a33c72af26 100644 --- a/examples/src/main/python/mllib/linear_regression_with_sgd_example.py +++ b/examples/src/main/python/mllib/linear_regression_with_sgd_example.py @@ -22,9 +22,11 @@ from pyspark import SparkContext # $example on$ -from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel +from pyspark.mllib.regression import (LabeledPoint, LinearRegressionModel, + LinearRegressionWithSGD) # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="PythonLinearRegressionWithSGDExample") diff --git a/examples/src/main/python/mllib/logistic_regression.py b/examples/src/main/python/mllib/logistic_regression.py index d4f1d34e2d8c..0c91e0a9b613 100755 --- a/examples/src/main/python/mllib/logistic_regression.py +++ b/examples/src/main/python/mllib/logistic_regression.py @@ -25,8 +25,8 @@ import sys from pyspark import SparkContext -from pyspark.mllib.regression import LabeledPoint from pyspark.mllib.classification import LogisticRegressionWithSGD +from pyspark.mllib.regression import LabeledPoint def parsePoint(line): diff --git a/examples/src/main/python/mllib/logistic_regression_with_lbfgs_example.py b/examples/src/main/python/mllib/logistic_regression_with_lbfgs_example.py index c9b768b3147d..0f81108039a4 100644 --- a/examples/src/main/python/mllib/logistic_regression_with_lbfgs_example.py +++ b/examples/src/main/python/mllib/logistic_regression_with_lbfgs_example.py @@ -22,9 +22,11 @@ from pyspark import SparkContext # $example on$ -from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel +from pyspark.mllib.classification import (LogisticRegressionModel, + LogisticRegressionWithLBFGS) from pyspark.mllib.regression import LabeledPoint # $example off$ +# if __name__ == "__main__": diff --git a/examples/src/main/python/mllib/multi_class_metrics_example.py b/examples/src/main/python/mllib/multi_class_metrics_example.py index 7dc5fb4f9127..7447e3bfd771 100644 --- a/examples/src/main/python/mllib/multi_class_metrics_example.py +++ b/examples/src/main/python/mllib/multi_class_metrics_example.py @@ -15,13 +15,13 @@ # limitations under the License. # +from pyspark import SparkContext # $example on$ from pyspark.mllib.classification import LogisticRegressionWithLBFGS -from pyspark.mllib.util import MLUtils from pyspark.mllib.evaluation import MulticlassMetrics +from pyspark.mllib.util import MLUtils # $example off$ -from pyspark import SparkContext if __name__ == "__main__": sc = SparkContext(appName="MultiClassMetricsExample") diff --git a/examples/src/main/python/mllib/multi_label_metrics_example.py b/examples/src/main/python/mllib/multi_label_metrics_example.py index 960ade659737..d4970efeb2e4 100644 --- a/examples/src/main/python/mllib/multi_label_metrics_example.py +++ b/examples/src/main/python/mllib/multi_label_metrics_example.py @@ -15,10 +15,11 @@ # limitations under the License. # +from pyspark import SparkContext # $example on$ from pyspark.mllib.evaluation import MultilabelMetrics # $example off$ -from pyspark import SparkContext + if __name__ == "__main__": sc = SparkContext(appName="MultiLabelMetricsExample") diff --git a/examples/src/main/python/mllib/naive_bayes_example.py b/examples/src/main/python/mllib/naive_bayes_example.py index a29fcccac5bf..0d4d0d36bd9c 100644 --- a/examples/src/main/python/mllib/naive_bayes_example.py +++ b/examples/src/main/python/mllib/naive_bayes_example.py @@ -24,16 +24,17 @@ from __future__ import print_function +# $example on$ import shutil +# $example off$ from pyspark import SparkContext # $example on$ from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel from pyspark.mllib.util import MLUtils - - # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="PythonNaiveBayesExample") diff --git a/examples/src/main/python/mllib/normalizer_example.py b/examples/src/main/python/mllib/normalizer_example.py index a4e028ca9af8..3e211a1b276f 100644 --- a/examples/src/main/python/mllib/normalizer_example.py +++ b/examples/src/main/python/mllib/normalizer_example.py @@ -23,6 +23,7 @@ from pyspark.mllib.util import MLUtils # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="NormalizerExample") # SparkContext diff --git a/examples/src/main/python/mllib/power_iteration_clustering_example.py b/examples/src/main/python/mllib/power_iteration_clustering_example.py index ca19c0ccb60c..fe132a0e8733 100644 --- a/examples/src/main/python/mllib/power_iteration_clustering_example.py +++ b/examples/src/main/python/mllib/power_iteration_clustering_example.py @@ -19,7 +19,9 @@ from pyspark import SparkContext # $example on$ -from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel +from pyspark.mllib.clustering import (PowerIterationClustering, + PowerIterationClusteringModel) + # $example off$ if __name__ == "__main__": @@ -36,9 +38,9 @@ model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster))) # Save and load model - model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel") - sameModel = PowerIterationClusteringModel\ - .load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel") + pic_model = "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel" + model.save(sc, pic_model) + sameModel = PowerIterationClusteringModel.load(sc, pic_model) # $example off$ sc.stop() diff --git a/examples/src/main/python/mllib/random_forest_classification_example.py b/examples/src/main/python/mllib/random_forest_classification_example.py index 5ac67520daee..6b861d2c88b5 100644 --- a/examples/src/main/python/mllib/random_forest_classification_example.py +++ b/examples/src/main/python/mllib/random_forest_classification_example.py @@ -26,6 +26,7 @@ from pyspark.mllib.util import MLUtils # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="PythonRandomForestClassificationExample") # $example on$ diff --git a/examples/src/main/python/mllib/random_forest_regression_example.py b/examples/src/main/python/mllib/random_forest_regression_example.py index 7e986a0d307f..3068714c6f2c 100644 --- a/examples/src/main/python/mllib/random_forest_regression_example.py +++ b/examples/src/main/python/mllib/random_forest_regression_example.py @@ -26,6 +26,7 @@ from pyspark.mllib.util import MLUtils # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="PythonRandomForestRegressionExample") # $example on$ diff --git a/examples/src/main/python/mllib/ranking_metrics_example.py b/examples/src/main/python/mllib/ranking_metrics_example.py index 21333deded35..6c2870bd9bff 100644 --- a/examples/src/main/python/mllib/ranking_metrics_example.py +++ b/examples/src/main/python/mllib/ranking_metrics_example.py @@ -15,11 +15,12 @@ # limitations under the License. # +from pyspark import SparkContext # $example on$ +from pyspark.mllib.evaluation import RankingMetrics, RegressionMetrics from pyspark.mllib.recommendation import ALS, Rating -from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics # $example off$ -from pyspark import SparkContext + if __name__ == "__main__": sc = SparkContext(appName="Ranking Metrics Example") diff --git a/examples/src/main/python/mllib/recommendation_example.py b/examples/src/main/python/mllib/recommendation_example.py index 00e683c3ae93..3af9f0cfc021 100644 --- a/examples/src/main/python/mllib/recommendation_example.py +++ b/examples/src/main/python/mllib/recommendation_example.py @@ -21,18 +21,19 @@ from __future__ import print_function from pyspark import SparkContext - # $example on$ from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="PythonCollaborativeFilteringExample") # $example on$ # Load and parse the data data = sc.textFile("data/mllib/als/test.data") - ratings = data.map(lambda l: l.split(','))\ - .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))) + ratings = (data + .map(lambda l: l.split(',')) + .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))) # Build the recommendation model using Alternating Least Squares rank = 10 diff --git a/examples/src/main/python/mllib/regression_metrics_example.py b/examples/src/main/python/mllib/regression_metrics_example.py index a3a83aafd7a1..a795059b3289 100644 --- a/examples/src/main/python/mllib/regression_metrics_example.py +++ b/examples/src/main/python/mllib/regression_metrics_example.py @@ -14,13 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from pyspark import SparkContext # $example on$ -from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD from pyspark.mllib.evaluation import RegressionMetrics from pyspark.mllib.linalg import DenseVector +from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD # $example off$ -from pyspark import SparkContext if __name__ == "__main__": sc = SparkContext(appName="Regression Metrics Example") diff --git a/examples/src/main/python/mllib/standard_scaler_example.py b/examples/src/main/python/mllib/standard_scaler_example.py index 442094e1bf36..88a39e624541 100644 --- a/examples/src/main/python/mllib/standard_scaler_example.py +++ b/examples/src/main/python/mllib/standard_scaler_example.py @@ -24,6 +24,7 @@ from pyspark.mllib.util import MLUtils # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="StandardScalerExample") # SparkContext diff --git a/examples/src/main/python/mllib/streaming_k_means_example.py b/examples/src/main/python/mllib/streaming_k_means_example.py index e82509ad3ffb..d12e2e04f401 100644 --- a/examples/src/main/python/mllib/streaming_k_means_example.py +++ b/examples/src/main/python/mllib/streaming_k_means_example.py @@ -18,13 +18,14 @@ from __future__ import print_function from pyspark import SparkContext -from pyspark.streaming import StreamingContext # $example on$ +from pyspark.mllib.clustering import StreamingKMeans from pyspark.mllib.linalg import Vectors from pyspark.mllib.regression import LabeledPoint -from pyspark.mllib.clustering import StreamingKMeans +from pyspark.streaming import StreamingContext # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="StreamingKMeansExample") # SparkContext ssc = StreamingContext(sc, 1) @@ -38,8 +39,9 @@ def parse(lp): return LabeledPoint(label, vec) - trainingData = sc.textFile("data/mllib/kmeans_data.txt")\ - .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')])) + trainingData = (sc + .textFile("data/mllib/kmeans_data.txt") + .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))) testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse) diff --git a/examples/src/main/python/mllib/streaming_linear_regression_example.py b/examples/src/main/python/mllib/streaming_linear_regression_example.py index f600496867c1..2045a99452c6 100644 --- a/examples/src/main/python/mllib/streaming_linear_regression_example.py +++ b/examples/src/main/python/mllib/streaming_linear_regression_example.py @@ -25,13 +25,14 @@ # $example off$ from pyspark import SparkContext -from pyspark.streaming import StreamingContext # $example on$ from pyspark.mllib.linalg import Vectors -from pyspark.mllib.regression import LabeledPoint -from pyspark.mllib.regression import StreamingLinearRegressionWithSGD +from pyspark.mllib.regression import (LabeledPoint, + StreamingLinearRegressionWithSGD) +from pyspark.streaming import StreamingContext # $example off$ + if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: streaming_linear_regression_example.py ", diff --git a/examples/src/main/python/mllib/summary_statistics_example.py b/examples/src/main/python/mllib/summary_statistics_example.py index d55d1a2c2d0e..e153bd5d19e7 100644 --- a/examples/src/main/python/mllib/summary_statistics_example.py +++ b/examples/src/main/python/mllib/summary_statistics_example.py @@ -17,13 +17,15 @@ from __future__ import print_function -from pyspark import SparkContext # $example on$ import numpy as np - +# $example off$ +from pyspark import SparkContext +# $example on$ from pyspark.mllib.stat import Statistics # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="SummaryStatisticsExample") # SparkContext diff --git a/examples/src/main/python/mllib/svm_with_sgd_example.py b/examples/src/main/python/mllib/svm_with_sgd_example.py index 24b8f431e059..374544c0497e 100644 --- a/examples/src/main/python/mllib/svm_with_sgd_example.py +++ b/examples/src/main/python/mllib/svm_with_sgd_example.py @@ -17,10 +17,11 @@ from pyspark import SparkContext # $example on$ -from pyspark.mllib.classification import SVMWithSGD, SVMModel +from pyspark.mllib.classification import SVMModel, SVMWithSGD from pyspark.mllib.regression import LabeledPoint # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="PythonSVMWithSGDExample") diff --git a/examples/src/main/python/mllib/tf_idf_example.py b/examples/src/main/python/mllib/tf_idf_example.py index b66412b2334e..5c00ee109072 100644 --- a/examples/src/main/python/mllib/tf_idf_example.py +++ b/examples/src/main/python/mllib/tf_idf_example.py @@ -19,9 +19,10 @@ from pyspark import SparkContext # $example on$ -from pyspark.mllib.feature import HashingTF, IDF +from pyspark.mllib.feature import IDF, HashingTF # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="TFIDFExample") # SparkContext diff --git a/examples/src/main/python/mllib/word2vec.py b/examples/src/main/python/mllib/word2vec.py index 4e7d4f7610c2..96f7e3831e62 100644 --- a/examples/src/main/python/mllib/word2vec.py +++ b/examples/src/main/python/mllib/word2vec.py @@ -30,6 +30,7 @@ from pyspark import SparkContext from pyspark.mllib.feature import Word2Vec + USAGE = ("bin/spark-submit --driver-memory 4g " "examples/src/main/python/mllib/word2vec.py text8_lines") diff --git a/examples/src/main/python/mllib/word2vec_example.py b/examples/src/main/python/mllib/word2vec_example.py index ad1090c77ee1..9c18bbed5e96 100644 --- a/examples/src/main/python/mllib/word2vec_example.py +++ b/examples/src/main/python/mllib/word2vec_example.py @@ -22,6 +22,7 @@ from pyspark.mllib.feature import Word2Vec # $example off$ + if __name__ == "__main__": sc = SparkContext(appName="Word2VecExample") # SparkContext diff --git a/examples/src/main/python/pagerank.py b/examples/src/main/python/pagerank.py index 0d6c253d397a..38f180d8ebe9 100755 --- a/examples/src/main/python/pagerank.py +++ b/examples/src/main/python/pagerank.py @@ -54,10 +54,10 @@ def parseNeighbors(urls): file=sys.stderr) # Initialize the spark context. - spark = SparkSession\ - .builder\ - .appName("PythonPageRank")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("PythonPageRank") + .getOrCreate()) # Loads in input file. It should be in format of: # URL neighbor URL diff --git a/examples/src/main/python/parquet_inputformat.py b/examples/src/main/python/parquet_inputformat.py index 29a1ac274ecc..221e314a4f76 100644 --- a/examples/src/main/python/parquet_inputformat.py +++ b/examples/src/main/python/parquet_inputformat.py @@ -1,4 +1,3 @@ -from __future__ import print_function # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -16,10 +15,13 @@ # limitations under the License. # +from __future__ import print_function + import sys from pyspark.sql import SparkSession + """ Read data file users.parquet in local Spark distro: @@ -48,10 +50,10 @@ path = sys.argv[1] - spark = SparkSession\ - .builder\ - .appName("ParquetInputFormat")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("ParquetInputFormat") + .getOrCreate()) sc = spark.sparkContext diff --git a/examples/src/main/python/pi.py b/examples/src/main/python/pi.py index 37029b76798f..18a07948d5ea 100755 --- a/examples/src/main/python/pi.py +++ b/examples/src/main/python/pi.py @@ -1,4 +1,3 @@ -from __future__ import print_function # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -16,9 +15,12 @@ # limitations under the License. # +from __future__ import print_function + import sys -from random import random + from operator import add +from random import random from pyspark.sql import SparkSession @@ -27,10 +29,10 @@ """ Usage: pi [partitions] """ - spark = SparkSession\ - .builder\ - .appName("PythonPi")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("PythonPi") + .getOrCreate()) partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 n = 100000 * partitions diff --git a/examples/src/main/python/sort.py b/examples/src/main/python/sort.py index 81898cf6d5ce..c0fcd009740b 100755 --- a/examples/src/main/python/sort.py +++ b/examples/src/main/python/sort.py @@ -27,15 +27,16 @@ print("Usage: sort ", file=sys.stderr) exit(-1) - spark = SparkSession\ - .builder\ - .appName("PythonSort")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("PythonSort") + .getOrCreate()) lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0]) - sortedCount = lines.flatMap(lambda x: x.split(' ')) \ - .map(lambda x: (int(x), 1)) \ - .sortByKey() + sortedCount = (lines + .flatMap(lambda x: x.split(' ')) + .map(lambda x: (int(x), 1)) + .sortByKey()) # This is just a demo on how to bring all the sorted data back to a single node. # In reality, we wouldn't want to collect all the data to the driver node. output = sortedCount.collect() diff --git a/examples/src/main/python/sql/basic.py b/examples/src/main/python/sql/basic.py index ebcf66995b47..3b790197f74b 100644 --- a/examples/src/main/python/sql/basic.py +++ b/examples/src/main/python/sql/basic.py @@ -17,13 +17,13 @@ from __future__ import print_function -# $example on:init_session$ -from pyspark.sql import SparkSession -# $example off:init_session$ - # $example on:schema_inferring$ from pyspark.sql import Row # $example off:schema_inferring$ +# +# $example on:init_session$ +from pyspark.sql import SparkSession +# $example off:init_session$ # $example on:programmatic_schema$ # Import data types @@ -205,11 +205,11 @@ def programmatic_schema_example(spark): if __name__ == "__main__": # $example on:init_session$ - spark = SparkSession \ - .builder \ - .appName("Python Spark SQL basic example") \ - .config("spark.some.config.option", "some-value") \ - .getOrCreate() + spark = (SparkSession + .builder + .appName("Python Spark SQL basic example") + .config("spark.some.config.option", "some-value") + .getOrCreate()) # $example off:init_session$ basic_df_example(spark) diff --git a/examples/src/main/python/sql/datasource.py b/examples/src/main/python/sql/datasource.py index e9aa9d9ac258..053655aa7215 100644 --- a/examples/src/main/python/sql/datasource.py +++ b/examples/src/main/python/sql/datasource.py @@ -75,13 +75,15 @@ def parquet_schema_merging_example(spark): # Create a simple DataFrame, stored into a partition directory sc = spark.sparkContext - squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6)) + squaresDF = spark.createDataFrame(sc + .parallelize(range(1, 6)) .map(lambda i: Row(single=i, double=i ** 2))) squaresDF.write.parquet("data/test_table/key=1") # Create another DataFrame in a new partition directory, # adding a new column and dropping an existing column - cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11)) + cubesDF = spark.createDataFrame(sc + .parallelize(range(6, 11)) .map(lambda i: Row(single=i, triple=i ** 3))) cubesDF.write.parquet("data/test_table/key=2") @@ -145,13 +147,12 @@ def jdbc_dataset_example(spark): # $example on:jdbc_dataset$ # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods # Loading data from a JDBC source - jdbcDF = spark.read \ - .format("jdbc") \ - .option("url", "jdbc:postgresql:dbserver") \ - .option("dbtable", "schema.tablename") \ - .option("user", "username") \ - .option("password", "password") \ - .load() + jdbcDF = (spark.read + .option("url", "jdbc:postgresql:dbserver") + .option("dbtable", "schema.tablename") + .option("user", "username") + .option("password", "password") + .load()) jdbcDF2 = spark.read \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", @@ -173,10 +174,10 @@ def jdbc_dataset_example(spark): if __name__ == "__main__": - spark = SparkSession \ - .builder \ - .appName("Python Spark SQL data source example") \ - .getOrCreate() + spark = (SparkSession + .builder + .appName("Python Spark SQL data source example") + .getOrCreate()) basic_datasource_example(spark) parquet_example(spark) diff --git a/examples/src/main/python/sql/hive.py b/examples/src/main/python/sql/hive.py index ba01544a5bd2..98e7b194fc89 100644 --- a/examples/src/main/python/sql/hive.py +++ b/examples/src/main/python/sql/hive.py @@ -19,10 +19,10 @@ # $example on:spark_hive$ from os.path import expanduser, join - -from pyspark.sql import SparkSession from pyspark.sql import Row # $example off:spark_hive$ +from pyspark.sql import SparkSession + """ A simple example demonstrating Spark SQL Hive integration. @@ -36,12 +36,12 @@ # warehouse_location points to the default location for managed databases and tables warehouse_location = 'spark-warehouse' - spark = SparkSession \ - .builder \ - .appName("Python Spark SQL Hive integration example") \ - .config("spark.sql.warehouse.dir", warehouse_location) \ - .enableHiveSupport() \ - .getOrCreate() + spark = (SparkSession + .builder + .appName("Python Spark SQL Hive integration example") + .config("spark.sql.warehouse.dir", warehouse_location) + .enableHiveSupport() + .getOrCreate()) # spark is an existing SparkSession spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive") diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount.py b/examples/src/main/python/sql/streaming/structured_network_wordcount.py index afde2550587c..36ec5a6a0a16 100644 --- a/examples/src/main/python/sql/streaming/structured_network_wordcount.py +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount.py @@ -32,8 +32,8 @@ import sys from pyspark.sql import SparkSession -from pyspark.sql.functions import explode -from pyspark.sql.functions import split +from pyspark.sql.functions import explode, split + if __name__ == "__main__": if len(sys.argv) != 3: @@ -43,18 +43,18 @@ host = sys.argv[1] port = int(sys.argv[2]) - spark = SparkSession\ - .builder\ - .appName("StructuredNetworkWordCount")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("StructuredNetworkWordCount") + .getOrCreate()) # Create DataFrame representing the stream of input lines from connection to host:port - lines = spark\ - .readStream\ - .format('socket')\ - .option('host', host)\ - .option('port', port)\ - .load() + lines = (spark + .readStream + .format('socket') + .option('host', host) + .option('port', port) + .load()) # Split the lines into words words = lines.select( @@ -68,10 +68,10 @@ wordCounts = words.groupBy('word').count() # Start running the query that prints the running counts to the console - query = wordCounts\ - .writeStream\ - .outputMode('complete')\ - .format('console')\ - .start() + query = (wordCounts + .writeStream + .outputMode('complete') + .format('console') + .start()) query.awaitTermination() diff --git a/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py b/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py index 02a7d3363d78..203332a5992b 100644 --- a/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py +++ b/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py @@ -44,9 +44,8 @@ import sys from pyspark.sql import SparkSession -from pyspark.sql.functions import explode -from pyspark.sql.functions import split -from pyspark.sql.functions import window +from pyspark.sql.functions import explode, split, window + if __name__ == "__main__": if len(sys.argv) != 5 and len(sys.argv) != 4: diff --git a/examples/src/main/python/status_api_demo.py b/examples/src/main/python/status_api_demo.py index 49b7902185aa..b980453e2ff2 100644 --- a/examples/src/main/python/status_api_demo.py +++ b/examples/src/main/python/status_api_demo.py @@ -17,9 +17,9 @@ from __future__ import print_function -import time -import threading import Queue +import threading +import time from pyspark import SparkConf, SparkContext diff --git a/examples/src/main/python/streaming/direct_kafka_wordcount.py b/examples/src/main/python/streaming/direct_kafka_wordcount.py index 7097f7f4502b..a2d92dbec12b 100644 --- a/examples/src/main/python/streaming/direct_kafka_wordcount.py +++ b/examples/src/main/python/streaming/direct_kafka_wordcount.py @@ -36,6 +36,7 @@ from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils + if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: direct_kafka_wordcount.py ", file=sys.stderr) @@ -47,9 +48,10 @@ brokers, topic = sys.argv[1:] kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) lines = kvs.map(lambda x: x[1]) - counts = lines.flatMap(lambda line: line.split(" ")) \ - .map(lambda word: (word, 1)) \ - .reduceByKey(lambda a, b: a+b) + counts = (lines + .flatMap(lambda line: line.split(" ")) + .map(lambda word: (word, 1)) + .reduceByKey(lambda a, b: a + b)) counts.pprint() ssc.start() diff --git a/examples/src/main/python/streaming/flume_wordcount.py b/examples/src/main/python/streaming/flume_wordcount.py index d75bc6daac13..eef62d6b851a 100644 --- a/examples/src/main/python/streaming/flume_wordcount.py +++ b/examples/src/main/python/streaming/flume_wordcount.py @@ -36,6 +36,7 @@ from pyspark.streaming import StreamingContext from pyspark.streaming.flume import FlumeUtils + if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: flume_wordcount.py ", file=sys.stderr) @@ -47,9 +48,10 @@ hostname, port = sys.argv[1:] kvs = FlumeUtils.createStream(ssc, hostname, int(port)) lines = kvs.map(lambda x: x[1]) - counts = lines.flatMap(lambda line: line.split(" ")) \ - .map(lambda word: (word, 1)) \ - .reduceByKey(lambda a, b: a+b) + counts = (lines + .flatMap(lambda line: line.split(" ")) + .map(lambda word: (word, 1)) + .reduceByKey(lambda a, b: a + b)) counts.pprint() ssc.start() diff --git a/examples/src/main/python/streaming/hdfs_wordcount.py b/examples/src/main/python/streaming/hdfs_wordcount.py index f815dd26823d..bd6e8792fa46 100644 --- a/examples/src/main/python/streaming/hdfs_wordcount.py +++ b/examples/src/main/python/streaming/hdfs_wordcount.py @@ -32,6 +32,7 @@ from pyspark import SparkContext from pyspark.streaming import StreamingContext + if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: hdfs_wordcount.py ", file=sys.stderr) @@ -41,9 +42,10 @@ ssc = StreamingContext(sc, 1) lines = ssc.textFileStream(sys.argv[1]) - counts = lines.flatMap(lambda line: line.split(" "))\ - .map(lambda x: (x, 1))\ - .reduceByKey(lambda a, b: a+b) + counts = (lines + .flatMap(lambda line: line.split(" ")) + .map(lambda x: (x, 1)) + .reduceByKey(lambda a, b: a + b)) counts.pprint() ssc.start() diff --git a/examples/src/main/python/streaming/kafka_wordcount.py b/examples/src/main/python/streaming/kafka_wordcount.py index 8d697f620f46..f9cb84733cf2 100644 --- a/examples/src/main/python/streaming/kafka_wordcount.py +++ b/examples/src/main/python/streaming/kafka_wordcount.py @@ -36,6 +36,7 @@ from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils + if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: kafka_wordcount.py ", file=sys.stderr) @@ -47,9 +48,10 @@ zkQuorum, topic = sys.argv[1:] kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1}) lines = kvs.map(lambda x: x[1]) - counts = lines.flatMap(lambda line: line.split(" ")) \ - .map(lambda word: (word, 1)) \ - .reduceByKey(lambda a, b: a+b) + counts = (lines. + flatMap(lambda line: line.split(" ")) + .map(lambda word: (word, 1)) + .reduceByKey(lambda a, b: a + b)) counts.pprint() ssc.start() diff --git a/examples/src/main/python/streaming/network_wordcount.py b/examples/src/main/python/streaming/network_wordcount.py index 2b48bcfd55db..4bcc71f360a6 100644 --- a/examples/src/main/python/streaming/network_wordcount.py +++ b/examples/src/main/python/streaming/network_wordcount.py @@ -32,6 +32,7 @@ from pyspark import SparkContext from pyspark.streaming import StreamingContext + if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: network_wordcount.py ", file=sys.stderr) @@ -40,9 +41,10 @@ ssc = StreamingContext(sc, 1) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) - counts = lines.flatMap(lambda line: line.split(" "))\ - .map(lambda word: (word, 1))\ - .reduceByKey(lambda a, b: a+b) + counts = (lines + .flatMap(lambda line: line.split(" ")) + .map(lambda word: (word, 1)) + .reduceByKey(lambda a, b: a + b)) counts.pprint() ssc.start() diff --git a/examples/src/main/python/streaming/network_wordjoinsentiments.py b/examples/src/main/python/streaming/network_wordjoinsentiments.py index b309d9fad33f..6dcab5b12c29 100644 --- a/examples/src/main/python/streaming/network_wordjoinsentiments.py +++ b/examples/src/main/python/streaming/network_wordjoinsentiments.py @@ -54,22 +54,25 @@ def print_happiest_words(rdd): # Read in the word-sentiment list and create a static RDD from it word_sentiments_file_path = "data/streaming/AFINN-111.txt" - word_sentiments = ssc.sparkContext.textFile(word_sentiments_file_path) \ - .map(lambda line: tuple(line.split("\t"))) + word_sentiments = (ssc.sparkContext + .textFile(word_sentiments_file_path) + .map(lambda line: tuple(line.split("\t")))) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) - word_counts = lines.flatMap(lambda line: line.split(" ")) \ - .map(lambda word: (word, 1)) \ - .reduceByKey(lambda a, b: a + b) + word_counts = (lines + .flatMap(lambda line: line.split(" ")) + .map(lambda word: (word, 1)) + .reduceByKey(lambda a, b: a + b)) # Determine the words with the highest sentiment values by joining the streaming RDD # with the static RDD inside the transform() method and then multiplying # the frequency of the words by its sentiment value - happiest_words = word_counts.transform(lambda rdd: word_sentiments.join(rdd)) \ - .map(lambda word_tuples: (word_tuples[0], float(word_tuples[1][0]) * word_tuples[1][1])) \ - .map(lambda word_happiness: (word_happiness[1], word_happiness[0])) \ - .transform(lambda rdd: rdd.sortByKey(False)) + happiest_words = (word_counts + .map(lambda word_tuples: (word_tuples[0], + float(word_tuples[1][0]) * word_tuples[1][1])) + .map(lambda word_happiness: (word_happiness[1], word_happiness[0])) + .transform(lambda rdd: rdd.sortByKey(False))) happiest_words.foreachRDD(print_happiest_words) diff --git a/examples/src/main/python/streaming/queue_stream.py b/examples/src/main/python/streaming/queue_stream.py index bdd2d4851949..7776407558d0 100644 --- a/examples/src/main/python/streaming/queue_stream.py +++ b/examples/src/main/python/streaming/queue_stream.py @@ -27,6 +27,7 @@ from pyspark import SparkContext from pyspark.streaming import StreamingContext + if __name__ == "__main__": sc = SparkContext(appName="PythonStreamingQueueStream") diff --git a/examples/src/main/python/streaming/sql_network_wordcount.py b/examples/src/main/python/streaming/sql_network_wordcount.py index 398ac8d2d8f5..bb8d667b5370 100644 --- a/examples/src/main/python/streaming/sql_network_wordcount.py +++ b/examples/src/main/python/streaming/sql_network_wordcount.py @@ -32,8 +32,8 @@ import sys from pyspark import SparkContext -from pyspark.streaming import StreamingContext from pyspark.sql import Row, SparkSession +from pyspark.streaming import StreamingContext def getSparkSessionInstance(sparkConf): @@ -74,8 +74,8 @@ def process(time, rdd): wordsDataFrame.createOrReplaceTempView("words") # Do word count on table using SQL and print it - wordCountsDataFrame = \ - spark.sql("select word, count(*) as total from words group by word") + wordCountsDataFrame = spark.sql( + "select word, count(*) as total from words group by word") wordCountsDataFrame.show() except: pass diff --git a/examples/src/main/python/streaming/stateful_network_wordcount.py b/examples/src/main/python/streaming/stateful_network_wordcount.py index f8bbc659c2ea..f2412806fdba 100644 --- a/examples/src/main/python/streaming/stateful_network_wordcount.py +++ b/examples/src/main/python/streaming/stateful_network_wordcount.py @@ -36,6 +36,7 @@ from pyspark import SparkContext from pyspark.streaming import StreamingContext + if __name__ == "__main__": if len(sys.argv) != 3: print("Usage: stateful_network_wordcount.py ", file=sys.stderr) @@ -51,9 +52,10 @@ def updateFunc(new_values, last_sum): return sum(new_values) + (last_sum or 0) lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2])) - running_counts = lines.flatMap(lambda line: line.split(" "))\ - .map(lambda word: (word, 1))\ - .updateStateByKey(updateFunc, initialRDD=initialStateRDD) + running_counts = (lines + .flatMap(lambda line: line.split(" ")) + .map(lambda word: (word, 1)) + .updateStateByKey(updateFunc, initialRDD=initialStateRDD)) running_counts.pprint() diff --git a/examples/src/main/python/transitive_closure.py b/examples/src/main/python/transitive_closure.py index 49551d40851c..6c1be4d2a19f 100755 --- a/examples/src/main/python/transitive_closure.py +++ b/examples/src/main/python/transitive_closure.py @@ -41,10 +41,10 @@ def generateGraph(): """ Usage: transitive_closure [partitions] """ - spark = SparkSession\ - .builder\ - .appName("PythonTransitiveClosure")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("PythonTransitiveClosure") + .getOrCreate()) partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 tc = spark.sparkContext.parallelize(generateGraph(), partitions).cache() diff --git a/examples/src/main/python/wordcount.py b/examples/src/main/python/wordcount.py index 3d5e44d5b2df..abbfab72bea9 100755 --- a/examples/src/main/python/wordcount.py +++ b/examples/src/main/python/wordcount.py @@ -22,21 +22,21 @@ from pyspark.sql import SparkSession - if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: wordcount ", file=sys.stderr) exit(-1) - spark = SparkSession\ - .builder\ - .appName("PythonWordCount")\ - .getOrCreate() + spark = (SparkSession + .builder + .appName("PythonWordCount") + .getOrCreate()) lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0]) - counts = lines.flatMap(lambda x: x.split(' ')) \ - .map(lambda x: (x, 1)) \ - .reduceByKey(add) + counts = (lines + .flatMap(lambda x: x.split(' ')) + .map(lambda x: (x, 1)) + .reduceByKey(add)) output = counts.collect() for (word, count) in output: print("%s: %i" % (word, count)) From 4af5966450c51f71309279f4dfb4e8f7de400fce Mon Sep 17 00:00:00 2001 From: Gaetan Semet Date: Tue, 14 Feb 2017 08:53:32 +0100 Subject: [PATCH 2/4] Undo add extra space --- examples/src/main/python/logistic_regression.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py index b380f3ba69e7..4a58140cb356 100755 --- a/examples/src/main/python/logistic_regression.py +++ b/examples/src/main/python/logistic_regression.py @@ -29,8 +29,8 @@ import numpy as np from pyspark.sql import SparkSession -D = 10 # Number of dimensions +D = 10 # Number of dimensions # Read a batch of points from the input file into a NumPy matrix object. We operate on batches to # make further computations faster. From ef1306e69b3f62b13819e5e0c5412a7a690977bb Mon Sep 17 00:00:00 2001 From: Gaetan Semet Date: Tue, 14 Feb 2017 08:54:26 +0100 Subject: [PATCH 3/4] Fix with 2 empty lines after import --- examples/src/main/python/mllib/bisecting_k_means_example.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/python/mllib/bisecting_k_means_example.py b/examples/src/main/python/mllib/bisecting_k_means_example.py index dff09f59c497..4527edbbb21f 100644 --- a/examples/src/main/python/mllib/bisecting_k_means_example.py +++ b/examples/src/main/python/mllib/bisecting_k_means_example.py @@ -24,7 +24,7 @@ # $example on$ from pyspark.mllib.clustering import BisectingKMeans, BisectingKMeansModel # $example off$ -# + if __name__ == "__main__": sc = SparkContext(appName="PythonBisectingKMeansExample") # SparkContext From 582c8221998d6fe055d066106e9ca43204f96bb9 Mon Sep 17 00:00:00 2001 From: Gaetan Semet Date: Tue, 14 Feb 2017 09:06:39 +0100 Subject: [PATCH 4/4] Fix pep8 empty lines --- examples/src/main/python/logistic_regression.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/src/main/python/logistic_regression.py b/examples/src/main/python/logistic_regression.py index 4a58140cb356..d2d1695675fb 100755 --- a/examples/src/main/python/logistic_regression.py +++ b/examples/src/main/python/logistic_regression.py @@ -32,6 +32,7 @@ D = 10 # Number of dimensions + # Read a batch of points from the input file into a NumPy matrix object. We operate on batches to # make further computations faster. # The data file contains lines of the form