diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala index 9ac673078d4a..3bff236677e6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala @@ -53,7 +53,7 @@ private[spark] trait ClassifierParams val validateInstance = (instance: Instance) => { val label = instance.label require(label.toLong == label && label >= 0 && label < numClasses, s"Classifier was given" + - s" dataset with invalid label $label. Labels must be integers in range" + + s" dataset with invalid label $label. Labels must be integers in range" + s" [0, $numClasses).") } extractInstances(dataset, validateInstance) diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala index 74624be360c6..5bc45f2b02a4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/GBTClassifier.scala @@ -23,7 +23,7 @@ import org.json4s.JsonDSL._ import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.regression.DecisionTreeRegressionModel @@ -34,7 +34,7 @@ import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel} -import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.functions._ /** @@ -79,6 +79,10 @@ class GBTClassifier @Since("1.4.0") ( @Since("1.4.0") def setMinInstancesPerNode(value: Int): this.type = set(minInstancesPerNode, value) + /** @group setParam */ + @Since("3.0.0") + def setMinWeightFractionPerNode(value: Double): this.type = set(minWeightFractionPerNode, value) + /** @group setParam */ @Since("1.4.0") def setMinInfoGain(value: Double): this.type = set(minInfoGain, value) @@ -152,36 +156,34 @@ class GBTClassifier @Since("1.4.0") ( set(validationIndicatorCol, value) } + /** + * Sets the value of param [[weightCol]]. + * If this is not set or empty, we treat all instance weights as 1.0. + * By default the weightCol is not set, so all instances have weight 1.0. + * + * @group setParam + */ + @Since("3.0.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + override protected def train( dataset: Dataset[_]): GBTClassificationModel = instrumented { instr => - val categoricalFeatures: Map[Int, Int] = - MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) - val withValidation = isDefined(validationIndicatorCol) && $(validationIndicatorCol).nonEmpty - // We copy and modify this from Classifier.extractLabeledPoints since GBT only supports - // 2 classes now. This lets us provide a more precise error message. - val convert2LabeledPoint = (dataset: Dataset[_]) => { - dataset.select(col($(labelCol)), col($(featuresCol))).rdd.map { - case Row(label: Double, features: Vector) => - require(label == 0 || label == 1, s"GBTClassifier was given" + - s" dataset with invalid label $label. Labels must be in {0,1}; note that" + - s" GBTClassifier currently only supports binary classification.") - LabeledPoint(label, features) - } + val validateInstance = (instance: Instance) => { + val label = instance.label + require(label == 0 || label == 1, s"GBTClassifier was given" + + s" dataset with invalid label $label. Labels must be in {0,1}; note that" + + s" GBTClassifier currently only supports binary classification.") } val (trainDataset, validationDataset) = if (withValidation) { - ( - convert2LabeledPoint(dataset.filter(not(col($(validationIndicatorCol))))), - convert2LabeledPoint(dataset.filter(col($(validationIndicatorCol)))) - ) + (extractInstances(dataset.filter(not(col($(validationIndicatorCol)))), validateInstance), + extractInstances(dataset.filter(col($(validationIndicatorCol))), validateInstance)) } else { - (convert2LabeledPoint(dataset), null) + (extractInstances(dataset, validateInstance), null) } - val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Classification) - val numClasses = 2 if (isDefined(thresholds)) { require($(thresholds).length == numClasses, this.getClass.getSimpleName + @@ -191,12 +193,14 @@ class GBTClassifier @Since("1.4.0") ( instr.logPipelineStage(this) instr.logDataset(dataset) - instr.logParams(this, labelCol, featuresCol, predictionCol, leafCol, impurity, - lossType, maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode, - seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval, featureSubsetStrategy, - validationIndicatorCol, validationTol) + instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, leafCol, + impurity, lossType, maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, + minInstancesPerNode, minWeightFractionPerNode, seed, stepSize, subsamplingRate, cacheNodeIds, + checkpointInterval, featureSubsetStrategy, validationIndicatorCol, validationTol) instr.logNumClasses(numClasses) + val categoricalFeatures = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) + val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Classification) val (baseLearners, learnerWeights) = if (withValidation) { GradientBoostedTrees.runWithValidation(trainDataset, validationDataset, boostingStrategy, $(seed), $(featureSubsetStrategy)) @@ -374,12 +378,9 @@ class GBTClassificationModel private[ml]( */ @Since("2.4.0") def evaluateEachIteration(dataset: Dataset[_]): Array[Double] = { - val data = dataset.select(col($(labelCol)), col($(featuresCol))).rdd.map { - case Row(label: Double, features: Vector) => LabeledPoint(label, features) - } + val data = extractInstances(dataset) GradientBoostedTrees.evaluateEachIteration(data, trees, treeWeights, loss, - OldAlgo.Classification - ) + OldAlgo.Classification) } @Since("2.0.0") @@ -423,10 +424,9 @@ object GBTClassificationModel extends MLReadable[GBTClassificationModel] { val numFeatures = (metadata.metadata \ numFeaturesKey).extract[Int] val numTrees = (metadata.metadata \ numTreesKey).extract[Int] - val trees: Array[DecisionTreeRegressionModel] = treesData.map { + val trees = treesData.map { case (treeMetadata, root) => - val tree = - new DecisionTreeRegressionModel(treeMetadata.uid, root, numFeatures) + val tree = new DecisionTreeRegressionModel(treeMetadata.uid, root, numFeatures) treeMetadata.getAndSetParams(tree) tree } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala index dd56fbbfa2b6..11d0c4689cbb 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala @@ -26,7 +26,7 @@ import org.apache.spark.ml.linalg.Vector * @param weight The weight of this instance. * @param features The vector of features for this data point. */ -private[ml] case class Instance(label: Double, weight: Double, features: Vector) +private[spark] case class Instance(label: Double, weight: Double, features: Vector) /** * Case class that represents an instance of data point with diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala index 602b5fac20d3..05851d511675 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/DecisionTreeRegressor.scala @@ -23,7 +23,7 @@ import org.json4s.JsonDSL._ import org.apache.spark.annotation.Since import org.apache.spark.ml.{PredictionModel, Predictor} -import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree._ @@ -132,15 +132,14 @@ class DecisionTreeRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: S /** (private[ml]) Train a decision tree on an RDD */ private[ml] def train( - data: RDD[LabeledPoint], + data: RDD[Instance], oldStrategy: OldStrategy, featureSubsetStrategy: String): DecisionTreeRegressionModel = instrumented { instr => instr.logPipelineStage(this) instr.logDataset(data) instr.logParams(this, params: _*) - val instances = data.map(_.toInstance) - val trees = RandomForest.run(instances, oldStrategy, numTrees = 1, + val trees = RandomForest.run(data, oldStrategy, numTrees = 1, featureSubsetStrategy, seed = $(seed), instr = Some(instr), parentUID = Some(uid)) trees.head.asInstanceOf[DecisionTreeRegressionModel] diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala index 0cc06d82bf3f..9c38647642a6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/GBTRegressor.scala @@ -24,7 +24,6 @@ import org.json4s.JsonDSL._ import org.apache.spark.annotation.Since import org.apache.spark.internal.Logging import org.apache.spark.ml.{PredictionModel, Predictor} -import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.tree._ @@ -34,7 +33,7 @@ import org.apache.spark.ml.util.DefaultParamsReader.Metadata import org.apache.spark.ml.util.Instrumentation.instrumented import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel => OldGBTModel} -import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} +import org.apache.spark.sql.{Column, DataFrame, Dataset} import org.apache.spark.sql.functions._ /** @@ -78,6 +77,10 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String) @Since("1.4.0") def setMinInstancesPerNode(value: Int): this.type = set(minInstancesPerNode, value) + /** @group setParam */ + @Since("3.0.0") + def setMinWeightFractionPerNode(value: Double): this.type = set(minWeightFractionPerNode, value) + /** @group setParam */ @Since("1.4.0") def setMinInfoGain(value: Double): this.type = set(minInfoGain, value) @@ -151,29 +154,35 @@ class GBTRegressor @Since("1.4.0") (@Since("1.4.0") override val uid: String) set(validationIndicatorCol, value) } - override protected def train(dataset: Dataset[_]): GBTRegressionModel = instrumented { instr => - val categoricalFeatures: Map[Int, Int] = - MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) + /** + * Sets the value of param [[weightCol]]. + * If this is not set or empty, we treat all instance weights as 1.0. + * By default the weightCol is not set, so all instances have weight 1.0. + * + * @group setParam + */ + @Since("3.0.0") + def setWeightCol(value: String): this.type = set(weightCol, value) + override protected def train(dataset: Dataset[_]): GBTRegressionModel = instrumented { instr => val withValidation = isDefined(validationIndicatorCol) && $(validationIndicatorCol).nonEmpty val (trainDataset, validationDataset) = if (withValidation) { - ( - extractLabeledPoints(dataset.filter(not(col($(validationIndicatorCol))))), - extractLabeledPoints(dataset.filter(col($(validationIndicatorCol)))) - ) + (extractInstances(dataset.filter(not(col($(validationIndicatorCol))))), + extractInstances(dataset.filter(col($(validationIndicatorCol))))) } else { - (extractLabeledPoints(dataset), null) + (extractInstances(dataset), null) } - val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression) instr.logPipelineStage(this) instr.logDataset(dataset) - instr.logParams(this, labelCol, featuresCol, predictionCol, leafCol, impurity, lossType, - maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode, - seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval, featureSubsetStrategy, - validationIndicatorCol, validationTol) + instr.logParams(this, labelCol, featuresCol, predictionCol, leafCol, weightCol, impurity, + lossType, maxDepth, maxBins, maxIter, maxMemoryInMB, minInfoGain, minInstancesPerNode, + minWeightFractionPerNode, seed, stepSize, subsamplingRate, cacheNodeIds, checkpointInterval, + featureSubsetStrategy, validationIndicatorCol, validationTol) + val categoricalFeatures = MetadataUtils.getCategoricalFeatures(dataset.schema($(featuresCol))) + val boostingStrategy = super.getOldBoostingStrategy(categoricalFeatures, OldAlgo.Regression) val (baseLearners, learnerWeights) = if (withValidation) { GradientBoostedTrees.runWithValidation(trainDataset, validationDataset, boostingStrategy, $(seed), $(featureSubsetStrategy)) @@ -323,9 +332,7 @@ class GBTRegressionModel private[ml]( */ @Since("2.4.0") def evaluateEachIteration(dataset: Dataset[_], loss: String): Array[Double] = { - val data = dataset.select(col($(labelCol)), col($(featuresCol))).rdd.map { - case Row(label: Double, features: Vector) => LabeledPoint(label, features) - } + val data = extractInstances(dataset) GradientBoostedTrees.evaluateEachIteration(data, trees, treeWeights, convertToOldLossType(loss), OldAlgo.Regression) } @@ -368,10 +375,9 @@ object GBTRegressionModel extends MLReadable[GBTRegressionModel] { val numFeatures = (metadata.metadata \ "numFeatures").extract[Int] val numTrees = (metadata.metadata \ "numTrees").extract[Int] - val trees: Array[DecisionTreeRegressionModel] = treesData.map { + val trees = treesData.map { case (treeMetadata, root) => - val tree = - new DecisionTreeRegressionModel(treeMetadata.uid, root, numFeatures) + val tree = new DecisionTreeRegressionModel(treeMetadata.uid, root, numFeatures) treeMetadata.getAndSetParams(tree) tree } diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala index c31334c92e1c..744708258b0a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/GradientBoostedTrees.scala @@ -18,7 +18,7 @@ package org.apache.spark.ml.tree.impl import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg.Vector import org.apache.spark.ml.regression.{DecisionTreeRegressionModel, DecisionTreeRegressor} import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} @@ -34,13 +34,13 @@ private[spark] object GradientBoostedTrees extends Logging { /** * Method to train a gradient boosting model - * @param input Training dataset: RDD of `LabeledPoint`. + * @param input Training dataset: RDD of `Instance`. * @param seed Random seed. * @return tuple of ensemble models and weights: * (array of decision tree models, array of model weights) */ def run( - input: RDD[LabeledPoint], + input: RDD[Instance], boostingStrategy: OldBoostingStrategy, seed: Long, featureSubsetStrategy: String): (Array[DecisionTreeRegressionModel], Array[Double]) = { @@ -51,7 +51,7 @@ private[spark] object GradientBoostedTrees extends Logging { seed, featureSubsetStrategy) case OldAlgo.Classification => // Map labels to -1, +1 so binary classification can be treated as regression. - val remappedInput = input.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) + val remappedInput = input.map(x => Instance((x.label * 2) - 1, x.weight, x.features)) GradientBoostedTrees.boost(remappedInput, remappedInput, boostingStrategy, validate = false, seed, featureSubsetStrategy) case _ => @@ -61,7 +61,7 @@ private[spark] object GradientBoostedTrees extends Logging { /** * Method to validate a gradient boosting model - * @param input Training dataset: RDD of `LabeledPoint`. + * @param input Training dataset: RDD of `Instance`. * @param validationInput Validation dataset. * This dataset should be different from the training dataset, * but it should follow the same distribution. @@ -72,8 +72,8 @@ private[spark] object GradientBoostedTrees extends Logging { * (array of decision tree models, array of model weights) */ def runWithValidation( - input: RDD[LabeledPoint], - validationInput: RDD[LabeledPoint], + input: RDD[Instance], + validationInput: RDD[Instance], boostingStrategy: OldBoostingStrategy, seed: Long, featureSubsetStrategy: String): (Array[DecisionTreeRegressionModel], Array[Double]) = { @@ -85,9 +85,9 @@ private[spark] object GradientBoostedTrees extends Logging { case OldAlgo.Classification => // Map labels to -1, +1 so binary classification can be treated as regression. val remappedInput = input.map( - x => new LabeledPoint((x.label * 2) - 1, x.features)) + x => Instance((x.label * 2) - 1, x.weight, x.features)) val remappedValidationInput = validationInput.map( - x => new LabeledPoint((x.label * 2) - 1, x.features)) + x => Instance((x.label * 2) - 1, x.weight, x.features)) GradientBoostedTrees.boost(remappedInput, remappedValidationInput, boostingStrategy, validate = true, seed, featureSubsetStrategy) case _ => @@ -106,13 +106,13 @@ private[spark] object GradientBoostedTrees extends Logging { * corresponding to every sample. */ def computeInitialPredictionAndError( - data: RDD[LabeledPoint], + data: RDD[Instance], initTreeWeight: Double, initTree: DecisionTreeRegressionModel, loss: OldLoss): RDD[(Double, Double)] = { - data.map { lp => - val pred = updatePrediction(lp.features, 0.0, initTree, initTreeWeight) - val error = loss.computeError(pred, lp.label) + data.map { case Instance(label, _, features) => + val pred = updatePrediction(features, 0.0, initTree, initTreeWeight) + val error = loss.computeError(pred, label) (pred, error) } } @@ -129,20 +129,17 @@ private[spark] object GradientBoostedTrees extends Logging { * corresponding to each sample. */ def updatePredictionError( - data: RDD[LabeledPoint], + data: RDD[Instance], predictionAndError: RDD[(Double, Double)], treeWeight: Double, tree: DecisionTreeRegressionModel, loss: OldLoss): RDD[(Double, Double)] = { - - val newPredError = data.zip(predictionAndError).mapPartitions { iter => - iter.map { case (lp, (pred, error)) => - val newPred = updatePrediction(lp.features, pred, tree, treeWeight) - val newError = loss.computeError(newPred, lp.label) + data.zip(predictionAndError).map { + case (Instance(label, _, features), (pred, _)) => + val newPred = updatePrediction(features, pred, tree, treeWeight) + val newError = loss.computeError(newPred, label) (newPred, newError) - } } - newPredError } /** @@ -166,29 +163,50 @@ private[spark] object GradientBoostedTrees extends Logging { * Method to calculate error of the base learner for the gradient boosting calculation. * Note: This method is not used by the gradient boosting algorithm but is useful for debugging * purposes. - * @param data Training dataset: RDD of `LabeledPoint`. + * @param data Training dataset: RDD of `Instance`. * @param trees Boosted Decision Tree models * @param treeWeights Learning rates at each boosting iteration. * @param loss evaluation metric. * @return Measure of model error on data */ - def computeError( - data: RDD[LabeledPoint], + def computeWeightedError( + data: RDD[Instance], trees: Array[DecisionTreeRegressionModel], treeWeights: Array[Double], loss: OldLoss): Double = { - data.map { lp => + val (errSum, weightSum) = data.map { case Instance(label, weight, features) => val predicted = trees.zip(treeWeights).foldLeft(0.0) { case (acc, (model, weight)) => - updatePrediction(lp.features, acc, model, weight) + updatePrediction(features, acc, model, weight) } - loss.computeError(predicted, lp.label) - }.mean() + (loss.computeError(predicted, label) * weight, weight) + }.treeReduce { case ((err1, weight1), (err2, weight2)) => + (err1 + err2, weight1 + weight2) + } + errSum / weightSum + } + + /** + * Method to calculate error of the base learner for the gradient boosting calculation. + * @param data Training dataset: RDD of `Instance`. + * @param predError Prediction and error. + * @return Measure of model error on data + */ + def computeWeightedError( + data: RDD[Instance], + predError: RDD[(Double, Double)]): Double = { + val (errSum, weightSum) = data.zip(predError).map { + case (Instance(_, weight, _), (_, err)) => + (err * weight, weight) + }.treeReduce { case ((err1, weight1), (err2, weight2)) => + (err1 + err2, weight1 + weight2) + } + errSum / weightSum } /** * Method to compute error or loss for every iteration of gradient boosting. * - * @param data RDD of `LabeledPoint` + * @param data RDD of `Instance` * @param trees Boosted Decision Tree models * @param treeWeights Learning rates at each boosting iteration. * @param loss evaluation metric. @@ -197,41 +215,34 @@ private[spark] object GradientBoostedTrees extends Logging { * containing the first i+1 trees */ def evaluateEachIteration( - data: RDD[LabeledPoint], + data: RDD[Instance], trees: Array[DecisionTreeRegressionModel], treeWeights: Array[Double], loss: OldLoss, algo: OldAlgo.Value): Array[Double] = { - - val sc = data.sparkContext val remappedData = algo match { - case OldAlgo.Classification => data.map(x => new LabeledPoint((x.label * 2) - 1, x.features)) + case OldAlgo.Classification => + data.map(x => Instance((x.label * 2) - 1, x.weight, x.features)) case _ => data } - val broadcastTrees = sc.broadcast(trees) - val localTreeWeights = treeWeights - val treesIndices = trees.indices - - val dataCount = remappedData.count() - val evaluation = remappedData.map { point => - treesIndices.map { idx => - val prediction = broadcastTrees.value(idx) - .rootNode - .predictImpl(point.features) - .prediction - prediction * localTreeWeights(idx) + val numTrees = trees.length + val (errSum, weightSum) = remappedData.mapPartitions { iter => + iter.map { case Instance(label, weight, features) => + val pred = Array.tabulate(numTrees) { i => + trees(i).rootNode.predictImpl(features) + .prediction * treeWeights(i) + } + val err = pred.scanLeft(0.0)(_ + _).drop(1) + .map(p => loss.computeError(p, label) * weight) + (err, weight) } - .scanLeft(0.0)(_ + _).drop(1) - .map(prediction => loss.computeError(prediction, point.label)) + }.treeReduce { case ((err1, weight1), (err2, weight2)) => + (0 until numTrees).foreach(i => err1(i) += err2(i)) + (err1, weight1 + weight2) } - .aggregate(treesIndices.map(_ => 0.0))( - (aggregated, row) => treesIndices.map(idx => aggregated(idx) + row(idx)), - (a, b) => treesIndices.map(idx => a(idx) + b(idx))) - .map(_ / dataCount) - broadcastTrees.destroy() - evaluation.toArray + errSum.map(_ / weightSum) } /** @@ -245,8 +256,8 @@ private[spark] object GradientBoostedTrees extends Logging { * (array of decision tree models, array of model weights) */ def boost( - input: RDD[LabeledPoint], - validationInput: RDD[LabeledPoint], + input: RDD[Instance], + validationInput: RDD[Instance], boostingStrategy: OldBoostingStrategy, validate: Boolean, seed: Long, @@ -280,8 +291,10 @@ private[spark] object GradientBoostedTrees extends Logging { } // Prepare periodic checkpointers + // Note: this is checkpointing the unweighted training error val predErrorCheckpointer = new PeriodicRDDCheckpointer[(Double, Double)]( treeStrategy.getCheckpointInterval, input.sparkContext) + // Note: this is checkpointing the unweighted validation error val validatePredErrorCheckpointer = new PeriodicRDDCheckpointer[(Double, Double)]( treeStrategy.getCheckpointInterval, input.sparkContext) @@ -299,26 +312,29 @@ private[spark] object GradientBoostedTrees extends Logging { baseLearners(0) = firstTreeModel baseLearnerWeights(0) = firstTreeWeight - var predError: RDD[(Double, Double)] = - computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss) + var predError = computeInitialPredictionAndError(input, firstTreeWeight, firstTreeModel, loss) predErrorCheckpointer.update(predError) - logDebug("error of gbt = " + predError.values.mean()) + logDebug("error of gbt = " + computeWeightedError(input, predError)) // Note: A model of type regression is used since we require raw prediction timer.stop("building tree 0") - var validatePredError: RDD[(Double, Double)] = + var validatePredError = computeInitialPredictionAndError(validationInput, firstTreeWeight, firstTreeModel, loss) if (validate) validatePredErrorCheckpointer.update(validatePredError) - var bestValidateError = if (validate) validatePredError.values.mean() else 0.0 + var bestValidateError = if (validate) { + computeWeightedError(validationInput, validatePredError) + } else { + 0.0 + } var bestM = 1 var m = 1 var doneLearning = false while (m < numIterations && !doneLearning) { // Update data with pseudo-residuals - val data = predError.zip(input).map { case ((pred, _), point) => - LabeledPoint(-loss.gradient(pred, point.label), point.features) + val data = predError.zip(input).map { case ((pred, _), Instance(label, weight, features)) => + Instance(-loss.gradient(pred, label), weight, features) } timer.start(s"building tree $m") @@ -339,7 +355,7 @@ private[spark] object GradientBoostedTrees extends Logging { predError = updatePredictionError( input, predError, baseLearnerWeights(m), baseLearners(m), loss) predErrorCheckpointer.update(predError) - logDebug("error of gbt = " + predError.values.mean()) + logDebug("error of gbt = " + computeWeightedError(input, predError)) if (validate) { // Stop training early if @@ -350,7 +366,7 @@ private[spark] object GradientBoostedTrees extends Logging { validatePredError = updatePredictionError( validationInput, validatePredError, baseLearnerWeights(m), baseLearners(m), loss) validatePredErrorCheckpointer.update(validatePredError) - val currentValidateError = validatePredError.values.mean() + val currentValidateError = computeWeightedError(validationInput, validatePredError) if (bestValidateError - currentValidateError < validationTol * Math.max( currentValidateError, 0.01)) { doneLearning = true diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala index d24d8da0dab4..d57f1b36a572 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/GradientBoostedTrees.scala @@ -20,7 +20,7 @@ package org.apache.spark.mllib.tree import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.{LabeledPoint => NewLabeledPoint} +import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.tree.impl.{GradientBoostedTrees => NewGBT} import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.tree.configuration.BoostingStrategy @@ -67,8 +67,9 @@ class GradientBoostedTrees private[spark] ( @Since("1.2.0") def run(input: RDD[LabeledPoint]): GradientBoostedTreesModel = { val algo = boostingStrategy.treeStrategy.algo - val (trees, treeWeights) = NewGBT.run(input.map { point => - NewLabeledPoint(point.label, point.features.asML) + val (trees, treeWeights) = NewGBT.run(input.map { + case LabeledPoint(label, features) => + Instance(label, 1.0, features.asML) }, boostingStrategy, seed.toLong, "all") new GradientBoostedTreesModel(algo, trees.map(_.toOld), treeWeights) } @@ -97,10 +98,12 @@ class GradientBoostedTrees private[spark] ( input: RDD[LabeledPoint], validationInput: RDD[LabeledPoint]): GradientBoostedTreesModel = { val algo = boostingStrategy.treeStrategy.algo - val (trees, treeWeights) = NewGBT.runWithValidation(input.map { point => - NewLabeledPoint(point.label, point.features.asML) - }, validationInput.map { point => - NewLabeledPoint(point.label, point.features.asML) + val (trees, treeWeights) = NewGBT.runWithValidation(input.map { + case LabeledPoint(label, features) => + Instance(label, 1.0, features.asML) + }, validationInput.map { + case LabeledPoint(label, features) => + Instance(label, 1.0, features.asML) }, boostingStrategy, seed.toLong, "all") new GradientBoostedTreesModel(algo, trees.map(_.toOld), treeWeights) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/GBTClassifierSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/GBTClassifierSuite.scala index 530ca20d0eb0..fdca71f8911c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/GBTClassifierSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/GBTClassifierSuite.scala @@ -20,7 +20,8 @@ package org.apache.spark.ml.classification import com.github.fommil.netlib.BLAS import org.apache.spark.{SparkException, SparkFunSuite} -import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.ml.classification.LinearSVCSuite.generateSVMInput +import org.apache.spark.ml.feature.{Instance, LabeledPoint} import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.regression.DecisionTreeRegressionModel @@ -52,8 +53,10 @@ class GBTClassifierSuite extends MLTest with DefaultReadWriteTest { private var data: RDD[LabeledPoint] = _ private var trainData: RDD[LabeledPoint] = _ private var validationData: RDD[LabeledPoint] = _ + private var binaryDataset: DataFrame = _ private val eps: Double = 1e-5 private val absEps: Double = 1e-8 + private val seed = 42 override def beforeAll(): Unit = { super.beforeAll() @@ -65,6 +68,7 @@ class GBTClassifierSuite extends MLTest with DefaultReadWriteTest { validationData = sc.parallelize(EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 20, 80), 2) .map(_.asML) + binaryDataset = generateSVMInput(0.01, Array[Double](-1.5, 1.0), 1000, seed).toDF() } test("params") { @@ -362,7 +366,7 @@ class GBTClassifierSuite extends MLTest with DefaultReadWriteTest { test("Tests of feature subset strategy") { val numClasses = 2 val gbt = new GBTClassifier() - .setSeed(42) + .setSeed(seed) .setMaxDepth(3) .setMaxIter(5) .setFeatureSubsetStrategy("all") @@ -397,13 +401,15 @@ class GBTClassifierSuite extends MLTest with DefaultReadWriteTest { model3.trees.take(2), model3.treeWeights.take(2), model3.numFeatures, model3.numClasses) val evalArr = model3.evaluateEachIteration(validationData.toDF) - val remappedValidationData = validationData.map( - x => new LabeledPoint((x.label * 2) - 1, x.features)) - val lossErr1 = GradientBoostedTrees.computeError(remappedValidationData, + val remappedValidationData = validationData.map { + case LabeledPoint(label, features) => + Instance(label * 2 - 1, 1.0, features) + } + val lossErr1 = GradientBoostedTrees.computeWeightedError(remappedValidationData, model1.trees, model1.treeWeights, model1.getOldLossType) - val lossErr2 = GradientBoostedTrees.computeError(remappedValidationData, + val lossErr2 = GradientBoostedTrees.computeWeightedError(remappedValidationData, model2.trees, model2.treeWeights, model2.getOldLossType) - val lossErr3 = GradientBoostedTrees.computeError(remappedValidationData, + val lossErr3 = GradientBoostedTrees.computeWeightedError(remappedValidationData, model3.trees, model3.treeWeights, model3.getOldLossType) assert(evalArr(0) ~== lossErr1 relTol 1E-3) @@ -433,16 +439,19 @@ class GBTClassifierSuite extends MLTest with DefaultReadWriteTest { assert(modelWithValidation.numTrees < numIter) val (errorWithoutValidation, errorWithValidation) = { - val remappedRdd = validationData.map(x => new LabeledPoint(2 * x.label - 1, x.features)) - (GradientBoostedTrees.computeError(remappedRdd, modelWithoutValidation.trees, + val remappedRdd = validationData.map { + case LabeledPoint(label, features) => + Instance(label * 2 - 1, 1.0, features) + } + (GradientBoostedTrees.computeWeightedError(remappedRdd, modelWithoutValidation.trees, modelWithoutValidation.treeWeights, modelWithoutValidation.getOldLossType), - GradientBoostedTrees.computeError(remappedRdd, modelWithValidation.trees, + GradientBoostedTrees.computeWeightedError(remappedRdd, modelWithValidation.trees, modelWithValidation.treeWeights, modelWithValidation.getOldLossType)) } assert(errorWithValidation < errorWithoutValidation) val evaluationArray = GradientBoostedTrees - .evaluateEachIteration(validationData, modelWithoutValidation.trees, + .evaluateEachIteration(validationData.map(_.toInstance), modelWithoutValidation.trees, modelWithoutValidation.treeWeights, modelWithoutValidation.getOldLossType, OldAlgo.Classification) assert(evaluationArray.length === numIter) @@ -472,6 +481,36 @@ class GBTClassifierSuite extends MLTest with DefaultReadWriteTest { }) } + test("training with sample weights") { + val df = binaryDataset + val numClasses = 2 + val predEquals = (x: Double, y: Double) => x == y + // (maxIter, maxDepth) + val testParams = Seq( + (5, 5), + (5, 10) + ) + + for ((maxIter, maxDepth) <- testParams) { + val estimator = new GBTClassifier() + .setMaxIter(maxIter) + .setMaxDepth(maxDepth) + .setSeed(seed) + .setMinWeightFractionPerNode(0.049) + + MLTestingUtils.testArbitrarilyScaledWeights[GBTClassificationModel, + GBTClassifier](df.as[LabeledPoint], estimator, + MLTestingUtils.modelPredictionEquals(df, predEquals, 0.7)) + MLTestingUtils.testOutliersWithSmallWeights[GBTClassificationModel, + GBTClassifier](df.as[LabeledPoint], estimator, + numClasses, MLTestingUtils.modelPredictionEquals(df, predEquals, 0.8), + outlierRatio = 2) + MLTestingUtils.testOversamplingVsWeighting[GBTClassificationModel, + GBTClassifier](df.as[LabeledPoint], estimator, + MLTestingUtils.modelPredictionEquals(df, predEquals, 0.7), seed) + } + } + ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala index 2b5a9a396eff..d2b8751360e9 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala @@ -1425,8 +1425,6 @@ class LogisticRegressionSuite extends MLTest with DefaultReadWriteTest { } test("multinomial logistic regression with zero variance (SPARK-21681)") { - val sqlContext = multinomialDatasetWithZeroVar.sqlContext - import sqlContext.implicits._ val mlr = new LogisticRegression().setFamily("multinomial").setFitIntercept(true) .setElasticNetParam(0.0).setRegParam(0.0).setStandardization(true).setWeightCol("weight") diff --git a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala index e2462af2ac1d..b772a3b7737d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/regression/GBTRegressorSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint} import org.apache.spark.mllib.tree.{EnsembleTestHelper, GradientBoostedTrees => OldGBT} import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo} +import org.apache.spark.mllib.util.LinearDataGenerator import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.functions.lit @@ -46,6 +47,8 @@ class GBTRegressorSuite extends MLTest with DefaultReadWriteTest { private var data: RDD[LabeledPoint] = _ private var trainData: RDD[LabeledPoint] = _ private var validationData: RDD[LabeledPoint] = _ + private var linearRegressionData: DataFrame = _ + private val seed = 42 override def beforeAll(): Unit = { super.beforeAll() @@ -57,6 +60,9 @@ class GBTRegressorSuite extends MLTest with DefaultReadWriteTest { validationData = sc.parallelize(EnsembleTestHelper.generateOrderedLabeledPoints(numFeatures = 20, 80), 2) .map(_.asML) + linearRegressionData = sc.parallelize(LinearDataGenerator.generateLinearInput( + intercept = 6.3, weights = Array(4.7, 7.2), xMean = Array(0.9, -1.3), + xVariance = Array(0.7, 1.2), nPoints = 1000, seed, eps = 0.5), 2).map(_.asML).toDF() } test("Regression with continuous features") { @@ -202,7 +208,7 @@ class GBTRegressorSuite extends MLTest with DefaultReadWriteTest { val gbt = new GBTRegressor() .setMaxDepth(3) .setMaxIter(5) - .setSeed(42) + .setSeed(seed) .setFeatureSubsetStrategy("all") // In this data, feature 1 is very important. @@ -237,11 +243,11 @@ class GBTRegressorSuite extends MLTest with DefaultReadWriteTest { for (evalLossType <- GBTRegressor.supportedLossTypes) { val evalArr = model3.evaluateEachIteration(validationData.toDF, evalLossType) - val lossErr1 = GradientBoostedTrees.computeError(validationData, + val lossErr1 = GradientBoostedTrees.computeWeightedError(validationData.map(_.toInstance), model1.trees, model1.treeWeights, model1.convertToOldLossType(evalLossType)) - val lossErr2 = GradientBoostedTrees.computeError(validationData, + val lossErr2 = GradientBoostedTrees.computeWeightedError(validationData.map(_.toInstance), model2.trees, model2.treeWeights, model2.convertToOldLossType(evalLossType)) - val lossErr3 = GradientBoostedTrees.computeError(validationData, + val lossErr3 = GradientBoostedTrees.computeWeightedError(validationData.map(_.toInstance), model3.trees, model3.treeWeights, model3.convertToOldLossType(evalLossType)) assert(evalArr(0) ~== lossErr1 relTol 1E-3) @@ -272,17 +278,19 @@ class GBTRegressorSuite extends MLTest with DefaultReadWriteTest { // early stop assert(modelWithValidation.numTrees < numIter) - val errorWithoutValidation = GradientBoostedTrees.computeError(validationData, + val errorWithoutValidation = GradientBoostedTrees.computeWeightedError( + validationData.map(_.toInstance), modelWithoutValidation.trees, modelWithoutValidation.treeWeights, modelWithoutValidation.getOldLossType) - val errorWithValidation = GradientBoostedTrees.computeError(validationData, + val errorWithValidation = GradientBoostedTrees.computeWeightedError( + validationData.map(_.toInstance), modelWithValidation.trees, modelWithValidation.treeWeights, modelWithValidation.getOldLossType) assert(errorWithValidation < errorWithoutValidation) val evaluationArray = GradientBoostedTrees - .evaluateEachIteration(validationData, modelWithoutValidation.trees, + .evaluateEachIteration(validationData.map(_.toInstance), modelWithoutValidation.trees, modelWithoutValidation.treeWeights, modelWithoutValidation.getOldLossType, OldAlgo.Regression) assert(evaluationArray.length === numIter) @@ -310,6 +318,35 @@ class GBTRegressorSuite extends MLTest with DefaultReadWriteTest { }) } + test("training with sample weights") { + val df = linearRegressionData + val numClasses = 0 + // (maxIter, maxDepth) + val testParams = Seq( + (5, 5), + (5, 10) + ) + + for ((maxIter, maxDepth) <- testParams) { + val estimator = new GBTRegressor() + .setMaxIter(maxIter) + .setMaxDepth(maxDepth) + .setSeed(seed) + .setMinWeightFractionPerNode(0.1) + + MLTestingUtils.testArbitrarilyScaledWeights[GBTRegressionModel, + GBTRegressor](df.as[LabeledPoint], estimator, + MLTestingUtils.modelPredictionEquals(df, _ ~= _ relTol 0.1, 0.95)) + MLTestingUtils.testOutliersWithSmallWeights[GBTRegressionModel, + GBTRegressor](df.as[LabeledPoint], estimator, numClasses, + MLTestingUtils.modelPredictionEquals(df, _ ~= _ relTol 0.1, 0.95), + outlierRatio = 2) + MLTestingUtils.testOversamplingVsWeighting[GBTRegressionModel, + GBTRegressor](df.as[LabeledPoint], estimator, + MLTestingUtils.modelPredictionEquals(df, _ ~= _ relTol 0.01, 0.95), seed) + } + } + ///////////////////////////////////////////////////////////////////////////// // Tests of model save/load ///////////////////////////////////////////////////////////////////////////// diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala index 366d5ec3a53f..18fc1407557f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/GradientBoostedTreesSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.tree.impl import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging -import org.apache.spark.ml.feature.LabeledPoint +import org.apache.spark.ml.feature.Instance import org.apache.spark.mllib.tree.{GradientBoostedTreesSuite => OldGBTSuite} import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, Strategy} import org.apache.spark.mllib.tree.configuration.Algo._ @@ -32,15 +32,12 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext */ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext with Logging { - import testImplicits._ - test("runWithValidation stops early and performs better on a validation dataset") { // Set numIterations large enough so that it stops early. val numIterations = 20 - val trainRdd = sc.parallelize(OldGBTSuite.trainData, 2).map(_.asML) - val validateRdd = sc.parallelize(OldGBTSuite.validateData, 2).map(_.asML) - val trainDF = trainRdd.toDF() - val validateDF = validateRdd.toDF() + val trainRdd = sc.parallelize(OldGBTSuite.trainData, 2).map(_.asML.toInstance) + val validateRdd = sc.parallelize(OldGBTSuite.validateData, 2).map(_.asML.toInstance) + val seed = 42 val algos = Array(Regression, Regression, Classification) val losses = Array(SquaredError, AbsoluteError, LogLoss) @@ -50,21 +47,21 @@ class GradientBoostedTreesSuite extends SparkFunSuite with MLlibTestSparkContext val boostingStrategy = new BoostingStrategy(treeStrategy, loss, numIterations, validationTol = 0.0) val (validateTrees, validateTreeWeights) = GradientBoostedTrees - .runWithValidation(trainRdd, validateRdd, boostingStrategy, 42L, "all") + .runWithValidation(trainRdd, validateRdd, boostingStrategy, seed, "all") val numTrees = validateTrees.length assert(numTrees !== numIterations) // Test that it performs better on the validation dataset. - val (trees, treeWeights) = GradientBoostedTrees.run(trainRdd, boostingStrategy, 42L, "all") + val (trees, treeWeights) = GradientBoostedTrees.run(trainRdd, boostingStrategy, seed, "all") val (errorWithoutValidation, errorWithValidation) = { if (algo == Classification) { - val remappedRdd = validateRdd.map(x => new LabeledPoint(2 * x.label - 1, x.features)) - (GradientBoostedTrees.computeError(remappedRdd, trees, treeWeights, loss), - GradientBoostedTrees.computeError(remappedRdd, validateTrees, + val remappedRdd = validateRdd.map(x => Instance(2 * x.label - 1, x.weight, x.features)) + (GradientBoostedTrees.computeWeightedError(remappedRdd, trees, treeWeights, loss), + GradientBoostedTrees.computeWeightedError(remappedRdd, validateTrees, validateTreeWeights, loss)) } else { - (GradientBoostedTrees.computeError(validateRdd, trees, treeWeights, loss), - GradientBoostedTrees.computeError(validateRdd, validateTrees, + (GradientBoostedTrees.computeWeightedError(validateRdd, trees, treeWeights, loss), + GradientBoostedTrees.computeWeightedError(validateRdd, validateTrees, validateTreeWeights, loss)) } }