Skip to content

Commit

Permalink
fix: add logging for number of columns and rows when creating dataset…
Browse files Browse the repository at this point in the history
…s, set useSingleDatasetMode=True by default
  • Loading branch information
imatiach-msft committed Nov 11, 2021
1 parent c0b516b commit 38140d4
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.ml.param.shared.{HasFeaturesCol => HasFeaturesColSpark,
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.slf4j.Logger

import java.net.{ServerSocket, Socket}
import java.util.concurrent.Executors
Expand Down Expand Up @@ -277,9 +278,10 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
private def generateDataset(ac: BaseAggregatedColumns,
referenceDataset: Option[LightGBMDataset],
schema: StructType,
datasetParams: String): LightGBMDataset = {
datasetParams: String,
log: Logger): LightGBMDataset = {
val dataset = try {
val datasetInner = ac.generateDataset(referenceDataset, datasetParams)
val datasetInner = ac.generateDataset(referenceDataset, datasetParams, log)
getOptGroupCol.foreach(_ => datasetInner.addGroupColumn(ac.getGroups))
datasetInner.setFeatureNames(getSlotNamesWithMetadata(schema(getFeaturesCol)), ac.getNumCols)
datasetInner
Expand All @@ -301,13 +303,13 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
val columnParams = getColumnParams
val datasetParams = getDatasetParams(trainParams.categoricalFeatures, trainParams.executionParams.numThreads)
beforeGenerateTrainDataset(batchIndex, columnParams, schema, log, trainParams)
val trainDataset = generateDataset(aggregatedColumns, None, schema, datasetParams)
val trainDataset = generateDataset(aggregatedColumns, None, schema, datasetParams, log)
try {
afterGenerateTrainDataset(batchIndex, columnParams, schema, log, trainParams)

val validDatasetOpt = validationData.map { vd =>
beforeGenerateValidDataset(batchIndex, columnParams, schema, log, trainParams)
val out = generateDataset(vd, Some(trainDataset), schema, datasetParams)
val out = generateDataset(vd, Some(trainDataset), schema, datasetParams, log)
afterGenerateValidDataset(batchIndex, columnParams, schema, log, trainParams)
out
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.ml.linalg.{DenseVector, SparseVector}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.slf4j.Logger

import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -211,7 +212,7 @@ private[lightgbm] abstract class BaseAggregatedColumns(val chunkSize: Int) {
initScores.foreach(_.delete())
}

def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset
def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String, log: Logger): LightGBMDataset

def incrementCount(chunkedCols: BaseChunkedColumns): Unit = {
rowCount.addAndGet(chunkedCols.rowCount)
Expand Down Expand Up @@ -327,14 +328,17 @@ private[lightgbm] abstract class BaseDenseAggregatedColumns(chunkSize: Int) exte

def getFeatures: DoubleSwigArray = features

def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset = {
def generateDataset(referenceDataset: Option[LightGBMDataset],
datasetParams: String, log: Logger): LightGBMDataset = {
val pointer = lightgbmlib.voidpp_handle()
try {
val numRows = rowCount.get().toInt
log.info(s"LightGBM task generating dense dataset with $numRows rows and $numCols columns")
// Generate the dataset for features
LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromMat(
lightgbmlib.double_to_voidp_ptr(features.array),
lightgbmlibConstants.C_API_DTYPE_FLOAT64,
rowCount.get().toInt,
numRows,
numCols,
1,
datasetParams,
Expand Down Expand Up @@ -434,9 +438,12 @@ private[lightgbm] abstract class BaseSparseAggregatedColumns(chunkSize: Int)
}
}

def generateDataset(referenceDataset: Option[LightGBMDataset], datasetParams: String): LightGBMDataset = {
def generateDataset(referenceDataset: Option[LightGBMDataset],
datasetParams: String, log: Logger): LightGBMDataset = {
indexPointerArrayIncrement(getIndexPointers.array)
val pointer = lightgbmlib.voidpp_handle()
val numRows = indptrCount.get() - 1
log.info(s"LightGBM task generating sparse dataset with $numRows rows and $numCols columns")
// Generate the dataset for features
LightGBMUtils.validate(lightgbmlib.LGBM_DatasetCreateFromCSR(
lightgbmlib.int_to_voidp_ptr(indexPointers.array),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ trait LightGBMExecutionParams extends Wrappable {
val useSingleDatasetMode = new BooleanParam(this, "useSingleDatasetMode",
"Use single dataset execution mode to create a single native dataset per executor (singleton) " +
"to reduce memory and communication overhead. Note this is disabled when running spark in local mode.")
setDefault(useSingleDatasetMode -> false)
setDefault(useSingleDatasetMode -> true)

def getUseSingleDatasetMode: Boolean = $(useSingleDatasetMode)
def setUseSingleDatasetMode(value: Boolean): this.type = set(useSingleDatasetMode, value)
Expand Down

0 comments on commit 38140d4

Please sign in to comment.