-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-16356][ML] Add testImplicits for ML unit tests and promote toDF() #14035
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
4a04bab
d09a469
30ae934
ad9d7ac
b60c952
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,8 @@ import org.apache.spark.sql.functions.lit | |
| class LogisticRegressionSuite | ||
| extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { | ||
|
|
||
| import testImplicits._ | ||
|
|
||
| @transient var smallBinaryDataset: Dataset[_] = _ | ||
| @transient var smallMultinomialDataset: Dataset[_] = _ | ||
| @transient var binaryDataset: Dataset[_] = _ | ||
|
|
@@ -46,8 +48,7 @@ class LogisticRegressionSuite | |
| override def beforeAll(): Unit = { | ||
| super.beforeAll() | ||
|
|
||
| smallBinaryDataset = | ||
| spark.createDataFrame(generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42)) | ||
| smallBinaryDataset = generateLogisticInput(1.0, 1.0, nPoints = 100, seed = 42).toDF() | ||
|
|
||
| smallMultinomialDataset = { | ||
| val nPoints = 100 | ||
|
|
@@ -61,7 +62,7 @@ class LogisticRegressionSuite | |
| val testData = generateMultinomialLogisticInput( | ||
| coefficients, xMean, xVariance, addIntercept = true, nPoints, 42) | ||
|
|
||
| val df = spark.createDataFrame(sc.parallelize(testData, 4)) | ||
| val df = sc.parallelize(testData, 4).toDF() | ||
| df.cache() | ||
| df | ||
| } | ||
|
|
@@ -76,7 +77,7 @@ class LogisticRegressionSuite | |
| generateMultinomialLogisticInput(coefficients, xMean, xVariance, | ||
| addIntercept = true, nPoints, 42) | ||
|
|
||
| spark.createDataFrame(sc.parallelize(testData, 4)) | ||
| sc.parallelize(testData, 4).toDF() | ||
|
||
| } | ||
|
|
||
| multinomialDataset = { | ||
|
|
@@ -91,7 +92,7 @@ class LogisticRegressionSuite | |
| val testData = generateMultinomialLogisticInput( | ||
| coefficients, xMean, xVariance, addIntercept = true, nPoints, 42) | ||
|
|
||
| val df = spark.createDataFrame(sc.parallelize(testData, 4)) | ||
| val df = sc.parallelize(testData, 4).toDF() | ||
| df.cache() | ||
| df | ||
| } | ||
|
|
@@ -430,10 +431,10 @@ class LogisticRegressionSuite | |
| val model = new LogisticRegressionModel("mLogReg", | ||
| Matrices.dense(3, 2, Array(0.0, 0.0, 0.0, 1.0, 2.0, 3.0)), | ||
| Vectors.dense(0.0, 0.0, 0.0), 3, true) | ||
| val overFlowData = spark.createDataFrame(Seq( | ||
| val overFlowData = Seq( | ||
| LabeledPoint(1.0, Vectors.dense(0.0, 1000.0)), | ||
| LabeledPoint(1.0, Vectors.dense(0.0, -1.0)) | ||
| )) | ||
| ).toDF() | ||
| val results = model.transform(overFlowData).select("rawPrediction", "probability").collect() | ||
|
|
||
| // probabilities are correct when margins have to be adjusted | ||
|
|
@@ -1795,9 +1796,9 @@ class LogisticRegressionSuite | |
| val numPoints = 40 | ||
| val outlierData = MLTestingUtils.genClassificationInstancesWithWeightedOutliers(spark, | ||
| numClasses, numPoints) | ||
| val testData = spark.createDataFrame(Array.tabulate[LabeledPoint](numClasses) { i => | ||
| val testData = Array.tabulate[LabeledPoint](numClasses) { i => | ||
| LabeledPoint(i.toDouble, Vectors.dense(i.toDouble)) | ||
| }) | ||
| }.toSeq.toDF() | ||
| val lr = new LogisticRegression().setFamily("binomial").setWeightCol("weight") | ||
| val model = lr.fit(outlierData) | ||
| val results = model.transform(testData).select("label", "prediction").collect() | ||
|
|
@@ -1819,9 +1820,9 @@ class LogisticRegressionSuite | |
| val numPoints = 40 | ||
| val outlierData = MLTestingUtils.genClassificationInstancesWithWeightedOutliers(spark, | ||
| numClasses, numPoints) | ||
| val testData = spark.createDataFrame(Array.tabulate[LabeledPoint](numClasses) { i => | ||
| val testData = Array.tabulate[LabeledPoint](numClasses) { i => | ||
| LabeledPoint(i.toDouble, Vectors.dense(i.toDouble)) | ||
| }) | ||
| }.toSeq.toDF() | ||
| val mlr = new LogisticRegression().setFamily("multinomial").setWeightCol("weight") | ||
| val model = mlr.fit(outlierData) | ||
| val results = model.transform(testData).select("label", "prediction").collect() | ||
|
|
@@ -1945,11 +1946,10 @@ class LogisticRegressionSuite | |
| } | ||
|
|
||
| test("multiclass logistic regression with all labels the same") { | ||
| val constantData = spark.createDataFrame(Seq( | ||
| val constantData = Seq( | ||
| LabeledPoint(4.0, Vectors.dense(0.0)), | ||
| LabeledPoint(4.0, Vectors.dense(1.0)), | ||
| LabeledPoint(4.0, Vectors.dense(2.0))) | ||
| ) | ||
| LabeledPoint(4.0, Vectors.dense(2.0))).toDF() | ||
| val mlr = new LogisticRegression().setFamily("multinomial") | ||
| val model = mlr.fit(constantData) | ||
| val results = model.transform(constantData) | ||
|
|
@@ -1961,11 +1961,10 @@ class LogisticRegressionSuite | |
| } | ||
|
|
||
| // force the model to be trained with only one class | ||
| val constantZeroData = spark.createDataFrame(Seq( | ||
| val constantZeroData = Seq( | ||
| LabeledPoint(0.0, Vectors.dense(0.0)), | ||
| LabeledPoint(0.0, Vectors.dense(1.0)), | ||
| LabeledPoint(0.0, Vectors.dense(2.0))) | ||
| ) | ||
| LabeledPoint(0.0, Vectors.dense(2.0))).toDF() | ||
| val modelZeroLabel = mlr.setFitIntercept(false).fit(constantZeroData) | ||
| val resultsZero = modelZeroLabel.transform(constantZeroData) | ||
| resultsZero.select("rawPrediction", "probability", "prediction").collect().foreach { | ||
|
|
@@ -1990,20 +1989,18 @@ class LogisticRegressionSuite | |
| } | ||
|
|
||
| test("compressed storage") { | ||
| val moreClassesThanFeatures = spark.createDataFrame(Seq( | ||
| val moreClassesThanFeatures = Seq( | ||
| LabeledPoint(4.0, Vectors.dense(0.0, 0.0, 0.0)), | ||
| LabeledPoint(4.0, Vectors.dense(1.0, 1.0, 1.0)), | ||
| LabeledPoint(4.0, Vectors.dense(2.0, 2.0, 2.0))) | ||
| ) | ||
| LabeledPoint(4.0, Vectors.dense(2.0, 2.0, 2.0))).toDF() | ||
| val mlr = new LogisticRegression().setFamily("multinomial") | ||
| val model = mlr.fit(moreClassesThanFeatures) | ||
| assert(model.coefficientMatrix.isInstanceOf[SparseMatrix]) | ||
| assert(model.coefficientMatrix.asInstanceOf[SparseMatrix].colPtrs.length === 4) | ||
| val moreFeaturesThanClasses = spark.createDataFrame(Seq( | ||
| val moreFeaturesThanClasses = Seq( | ||
| LabeledPoint(1.0, Vectors.dense(0.0, 0.0, 0.0)), | ||
| LabeledPoint(1.0, Vectors.dense(1.0, 1.0, 1.0)), | ||
| LabeledPoint(1.0, Vectors.dense(2.0, 2.0, 2.0))) | ||
| ) | ||
| LabeledPoint(1.0, Vectors.dense(2.0, 2.0, 2.0))).toDF() | ||
| val model2 = mlr.fit(moreFeaturesThanClasses) | ||
| assert(model2.coefficientMatrix.isInstanceOf[SparseMatrix]) | ||
| assert(model2.coefficientMatrix.asInstanceOf[SparseMatrix].colPtrs.length === 3) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,8 @@ import org.apache.spark.sql.{DataFrame, Dataset, Row} | |
|
|
||
| class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { | ||
|
|
||
| import testImplicits._ | ||
|
|
||
| @transient var dataset: Dataset[_] = _ | ||
|
|
||
| override def beforeAll(): Unit = { | ||
|
|
@@ -47,7 +49,7 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa | |
| Array(0.10, 0.10, 0.70, 0.10) // label 2 | ||
| ).map(_.map(math.log)) | ||
|
|
||
| dataset = spark.createDataFrame(generateNaiveBayesInput(pi, theta, 100, 42)) | ||
| dataset = generateNaiveBayesInput(pi, theta, 100, 42).toDF() | ||
|
||
| } | ||
|
|
||
| def validatePrediction(predictionAndLabels: DataFrame): Unit = { | ||
|
|
@@ -131,16 +133,16 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa | |
| val pi = Vectors.dense(piArray) | ||
| val theta = new DenseMatrix(3, 4, thetaArray.flatten, true) | ||
|
|
||
| val testDataset = spark.createDataFrame(generateNaiveBayesInput( | ||
| piArray, thetaArray, nPoints, 42, "multinomial")) | ||
| val testDataset = | ||
| generateNaiveBayesInput(piArray, thetaArray, nPoints, 42, "multinomial").toDF() | ||
| val nb = new NaiveBayes().setSmoothing(1.0).setModelType("multinomial") | ||
| val model = nb.fit(testDataset) | ||
|
|
||
| validateModelFit(pi, theta, model) | ||
| assert(model.hasParent) | ||
|
|
||
| val validationDataset = spark.createDataFrame(generateNaiveBayesInput( | ||
| piArray, thetaArray, nPoints, 17, "multinomial")) | ||
| val validationDataset = | ||
| generateNaiveBayesInput(piArray, thetaArray, nPoints, 17, "multinomial").toDF() | ||
|
|
||
| val predictionAndLabels = model.transform(validationDataset).select("prediction", "label") | ||
| validatePrediction(predictionAndLabels) | ||
|
|
@@ -161,16 +163,16 @@ class NaiveBayesSuite extends SparkFunSuite with MLlibTestSparkContext with Defa | |
| val pi = Vectors.dense(piArray) | ||
| val theta = new DenseMatrix(3, 12, thetaArray.flatten, true) | ||
|
|
||
| val testDataset = spark.createDataFrame(generateNaiveBayesInput( | ||
| piArray, thetaArray, nPoints, 45, "bernoulli")) | ||
| val testDataset = | ||
| generateNaiveBayesInput(piArray, thetaArray, nPoints, 45, "bernoulli").toDF() | ||
| val nb = new NaiveBayes().setSmoothing(1.0).setModelType("bernoulli") | ||
| val model = nb.fit(testDataset) | ||
|
|
||
| validateModelFit(pi, theta, model) | ||
| assert(model.hasParent) | ||
|
|
||
| val validationDataset = spark.createDataFrame(generateNaiveBayesInput( | ||
| piArray, thetaArray, nPoints, 20, "bernoulli")) | ||
| val validationDataset = | ||
| generateNaiveBayesInput(piArray, thetaArray, nPoints, 20, "bernoulli").toDF() | ||
|
|
||
| val predictionAndLabels = model.transform(validationDataset).select("prediction", "label") | ||
| validatePrediction(predictionAndLabels) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,6 +37,8 @@ import org.apache.spark.sql.types.Metadata | |
|
|
||
| class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { | ||
|
|
||
| import testImplicits._ | ||
|
|
||
| @transient var dataset: Dataset[_] = _ | ||
| @transient var rdd: RDD[LabeledPoint] = _ | ||
|
|
||
|
|
@@ -55,7 +57,7 @@ class OneVsRestSuite extends SparkFunSuite with MLlibTestSparkContext with Defau | |
| val xVariance = Array(0.6856, 0.1899, 3.116, 0.581) | ||
| rdd = sc.parallelize(generateMultinomialLogisticInput( | ||
| coefficients, xMean, xVariance, true, nPoints, 42), 2) | ||
| dataset = spark.createDataFrame(rdd) | ||
| dataset = rdd.toDF() | ||
|
||
| } | ||
|
|
||
| test("params") { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,7 @@ class RandomForestClassifierSuite | |
| extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest { | ||
|
|
||
| import RandomForestClassifierSuite.compareAPIs | ||
| import testImplicits._ | ||
|
|
||
| private var orderedLabeledPoints50_1000: RDD[LabeledPoint] = _ | ||
| private var orderedLabeledPoints5_20: RDD[LabeledPoint] = _ | ||
|
|
@@ -158,7 +159,7 @@ class RandomForestClassifierSuite | |
| } | ||
|
|
||
| test("Fitting without numClasses in metadata") { | ||
| val df: DataFrame = spark.createDataFrame(TreeTests.featureImportanceData(sc)) | ||
| val df: DataFrame = TreeTests.featureImportanceData(sc).toDF() | ||
|
||
| val rf = new RandomForestClassifier().setMaxDepth(1).setNumTrees(1) | ||
| rf.fit(df) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wonder why this line is separate not part of 139? Any reason?