diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala index 7d36402a5937..de52740e7d98 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/RandomForestClassifier.scala @@ -21,7 +21,6 @@ import org.json4s.{DefaultFormats, JObject} import org.json4s.JsonDSL._ import org.apache.spark.annotation.Since -import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree._ @@ -33,7 +32,6 @@ import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.model.{RandomForestModel => OldRandomForestModel} -import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types.StructType @@ -69,6 +67,10 @@ class RandomForestClassifier @Since("1.4.0") ( @Since("1.4.0") def setMinInstancesPerNode(value: Int): this.type = set(minInstancesPerNode, value) + /** @group setParam */ + @Since("3.0.0") + def setMinWeightFractionPerNode(value: Double): this.type = set(minWeightFractionPerNode, value) + /** @group setParam */ @Since("1.4.0") def setMinInfoGain(value: Double): this.type = set(minInfoGain, value) @@ -118,6 +120,16 @@ class RandomForestClassifier @Since("1.4.0") ( def setFeatureSubsetStrategy(value: String): this.type = set(featureSubsetStrategy, value) + /** + * Sets the value of param [[weightCol]]. + * If this is not set or empty, we treat all instance weights as 1.0. + * By default the weightCol is not set, so all instances have weight 1.0. + * + * @group setParam + */ + @Since("3.0.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + override protected def train( dataset: Dataset[_]): RandomForestClassificationModel = instrumented { instr => instr.logPipelineStage(this) @@ -132,14 +144,14 @@ class RandomForestClassifier @Since("1.4.0") ( s" numClasses=$numClasses, but thresholds has length ${$(thresholds).length}") } - val instances: RDD[Instance] = extractLabeledPoints(dataset, numClasses).map(_.toInstance) + val instances = extractInstances(dataset, numClasses) val strategy = super.getOldStrategy(categoricalFeatures, numClasses, OldAlgo.Classification, getOldImpurity) - instr.logParams(this, labelCol, featuresCol, predictionCol, probabilityCol, rawPredictionCol, - leafCol, impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, - minInfoGain, minInstancesPerNode, seed, subsamplingRate, thresholds, cacheNodeIds, - checkpointInterval) + instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, probabilityCol, + rawPredictionCol, leafCol, impurity, numTrees, featureSubsetStrategy, maxDepth, maxBins, + maxMemoryInMB, minInfoGain, minInstancesPerNode, minWeightFractionPerNode, seed, + subsamplingRate, thresholds, cacheNodeIds, checkpointInterval) val trees = RandomForest .run(instances, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala index fa4dbbb47079..88377d610f29 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/RandomForestRegressor.scala @@ -64,6 +64,10 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S @Since("1.4.0") def setMinInstancesPerNode(value: Int): this.type = set(minInstancesPerNode, value) + /** @group setParam */ + @Since("3.0.0") + def setMinWeightFractionPerNode(value: Double): this.type = set(minWeightFractionPerNode, value) + /** @group setParam */ @Since("1.4.0") def setMinInfoGain(value: Double): this.type = set(minInfoGain, value) @@ -113,20 +117,31 @@ class RandomForestRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S def setFeatureSubsetStrategy(value: String): this.type = set(featureSubsetStrategy, value) + /** + * Sets the value of param [[weightCol]]. + * If this is not set or empty, we treat all instance weights as 1.0. + * By default the weightCol is not set, so all instances have weight 1.0. + * + * @group setParam + */ + @Since("3.0.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + override protected def train( dataset: Dataset[_]): RandomForestRegressionModel = instrumented { instr => val categoricalFeatures: Map[Int, Int] = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) - val instances = extractLabeledPoints(dataset).map(_.toInstance) + val instances = extractInstances(dataset) val strategy = super.getOldStrategy(categoricalFeatures, numClasses = 0, OldAlgo.Regression, getOldImpurity) instr.logPipelineStage(this) instr.logDataset(instances) - instr.logParams(this, labelCol, featuresCol, predictionCol, leafCol, impurity, numTrees, - featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain, - minInstancesPerNode, seed, subsamplingRate, cacheNodeIds, checkpointInterval) + instr.logParams(this, labelCol, featuresCol, weightCol, predictionCol, leafCol, impurity, + numTrees, featureSubsetStrategy, maxDepth, maxBins, maxMemoryInMB, minInfoGain, + minInstancesPerNode, minWeightFractionPerNode, seed, subsamplingRate, cacheNodeIds, + checkpointInterval) val trees = RandomForest .run(instances, strategy, getNumTrees, getFeatureSubsetStrategy, getSeed, Some(instr)) diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala index aac3dbf6c5a6..0ec7c6f03a11 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/BaggedPoint.scala @@ -65,7 +65,8 @@ private[spark] object BaggedPoint { seed: Long = Utils.random.nextLong()): RDD[BaggedPoint[Datum]] = { // TODO: implement weighted bootstrapping if (withReplacement) { - convertToBaggedRDDSamplingWithReplacement(input, subsamplingRate, numSubsamples, seed) + convertToBaggedRDDSamplingWithReplacement(input, subsamplingRate, numSubsamples, + extractSampleWeight, seed) } else { if (numSubsamples == 1 && subsamplingRate == 1.0) { convertToBaggedRDDWithoutSampling(input, extractSampleWeight) @@ -104,6 +105,7 @@ private[spark] object BaggedPoint { input: RDD[Datum], subsample: Double, numSubsamples: Int, + extractSampleWeight: (Datum => Double), seed: Long): RDD[BaggedPoint[Datum]] = { input.mapPartitionsWithIndex { (partitionIndex, instances) => // Use random seed = seed + partitionIndex + 1 to make generation reproducible. @@ -116,7 +118,7 @@ private[spark] object BaggedPoint { subsampleCounts(subsampleIndex) = poisson.sample() subsampleIndex += 1 } - new BaggedPoint(instance, subsampleCounts) + new BaggedPoint(instance, subsampleCounts, extractSampleWeight(instance)) } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala index bf0f521cd28d..b21ab81438d2 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/RandomForestClassifierSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml.classification import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.classification.LinearSVCSuite.generateSVMInput import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite @@ -41,6 +42,8 @@ class RandomForestClassifierSuite extends MLTest with DefaultReadWriteTest { private var orderedLabeledPoints50_1000: RDD[LabeledPoint] = _ private var orderedLabeledPoints5_20: RDD[LabeledPoint] = _ + private var binaryDataset: DataFrame = _ + private val seed = 42 override def beforeAll(): Unit = { super.beforeAll() @@ -50,6 +53,7 @@ class RandomForestClassifierSuite extends MLTest with DefaultReadWriteTest { orderedLabeledPoints5_20 = sc.parallelize(EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 5, 20)) .map(_.asML) + binaryDataset = generateSVMInput(0.01, Array[Double](-1.5, 1.0), 1000, seed).toDF() } ///////////////////////////////////////////////////////////////////////////// @@ -259,6 +263,37 @@ class RandomForestClassifierSuite extends MLTest with DefaultReadWriteTest { }) } + test("training with sample weights") { + val df = binaryDataset + val numClasses = 2 + // (numTrees, maxDepth, subsamplingRate, fractionInTol) + val testParams = Seq( + (20, 5, 1.0, 0.96), + (20, 10, 1.0, 0.96), + (20, 10, 0.95, 0.96) + ) + + for ((numTrees, maxDepth, subsamplingRate, tol) <- testParams) { + val estimator = new RandomForestClassifier() + .setNumTrees(numTrees) + .setMaxDepth(maxDepth) + .setSubsamplingRate(subsamplingRate) + .setSeed(seed) + .setMinWeightFractionPerNode(0.049) + + MLTestingUtils.testArbitrarilyScaledWeights[RandomForestClassificationModel, + RandomForestClassifier](df.as[LabeledPoint], estimator, + MLTestingUtils.modelPredictionEquals(df, _ == _, tol)) + MLTestingUtils.testOutliersWithSmallWeights[RandomForestClassificationModel, + RandomForestClassifier](df.as[LabeledPoint], estimator, + numClasses, MLTestingUtils.modelPredictionEquals(df, _ == _, tol), + outlierRatio = 2) + MLTestingUtils.testOversamplingVsWeighting[RandomForestClassificationModel, + RandomForestClassifier](df.as[LabeledPoint], estimator, + MLTestingUtils.modelPredictionEquals(df, _ == _, tol), seed) + } + } + ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala index f3b0f0470e57..fff5cdd4ec50 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/RandomForestRegressorSuite.scala @@ -22,9 +22,11 @@ import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.tree.impl.TreeTests import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils} +import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} import org.apache.spark.mllib.tree.{EnsembleTestHelper, RandomForest => OldRandomForest} import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} +import org.apache.spark.mllib.util.LinearDataGenerator import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} @@ -37,12 +39,18 @@ class RandomForestRegressorSuite extends MLTest with DefaultReadWriteTest{ import testImplicits._ private var orderedLabeledPoints50_1000: RDD[LabeledPoint] = _ + private var linearRegressionData: DataFrame = _ + private val seed = 42 override def beforeAll(): Unit = { super.beforeAll() orderedLabeledPoints50_1000 = sc.parallelize(EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 50, 1000) .map(_.asML)) + + linearRegressionData = sc.parallelize(LinearDataGenerator.generateLinearInput( + intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3), + xVariance = Array(0.7, 1.2), nPoints = 1000, seed, eps = 0.5), 2).map(_.asML).toDF() } ///////////////////////////////////////////////////////////////////////////// @@ -158,6 +166,37 @@ class RandomForestRegressorSuite extends MLTest with DefaultReadWriteTest{ }) } + test("training with sample weights") { + val df = linearRegressionData + val numClasses = 0 + // (numTrees, maxDepth, subsamplingRate, fractionInTol) + val testParams = Seq( + (50, 5, 1.0, 0.75), + (50, 10, 1.0, 0.75), + (50, 10, 0.95, 0.78) + ) + + for ((numTrees, maxDepth, subsamplingRate, tol) <- testParams) { + val estimator = new RandomForestRegressor() + .setNumTrees(numTrees) + .setMaxDepth(maxDepth) + .setSubsamplingRate(subsamplingRate) + .setSeed(seed) + .setMinWeightFractionPerNode(0.05) + + MLTestingUtils.testArbitrarilyScaledWeights[RandomForestRegressionModel, + RandomForestRegressor](df.as[LabeledPoint], estimator, + MLTestingUtils.modelPredictionEquals(df, _ ~= _ relTol 0.2, tol)) + MLTestingUtils.testOutliersWithSmallWeights[RandomForestRegressionModel, + RandomForestRegressor](df.as[LabeledPoint], estimator, + numClasses, MLTestingUtils.modelPredictionEquals(df, _ ~= _ relTol 0.2, tol), + outlierRatio = 2) + MLTestingUtils.testOversamplingVsWeighting[RandomForestRegressionModel, + RandomForestRegressor](df.as[LabeledPoint], estimator, + MLTestingUtils.modelPredictionEquals(df, _ ~= _ relTol 0.2, tol), seed) + } + } + ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala index 1f7a1ba81624..2a95faef98b6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/BaggedPointSuite.scala @@ -54,8 +54,7 @@ class BaggedPointSuite extends SparkFunSuite with MLlibTestSparkContext { baggedRDD.map(_.subsampleCounts.map(_.toDouble)).collect() EnsembleTestHelper.testRandomArrays(subsampleCounts, numSubsamples, expectedMean, expectedStddev, epsilon = 0.01) - // should ignore weight function for now - assert(baggedRDD.collect().forall(_.sampleWeight === 1.0)) + assert(baggedRDD.collect().forall(_.sampleWeight === 2.0)) } } diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py index af295ac00845..95f8af99effd 100644 --- a/python/pyspark/ml/classification.py +++ b/python/pyspark/ml/classification.py @@ -1387,6 +1387,8 @@ class RandomForestClassifier(JavaProbabilisticClassifier, _RandomForestClassifie >>> td = si_model.transform(df) >>> rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol="indexed", seed=42, ... leafCol="leafId") + >>> rf.getMinWeightFractionPerNode() + 0.0 >>> model = rf.fit(td) >>> model.getLabelCol() 'indexed' @@ -1441,14 +1443,14 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", numTrees=20, featureSubsetStrategy="auto", seed=None, subsamplingRate=1.0, - leafCol="", minWeightFractionPerNode=0.0): + leafCol="", minWeightFractionPerNode=0.0, weightCol=None): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ probabilityCol="probability", rawPredictionCol="rawPrediction", \ maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="gini", \ numTrees=20, featureSubsetStrategy="auto", seed=None, subsamplingRate=1.0, \ - leafCol="", minWeightFractionPerNode=0.0) + leafCol="", minWeightFractionPerNode=0.0, weightCol=None) """ super(RandomForestClassifier, self).__init__() self._java_obj = self._new_java_obj( @@ -1467,14 +1469,14 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=None, impurity="gini", numTrees=20, featureSubsetStrategy="auto", subsamplingRate=1.0, - leafCol="", minWeightFractionPerNode=0.0): + leafCol="", minWeightFractionPerNode=0.0, weightCol=None): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ probabilityCol="probability", rawPredictionCol="rawPrediction", \ maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, seed=None, \ impurity="gini", numTrees=20, featureSubsetStrategy="auto", subsamplingRate=1.0, \ - leafCol="", minWeightFractionPerNode=0.0) + leafCol="", minWeightFractionPerNode=0.0, weightCol=None) Sets params for linear classification. """ kwargs = self._input_kwargs @@ -1559,6 +1561,20 @@ def setCheckpointInterval(self, value): """ return self._set(checkpointInterval=value) + @since("3.0.0") + def setWeightCol(self, value): + """ + Sets the value of :py:attr:`weightCol`. + """ + return self._set(weightCol=value) + + @since("3.0.0") + def setMinWeightFractionPerNode(self, value): + """ + Sets the value of :py:attr:`minWeightFractionPerNode`. + """ + return self._set(minWeightFractionPerNode=value) + class RandomForestClassificationModel(_TreeEnsembleModel, JavaProbabilisticClassificationModel, _RandomForestClassifierParams, JavaMLWritable, diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py index 23c102b7643b..50d50ea8afa8 100644 --- a/python/pyspark/ml/regression.py +++ b/python/pyspark/ml/regression.py @@ -995,6 +995,8 @@ class RandomForestRegressor(JavaPredictor, _RandomForestRegressorParams, JavaMLW ... (1.0, Vectors.dense(1.0)), ... (0.0, Vectors.sparse(1, [], []))], ["label", "features"]) >>> rf = RandomForestRegressor(numTrees=2, maxDepth=2) + >>> rf.getMinWeightFractionPerNode() + 0.0 >>> rf.setSeed(42) RandomForestRegressor... >>> model = rf.fit(df) @@ -1044,13 +1046,15 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", subsamplingRate=1.0, seed=None, numTrees=20, - featureSubsetStrategy="auto", leafCol="", minWeightFractionPerNode=0.0): + featureSubsetStrategy="auto", leafCol="", minWeightFractionPerNode=0.0, + weightCol=None): """ __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \ impurity="variance", subsamplingRate=1.0, seed=None, numTrees=20, \ - featureSubsetStrategy="auto", leafCol=", minWeightFractionPerNode=0.0") + featureSubsetStrategy="auto", leafCol=", minWeightFractionPerNode=0.0", \ + weightCol=None) """ super(RandomForestRegressor, self).__init__() self._java_obj = self._new_java_obj( @@ -1068,13 +1072,15 @@ def setParams(self, featuresCol="features", labelCol="label", predictionCol="pre maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, impurity="variance", subsamplingRate=1.0, seed=None, numTrees=20, - featureSubsetStrategy="auto", leafCol="", minWeightFractionPerNode=0.0): + featureSubsetStrategy="auto", leafCol="", minWeightFractionPerNode=0.0, + weightCol=None): """ setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \ maxDepth=5, maxBins=32, minInstancesPerNode=1, minInfoGain=0.0, \ maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10, \ impurity="variance", subsamplingRate=1.0, seed=None, numTrees=20, \ - featureSubsetStrategy="auto", leafCol="", minWeightFractionPerNode=0.0) + featureSubsetStrategy="auto", leafCol="", minWeightFractionPerNode=0.0, \ + weightCol=None) Sets params for linear regression. """ kwargs = self._input_kwargs @@ -1159,6 +1165,20 @@ def setSeed(self, value): """ return self._set(seed=value) + @since("3.0.0") + def setWeightCol(self, value): + """ + Sets the value of :py:attr:`weightCol`. + """ + return self._set(weightCol=value) + + @since("3.0.0") + def setMinWeightFractionPerNode(self, value): + """ + Sets the value of :py:attr:`minWeightFractionPerNode`. + """ + return self._set(minWeightFractionPerNode=value) + class RandomForestRegressionModel(_TreeEnsembleModel, _RandomForestRegressorParams, JavaMLWritable, JavaMLReadable):