Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm-packages] move dmatrix building into rabit context for cpu pipeline #7908

Merged
merged 1 commit into from
May 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@ class GpuPreXGBoost extends PreXGBoostProvider {
* @param estimator [[XGBoostClassifier]] or [[XGBoostRegressor]]
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* RDD[() => Watches] will be used as the training input
* Option[ RDD[_] ] is the optional cached RDD
*/
override def buildDatasetToRDD(estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = {
GpuPreXGBoost.buildDatasetToRDD(estimator, dataset, params)
}

Expand Down Expand Up @@ -123,16 +122,15 @@ object GpuPreXGBoost extends PreXGBoostProvider {
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* RDD[() => Watches] will be used as the training input to build DMatrix
* Option[ RDD[_] ] is the optional cached RDD
*/
override def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = {

val (Seq(labelName, weightName, marginName), feturesCols, groupName, evalSets) =
estimator match {
Expand Down Expand Up @@ -170,7 +168,7 @@ object GpuPreXGBoost extends PreXGBoostProvider {
xgbExecParams: XGBoostExecutionParams =>
val dataMap = prepareInputData(trainingData, evalDataMap, xgbExecParams.numWorkers,
xgbExecParams.cacheTrainingSet)
(true, buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
(buildRDDWatches(dataMap, xgbExecParams, evalDataMap.isEmpty), None)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,15 @@ object PreXGBoost extends PreXGBoostProvider {
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* RDD[() => Watches] will be used as the training input
* Option[RDD[_]\] is the optional cached RDD
*/
override def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]): XGBoostExecutionParams =>
(Boolean, RDD[() => Watches], Option[RDD[_]]) = {
(RDD[() => Watches], Option[RDD[_]]) = {

if (optionProvider.isDefined && optionProvider.get.providerEnabled(Some(dataset))) {
return optionProvider.get.buildDatasetToRDD(estimator, dataset, params)
Expand Down Expand Up @@ -172,12 +171,12 @@ object PreXGBoost extends PreXGBoostProvider {
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
case Right(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
}

}
Expand Down Expand Up @@ -324,20 +323,20 @@ object PreXGBoost extends PreXGBoostProvider {
trainingSet: RDD[XGBLabeledPoint],
evalRDDMap: Map[String, RDD[XGBLabeledPoint]] = Map(),
hasGroup: Boolean = false):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]) = {
XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]) = {

xgbExecParams: XGBoostExecutionParams =>
composeInputData(trainingSet, hasGroup, xgbExecParams.numWorkers) match {
case Left(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(false, trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(trainForRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
case Right(trainingData) =>
val cachedRDD = if (xgbExecParams.cacheTrainingSet) {
Some(trainingData.persist(StorageLevel.MEMORY_AND_DISK))
} else None
(false, trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
(trainForNonRanking(trainingData, xgbExecParams, evalRDDMap), cachedRDD)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,15 @@ private[scala] trait PreXGBoostProvider {
* @param estimator supports XGBoostClassifier and XGBoostRegressor
* @param dataset the training data
* @param params all user defined and defaulted params
* @return [[XGBoostExecutionParams]] => (Boolean, RDD[[() => Watches]], Option[ RDD[_] ])
* Boolean if building DMatrix in rabit context
* @return [[XGBoostExecutionParams]] => (RDD[[() => Watches]], Option[ RDD[_] ])
* RDD[() => Watches] will be used as the training input to build DMatrix
* Option[ RDD[_] ] is the optional cached RDD
*/
def buildDatasetToRDD(
estimator: Estimator[_],
dataset: Dataset[_],
params: Map[String, Any]):
XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]])
XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]])

/**
* Transform Dataset
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ object XGBoost extends Serializable {
}

private def buildDistributedBooster(
buildDMatrixInRabit: Boolean,
buildWatches: () => Watches,
xgbExecutionParam: XGBoostExecutionParams,
rabitEnv: java.util.Map[String, String],
Expand All @@ -295,11 +294,6 @@ object XGBoost extends Serializable {
prevBooster: Booster): Iterator[(Booster, Map[String, Array[Float]])] = {

var watches: Watches = null
if (!buildDMatrixInRabit) {
// for CPU pipeline, we need to build DMatrix out of rabit context
watches = buildWatchesAndCheck(buildWatches)
}

val taskId = TaskContext.getPartitionId().toString
val attempt = TaskContext.get().attemptNumber.toString
rabitEnv.put("DMLC_TASK_ID", taskId)
Expand All @@ -310,10 +304,7 @@ object XGBoost extends Serializable {
try {
Rabit.init(rabitEnv)

if (buildDMatrixInRabit) {
// for GPU pipeline, we need to move dmatrix building into rabit context
watches = buildWatchesAndCheck(buildWatches)
}
watches = buildWatchesAndCheck(buildWatches)

val numEarlyStoppingRounds = xgbExecutionParam.earlyStoppingParams.numEarlyStoppingRounds
val metrics = Array.tabulate(watches.size)(_ => Array.ofDim[Float](numRounds))
Expand Down Expand Up @@ -377,7 +368,7 @@ object XGBoost extends Serializable {
@throws(classOf[XGBoostError])
private[spark] def trainDistributed(
sc: SparkContext,
buildTrainingData: XGBoostExecutionParams => (Boolean, RDD[() => Watches], Option[RDD[_]]),
buildTrainingData: XGBoostExecutionParams => (RDD[() => Watches], Option[RDD[_]]),
params: Map[String, Any]):
(Booster, Map[String, Array[Float]]) = {

Expand All @@ -396,7 +387,7 @@ object XGBoost extends Serializable {
}.orNull

// Get the training data RDD and the cachedRDD
val (buildDMatrixInRabit, trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)
val (trainingRDD, optionalCachedRDD) = buildTrainingData(xgbExecParams)

try {
// Train for every ${savingRound} rounds and save the partially completed booster
Expand All @@ -413,9 +404,8 @@ object XGBoost extends Serializable {
optionWatches = Some(iter.next())
}

optionWatches.map { buildWatches => buildDistributedBooster(buildDMatrixInRabit,
buildWatches, xgbExecParams, rabitEnv, xgbExecParams.obj,
xgbExecParams.eval, prevBooster)}
optionWatches.map { buildWatches => buildDistributedBooster(buildWatches,
xgbExecParams, rabitEnv, xgbExecParams.obj, xgbExecParams.eval, prevBooster)}
.getOrElse(throw new RuntimeException("No Watches to train"))

}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ class FeatureSizeValidatingSuite extends FunSuite with PerTest {
(id, lp.label, lp.features)
}.toDF("id", "label", "features")
val xgb = new XGBoostClassifier(paramMap)
intercept[Exception] {
trivialfis marked this conversation as resolved.
Show resolved Hide resolved
xgb.fit(repartitioned)
}
xgb.fit(repartitioned)
}
}