From b69f201bc51f8de87adc3869d4843e3df6750972 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Mon, 12 Jun 2017 14:46:28 -0700 Subject: [PATCH 01/16] Added tunable parallelism to the pyspark implementation of one vs. rest classification. Added a parallelism parameter to the scala implementation of one vs. rest for python persistence but have not yet used it to tune the scala parallelism implementation. --- .../spark/ml/classification/OneVsRest.scala | 14 +++++++- python/pyspark/ml/classification.py | 31 +++++++++++++++--- python/pyspark/ml/tests.py | 32 +++++++++++++++++-- 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 7cbcccf2720a..c942c1f6313a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -33,7 +33,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.Vector -import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} +import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamPair, Params, ParamValidators} import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ @@ -65,6 +65,12 @@ private[ml] trait OneVsRestParams extends PredictorParams with ClassifierTypeTra /** @group getParam */ def getClassifier: ClassifierType = $(classifier) + + val parallelism = new IntParam(this, "parallelism", + "parallelism parameter for tuning amount of parallelism", ParamValidators.gt(1)) + + /** @group getParam */ + def getParallelism: Int = $(parallelism) } private[ml] object OneVsRestParams extends ClassifierTypeTrait { @@ -282,6 +288,12 @@ final class OneVsRest @Since("1.4.0") ( set(classifier, value.asInstanceOf[ClassifierType]) } + /** @group setParam */ + @Since("1.4.0") + def setParallelism(value: Int): this.type = { + set(parallelism, value) + } + /** @group setParam */ @Since("1.5.0") def setLabelCol(value: String): this.type = set(labelCol, value) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 60bdeedd6a14..1b498c07f054 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -16,6 +16,7 @@ # import operator +from multiprocessing.pool import ThreadPool from pyspark import since, keyword_only from pyspark.ml import Estimator, Model @@ -1510,21 +1511,25 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable): .. versionadded:: 2.0.0 """ + parallelism = Param(Params._dummy(), "parallelism", + "Number of models to fit in parallel", + typeConverter=TypeConverters.toInt) @keyword_only def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", - classifier=None): + classifier=None, parallelism=8): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ classifier=None) """ super(OneVsRest, self).__init__() + self._setDefault(parallelism=8) kwargs = self._input_kwargs self._set(**kwargs) @keyword_only @since("2.0.0") - def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): + def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None, parallelism=None): """ setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): Sets params for OneVsRest. @@ -1561,13 +1566,28 @@ def trainSingleClass(index): return classifier.fit(trainingDataset, paramMap) # TODO: Parallel training for all classes. - models = [trainSingleClass(i) for i in range(numClasses)] + pool = ThreadPool(processes=self.getParallelism()) + + models = pool.map(trainSingleClass, range(numClasses)) + #models = [trainSingleClass(i) for i in range(numClasses)] if handlePersistence: multiclassLabeled.unpersist() return self._copyValues(OneVsRestModel(models=models)) + def setParallelism(self, value): + """ + Sets the value of :py:attr:`parallelism`. + """ + return self._set(parallelism=value) + + def getParallelism(self): + """ + Gets the value of parallelism or its default value. + """ + return self.getOrDefault(self.parallelism) + @since("2.0.0") def copy(self, extra=None): """ @@ -1611,8 +1631,9 @@ def _from_java(cls, java_stage): labelCol = java_stage.getLabelCol() predictionCol = java_stage.getPredictionCol() classifier = JavaParams._from_java(java_stage.getClassifier()) + parallelism = java_stage.getParallelism() py_stage = cls(featuresCol=featuresCol, labelCol=labelCol, predictionCol=predictionCol, - classifier=classifier) + classifier=classifier, parallelism=parallelism) py_stage._resetUid(java_stage.uid()) return py_stage @@ -1625,12 +1646,12 @@ def _to_java(self): _java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest", self.uid) _java_obj.setClassifier(self.getClassifier()._to_java()) + _java_obj.setParallelism(self.getParallelism()) _java_obj.setFeaturesCol(self.getFeaturesCol()) _java_obj.setLabelCol(self.getLabelCol()) _java_obj.setPredictionCol(self.getPredictionCol()) return _java_obj - class OneVsRestModel(Model, OneVsRestParams, MLReadable, MLWritable): """ .. note:: Experimental diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 17a39472e1fe..468e218e69a7 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -951,7 +951,7 @@ def test_onevsrest(self): (2.0, Vectors.dense(0.5, 0.5))] * 10, ["label", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01) - ovr = OneVsRest(classifier=lr) + ovr = OneVsRest(classifier=lr, parallelism=8) model = ovr.fit(df) ovrPath = temp_path + "/ovr" ovr.save(ovrPath) @@ -1215,7 +1215,7 @@ def test_copy(self): (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01) - ovr = OneVsRest(classifier=lr) + ovr = OneVsRest(classifier=lr, parallelism=1) ovr1 = ovr.copy({lr.maxIter: 10}) self.assertEqual(ovr.getClassifier().getMaxIter(), 5) self.assertEqual(ovr1.getClassifier().getMaxIter(), 10) @@ -1229,11 +1229,37 @@ def test_output_columns(self): (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01) - ovr = OneVsRest(classifier=lr) + ovr = OneVsRest(classifier=lr, parallelism=1) model = ovr.fit(df) output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "prediction"]) +class ParOneVsRestTests(SparkSessionTestCase): + + def test_copy(self): + df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], + ["label", "features"]) + lr = LogisticRegression(maxIter=5, regParam=0.01) + ovr = OneVsRest(classifier=lr, parallelism=8) + ovr1 = ovr.copy({lr.maxIter: 10}) + self.assertEqual(ovr.getClassifier().getMaxIter(), 5) + self.assertEqual(ovr1.getClassifier().getMaxIter(), 10) + model = ovr.fit(df) + model1 = model.copy({model.predictionCol: "indexed"}) + self.assertEqual(model1.getPredictionCol(), "indexed") + + def test_output_columns(self): + df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), + (1.0, Vectors.sparse(2, [], [])), + (2.0, Vectors.dense(0.5, 0.5))], + ["label", "features"]) + lr = LogisticRegression(maxIter=5, regParam=0.01) + ovr = OneVsRest(classifier=lr, parallelism=8) + model = ovr.fit(df) + output = model.transform(df) + self.assertEqual(output.columns, ["label", "features", "prediction"]) class HashingTFTest(SparkSessionTestCase): From e750d3ee8eaf75ed8732f5cf1a1904acb1666793 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Mon, 12 Jun 2017 15:35:20 -0700 Subject: [PATCH 02/16] Fixed python style. --- python/pyspark/ml/classification.py | 6 +++--- python/pyspark/ml/tests.py | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 1b498c07f054..0abe1a62227b 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1529,7 +1529,8 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred @keyword_only @since("2.0.0") - def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None, parallelism=None): + def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, + classifier=None, parallelism=None): """ setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): Sets params for OneVsRest. @@ -1565,11 +1566,9 @@ def trainSingleClass(index): (classifier.predictionCol, predictionCol)]) return classifier.fit(trainingDataset, paramMap) - # TODO: Parallel training for all classes. pool = ThreadPool(processes=self.getParallelism()) models = pool.map(trainSingleClass, range(numClasses)) - #models = [trainSingleClass(i) for i in range(numClasses)] if handlePersistence: multiclassLabeled.unpersist() @@ -1652,6 +1651,7 @@ def _to_java(self): _java_obj.setPredictionCol(self.getPredictionCol()) return _java_obj + class OneVsRestModel(Model, OneVsRestParams, MLReadable, MLWritable): """ .. note:: Experimental diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 468e218e69a7..d4f836f17747 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1234,6 +1234,7 @@ def test_output_columns(self): output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "prediction"]) + class ParOneVsRestTests(SparkSessionTestCase): def test_copy(self): @@ -1261,6 +1262,7 @@ def test_output_columns(self): output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "prediction"]) + class HashingTFTest(SparkSessionTestCase): def test_apply_binary_term_freqs(self): From 81d458be99cf4f195b761eaa9bcb48ea086cdf61 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Mon, 12 Jun 2017 17:10:23 -0700 Subject: [PATCH 03/16] Added functionality for tuning parellelism in the Scala implementation of the one vs. rest algorithm. --- .../spark/ml/classification/OneVsRest.scala | 15 ++++++-- .../ml/classification/OneVsRestSuite.scala | 34 +++++++++++++++++++ python/pyspark/ml/classification.py | 4 +-- 3 files changed, 49 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index c942c1f6313a..8118081eae51 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -21,6 +21,8 @@ import java.util.{List => JList} import java.util.UUID import scala.collection.JavaConverters._ +import scala.collection.parallel.ForkJoinTaskSupport +import scala.concurrent.forkjoin.ForkJoinPool import scala.language.existentials import org.apache.hadoop.fs.Path @@ -67,7 +69,7 @@ private[ml] trait OneVsRestParams extends PredictorParams with ClassifierTypeTra def getClassifier: ClassifierType = $(classifier) val parallelism = new IntParam(this, "parallelism", - "parallelism parameter for tuning amount of parallelism", ParamValidators.gt(1)) + "parallelism parameter for tuning amount of parallelism", ParamValidators.gtEq(1)) /** @group getParam */ def getParallelism: Int = $(parallelism) @@ -279,6 +281,10 @@ final class OneVsRest @Since("1.4.0") ( @Since("1.4.0") override val uid: String) extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable { + setDefault( + parallelism -> 4 + ) + @Since("1.4.0") def this() = this(Identifiable.randomUID("oneVsRest")) @@ -337,8 +343,13 @@ final class OneVsRest @Since("1.4.0") ( multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) } + val iters = Range(0, numClasses).par + iters.tasksupport = new ForkJoinTaskSupport( + new ForkJoinPool(getParallelism) + ) + // create k columns, one for each binary classifier. - val models = Range(0, numClasses).par.map { index => + val models = iters.map { index => // generate new label metadata for the binary problem. val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() val labelColName = "mc2b$" + index diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala index c02e38ad64e3..091296cc7028 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala @@ -101,6 +101,40 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400) } + test("one-vs-rest: tuning parallelism produces correct output") { + val numClasses = 3 + val ova = new OneVsRest() + .setClassifier(new LogisticRegression) + .setParallelism(8) + assert(ova.getLabelCol === "label") + assert(ova.getPredictionCol === "prediction") + val ovaModel = ova.fit(dataset) + + MLTestingUtils.checkCopyAndUids(ova, ovaModel) + + assert(ovaModel.models.length === numClasses) + val transformedDataset = ovaModel.transform(dataset) + + // check for label metadata in prediction col + val predictionColSchema = transformedDataset.schema(ovaModel.getPredictionCol) + assert(MetadataUtils.getNumClasses(predictionColSchema) === Some(3)) + + val ovaResults = transformedDataset.select("prediction", "label").rdd.map { + row => (row.getDouble(0), row.getDouble(1)) + } + + val lr = new LogisticRegressionWithLBFGS().setIntercept(true).setNumClasses(numClasses) + lr.optimizer.setRegParam(0.1).setNumIterations(100) + + val model = lr.run(rdd.map(OldLabeledPoint.fromML)) + val results = model.predict(rdd.map(p => OldVectors.fromML(p.features))).zip(rdd.map(_.label)) + // determine the #confusion matrix in each class. + // bound how much error we allow compared to multinomial logistic regression. + val expectedMetrics = new MulticlassMetrics(results) + val ovaMetrics = new MulticlassMetrics(ovaResults) + assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400) + } + test("one-vs-rest: pass label metadata correctly during train") { val numClasses = 3 val ova = new OneVsRest() diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 0abe1a62227b..7b4a242445b5 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1517,13 +1517,13 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable): @keyword_only def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", - classifier=None, parallelism=8): + classifier=None, parallelism=4): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ classifier=None) """ super(OneVsRest, self).__init__() - self._setDefault(parallelism=8) + self._setDefault(parallelism=4) kwargs = self._input_kwargs self._set(**kwargs) From 213337882a40c63c3a3ef5741c17a6eebd63df0b Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Tue, 13 Jun 2017 13:28:49 -0700 Subject: [PATCH 04/16] Fixed code according to comments. Added both annotations and unit tests for testing that parallelism doesn't affect the output. --- .../spark/ml/classification/OneVsRest.scala | 23 ++++++---- .../ml/classification/OneVsRestSuite.scala | 43 +++++++++---------- python/pyspark/ml/classification.py | 31 +++++++------ python/pyspark/ml/tests.py | 29 +++---------- 4 files changed, 58 insertions(+), 68 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 8118081eae51..1d41bcabd43c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -67,12 +67,6 @@ private[ml] trait OneVsRestParams extends PredictorParams with ClassifierTypeTra /** @group getParam */ def getClassifier: ClassifierType = $(classifier) - - val parallelism = new IntParam(this, "parallelism", - "parallelism parameter for tuning amount of parallelism", ParamValidators.gtEq(1)) - - /** @group getParam */ - def getParallelism: Int = $(parallelism) } private[ml] object OneVsRestParams extends ClassifierTypeTrait { @@ -281,6 +275,16 @@ final class OneVsRest @Since("1.4.0") ( @Since("1.4.0") override val uid: String) extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable { + /** + * param for the number of processes to use when running parallel one vs. rest + * The implementation of parallel one vs. rest runs the classification for + * each class in a separate process. + * @group param + */ + @Since("2.3.0") + val parallelism = new IntParam(this, "parallelism", + "the number of processes to use when running parallel one vs. rest", ParamValidators.gtEq(1)) + setDefault( parallelism -> 4 ) @@ -288,6 +292,9 @@ final class OneVsRest @Since("1.4.0") ( @Since("1.4.0") def this() = this(Identifiable.randomUID("oneVsRest")) + /** @group getParam */ + def getParallelism: Int = $(parallelism) + /** @group setParam */ @Since("1.4.0") def setClassifier(value: Classifier[_, _, _]): this.type = { @@ -295,7 +302,7 @@ final class OneVsRest @Since("1.4.0") ( } /** @group setParam */ - @Since("1.4.0") + @Since("2.3.0") def setParallelism(value: Int): this.type = { set(parallelism, value) } @@ -345,7 +352,7 @@ final class OneVsRest @Since("1.4.0") ( val iters = Range(0, numClasses).par iters.tasksupport = new ForkJoinTaskSupport( - new ForkJoinPool(getParallelism) + new ForkJoinPool(Math.min(getParallelism, numClasses)) ) // create k columns, one for each binary classifier. diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala index 091296cc7028..9d8056fc5ec4 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala @@ -101,38 +101,35 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400) } - test("one-vs-rest: tuning parallelism produces correct output") { + test("one-vs-rest: tuning parallelism does not change output") { val numClasses = 3 - val ova = new OneVsRest() + val ovaPar2 = new OneVsRest() .setClassifier(new LogisticRegression) - .setParallelism(8) - assert(ova.getLabelCol === "label") - assert(ova.getPredictionCol === "prediction") - val ovaModel = ova.fit(dataset) + .setParallelism(2) - MLTestingUtils.checkCopyAndUids(ova, ovaModel) - - assert(ovaModel.models.length === numClasses) - val transformedDataset = ovaModel.transform(dataset) + val ovaModelPar2 = ovaPar2.fit(dataset) - // check for label metadata in prediction col - val predictionColSchema = transformedDataset.schema(ovaModel.getPredictionCol) - assert(MetadataUtils.getNumClasses(predictionColSchema) === Some(3)) + val transformedDatasetPar2 = ovaModelPar2.transform(dataset) - val ovaResults = transformedDataset.select("prediction", "label").rdd.map { + val ovaResultsPar2 = transformedDatasetPar2.select("prediction", "label").rdd.map { row => (row.getDouble(0), row.getDouble(1)) } - val lr = new LogisticRegressionWithLBFGS().setIntercept(true).setNumClasses(numClasses) - lr.optimizer.setRegParam(0.1).setNumIterations(100) + val ovaPar4 = new OneVsRest() + .setClassifier(new LogisticRegression) + .setParallelism(4) - val model = lr.run(rdd.map(OldLabeledPoint.fromML)) - val results = model.predict(rdd.map(p => OldVectors.fromML(p.features))).zip(rdd.map(_.label)) - // determine the #confusion matrix in each class. - // bound how much error we allow compared to multinomial logistic regression. - val expectedMetrics = new MulticlassMetrics(results) - val ovaMetrics = new MulticlassMetrics(ovaResults) - assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400) + val ovaModelPar4 = ovaPar4.fit(dataset) + + val transformedDatasetPar4 = ovaModelPar4.transform(dataset) + + val ovaResultsPar4 = transformedDatasetPar4.select("prediction", "label").rdd.map { + row => (row.getDouble(0), row.getDouble(1)) + } + + val metricsPar2 = new MulticlassMetrics(ovaResultsPar2) + val metricsPar4 = new MulticlassMetrics(ovaResultsPar4) + assert(metricsPar2.confusionMatrix ~== metricsPar4.confusionMatrix absTol 400) } test("one-vs-rest: pass label metadata correctly during train") { diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 7b4a242445b5..741c9ceaefc2 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1511,8 +1511,9 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable): .. versionadded:: 2.0.0 """ + parallelism = Param(Params._dummy(), "parallelism", - "Number of models to fit in parallel", + "number of processors to use when fitting models in parallel", typeConverter=TypeConverters.toInt) @keyword_only @@ -1538,6 +1539,20 @@ def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, kwargs = self._input_kwargs return self._set(**kwargs) + @since("2.3.0") + def setParallelism(self, value): + """ + Sets the value of :py:attr:`parallelism`. + """ + return self._set(parallelism=value) + + @since("2.3.0") + def getParallelism(self): + """ + Gets the value of parallelism or its default value. + """ + return self.getOrDefault(self.parallelism) + def _fit(self, dataset): labelCol = self.getLabelCol() featuresCol = self.getFeaturesCol() @@ -1566,7 +1581,7 @@ def trainSingleClass(index): (classifier.predictionCol, predictionCol)]) return classifier.fit(trainingDataset, paramMap) - pool = ThreadPool(processes=self.getParallelism()) + pool = ThreadPool(processes=min(self.getParallelism(), numClasses)) models = pool.map(trainSingleClass, range(numClasses)) @@ -1575,18 +1590,6 @@ def trainSingleClass(index): return self._copyValues(OneVsRestModel(models=models)) - def setParallelism(self, value): - """ - Sets the value of :py:attr:`parallelism`. - """ - return self._set(parallelism=value) - - def getParallelism(self): - """ - Gets the value of parallelism or its default value. - """ - return self.getOrDefault(self.parallelism) - @since("2.0.0") def copy(self, extra=None): """ diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index d4f836f17747..d786de696397 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1234,33 +1234,16 @@ def test_output_columns(self): output = model.transform(df) self.assertEqual(output.columns, ["label", "features", "prediction"]) - -class ParOneVsRestTests(SparkSessionTestCase): - - def test_copy(self): - df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), - (1.0, Vectors.sparse(2, [], [])), - (2.0, Vectors.dense(0.5, 0.5))], - ["label", "features"]) - lr = LogisticRegression(maxIter=5, regParam=0.01) - ovr = OneVsRest(classifier=lr, parallelism=8) - ovr1 = ovr.copy({lr.maxIter: 10}) - self.assertEqual(ovr.getClassifier().getMaxIter(), 5) - self.assertEqual(ovr1.getClassifier().getMaxIter(), 10) - model = ovr.fit(df) - model1 = model.copy({model.predictionCol: "indexed"}) - self.assertEqual(model1.getPredictionCol(), "indexed") - - def test_output_columns(self): + def test_parallelism_doesnt_change_output(self): df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)), (1.0, Vectors.sparse(2, [], [])), (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) - lr = LogisticRegression(maxIter=5, regParam=0.01) - ovr = OneVsRest(classifier=lr, parallelism=8) - model = ovr.fit(df) - output = model.transform(df) - self.assertEqual(output.columns, ["label", "features", "prediction"]) + ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2) + modelPar2 = ovrPar2.fit(df) + ovrPar4 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=4) + modelPar4 = ovrPar4.fit(df) + self.assertEqual(modelPar2.getPredictionCol(), modelPar4.getPredictionCol()) class HashingTFTest(SparkSessionTestCase): From c59b1d897c24d88753f478243ac8428598108da3 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Thu, 22 Jun 2017 16:41:15 -0700 Subject: [PATCH 05/16] Modified parallel one vs rest to use futures. --- .../spark/ml/classification/OneVsRest.scala | 58 +++++++++++-------- .../spark/ml/util/ParallelismParam.scala | 28 +++++++++ .../ml/classification/OneVsRestSuite.scala | 13 ++--- 3 files changed, 69 insertions(+), 30 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/util/ParallelismParam.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 1d41bcabd43c..a0004178921f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -17,20 +17,19 @@ package org.apache.spark.ml.classification -import java.util.{List => JList} import java.util.UUID +import java.util.concurrent.ExecutorService -import scala.collection.JavaConverters._ -import scala.collection.parallel.ForkJoinTaskSupport -import scala.concurrent.forkjoin.ForkJoinPool +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration import scala.language.existentials +import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.Path import org.json4s.{DefaultFormats, JObject, _} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ -import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ @@ -40,7 +39,9 @@ import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ +import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.ThreadUtils private[ml] trait ClassifierTypeTrait { // scalastyle:off structural.type @@ -286,7 +287,7 @@ final class OneVsRest @Since("1.4.0") ( "the number of processes to use when running parallel one vs. rest", ParamValidators.gtEq(1)) setDefault( - parallelism -> 4 + parallelism -> 1 ) @Since("1.4.0") @@ -324,6 +325,14 @@ final class OneVsRest @Since("1.4.0") ( validateAndTransformSchema(schema, fitting = true, getClassifier.featuresDataType) } + def getExecutorService: ExecutorService = { + if (getParallelism == 1) { + return MoreExecutors.sameThreadExecutor() + } + ThreadUtils + .newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", getParallelism) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): OneVsRestModel = { transformSchema(dataset.schema) @@ -350,25 +359,28 @@ final class OneVsRest @Since("1.4.0") ( multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) } - val iters = Range(0, numClasses).par - iters.tasksupport = new ForkJoinTaskSupport( - new ForkJoinPool(Math.min(getParallelism, numClasses)) - ) + val executor = getExecutorService + val executionContext = ExecutionContext.fromExecutorService(executor) // create k columns, one for each binary classifier. - val models = iters.map { index => - // generate new label metadata for the binary problem. - val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() - val labelColName = "mc2b$" + index - val trainingDataset = multiclassLabeled.withColumn( - labelColName, when(col($(labelCol)) === index.toDouble, 1.0).otherwise(0.0), newLabelMeta) - val classifier = getClassifier - val paramMap = new ParamMap() - paramMap.put(classifier.labelCol -> labelColName) - paramMap.put(classifier.featuresCol -> getFeaturesCol) - paramMap.put(classifier.predictionCol -> getPredictionCol) - classifier.fit(trainingDataset, paramMap) - }.toArray[ClassificationModel[_, _]] + val modelFutures = Range(0, numClasses).map { index => + Future[ClassificationModel[_, _]] { + // generate new label metadata for the binary problem. + val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() + val labelColName = "mc2b$" + index + val trainingDataset = multiclassLabeled.withColumn( + labelColName, when(col($(labelCol)) === index.toDouble, 1.0).otherwise(0.0), newLabelMeta) + val classifier = getClassifier + val paramMap = new ParamMap() + paramMap.put(classifier.labelCol -> labelColName) + paramMap.put(classifier.featuresCol -> getFeaturesCol) + paramMap.put(classifier.predictionCol -> getPredictionCol) + classifier.fit(trainingDataset, paramMap) + }(executionContext) + } + val models = modelFutures + .map(ThreadUtils.awaitResult(_, Duration.Inf)).toArray[ClassificationModel[_, _]] + instr.logNumFeatures(models.head.numFeatures) if (handlePersistence) { diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ParallelismParam.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ParallelismParam.scala new file mode 100644 index 000000000000..3483c032dceb --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ParallelismParam.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.util + +import org.apache.spark.ml.param.Params + +/** + * Common parameter for estimators trained in a multithreaded environment. + */ +private[ml] trait ParallelismParam extends Params { + + +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala index 9d8056fc5ec4..04f49941ada5 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala @@ -103,15 +103,14 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau test("one-vs-rest: tuning parallelism does not change output") { val numClasses = 3 - val ovaPar2 = new OneVsRest() + val ovaPar1 = new OneVsRest() .setClassifier(new LogisticRegression) - .setParallelism(2) - val ovaModelPar2 = ovaPar2.fit(dataset) + val ovaModelPar1 = ovaPar1.fit(dataset) - val transformedDatasetPar2 = ovaModelPar2.transform(dataset) + val transformedDatasetPar1 = ovaModelPar1.transform(dataset) - val ovaResultsPar2 = transformedDatasetPar2.select("prediction", "label").rdd.map { + val ovaResultsPar1 = transformedDatasetPar1.select("prediction", "label").rdd.map { row => (row.getDouble(0), row.getDouble(1)) } @@ -127,9 +126,9 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau row => (row.getDouble(0), row.getDouble(1)) } - val metricsPar2 = new MulticlassMetrics(ovaResultsPar2) + val metricsPar1 = new MulticlassMetrics(ovaResultsPar1) val metricsPar4 = new MulticlassMetrics(ovaResultsPar4) - assert(metricsPar2.confusionMatrix ~== metricsPar4.confusionMatrix absTol 400) + assert(metricsPar1.confusionMatrix ~== metricsPar4.confusionMatrix absTol 400) } test("one-vs-rest: pass label metadata correctly during train") { From 5f635a2e3746bb6f1c503ebf48a485c8217f555e Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Thu, 22 Jun 2017 17:00:48 -0700 Subject: [PATCH 06/16] Put the parallelism parameter as well as the function for getting an executor service with a given level of parallelism in a separat trait that OneVsRest inherits from. --- .../spark/ml/classification/OneVsRest.scala | 58 +++++++++---------- .../spark/ml/util/ParallelismParam.scala | 37 +++++++++++- 2 files changed, 65 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index a0004178921f..b4f1d0dfce8d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -274,27 +274,27 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] { @Since("1.4.0") final class OneVsRest @Since("1.4.0") ( @Since("1.4.0") override val uid: String) - extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable { - - /** - * param for the number of processes to use when running parallel one vs. rest - * The implementation of parallel one vs. rest runs the classification for - * each class in a separate process. - * @group param - */ - @Since("2.3.0") - val parallelism = new IntParam(this, "parallelism", - "the number of processes to use when running parallel one vs. rest", ParamValidators.gtEq(1)) - - setDefault( - parallelism -> 1 - ) + extends Estimator[OneVsRestModel] with OneVsRestParams with ParallelismParam with MLWritable { + +// /** +// * param for the number of processes to use when running parallel one vs. rest +// * The implementation of parallel one vs. rest runs the classification for +// * each class in a separate process. +// * @group param +// */ +// @Since("2.3.0") +// val parallelism = new IntParam(this, "parallelism", +// "the number of processes to use when running parallel one vs. rest", ParamValidators.gtEq(1)) +// +// setDefault( +// parallelism -> 1 +// ) @Since("1.4.0") def this() = this(Identifiable.randomUID("oneVsRest")) - /** @group getParam */ - def getParallelism: Int = $(parallelism) +// /** @group getParam */ +// def getParallelism: Int = $(parallelism) /** @group setParam */ @Since("1.4.0") @@ -302,11 +302,11 @@ final class OneVsRest @Since("1.4.0") ( set(classifier, value.asInstanceOf[ClassifierType]) } - /** @group setParam */ - @Since("2.3.0") - def setParallelism(value: Int): this.type = { - set(parallelism, value) - } +// /** @group setParam */ +// @Since("2.3.0") +// def setParallelism(value: Int): this.type = { +// set(parallelism, value) +// } /** @group setParam */ @Since("1.5.0") @@ -325,13 +325,13 @@ final class OneVsRest @Since("1.4.0") ( validateAndTransformSchema(schema, fitting = true, getClassifier.featuresDataType) } - def getExecutorService: ExecutorService = { - if (getParallelism == 1) { - return MoreExecutors.sameThreadExecutor() - } - ThreadUtils - .newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", getParallelism) - } +// def getExecutorService: ExecutorService = { +// if (getParallelism == 1) { +// return MoreExecutors.sameThreadExecutor() +// } +// ThreadUtils +// .newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", getParallelism) +// } @Since("2.0.0") override def fit(dataset: Dataset[_]): OneVsRestModel = { diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ParallelismParam.scala b/mllib/src/main/scala/org/apache/spark/ml/util/ParallelismParam.scala index 3483c032dceb..7ac6e98a7ee8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ParallelismParam.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/ParallelismParam.scala @@ -17,12 +17,47 @@ package org.apache.spark.ml.util -import org.apache.spark.ml.param.Params +import java.util.concurrent.ExecutorService + +import com.google.common.util.concurrent.MoreExecutors + +import org.apache.spark.annotation.Since +import org.apache.spark.ml.param.{IntParam, Params, ParamValidators} +import org.apache.spark.util.ThreadUtils /** * Common parameter for estimators trained in a multithreaded environment. */ private[ml] trait ParallelismParam extends Params { + /** + * param for the number of processes to use when running parallel one vs. rest + * The implementation of parallel one vs. rest runs the classification for + * each class in a separate process. + * @group param + */ + @Since("2.3.0") + val parallelism = new IntParam(this, "parallelism", + "the number of processes to use when running parallel one vs. rest", ParamValidators.gtEq(1)) + + setDefault( + parallelism -> 1 + ) + + /** @group getParam */ + def getParallelism: Int = $(parallelism) + + /** @group setParam */ + @Since("2.3.0") + def setParallelism(value: Int): this.type = { + set(parallelism, value) + } + def getExecutorService: ExecutorService = { + if (getParallelism == 1) { + return MoreExecutors.sameThreadExecutor() + } + ThreadUtils + .newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", getParallelism) + } } From 4431ffcd38843798db907842799fd81a30c7a5be Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Fri, 23 Jun 2017 11:54:32 -0700 Subject: [PATCH 07/16] Responded to pull request comments. --- .../spark/ml/classification/OneVsRest.scala | 58 +++++-------------- ...lelismParam.scala => HasParallelism.scala} | 20 ++++--- .../ml/classification/OneVsRestSuite.scala | 14 ++--- python/pyspark/ml/classification.py | 4 +- python/pyspark/ml/tests.py | 10 ++-- 5 files changed, 39 insertions(+), 67 deletions(-) rename mllib/src/main/scala/org/apache/spark/ml/util/{ParallelismParam.scala => HasParallelism.scala} (76%) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index b4f1d0dfce8d..34f3b9bf2972 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -274,40 +274,17 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] { @Since("1.4.0") final class OneVsRest @Since("1.4.0") ( @Since("1.4.0") override val uid: String) - extends Estimator[OneVsRestModel] with OneVsRestParams with ParallelismParam with MLWritable { - -// /** -// * param for the number of processes to use when running parallel one vs. rest -// * The implementation of parallel one vs. rest runs the classification for -// * each class in a separate process. -// * @group param -// */ -// @Since("2.3.0") -// val parallelism = new IntParam(this, "parallelism", -// "the number of processes to use when running parallel one vs. rest", ParamValidators.gtEq(1)) -// -// setDefault( -// parallelism -> 1 -// ) + extends Estimator[OneVsRestModel] with OneVsRestParams with HasParallelism with MLWritable { @Since("1.4.0") def this() = this(Identifiable.randomUID("oneVsRest")) -// /** @group getParam */ -// def getParallelism: Int = $(parallelism) - /** @group setParam */ @Since("1.4.0") def setClassifier(value: Classifier[_, _, _]): this.type = { set(classifier, value.asInstanceOf[ClassifierType]) } -// /** @group setParam */ -// @Since("2.3.0") -// def setParallelism(value: Int): this.type = { -// set(parallelism, value) -// } - /** @group setParam */ @Since("1.5.0") def setLabelCol(value: String): this.type = set(labelCol, value) @@ -325,14 +302,6 @@ final class OneVsRest @Since("1.4.0") ( validateAndTransformSchema(schema, fitting = true, getClassifier.featuresDataType) } -// def getExecutorService: ExecutorService = { -// if (getParallelism == 1) { -// return MoreExecutors.sameThreadExecutor() -// } -// ThreadUtils -// .newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", getParallelism) -// } - @Since("2.0.0") override def fit(dataset: Dataset[_]): OneVsRestModel = { transformSchema(dataset.schema) @@ -359,22 +328,21 @@ final class OneVsRest @Since("1.4.0") ( multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK) } - val executor = getExecutorService - val executionContext = ExecutionContext.fromExecutorService(executor) + val executionContext = getExecutionContext // create k columns, one for each binary classifier. val modelFutures = Range(0, numClasses).map { index => - Future[ClassificationModel[_, _]] { - // generate new label metadata for the binary problem. - val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() - val labelColName = "mc2b$" + index - val trainingDataset = multiclassLabeled.withColumn( - labelColName, when(col($(labelCol)) === index.toDouble, 1.0).otherwise(0.0), newLabelMeta) - val classifier = getClassifier - val paramMap = new ParamMap() - paramMap.put(classifier.labelCol -> labelColName) - paramMap.put(classifier.featuresCol -> getFeaturesCol) - paramMap.put(classifier.predictionCol -> getPredictionCol) + // generate new label metadata for the binary problem. + val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata() + val labelColName = "mc2b$" + index + val trainingDataset = multiclassLabeled.withColumn( + labelColName, when(col($(labelCol)) === index.toDouble, 1.0).otherwise(0.0), newLabelMeta) + val classifier = getClassifier + val paramMap = new ParamMap() + paramMap.put(classifier.labelCol -> labelColName) + paramMap.put(classifier.featuresCol -> getFeaturesCol) + paramMap.put(classifier.predictionCol -> getPredictionCol) + Future { classifier.fit(trainingDataset, paramMap) }(executionContext) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/ParallelismParam.scala b/mllib/src/main/scala/org/apache/spark/ml/util/HasParallelism.scala similarity index 76% rename from mllib/src/main/scala/org/apache/spark/ml/util/ParallelismParam.scala rename to mllib/src/main/scala/org/apache/spark/ml/util/HasParallelism.scala index 7ac6e98a7ee8..7413baa3d3d4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/ParallelismParam.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/HasParallelism.scala @@ -19,6 +19,8 @@ package org.apache.spark.ml.util import java.util.concurrent.ExecutorService +import scala.concurrent.ExecutionContext + import com.google.common.util.concurrent.MoreExecutors import org.apache.spark.annotation.Since @@ -28,17 +30,17 @@ import org.apache.spark.util.ThreadUtils /** * Common parameter for estimators trained in a multithreaded environment. */ -private[ml] trait ParallelismParam extends Params { +private[ml] trait HasParallelism extends Params { /** * param for the number of processes to use when running parallel one vs. rest * The implementation of parallel one vs. rest runs the classification for * each class in a separate process. - * @group param + * @group expertParam */ @Since("2.3.0") val parallelism = new IntParam(this, "parallelism", - "the number of processes to use when running parallel one vs. rest", ParamValidators.gtEq(1)) + "the number of processes to use when running parallel algorithms", ParamValidators.gtEq(1)) setDefault( parallelism -> 1 @@ -53,11 +55,13 @@ private[ml] trait ParallelismParam extends Params { set(parallelism, value) } - def getExecutorService: ExecutorService = { - if (getParallelism == 1) { - return MoreExecutors.sameThreadExecutor() + def getExecutionContext: ExecutionContext = { + getParallelism match { + case 1 => + ThreadUtils.sameThread + case n => + ExecutionContext.fromExecutorService(ThreadUtils + .newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", getParallelism)) } - ThreadUtils - .newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", getParallelism) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala index 04f49941ada5..1383ece81724 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala @@ -114,21 +114,21 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau row => (row.getDouble(0), row.getDouble(1)) } - val ovaPar4 = new OneVsRest() + val ovaPar2 = new OneVsRest() .setClassifier(new LogisticRegression) - .setParallelism(4) + .setParallelism(2) - val ovaModelPar4 = ovaPar4.fit(dataset) + val ovaModelPar2 = ovaPar2.fit(dataset) - val transformedDatasetPar4 = ovaModelPar4.transform(dataset) + val transformedDatasetPar2 = ovaModelPar2.transform(dataset) - val ovaResultsPar4 = transformedDatasetPar4.select("prediction", "label").rdd.map { + val ovaResultsPar2 = transformedDatasetPar2.select("prediction", "label").rdd.map { row => (row.getDouble(0), row.getDouble(1)) } val metricsPar1 = new MulticlassMetrics(ovaResultsPar1) - val metricsPar4 = new MulticlassMetrics(ovaResultsPar4) - assert(metricsPar1.confusionMatrix ~== metricsPar4.confusionMatrix absTol 400) + val metricsPar2 = new MulticlassMetrics(ovaResultsPar2) + assert(metricsPar1.confusionMatrix ~== metricsPar2.confusionMatrix absTol 400) } test("one-vs-rest: pass label metadata correctly during train") { diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 741c9ceaefc2..08feca9e2c64 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1518,13 +1518,13 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable): @keyword_only def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", - classifier=None, parallelism=4): + classifier=None, parallelism=1): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ classifier=None) """ super(OneVsRest, self).__init__() - self._setDefault(parallelism=4) + self._setDefault(parallelism=1) kwargs = self._input_kwargs self._set(**kwargs) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index d786de696397..f0d590ae9376 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -951,7 +951,7 @@ def test_onevsrest(self): (2.0, Vectors.dense(0.5, 0.5))] * 10, ["label", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01) - ovr = OneVsRest(classifier=lr, parallelism=8) + ovr = OneVsRest(classifier=lr, parallelism=1) model = ovr.fit(df) ovrPath = temp_path + "/ovr" ovr.save(ovrPath) @@ -1215,7 +1215,7 @@ def test_copy(self): (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01) - ovr = OneVsRest(classifier=lr, parallelism=1) + ovr = OneVsRest(classifier=lr) ovr1 = ovr.copy({lr.maxIter: 10}) self.assertEqual(ovr.getClassifier().getMaxIter(), 5) self.assertEqual(ovr1.getClassifier().getMaxIter(), 10) @@ -1239,11 +1239,11 @@ def test_parallelism_doesnt_change_output(self): (1.0, Vectors.sparse(2, [], [])), (2.0, Vectors.dense(0.5, 0.5))], ["label", "features"]) + ovrPar1 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=1) + modelPar1 = ovrPar1.fit(df) ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2) modelPar2 = ovrPar2.fit(df) - ovrPar4 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=4) - modelPar4 = ovrPar4.fit(df) - self.assertEqual(modelPar2.getPredictionCol(), modelPar4.getPredictionCol()) + self.assertEqual(modelPar1.getPredictionCol(), modelPar2.getPredictionCol()) class HashingTFTest(SparkSessionTestCase): From a841b3ea3c674b790948425c884661063f5e3520 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Thu, 6 Jul 2017 16:35:15 -0700 Subject: [PATCH 08/16] Made changes based on pull request comments. --- .../spark/ml/classification/OneVsRest.scala | 7 +++---- .../apache/spark/ml/util/HasParallelism.scala | 18 ++++++------------ .../ml/classification/OneVsRestSuite.scala | 16 +++++++++++++++- python/pyspark/ml/classification.py | 6 +++--- python/pyspark/ml/tests.py | 10 +++++++++- 5 files changed, 36 insertions(+), 21 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 34f3b9bf2972..da01a49a73e1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -18,13 +18,11 @@ package org.apache.spark.ml.classification import java.util.UUID -import java.util.concurrent.ExecutorService -import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.language.existentials -import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.Path import org.json4s.{DefaultFormats, JObject, _} import org.json4s.JsonDSL._ @@ -34,7 +32,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.Vector -import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamPair, Params, ParamValidators} +import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ @@ -329,6 +327,7 @@ final class OneVsRest @Since("1.4.0") ( } val executionContext = getExecutionContext + instr.logParams(parallelism) // create k columns, one for each binary classifier. val modelFutures = Range(0, numClasses).map { index => diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/HasParallelism.scala b/mllib/src/main/scala/org/apache/spark/ml/util/HasParallelism.scala index 7413baa3d3d4..6645439f31ec 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/HasParallelism.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/HasParallelism.scala @@ -17,12 +17,8 @@ package org.apache.spark.ml.util -import java.util.concurrent.ExecutorService - import scala.concurrent.ExecutionContext -import com.google.common.util.concurrent.MoreExecutors - import org.apache.spark.annotation.Since import org.apache.spark.ml.param.{IntParam, Params, ParamValidators} import org.apache.spark.util.ThreadUtils @@ -33,18 +29,16 @@ import org.apache.spark.util.ThreadUtils private[ml] trait HasParallelism extends Params { /** - * param for the number of processes to use when running parallel one vs. rest + * param for the number of threads to use when running parallel one vs. rest * The implementation of parallel one vs. rest runs the classification for - * each class in a separate process. + * each class in a separate threads. * @group expertParam */ @Since("2.3.0") val parallelism = new IntParam(this, "parallelism", - "the number of processes to use when running parallel algorithms", ParamValidators.gtEq(1)) + "the number of threads to use when running parallel algorithms", ParamValidators.gtEq(1)) - setDefault( - parallelism -> 1 - ) + setDefault(parallelism -> 1) /** @group getParam */ def getParallelism: Int = $(parallelism) @@ -55,13 +49,13 @@ private[ml] trait HasParallelism extends Params { set(parallelism, value) } - def getExecutionContext: ExecutionContext = { + protected def getExecutionContext: ExecutionContext = { getParallelism match { case 1 => ThreadUtils.sameThread case n => ExecutionContext.fromExecutorService(ThreadUtils - .newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", getParallelism)) + .newDaemonCachedThreadPool(s"${this.getClass.getSimpleName}-thread-pool", n)) } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala index 1383ece81724..1850f7940306 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala @@ -128,7 +128,21 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau val metricsPar1 = new MulticlassMetrics(ovaResultsPar1) val metricsPar2 = new MulticlassMetrics(ovaResultsPar2) - assert(metricsPar1.confusionMatrix ~== metricsPar2.confusionMatrix absTol 400) + assert(metricsPar1.confusionMatrix == metricsPar2.confusionMatrix) + + for (i <- 0 until ovaModelPar1.models.length) { + var foundCloseCoeffs = false + val currentCoeffs = ovaModelPar1.models(i) + .asInstanceOf[LogisticRegressionModel].coefficients + for (j <- 0 until ovaModelPar2.models.length) { + val otherCoeffs = ovaModelPar2.models(i) + .asInstanceOf[LogisticRegressionModel].coefficients + if (currentCoeffs == otherCoeffs) { + foundCloseCoeffs = true + } + } + assert(foundCloseCoeffs) + } } test("one-vs-rest: pass label metadata correctly during train") { diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 08feca9e2c64..6457ed9eb326 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1513,7 +1513,7 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable): """ parallelism = Param(Params._dummy(), "parallelism", - "number of processors to use when fitting models in parallel", + "number of threads to use when fitting models in parallel", typeConverter=TypeConverters.toInt) @keyword_only @@ -1530,8 +1530,8 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred @keyword_only @since("2.0.0") - def setParams(self, featuresCol=None, labelCol=None, predictionCol=None, - classifier=None, parallelism=None): + def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", + classifier=None, parallelism=1): """ setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): Sets params for OneVsRest. diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index f0d590ae9376..ad1d60b05dd1 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -951,7 +951,7 @@ def test_onevsrest(self): (2.0, Vectors.dense(0.5, 0.5))] * 10, ["label", "features"]) lr = LogisticRegression(maxIter=5, regParam=0.01) - ovr = OneVsRest(classifier=lr, parallelism=1) + ovr = OneVsRest(classifier=lr) model = ovr.fit(df) ovrPath = temp_path + "/ovr" ovr.save(ovrPath) @@ -1244,6 +1244,14 @@ def test_parallelism_doesnt_change_output(self): ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2) modelPar2 = ovrPar2.fit(df) self.assertEqual(modelPar1.getPredictionCol(), modelPar2.getPredictionCol()) + for model in modelPar1.models: + foundCloseCoeffs = False + for model2 in modelPar2.models: + if np.allclose(model.coefficients.toArray(), + model2.coefficients.toArray(), atol=1E-4): + foundCloseCoeffs = True + break + self.assertTrue(foundCloseCoeffs) class HashingTFTest(SparkSessionTestCase): From a95a8af2073b29aac751ae58489b737a3d7a39ae Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Fri, 14 Jul 2017 16:49:00 -0700 Subject: [PATCH 09/16] Fixed based on pull request comments --- .../spark/ml/classification/OneVsRest.scala | 4 +-- .../shared}/HasParallelism.scala | 13 ++++------ .../ml/classification/OneVsRestSuite.scala | 4 +-- python/pyspark/ml/classification.py | 25 +++---------------- .../ml/param/_shared_params_code_gen.py | 2 ++ python/pyspark/ml/param/shared.py | 24 ++++++++++++++++++ 6 files changed, 39 insertions(+), 33 deletions(-) rename mllib/src/main/scala/org/apache/spark/ml/{util => param/shared}/HasParallelism.scala (88%) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index da01a49a73e1..50bf5b891aee 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -33,6 +33,7 @@ import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} +import org.apache.spark.ml.param.shared.HasParallelism import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ @@ -305,7 +306,7 @@ final class OneVsRest @Since("1.4.0") ( transformSchema(dataset.schema) val instr = Instrumentation.create(this, dataset) - instr.logParams(labelCol, featuresCol, predictionCol) + instr.logParams(labelCol, featuresCol, predictionCol, parallelism) instr.logNamedValue("classifier", $(classifier).getClass.getCanonicalName) // determine number of classes either from metadata if provided, or via computation. @@ -327,7 +328,6 @@ final class OneVsRest @Since("1.4.0") ( } val executionContext = getExecutionContext - instr.logParams(parallelism) // create k columns, one for each binary classifier. val modelFutures = Range(0, numClasses).map { index => diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/HasParallelism.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala similarity index 88% rename from mllib/src/main/scala/org/apache/spark/ml/util/HasParallelism.scala rename to mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala index 6645439f31ec..73d6f9307fb5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/HasParallelism.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala @@ -15,11 +15,10 @@ * limitations under the License. */ -package org.apache.spark.ml.util +package org.apache.spark.ml.param.shared import scala.concurrent.ExecutionContext -import org.apache.spark.annotation.Since import org.apache.spark.ml.param.{IntParam, Params, ParamValidators} import org.apache.spark.util.ThreadUtils @@ -29,27 +28,25 @@ import org.apache.spark.util.ThreadUtils private[ml] trait HasParallelism extends Params { /** - * param for the number of threads to use when running parallel one vs. rest + * param for the number of threads to use when running parallel meta-algorithms * The implementation of parallel one vs. rest runs the classification for * each class in a separate threads. * @group expertParam */ - @Since("2.3.0") val parallelism = new IntParam(this, "parallelism", "the number of threads to use when running parallel algorithms", ParamValidators.gtEq(1)) setDefault(parallelism -> 1) - /** @group getParam */ + /** @group expertGetParam */ def getParallelism: Int = $(parallelism) - /** @group setParam */ - @Since("2.3.0") + /** @group expertSetParam */ def setParallelism(value: Int): this.type = { set(parallelism, value) } - protected def getExecutionContext: ExecutionContext = { + private[ml] def getExecutionContext: ExecutionContext = { getParallelism match { case 1 => ThreadUtils.sameThread diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala index 1850f7940306..bb6b0b2cdbf3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala @@ -133,10 +133,10 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau for (i <- 0 until ovaModelPar1.models.length) { var foundCloseCoeffs = false val currentCoeffs = ovaModelPar1.models(i) - .asInstanceOf[LogisticRegressionModel].coefficients + .asInstanceOf[LogisticRegressionModel].coefficients for (j <- 0 until ovaModelPar2.models.length) { val otherCoeffs = ovaModelPar2.models(i) - .asInstanceOf[LogisticRegressionModel].coefficients + .asInstanceOf[LogisticRegressionModel].coefficients if (currentCoeffs == otherCoeffs) { foundCloseCoeffs = true } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index 6457ed9eb326..dc3a600fe5a6 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1444,7 +1444,7 @@ def weights(self): return self._call_java("weights") -class OneVsRestParams(HasFeaturesCol, HasLabelCol, HasPredictionCol): +class OneVsRestParams(HasFeaturesCol, HasLabelCol, HasParallelism, HasPredictionCol): """ Parameters for OneVsRest and OneVsRestModel. """ @@ -1512,16 +1512,12 @@ class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable): .. versionadded:: 2.0.0 """ - parallelism = Param(Params._dummy(), "parallelism", - "number of threads to use when fitting models in parallel", - typeConverter=TypeConverters.toInt) - @keyword_only def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", classifier=None, parallelism=1): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ - classifier=None) + classifier=None, parallelism=1) """ super(OneVsRest, self).__init__() self._setDefault(parallelism=1) @@ -1533,26 +1529,13 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", classifier=None, parallelism=1): """ - setParams(self, featuresCol=None, labelCol=None, predictionCol=None, classifier=None): + setParams(self, featuresCol=None, labelCol=None, predictionCol=None, + classifier=None, parallelism=1): Sets params for OneVsRest. """ kwargs = self._input_kwargs return self._set(**kwargs) - @since("2.3.0") - def setParallelism(self, value): - """ - Sets the value of :py:attr:`parallelism`. - """ - return self._set(parallelism=value) - - @since("2.3.0") - def getParallelism(self): - """ - Gets the value of parallelism or its default value. - """ - return self.getOrDefault(self.parallelism) - def _fit(self, dataset): labelCol = self.getLabelCol() featuresCol = self.getFeaturesCol() diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index 51d49b524c32..34a1773d34bf 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -152,6 +152,8 @@ def get$Name(self): ("varianceCol", "column name for the biased sample variance of prediction.", None, "TypeConverters.toString"), ("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2", + "TypeConverters.toInt"), + ("parallelism", "number of threads to use when fitting models in parallel.", "1", "TypeConverters.toInt")] code = [] diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 163a0e2b3a96..5c84b32255de 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -608,6 +608,30 @@ def getAggregationDepth(self): return self.getOrDefault(self.aggregationDepth) +class HasParallelism(Params): + """ + Mixin for param parallelism: number of threads to use when fitting models in parallel. + """ + + parallelism = Param(Params._dummy(), "parallelism", "number of threads to use when fitting models in parallel.", typeConverter=TypeConverters.toInt) + + def __init__(self): + super(HasParallelism, self).__init__() + self._setDefault(parallelism=1) + + def setParallelism(self, value): + """ + Sets the value of :py:attr:`parallelism`. + """ + return self._set(parallelism=value) + + def getParallelism(self): + """ + Gets the value of parallelism or its default value. + """ + return self.getOrDefault(self.parallelism) + + class DecisionTreeParams(Params): """ Mixin for Decision Tree parameters. From d45bc233751e63f15d242da702fdad6102f8bd65 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Tue, 18 Jul 2017 11:25:50 -0700 Subject: [PATCH 10/16] Fixed based on comments --- .../spark/ml/classification/OneVsRest.scala | 12 ++++++++++++ .../ml/param/shared/HasParallelism.scala | 2 -- .../ml/classification/OneVsRestSuite.scala | 19 +++++++------------ python/pyspark/ml/tests.py | 11 +++-------- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 50bf5b891aee..6aa7593252c9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -296,6 +296,18 @@ final class OneVsRest @Since("1.4.0") ( @Since("1.5.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) + /** @group expertGetParam */ + override def getParallelism: Int = $(parallelism) + + /** + * @group expertSetParam + * The implementation of parallel one vs. rest runs the classification for + * each class in a separate threads. + */ + override def setParallelism(value: Int): this.type = { + set(parallelism, value) + } + @Since("1.4.0") override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema, fitting = true, getClassifier.featuresDataType) diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala index 73d6f9307fb5..cf5a5a89c822 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala @@ -29,8 +29,6 @@ private[ml] trait HasParallelism extends Params { /** * param for the number of threads to use when running parallel meta-algorithms - * The implementation of parallel one vs. rest runs the classification for - * each class in a separate threads. * @group expertParam */ val parallelism = new IntParam(this, "parallelism", diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala index bb6b0b2cdbf3..983f52403909 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala @@ -130,18 +130,13 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau val metricsPar2 = new MulticlassMetrics(ovaResultsPar2) assert(metricsPar1.confusionMatrix == metricsPar2.confusionMatrix) - for (i <- 0 until ovaModelPar1.models.length) { - var foundCloseCoeffs = false - val currentCoeffs = ovaModelPar1.models(i) - .asInstanceOf[LogisticRegressionModel].coefficients - for (j <- 0 until ovaModelPar2.models.length) { - val otherCoeffs = ovaModelPar2.models(i) - .asInstanceOf[LogisticRegressionModel].coefficients - if (currentCoeffs == otherCoeffs) { - foundCloseCoeffs = true - } - } - assert(foundCloseCoeffs) + ovaModelPar1.models.zip(ovaModelPar2.models).foreach { + case (lrModel1: LogisticRegressionModel, lrModel2: LogisticRegressionModel) => + assert(lrModel1.coefficients === lrModel2.coefficients) + assert(lrModel1.intercept === lrModel2.intercept) + case other => + throw new AssertionError(s"Loaded OneVsRestModel expected model of type" + + s" LogisticRegressionModel but found ${other.getClass.getName}") } } diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index ad1d60b05dd1..3cda347c2834 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1244,14 +1244,9 @@ def test_parallelism_doesnt_change_output(self): ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2) modelPar2 = ovrPar2.fit(df) self.assertEqual(modelPar1.getPredictionCol(), modelPar2.getPredictionCol()) - for model in modelPar1.models: - foundCloseCoeffs = False - for model2 in modelPar2.models: - if np.allclose(model.coefficients.toArray(), - model2.coefficients.toArray(), atol=1E-4): - foundCloseCoeffs = True - break - self.assertTrue(foundCloseCoeffs) + for i, model in enumerate(modelPar1.models): + assert(np.allclose(model.coefficients.toArray(), + modelPar2.models[i].coefficients.toArray(), atol=1E-4)) class HashingTFTest(SparkSessionTestCase): From 30ac62d476e72fd2f70ef2f4a74a735ea5d98509 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Wed, 19 Jul 2017 13:24:48 -0700 Subject: [PATCH 11/16] Reverting merge and adding change that would fix merge conflict (making OneVsRest and OneVsRest model JavaMLReadable and JavaMLWritable) --- python/pyspark/ml/classification.py | 38 +++-------------------------- 1 file changed, 3 insertions(+), 35 deletions(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index dc3a600fe5a6..b79300158967 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1444,7 +1444,7 @@ def weights(self): return self._call_java("weights") -class OneVsRestParams(HasFeaturesCol, HasLabelCol, HasParallelism, HasPredictionCol): +class OneVsRestParams(HasFeaturesCol, HasLabelCol, HasPredictionCol): """ Parameters for OneVsRest and OneVsRestModel. """ @@ -1469,7 +1469,7 @@ def getClassifier(self): @inherit_doc -class OneVsRest(Estimator, OneVsRestParams, MLReadable, MLWritable): +class OneVsRest(Estimator, OneVsRestParams, HasParallelism, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental @@ -1590,22 +1590,6 @@ def copy(self, extra=None): newOvr.setClassifier(self.getClassifier().copy(extra)) return newOvr - @since("2.0.0") - def write(self): - """Returns an MLWriter instance for this ML instance.""" - return JavaMLWriter(self) - - @since("2.0.0") - def save(self, path): - """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" - self.write().save(path) - - @classmethod - @since("2.0.0") - def read(cls): - """Returns an MLReader instance for this class.""" - return JavaMLReader(cls) - @classmethod def _from_java(cls, java_stage): """ @@ -1638,7 +1622,7 @@ def _to_java(self): return _java_obj -class OneVsRestModel(Model, OneVsRestParams, MLReadable, MLWritable): +class OneVsRestModel(Model, OneVsRestParams, JavaMLReadable, JavaMLWritable): """ .. note:: Experimental @@ -1718,22 +1702,6 @@ def copy(self, extra=None): newModel.models = [model.copy(extra) for model in self.models] return newModel - @since("2.0.0") - def write(self): - """Returns an MLWriter instance for this ML instance.""" - return JavaMLWriter(self) - - @since("2.0.0") - def save(self, path): - """Save this ML instance to the given path, a shortcut of `write().save(path)`.""" - self.write().save(path) - - @classmethod - @since("2.0.0") - def read(cls): - """Returns an MLReader instance for this class.""" - return JavaMLReader(cls) - @classmethod def _from_java(cls, java_stage): """ From ce14172711b51a4321ed02a3cf8450a54374d4f5 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Wed, 19 Jul 2017 17:32:08 -0700 Subject: [PATCH 12/16] Style fix with docstring --- python/pyspark/ml/classification.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index a07b68057937..017c5a4e25a9 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1533,7 +1533,7 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", classifier=None, parallelism=1): """ - setParams(self, featuresCol=None, labelCol=None, predictionCol=None, + setParams(self, featuresCol=None, labelCol=None, predictionCol=None, \ classifier=None, parallelism=1): Sets params for OneVsRest. """ From 1c9de16d2919205542712b57de8bf4d866e17d95 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Wed, 26 Jul 2017 18:56:31 -0700 Subject: [PATCH 13/16] Fixed based on comments. --- .../org/apache/spark/ml/classification/OneVsRestSuite.scala | 1 - python/pyspark/ml/tests.py | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala index 983f52403909..567093dfbc62 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/OneVsRestSuite.scala @@ -102,7 +102,6 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau } test("one-vs-rest: tuning parallelism does not change output") { - val numClasses = 3 val ovaPar1 = new OneVsRest() .setClassifier(new LogisticRegression) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index dd4107c82f44..13ddacda496b 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1403,10 +1403,10 @@ def test_parallelism_doesnt_change_output(self): modelPar1 = ovrPar1.fit(df) ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2) modelPar2 = ovrPar2.fit(df) - self.assertEqual(modelPar1.getPredictionCol(), modelPar2.getPredictionCol()) for i, model in enumerate(modelPar1.models): - assert(np.allclose(model.coefficients.toArray(), + self.assertTrue(np.allclose(model.coefficients.toArray(), modelPar2.models[i].coefficients.toArray(), atol=1E-4)) + self.assertTrue(np.allclose(model.intercept, modelPar2.models[i].intercept, atol=1E-4)) class HashingTFTest(SparkSessionTestCase): From 9f3440412515df1630072c01ef642ee5e3723eb0 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Wed, 26 Jul 2017 19:04:43 -0700 Subject: [PATCH 14/16] Fixed style issue. --- python/pyspark/ml/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 13ddacda496b..6f3b8102bcac 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1405,7 +1405,7 @@ def test_parallelism_doesnt_change_output(self): modelPar2 = ovrPar2.fit(df) for i, model in enumerate(modelPar1.models): self.assertTrue(np.allclose(model.coefficients.toArray(), - modelPar2.models[i].coefficients.toArray(), atol=1E-4)) + modelPar2.models[i].coefficients.toArray(), atol=1E-4)) self.assertTrue(np.allclose(model.intercept, modelPar2.models[i].intercept, atol=1E-4)) From f65381afe20f5458ff7d706c6d803355c1b89f00 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Wed, 23 Aug 2017 13:51:01 -0400 Subject: [PATCH 15/16] Fixed remaining part of merge conflict. --- .../org/apache/spark/ml/classification/OneVsRest.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index cf2df82d1f53..cbeeddf2738e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -33,11 +33,7 @@ import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params} -<<<<<<< HEAD -import org.apache.spark.ml.param.shared.HasParallelism -======= -import org.apache.spark.ml.param.shared.HasWeightCol ->>>>>>> master +import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol} import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ From 2a335fec1a4527e4970e88c10555d7d3f0375c35 Mon Sep 17 00:00:00 2001 From: Ajay Saini Date: Wed, 23 Aug 2017 14:34:48 -0400 Subject: [PATCH 16/16] Fixed style problem --- .../scala/org/apache/spark/ml/classification/OneVsRest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index cbeeddf2738e..ad08d572a0b2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -28,6 +28,7 @@ import org.json4s.{DefaultFormats, JObject, _} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ +import org.apache.spark.SparkContext import org.apache.spark.annotation.Since import org.apache.spark.ml._ import org.apache.spark.ml.attribute._ @@ -38,7 +39,6 @@ import org.apache.spark.ml.util._ import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ -import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel import org.apache.spark.util.ThreadUtils