From 84f57a6c6c50c1d0a02620d504996e26437f2781 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Mon, 16 May 2022 14:45:13 +0800 Subject: [PATCH] [jvm-packages] move dmatrix building into rabit context for cpu pipeline --- .../scala/rapids/spark/GpuPreXGBoost.scala | 12 +++++------ .../xgboost4j/scala/spark/PreXGBoost.scala | 15 +++++++------- .../scala/spark/PreXGBoostProvider.scala | 5 ++--- .../dmlc/xgboost4j/scala/spark/XGBoost.scala | 20 +++++-------------- .../spark/FeatureSizeValidatingSuite.scala | 4 +--- 5 files changed, 20 insertions(+), 36 deletions(-) diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/rapids/spark/GpuPreXGBoost.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/rapids/spark/GpuPreXGBoost.scala index 756b7b54b161..08d186d6f84e 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/rapids/spark/GpuPreXGBoost.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/rapids/spark/GpuPreXGBoost.scala @@ -61,15 +61,14 @@ class GpuPreXGBoost extends PreXGBoostProvider { * @param estimator [[XGBoostClassifier]] or [[XGBoostRegressor]] * @param dataset the training data * @param params all user defined and defaulted params - * @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ]) - * Boolean if building DMatrix in rabit context + * @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ]) * RDD[() => Watches] will be used as the training input * Option[ RDD[_] ] is the optional cached RDD */ override def buildDatasetToRDD(estimator: Estimator[_], dataset: Dataset[_], params: Map[String, Any]): - XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = { + XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = { GpuPreXGBoost.buildDatasetToRDD(estimator, dataset, params) } @@ -123,8 +122,7 @@ object GpuPreXGBoost extends PreXGBoostProvider { * @param estimator supports XGBoostClassifier and XGBoostRegressor * @param dataset the training data * @param params all user defined and defaulted params - * @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ]) - * Boolean if building DMatrix in rabit context + * @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ]) * RDD[() => Watches] will be used as the training input to build DMatrix * Option[ RDD[_] ] is the optional cached RDD */ @@ -132,7 +130,7 @@ object GpuPreXGBoost extends PreXGBoostProvider { estimator: Estimator[_], dataset: Dataset[_], params: Map[String, Any]): - XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = { + XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = { val (Seq(labelName, weightName, marginName), feturesCols, groupName, evalSets) = estimator match { @@ -170,7 +168,7 @@ object GpuPreXGBoost extends PreXGBoostProvider { xgbExecParams: XGBoostExecutionParams => val dataMap = prepareInputData(trainingData, evalDataMap, xgbExecParams.numWorkers, xgbExecParams.cacheTrainingSet) - (true, buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None) + (buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None) } /** diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoost.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoost.scala index 01eb3d0a4f32..13484f490f5b 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoost.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoost.scala @@ -101,8 +101,7 @@ object PreXGBoost extends PreXGBoostProvider { * @param estimator supports XGBoostClassifier and XGBoostRegressor * @param dataset the training data * @param params all user defined and defaulted params - * @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ]) - * Boolean if building DMatrix in rabit context + * @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ]) * RDD[() => Watches] will be used as the training input * Option[RDD[_]\] is the optional cached RDD */ @@ -110,7 +109,7 @@ object PreXGBoost extends PreXGBoostProvider { estimator: Estimator[_], dataset: Dataset[_], params: Map[String, Any]): XGBoostExecutionParams => - (Boolean, RDD[() => Watches], Option[RDD[_]]) = { + (RDD[() => Watches], Option[RDD[_]]) = { if (optionProvider.isDefined && optionProvider.get.providerEnabled(Some(dataset))) { return optionProvider.get.buildDatasetToRDD(estimator, dataset, params) @@ -172,12 +171,12 @@ object PreXGBoost extends PreXGBoostProvider { val cachedRDD = if (xgbExecParams.cacheTrainingSet) { Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK)) } else None - (false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) + (trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) case Right(trainingData) => val cachedRDD = if (xgbExecParams.cacheTrainingSet) { Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK)) } else None - (false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) + (trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) } } @@ -324,7 +323,7 @@ object PreXGBoost extends PreXGBoostProvider { trainingSet: RDD[XGBLabeledPoint], evalRDDMap: Map[String, RDD[XGBLabeledPoint]] = Map(), hasGroup: Boolean = false): - XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = { + XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = { xgbExecParams: XGBoostExecutionParams => composeInputData(trainingSet, hasGroup, xgbExecParams.numWorkers) match { @@ -332,12 +331,12 @@ object PreXGBoost extends PreXGBoostProvider { val cachedRDD = if (xgbExecParams.cacheTrainingSet) { Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK)) } else None - (false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) + (trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) case Right(trainingData) => val cachedRDD = if (xgbExecParams.cacheTrainingSet) { Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK)) } else None - (false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) + (trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD) } } diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoostProvider.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoostProvider.scala index d133aea288dd..4c4dbdec1e53 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoostProvider.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/PreXGBoostProvider.scala @@ -50,8 +50,7 @@ private[scala] trait PreXGBoostProvider { * @param estimator supports XGBoostClassifier and XGBoostRegressor * @param dataset the training data * @param params all user defined and defaulted params - * @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ]) - * Boolean if building DMatrix in rabit context + * @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ]) * RDD[() => Watches] will be used as the training input to build DMatrix * Option[ RDD[_] ] is the optional cached RDD */ @@ -59,7 +58,7 @@ private[scala] trait PreXGBoostProvider { estimator: Estimator[_], dataset: Dataset[_], params: Map[String, Any]): - XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) + XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) /** * Transform Dataset diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala index 6cfabcfaca17..fa22e8939e29 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala @@ -286,7 +286,6 @@ object XGBoost extends Serializable { } private def buildDistributedBooster( - buildDMatrixInRabit: Boolean, buildWatches: () => Watches, xgbExecutionParam: XGBoostExecutionParams, rabitEnv: java.util.Map[String, String], @@ -295,11 +294,6 @@ object XGBoost extends Serializable { prevBooster: Booster): Iterator[(Booster, Map[String, Array[Float]])] = { var watches: Watches = null - if (!buildDMatrixInRabit) { - // for CPU pipeline, we need to build DMatrix out of rabit context - watches = buildWatchesAndCheck(buildWatches) - } - val taskId = TaskContext.getPartitionId().toString val attempt = TaskContext.get().attemptNumber.toString rabitEnv.put("DMLC_TASK_ID", taskId) @@ -310,10 +304,7 @@ object XGBoost extends Serializable { try { Rabit.init(rabitEnv) - if (buildDMatrixInRabit) { - // for GPU pipeline, we need to move dmatrix building into rabit context - watches = buildWatchesAndCheck(buildWatches) - } + watches = buildWatchesAndCheck(buildWatches) val numEarlyStoppingRounds = xgbExecutionParam.earlyStoppingParams.numEarlyStoppingRounds val metrics = Array.tabulate(watches.size)(_ => Array.ofDim[Float](numRounds)) @@ -377,7 +368,7 @@ object XGBoost extends Serializable { @throws(classOf[XGBoostError]) private[spark] def trainDistributed( sc: SparkContext, - buildTrainingData: XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]), + buildTrainingData: XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]), params: Map[String, Any]): (Booster, Map[String, Array[Float]]) = { @@ -396,7 +387,7 @@ object XGBoost extends Serializable { }.orNull // Get the training data RDD and the cachedRDD - val (buildDMatrixInRabit, trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams) + val (trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams) try { // Train for every ${savingRound} rounds and save the partially completed booster @@ -413,9 +404,8 @@ object XGBoost extends Serializable { optionWatches = Some(iter.next()) } - optionWatches.map { buildWatches => buildDistributedBooster(buildDMatrixInRabit, - buildWatches, xgbExecParams, rabitEnv, xgbExecParams.obj, - xgbExecParams.eval, prevBooster)} + optionWatches.map { buildWatches => buildDistributedBooster(buildWatches, + xgbExecParams, rabitEnv, xgbExecParams.obj, xgbExecParams.eval, prevBooster)} .getOrElse(throw new RuntimeException("No Watches to train")) }} diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/FeatureSizeValidatingSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/FeatureSizeValidatingSuite.scala index 79562d1f428b..f96140555809 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/FeatureSizeValidatingSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/FeatureSizeValidatingSuite.scala @@ -65,8 +65,6 @@ class FeatureSizeValidatingSuite extends FunSuite with PerTest { (id, lp.label, lp.features) }.toDF("id", "label", "features") val xgb = new XGBoostClassifier(paramMap) - intercept[Exception] { - xgb.fit(repartitioned) - } + xgb.fit(repartitioned) } }