Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
b69f201
Added tunable parallelism to the pyspark implementation of one vs. re…
ajaysaini725 Jun 12, 2017
e750d3e
Fixed python style.
ajaysaini725 Jun 12, 2017
81d458b
Added functionality for tuning parellelism in the Scala implementatio…
ajaysaini725 Jun 13, 2017
2133378
Fixed code according to comments. Added both annotations and unit tes…
ajaysaini725 Jun 13, 2017
c59b1d8
Modified parallel one vs rest to use futures.
ajaysaini725 Jun 22, 2017
5f635a2
Put the parallelism parameter as well as the function for getting an …
ajaysaini725 Jun 23, 2017
4431ffc
Responded to pull request comments.
ajaysaini725 Jun 23, 2017
a841b3e
Made changes based on pull request comments.
ajaysaini725 Jul 6, 2017
a95a8af
Fixed based on pull request comments
ajaysaini725 Jul 14, 2017
d45bc23
Fixed based on comments
ajaysaini725 Jul 18, 2017
30ac62d
Reverting merge and adding change that would fix merge conflict (maki…
ajaysaini725 Jul 19, 2017
cc634d2
Merge branch 'master' into spark-21027
ajaysaini725 Jul 19, 2017
ce14172
Style fix with docstring
ajaysaini725 Jul 20, 2017
1c9de16
Fixed based on comments.
ajaysaini725 Jul 27, 2017
9f34404
Fixed style issue.
ajaysaini725 Jul 27, 2017
585a3f8
Fixed merge conflict
ajaysaini725 Aug 12, 2017
f65381a
Fixed remaining part of merge conflict.
ajaysaini725 Aug 23, 2017
2a335fe
Fixed style problem
ajaysaini725 Aug 23, 2017
049f371
Merge branch 'master' into spark-21027
WeichenXu123 Sep 2, 2017
ddc2ff4
address review feedback issues
WeichenXu123 Sep 3, 2017
fc6fd5e
update migration guide
WeichenXu123 Sep 3, 2017
7d0849e
update desc
WeichenXu123 Sep 6, 2017
edcf85c
fix style
WeichenXu123 Sep 6, 2017
7a1d404
merge master & resolve conflicts
WeichenXu123 Sep 6, 2017
c24d4e2
update out-of-date shared.py
WeichenXu123 Sep 12, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/ml-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,24 @@ MLlib is under active development.
The APIs marked `Experimental`/`DeveloperApi` may change in future releases,
and the migration guide below will explain all changes between releases.

## From 2.2 to 2.3

### Breaking changes

There are no breaking changes.

### Deprecations and changes of behavior

**Deprecations**

There are no deprecations.

**Changes of behavior**

* [SPARK-21027](https://issues.apache.org/jira/browse/SPARK-21027):
We are now setting the default parallelism used in `OneVsRest` to be 1 (i.e. serial), in 2.2 and earlier version,
the `OneVsRest` parallelism would be parallelism of the default threadpool in scala.

## From 2.1 to 2.2

### Breaking changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.ml.classification

import java.util.{List => JList}
import java.util.UUID

import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.language.existentials

import org.apache.hadoop.fs.Path
Expand All @@ -34,12 +34,13 @@ import org.apache.spark.ml._
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.{Param, ParamMap, ParamPair, Params}
import org.apache.spark.ml.param.shared.HasWeightCol
import org.apache.spark.ml.param.shared.{HasParallelism, HasWeightCol}
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.ThreadUtils

private[ml] trait ClassifierTypeTrait {
// scalastyle:off structural.type
Expand Down Expand Up @@ -273,7 +274,7 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] {
@Since("1.4.0")
final class OneVsRest @Since("1.4.0") (
@Since("1.4.0") override val uid: String)
extends Estimator[OneVsRestModel] with OneVsRestParams with MLWritable {
extends Estimator[OneVsRestModel] with OneVsRestParams with HasParallelism with MLWritable {

@Since("1.4.0")
def this() = this(Identifiable.randomUID("oneVsRest"))
Expand All @@ -296,6 +297,16 @@ final class OneVsRest @Since("1.4.0") (
@Since("1.5.0")
def setPredictionCol(value: String): this.type = set(predictionCol, value)

/**
* The implementation of parallel one vs. rest runs the classification for
* each class in a separate threads.
*
* @group expertSetParam
*/
def setParallelism(value: Int): this.type = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing since annotation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I create a PR to fix this.

set(parallelism, value)
}

/**
* Sets the value of param [[weightCol]].
*
Expand All @@ -318,7 +329,7 @@ final class OneVsRest @Since("1.4.0") (
transformSchema(dataset.schema)

val instr = Instrumentation.create(this, dataset)
instr.logParams(labelCol, featuresCol, predictionCol)
instr.logParams(labelCol, featuresCol, predictionCol, parallelism)
instr.logNamedValue("classifier", $(classifier).getClass.getCanonicalName)

// determine number of classes either from metadata if provided, or via computation.
Expand Down Expand Up @@ -352,8 +363,10 @@ final class OneVsRest @Since("1.4.0") (
multiclassLabeled.persist(StorageLevel.MEMORY_AND_DISK)
}

val executionContext = getExecutionContext

// create k columns, one for each binary classifier.
val models = Range(0, numClasses).par.map { index =>
val modelFutures = Range(0, numClasses).map { index =>
// generate new label metadata for the binary problem.
val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata()
val labelColName = "mc2b$" + index
Expand All @@ -364,14 +377,18 @@ final class OneVsRest @Since("1.4.0") (
paramMap.put(classifier.labelCol -> labelColName)
paramMap.put(classifier.featuresCol -> getFeaturesCol)
paramMap.put(classifier.predictionCol -> getPredictionCol)
if (weightColIsUsed) {
val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol]
paramMap.put(classifier_.weightCol -> getWeightCol)
classifier_.fit(trainingDataset, paramMap)
} else {
classifier.fit(trainingDataset, paramMap)
}
}.toArray[ClassificationModel[_, _]]
Future {
if (weightColIsUsed) {
val classifier_ = classifier.asInstanceOf[ClassifierType with HasWeightCol]
paramMap.put(classifier_.weightCol -> getWeightCol)
classifier_.fit(trainingDataset, paramMap)
} else {
classifier.fit(trainingDataset, paramMap)
}
}(executionContext)
}
val models = modelFutures
.map(ThreadUtils.awaitResult(_, Duration.Inf)).toArray[ClassificationModel[_, _]]
instr.logNumFeatures(models.head.numFeatures)

if (handlePersistence) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param.{ParamMap, ParamsSuite}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MetadataUtils, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -98,7 +98,45 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
// bound how much error we allow compared to multinomial logistic regression.
val expectedMetrics = new MulticlassMetrics(results)
val ovaMetrics = new MulticlassMetrics(ovaResults)
assert(expectedMetrics.confusionMatrix ~== ovaMetrics.confusionMatrix absTol 400)
assert(expectedMetrics.confusionMatrix.asML ~== ovaMetrics.confusionMatrix.asML absTol 400)
}

test("one-vs-rest: tuning parallelism does not change output") {
val ovaPar1 = new OneVsRest()
.setClassifier(new LogisticRegression)

val ovaModelPar1 = ovaPar1.fit(dataset)

val transformedDatasetPar1 = ovaModelPar1.transform(dataset)

val ovaResultsPar1 = transformedDatasetPar1.select("prediction", "label").rdd.map {
row => (row.getDouble(0), row.getDouble(1))
}

val ovaPar2 = new OneVsRest()
.setClassifier(new LogisticRegression)
.setParallelism(2)

val ovaModelPar2 = ovaPar2.fit(dataset)

val transformedDatasetPar2 = ovaModelPar2.transform(dataset)

val ovaResultsPar2 = transformedDatasetPar2.select("prediction", "label").rdd.map {
row => (row.getDouble(0), row.getDouble(1))
}

val metricsPar1 = new MulticlassMetrics(ovaResultsPar1)
val metricsPar2 = new MulticlassMetrics(ovaResultsPar2)
assert(metricsPar1.confusionMatrix == metricsPar2.confusionMatrix)

ovaModelPar1.models.zip(ovaModelPar2.models).foreach {
case (lrModel1: LogisticRegressionModel, lrModel2: LogisticRegressionModel) =>
assert(lrModel1.coefficients ~== lrModel2.coefficients relTol 1E-3)
assert(lrModel1.intercept ~== lrModel2.intercept relTol 1E-3)
case other =>
throw new AssertionError(s"Loaded OneVsRestModel expected model of type" +
s" LogisticRegressionModel but found ${other.getClass.getName}")
}
}

test("one-vs-rest: pass label metadata correctly during train") {
Expand Down
25 changes: 15 additions & 10 deletions python/pyspark/ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import operator
from multiprocessing.pool import ThreadPool

from pyspark import since, keyword_only
from pyspark.ml import Estimator, Model
Expand Down Expand Up @@ -1562,7 +1563,7 @@ def getClassifier(self):


@inherit_doc
class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable):
class OneVsRest(Estimator, OneVsRestParams, HasParallelism, JavaMLReadable, JavaMLWritable):
"""
.. note:: Experimental

Expand Down Expand Up @@ -1607,22 +1608,23 @@ class OneVsRest(Estimator, OneVsRestParams, JavaMLReadable, JavaMLWritable):

@keyword_only
def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
classifier=None, weightCol=None):
classifier=None, weightCol=None, parallelism=1):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
classifier=None, weightCol=None)
classifier=None, weightCol=None, parallelism=1):
"""
super(OneVsRest, self).__init__()
self._setDefault(parallelism=1)
kwargs = self._input_kwargs
self._set(**kwargs)

@keyword_only
@since("2.0.0")
def setParams(self, featuresCol=None, labelCol=None, predictionCol=None,
classifier=None, weightCol=None):
def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
classifier=None, weightCol=None, parallelism=1):
"""
setParams(self, featuresCol=None, labelCol=None, predictionCol=None, \
classifier=None, weightCol=None):
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
classifier=None, weightCol=None, parallelism=1):
Sets params for OneVsRest.
"""
kwargs = self._input_kwargs
Expand Down Expand Up @@ -1669,8 +1671,9 @@ def trainSingleClass(index):
paramMap[classifier.weightCol] = weightCol
return classifier.fit(trainingDataset, paramMap)

# TODO: Parallel training for all classes.
models = [trainSingleClass(i) for i in range(numClasses)]
pool = ThreadPool(processes=min(self.getParallelism(), numClasses))

models = pool.map(trainSingleClass, range(numClasses))

if handlePersistence:
multiclassLabeled.unpersist()
Expand Down Expand Up @@ -1704,8 +1707,9 @@ def _from_java(cls, java_stage):
labelCol = java_stage.getLabelCol()
predictionCol = java_stage.getPredictionCol()
classifier = JavaParams._from_java(java_stage.getClassifier())
parallelism = java_stage.getParallelism()
py_stage = cls(featuresCol=featuresCol, labelCol=labelCol, predictionCol=predictionCol,
classifier=classifier)
classifier=classifier, parallelism=parallelism)
py_stage._resetUid(java_stage.uid())
return py_stage

Expand All @@ -1718,6 +1722,7 @@ def _to_java(self):
_java_obj = JavaParams._new_java_obj("org.apache.spark.ml.classification.OneVsRest",
self.uid)
_java_obj.setClassifier(self.getClassifier()._to_java())
_java_obj.setParallelism(self.getParallelism())
_java_obj.setFeaturesCol(self.getFeaturesCol())
_java_obj.setLabelCol(self.getLabelCol())
_java_obj.setPredictionCol(self.getPredictionCol())
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/ml/param/_shared_params_code_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ def get$Name(self):
("varianceCol", "column name for the biased sample variance of prediction.",
None, "TypeConverters.toString"),
("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2",
"TypeConverters.toInt")]
"TypeConverters.toInt"),
("parallelism", "the number of threads to use when running parallel algorithms (>= 1).",
"1", "TypeConverters.toInt")]

code = []
for name, doc, defaultValueStr, typeConverter in shared:
Expand Down
24 changes: 24 additions & 0 deletions python/pyspark/ml/param/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,30 @@ def getAggregationDepth(self):
return self.getOrDefault(self.aggregationDepth)


class HasParallelism(Params):
"""
Mixin for param parallelism: the number of threads to use when running parallel algorithms (>= 1).
"""

parallelism = Param(Params._dummy(), "parallelism", "the number of threads to use when running parallel algorithms (>= 1).", typeConverter=TypeConverters.toInt)

def __init__(self):
super(HasParallelism, self).__init__()
self._setDefault(parallelism=1)

def setParallelism(self, value):
"""
Sets the value of :py:attr:`parallelism`.
"""
return self._set(parallelism=value)

def getParallelism(self):
"""
Gets the value of parallelism or its default value.
"""
return self.getOrDefault(self.parallelism)


class DecisionTreeParams(Params):
"""
Mixin for Decision Tree parameters.
Expand Down
16 changes: 15 additions & 1 deletion python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1533,11 +1533,25 @@ def test_output_columns(self):
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01)
ovr = OneVsRest(classifier=lr)
ovr = OneVsRest(classifier=lr, parallelism=1)
model = ovr.fit(df)
output = model.transform(df)
self.assertEqual(output.columns, ["label", "features", "prediction"])

def test_parallelism_doesnt_change_output(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
ovrPar1 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=1)
modelPar1 = ovrPar1.fit(df)
ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2)
modelPar2 = ovrPar2.fit(df)
for i, model in enumerate(modelPar1.models):
self.assertTrue(np.allclose(model.coefficients.toArray(),
modelPar2.models[i].coefficients.toArray(), atol=1E-4))
self.assertTrue(np.allclose(model.intercept, modelPar2.models[i].intercept, atol=1E-4))

def test_support_for_weightCol(self):
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8), 1.0),
(1.0, Vectors.sparse(2, [], []), 1.0),
Expand Down