Skip to content

Commit

Permalink
fix: add seed parameters to lightgbm
Browse files Browse the repository at this point in the history
  • Loading branch information
imatiach-msft committed Feb 28, 2022
1 parent f7e3728 commit 64e20fc
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,11 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
ExecutionParams(getChunkSize, getMatrixType, execNumThreads, getUseSingleDatasetMode)
}

/**
* Constructs the ColumnParams.
*
* @return ColumnParams object containing the parameters related to LightGBM columns.
*/
protected def getColumnParams: ColumnParams = {
ColumnParams(getLabelCol, getFeaturesCol, get(weightCol), get(initScoreCol), getOptGroupCol)
}
Expand All @@ -268,13 +273,25 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
ObjectiveParams(getObjective, if (isDefined(fobj)) Some(getFObj) else None)
}

/**
* Constructs the SeedParams.
*
* @return SeedParams object containing the parameters related to LightGBM seeds and determinism.
*/
protected def getSeedParams: SeedParams = {
SeedParams(get(seed), get(deterministic), get(baggingSeed), get(featureFractionSeed),
get(extraSeed), get(dropSeed), get(dataRandomSeed), get(objectiveSeed), getBoostingType, getObjective)
}

def getDatasetParams(categoricalIndexes: Array[Int], numThreads: Int): String = {
val seedParam = get(dataRandomSeed).orElse(get(seed))
val datasetParams = s"max_bin=$getMaxBin is_pre_partition=True " +
s"bin_construct_sample_cnt=$getBinSampleCount " +
s"min_data_in_leaf=$getMinDataInLeaf " +
s"num_threads=$numThreads " +
(if (categoricalIndexes.isEmpty) ""
else s"categorical_feature=${categoricalIndexes.mkString(",")}")
else s"categorical_feature=${categoricalIndexes.mkString(",")} ") +
seedParam.map(dataRandomSeedOpt => s"data_random_seed=$dataRandomSeedOpt ").getOrElse("")
datasetParams
}

Expand Down Expand Up @@ -424,7 +441,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
}
}
// Concatenate with commas, eg: host1:port1,host2:port2, ... etc
val allConnections = hostAndPorts.map(_._2).mkString(",")
val allConnections = hostAndPorts.map(_._2).sorted.mkString(",")
log.info(s"driver writing back to all connections: $allConnections")
// Send data back to all tasks and helper tasks on executors
sendDataToExecutors(hostAndPorts, allConnections)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class LightGBMClassifier(override val uid: String)
getIsUnbalance, getVerbosity, categoricalIndexes, actualNumClasses, getBoostFromAverage,
getBoostingType, get(lambdaL1), get(lambdaL2), get(isProvideTrainingMetric),
get(metric), get(minGainToSplit), get(maxDeltaStep), getMaxBinByFeature, get(minDataInLeaf), getSlotNames,
getDelegate, getDartParams, getExecutionParams(numTasksPerExec), getObjectiveParams)
getDelegate, getDartParams, getExecutionParams(numTasksPerExec), getObjectiveParams, getSeedParams)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMClassificationModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class LightGBMRanker(override val uid: String)
getVerbosity, categoricalIndexes, getBoostingType, get(lambdaL1), get(lambdaL2), getMaxPosition, getLabelGain,
get(isProvideTrainingMetric), get(metric), getEvalAt, get(minGainToSplit), get(maxDeltaStep),
getMaxBinByFeature, get(minDataInLeaf), getSlotNames, getDelegate, getDartParams,
getExecutionParams(numTasksPerExec), getObjectiveParams)
getExecutionParams(numTasksPerExec), getObjectiveParams, getSeedParams)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRankerModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class LightGBMRegressor(override val uid: String)
getBoostFromAverage, getBoostingType, get(lambdaL1), get(lambdaL2), get(isProvideTrainingMetric),
get(metric), get(minGainToSplit), get(maxDeltaStep),
getMaxBinByFeature, get(minDataInLeaf), getSlotNames, getDelegate,
getDartParams, getExecutionParams(numTasksPerExec), getObjectiveParams)
getDartParams, getExecutionParams(numTasksPerExec), getObjectiveParams, getSeedParams)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRegressionModel = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ object LightGBMUtils {
idAsInt
}

/** Returns the partition ID for the spark Dataset.
*
* Used to make operations deterministic on same dataset.
*
* @return Returns the partition id.
*/
def getPartitionId: Int = {
val ctx = TaskContext.get
ctx.partitionId
}

/** Returns true if spark is run in local mode.
* @return True if spark is run in local mode.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.apache.spark.sql.types.StructType

import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ListBuffer
import scala.collection.concurrent.TrieMap

private[lightgbm] object ChunkedArrayUtils {
def copyChunkedArray[T: Numeric](chunkedArray: ChunkedArray[T],
Expand Down Expand Up @@ -193,6 +194,8 @@ private[lightgbm] abstract class BaseAggregatedColumns(val chunkSize: Int) exten
*/
protected val rowCount = new AtomicLong(0L)
protected val initScoreCount = new AtomicLong(0L)
protected val pIdToRowCountOffset = new TrieMap[Long, Long]()
protected val pIdToInitScoreCountOffset = new TrieMap[Long, Long]()

protected var numCols = 0

Expand All @@ -216,7 +219,10 @@ private[lightgbm] abstract class BaseAggregatedColumns(val chunkSize: Int) exten

def incrementCount(chunkedCols: BaseChunkedColumns): Unit = {
rowCount.addAndGet(chunkedCols.rowCount)
pIdToRowCountOffset.update(LightGBMUtils.getPartitionId, chunkedCols.rowCount)
initScoreCount.addAndGet(chunkedCols.numInitScores)
pIdToInitScoreCountOffset.update(
LightGBMUtils.getPartitionId, chunkedCols.numInitScores)
}

def addRows(chunkedCols: BaseChunkedColumns): Unit = {
Expand All @@ -232,6 +238,18 @@ private[lightgbm] abstract class BaseAggregatedColumns(val chunkSize: Int) exten
initScores = chunkedCols.initScores.map(_ => new DoubleSwigArray(isc))
initializeFeatures(chunkedCols, rc)
groups = new Array[Any](rc.toInt)
updateConcurrentMapOffsets(pIdToRowCountOffset)
updateConcurrentMapOffsets(pIdToInitScoreCountOffset)
}

protected def updateConcurrentMapOffsets(concurrentIdToOffset: TrieMap[Long, Long],
initialValue: Long = 0L): Unit = {
val sortedKeys = concurrentIdToOffset.keys.toSeq.sorted
sortedKeys.foldRight(initialValue: Long)((key, offset) => {
val partitionRowCount = concurrentIdToOffset(key)
concurrentIdToOffset.update(key, offset)
partitionRowCount + offset
})
}

}
Expand All @@ -254,12 +272,6 @@ private[lightgbm] trait DisjointAggregatedColumns extends BaseAggregatedColumns
}

private[lightgbm] trait SyncAggregatedColumns extends BaseAggregatedColumns {
/**
* Variables for current thread to use in order to update common arrays in parallel
*/
protected val threadRowStartIndex = new AtomicLong(0L)
protected val threadInitScoreStartIndex = new AtomicLong(0L)

/** Adds the rows to the internal data structure.
*/
override def addRows(chunkedCols: BaseChunkedColumns): Unit = {
Expand Down Expand Up @@ -289,10 +301,9 @@ private[lightgbm] trait SyncAggregatedColumns extends BaseAggregatedColumns {
var threadInitScoreStartIndex = 0L
val featureIndexes =
this.synchronized {
val labelsSize = chunkedCols.labels.getAddCount
threadRowStartIndex = this.threadRowStartIndex.getAndAdd(labelsSize.toInt)
val initScoreSize = chunkedCols.initScores.map(_.getAddCount)
initScoreSize.foreach(size => threadInitScoreStartIndex = this.threadInitScoreStartIndex.getAndAdd(size))
val partitionId = LightGBMUtils.getPartitionId
threadRowStartIndex = pIdToRowCountOffset.get(partitionId).get
threadInitScoreStartIndex = chunkedCols.initScores.map(_ => pIdToInitScoreCountOffset(partitionId)).getOrElse(0)
updateThreadLocalIndices(chunkedCols, threadRowStartIndex)
}
ChunkedArrayUtils.copyChunkedArray(chunkedCols.labels, labels, threadRowStartIndex, chunkSize)
Expand Down Expand Up @@ -393,6 +404,8 @@ private[lightgbm] abstract class BaseSparseAggregatedColumns(chunkSize: Int)
*/
protected var indexesCount = new AtomicLong(0L)
protected var indptrCount = new AtomicLong(0L)
protected val pIdToIndexesCountOffset = new TrieMap[Long, Long]()
protected val pIdToIndptrCountOffset = new TrieMap[Long, Long]()

def getNumColsFromChunkedArray(chunkedCols: BaseChunkedColumns): Int = {
chunkedCols.asInstanceOf[SparseChunkedColumns].numCols
Expand All @@ -402,7 +415,9 @@ private[lightgbm] abstract class BaseSparseAggregatedColumns(chunkSize: Int)
super.incrementCount(chunkedCols)
val sparseChunkedCols = chunkedCols.asInstanceOf[SparseChunkedColumns]
indexesCount.addAndGet(sparseChunkedCols.getNumIndexes)
pIdToIndexesCountOffset.update(LightGBMUtils.getPartitionId, sparseChunkedCols.getNumIndexes)
indptrCount.addAndGet(sparseChunkedCols.getNumIndexPointers)
pIdToIndptrCountOffset.update(LightGBMUtils.getPartitionId, sparseChunkedCols.getNumIndexPointers)
}

protected def initializeFeatures(chunkedCols: BaseChunkedColumns, rowCount: Long): Unit = {
Expand All @@ -412,6 +427,8 @@ private[lightgbm] abstract class BaseSparseAggregatedColumns(chunkSize: Int)
values = new DoubleSwigArray(indexesCount)
indexPointers = new IntSwigArray(indptrCount)
indexPointers.setItem(0, 0)
updateConcurrentMapOffsets(pIdToIndexesCountOffset)
updateConcurrentMapOffsets(pIdToIndptrCountOffset, 1L)
}

def getIndexes: IntSwigArray = indexes
Expand Down Expand Up @@ -489,25 +506,16 @@ private[lightgbm] final class SparseAggregatedColumns(chunkSize: Int)
*/
private[lightgbm] final class SparseSyncAggregatedColumns(chunkSize: Int)
extends BaseSparseAggregatedColumns(chunkSize) with SyncAggregatedColumns {
/**
* Variables for current thread to use in order to update common arrays in parallel
*/
protected val threadIndexesStartIndex = new AtomicLong(0L)
protected val threadIndptrStartIndex = new AtomicLong(1L)

override protected def initializeRows(chunkedCols: BaseChunkedColumns): Unit = {
// Add extra 0 for start of indptr in parallel case
this.indptrCount.addAndGet(1L)
super.initializeRows(chunkedCols)
}

protected def updateThreadLocalIndices(chunkedCols: BaseChunkedColumns, threadRowStartIndex: Long): List[Long] = {
val sparseChunkedCols = chunkedCols.asInstanceOf[SparseChunkedColumns]
val indexesSize = sparseChunkedCols.indexes.getAddCount
val threadIndexesStartIndex = this.threadIndexesStartIndex.getAndAdd(indexesSize)

val indPtrSize = sparseChunkedCols.indexPointers.getAddCount
val threadIndPtrStartIndex = this.threadIndptrStartIndex.getAndAdd(indPtrSize)
val partitionId = LightGBMUtils.getPartitionId
val threadIndexesStartIndex = pIdToIndexesCountOffset.get(partitionId).get
val threadIndPtrStartIndex = pIdToIndptrCountOffset.get(partitionId).get
List(threadIndexesStartIndex, threadIndPtrStartIndex)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package com.microsoft.azure.synapse.ml.lightgbm.dataset

import com.microsoft.azure.synapse.ml.lightgbm.ColumnParams
import com.microsoft.azure.synapse.ml.lightgbm.swig.DoubleChunkedArray
import com.microsoft.ml.lightgbm.{doubleChunkedArray, floatChunkedArray}
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.ml.linalg.{DenseVector, SparseVector}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,71 @@ trait LightGBMObjectiveParams extends Wrappable {
def setFObj(value: FObjTrait): this.type = set(fobj, value)
}

/** Defines common parameters related to seed and determinism
*/
trait LightGBMSeedParams extends Wrappable {
val seed = new IntParam(this, "seed", "Main seed, used to generate other seeds")

def getSeed: Int = $(seed)
def setSeed(value: Int): this.type = set(seed, value)

val deterministic = new BooleanParam(this, "deterministic", "Used only with cpu " +
"devide type. Setting this to true should ensure stable results when using the same data and the " +
"same parameters. Note: setting this to true may slow down training. To avoid potential instability " +
"due to numerical issues, please set force_col_wise=true or force_row_wise=true when setting " +
"deterministic=true")
setDefault(deterministic->false)

def getDeterministic: Boolean = $(deterministic)
def setDeterministic(value: Boolean): this.type = set(deterministic, value)

val baggingSeed = new IntParam(this, "baggingSeed", "Bagging seed")
setDefault(baggingSeed->3)

def getBaggingSeed: Int = $(baggingSeed)
def setBaggingSeed(value: Int): this.type = set(baggingSeed, value)

val featureFractionSeed = new IntParam(this, "featureFractionSeed", "Feature fraction seed")
setDefault(featureFractionSeed->2)

def getFeatureFractionSeed: Int = $(featureFractionSeed)
def setFeatureFractionSeed(value: Int): this.type = set(featureFractionSeed, value)

val extraSeed = new IntParam(this, "extraSeed", "Random seed for selecting threshold " +
"when extra_trees is true")
setDefault(extraSeed->6)

def getExtraSeed: Int = $(extraSeed)
def setExtraSeed(value: Int): this.type = set(extraSeed, value)

val dropSeed = new IntParam(this, "dropSeed", "Random seed to choose dropping models. " +
"Only used in dart.")
setDefault(dropSeed->4)

def getDropSeed: Int = $(dropSeed)
def setDropSeed(value: Int): this.type = set(dropSeed, value)

val dataRandomSeed = new IntParam(this, "dataRandomSeed", "Random seed for sampling " +
"data to construct histogram bins.")
setDefault(dataRandomSeed->1)

def getDataRandomSeed: Int = $(dataRandomSeed)
def setDataRandomSeed(value: Int): this.type = set(dataRandomSeed, value)

val objectiveSeed = new IntParam(this, "objectiveSeed", "Random seed for objectives, " +
"if random process is needed. Currently used only for rank_xendcg objective.")
setDefault(objectiveSeed->5)

def getObjectiveSeed: Int = $(objectiveSeed)
def setObjectiveSeed(value: Int): this.type = set(objectiveSeed, value)
}

/** Defines common parameters across all LightGBM learners.
*/
trait LightGBMParams extends Wrappable with DefaultParamsWritable with HasWeightCol
with HasValidationIndicatorCol with HasInitScoreCol with LightGBMExecutionParams
with LightGBMSlotParams with LightGBMFractionParams with LightGBMBinParams with LightGBMLearnerParams
with LightGBMDartParams with LightGBMPredictionParams with LightGBMObjectiveParams {
with LightGBMDartParams with LightGBMPredictionParams with LightGBMObjectiveParams with LightGBMSeedParams {
val numIterations = new IntParam(this, "numIterations",
"Number of iterations, LightGBM constructs num_class * num_iterations trees")
setDefault(numIterations->100)
Expand All @@ -348,12 +407,6 @@ trait LightGBMParams extends Wrappable with DefaultParamsWritable with HasWeight
def getBaggingFreq: Int = $(baggingFreq)
def setBaggingFreq(value: Int): this.type = set(baggingFreq, value)

val baggingSeed = new IntParam(this, "baggingSeed", "Bagging seed")
setDefault(baggingSeed->3)

def getBaggingSeed: Int = $(baggingSeed)
def setBaggingSeed(value: Int): this.type = set(baggingSeed, value)

val maxDepth = new IntParam(this, "maxDepth", "Max depth")
setDefault(maxDepth-> -1)

Expand Down
Loading

0 comments on commit 64e20fc

Please sign in to comment.