Skip to content

Commit

Permalink
[ML-298][Core] Optimize LabelPoints single-threaded to multi-threaded…
Browse files Browse the repository at this point in the history
… and the rename coalesce functions (oap-project#302)

* Used multi threads copy data to continuous array to optimize labeledpoint data conversion

Signed-off-by: minmingzhu <minming.zhu@intel.com>

update OneDAL.scala

Signed-off-by: minmingzhu <minming.zhu@intel.com>

optimize labeledpoint data conversion

Signed-off-by: minmingzhu <minming.zhu@intel.com>

update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

update logs

Signed-off-by: minmingzhu <minming.zhu@intel.com>

update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

1. remove debug log
2. format code style

Signed-off-by: minmingzhu <minming.zhu@intel.com>

update OneDAL.scala

Signed-off-by: minmingzhu <minming.zhu@intel.com>

update OneDAL.scala

Signed-off-by: minmingzhu <minming.zhu@intel.com>

[ML-282] Refactor CPU & GPU examples (oap-project#306)

* First move

* Move device discover for scala

* Delete old gpu discover

* Add run-all-gpu

* Add clean up

* Add tmp utils file

* Add exe

* Rename run script

* Scala gpu donw

* Scala cpu done

* For ci

* pyspark ci

* Rename scala

* Rename scala file in scripts

* Pyspark unit done

* Update pyspark utils

* Update ci

* Remove tmp utils

* Reaname utils

* Change absolute path, rm als gpu.sh

* Scala absolute path

* Change sanity check

* Rename ci

* Split random_forest

* Fix name change in ci

* Fix path typo

* Fix typo

update OneDAL.scala

Signed-off-by: minmingzhu <minming.zhu@intel.com>

Update run-gpu.sh

update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

update

Signed-off-by: minmingzhu <minming.zhu@intel.com>

update OneDAL.scala

Signed-off-by: minmingzhu <minming.zhu@intel.com>

Update OneDAL.scala

* 1. rename function name
2. remove unused functions

Signed-off-by: minmingzhu <minming.zhu@intel.com>

* update OneDAL.scala

Signed-off-by: minmingzhu <minming.zhu@intel.com>

---------

Signed-off-by: minmingzhu <minming.zhu@intel.com>
  • Loading branch information
minmingzhu committed Jul 4, 2023
1 parent 36ac8ca commit c136298
Show file tree
Hide file tree
Showing 10 changed files with 87 additions and 213 deletions.
261 changes: 66 additions & 195 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.intel.daal.data_management.data.{CSRNumericTable, HomogenNumericTable
import com.intel.daal.services.DaalContext
import org.apache.spark.{Partition, SparkContext, SparkException}
import org.apache.spark.ml.linalg.{DenseMatrix, DenseVector, Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, Matrix => OldMatrix, Vector => OldVector}
import org.apache.spark.rdd.{ExecutorInProcessCoalescePartitioner, PartitionGroup, RDD}
import org.apache.spark.storage.StorageLevel

Expand All @@ -31,6 +30,7 @@ import com.intel.oneapi.dal.table.Common.ComputeDevice
import com.intel.oneapi.dal.table.{ColumnAccessor, Common, HomogenTable, RowAccessor, Table}
import com.intel.daal.data_management.data.{CSRNumericTable, HomogenNumericTable, NumericTable, RowMergedNumericTable, Matrix => DALMatrix}
import com.intel.daal.services.DaalContext
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.{DenseMatrix, DenseVector, Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, Matrix => OldMatrix, Vector => OldVector}
import org.apache.spark.rdd.{ExecutorInProcessCoalescePartitioner, RDD}
Expand All @@ -44,7 +44,6 @@ import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, Future}
import scala.sys.exit


object OneDAL {
Expand Down Expand Up @@ -85,24 +84,6 @@ object OneDAL {
matrix
}

def numericTableToOldMatrix(table: NumericTable): OldMatrix = {
val numRows = table.getNumberOfRows.toInt
val numCols = table.getNumberOfColumns.toInt

var dataDouble: DoubleBuffer = null
// returned DoubleBuffer is ByteByffer, need to copy as double array
dataDouble = table.getBlockOfRows(0, numRows, dataDouble)
val arrayDouble = new Array[Double](numRows * numCols)
dataDouble.get(arrayDouble)

// Transpose as DAL numeric table is row-major and DenseMatrix is column major
val OldMatrix = new OldDenseMatrix(numRows, numCols, arrayDouble, isTransposed = false)

table.releaseBlockOfRows(0, numRows, dataDouble)

OldMatrix
}

def homogenTableToOldMatrix(table: HomogenTable, device: Common.ComputeDevice): OldMatrix = {
val numRows = table.getRowCount.toInt
val numCols = table.getColumnCount.toInt
Expand Down Expand Up @@ -226,27 +207,7 @@ object OneDAL {
table
}

def rddDoubleToNumericTables(doubles: RDD[Double], executorNum: Int): RDD[Long] = {
require(executorNum > 0)

val doublesTables = doubles.repartition(executorNum).mapPartitions { it: Iterator[Double] =>
val data = it.toArray
// Build DALMatrix, this will load libJavaAPI, libtbb, libtbbmalloc
val context = new DaalContext()
val matrix = new DALMatrix(context, classOf[java.lang.Double],
1, data.length, NumericTable.AllocationFlag.DoAllocate)

data.zipWithIndex.foreach { case (value: Double, index: Int) =>
cSetDouble(matrix.getCNumericTable, index, 0, value)
}
Iterator(matrix.getCNumericTable)
}
doublesTables.count()

doublesTables
}

def rddLabeledPointToSparseTables(labeledPoints: Dataset[_],
def coalesceSparseLabelPointsToSparseNumericTables(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int): RDD[(Long, Long)] = {
Expand Down Expand Up @@ -333,46 +294,6 @@ object OneDAL {
table
}

def rddLabeledPointToSparseTables_shuffle(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int): RDD[(Long, Long)] = {
require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")

val spark = SparkSession.active

val labeledPointsRDD = labeledPoints.select(labelCol, featuresCol).rdd.map {
case Row(label: Double, features: Vector) => (features, label)
}

// Repartition to executorNum
val dataForConversion = labeledPointsRDD.repartition(executorNum)
.setName("Repartitioned for conversion")

val tables = dataForConversion.mapPartitions { it: Iterator[(Vector, Double)] =>
val points: Array[(Vector, Double)] = it.toArray

val features = points.map(_._1)
val labels = points.map(_._2)

if (features.size == 0) {
Iterator()
} else {
val numColumns = features(0).size
val featuresTable = vectorsToSparseNumericTable(features, numColumns)
val labelsTable = doubleArrayToNumericTable(labels)

Iterator((featuresTable.getCNumericTable, labelsTable.getCNumericTable))
}
}.cache()

tables.count()

tables
}

private[mllib] def doubleArrayToHomogenTable(
points: Array[Double],
device: Common.ComputeDevice): HomogenTable = {
Expand Down Expand Up @@ -402,7 +323,7 @@ object OneDAL {
table
}

def rddLabeledPointToMergedTables(labeledPoints: Dataset[_],
def coalesceLabelPointsToNumericTables(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int): RDD[(Long, Long)] = {
Expand Down Expand Up @@ -476,73 +397,91 @@ object OneDAL {
mergedTables
}

def rddLabeledPointToMergedHomogenTables(labeledPoints: Dataset[_],
def coalesceLabelPointsToHomogenTables(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int,
device: Common.ComputeDevice): RDD[(Long, Long)] = {
require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")
val numberCores: Int = labeledPoints.sparkSession.sparkContext.getConf.getInt("spark.executor.cores", 1)

val spark = SparkSession.active
import spark.implicits._
val labeledPointsRDD = labeledPoints.rdd

// Repartition to executorNum if not enough partitions
val dataForConversion = if (labeledPoints.rdd.getNumPartitions < executorNum) {
labeledPoints.repartition(executorNum).cache()
val dataForConversion = if (labeledPointsRDD.getNumPartitions < executorNum) {
logger.info(s"Repartition to executorNum if not enough partitions")
val rePartitions = labeledPoints.repartition(executorNum).cache()
rePartitions.count()
rePartitions
} else {
labeledPoints
}

val tables = dataForConversion.select(labelCol, featuresCol)
.toDF().mapPartitions { it: Iterator[Row] =>
val rows = it.toArray

val features = rows.map {
case Row(label: Double, features: Vector) => features
}
// Get dimensions for each partition
val partitionDims = Utils.getPartitionDims(dataForConversion.select(featuresCol).rdd.map{ row =>
val vector = row.getAs[Vector](0)
vector
})

val labels = rows.map {
case Row(label: Double, features: Vector) => label
}
// Filter out empty partitions, if there is no such rdd, coalesce will report an error
// "No partitions or no locations for partitions found".
// TODO: ML-312: Improve ExecutorInProcessCoalescePartitioner
val nonEmptyPartitions = dataForConversion.select(labelCol, featuresCol).toDF().rdd.mapPartitionsWithIndex {
(index: Int, it: Iterator[Row]) => Iterator(Tuple3(partitionDims(index)._1, index, it))
}.filter {
_._1 > 0
}.flatMap {
entry => entry._3
}

if (features.size == 0) {
Iterator()
} else {
val numColumns = features(0).size
val coalescedRdd = nonEmptyPartitions.coalesce(executorNum,
partitionCoalescer = Some(new ExecutorInProcessCoalescePartitioner()))
.setName("coalescedRdd")

val featuresTable: Table = if (features(0).isInstanceOf[DenseVector]) {
vectorsToDenseHomogenTable(features.toIterator, features.length, numColumns, device)
} else {
throw new Exception("Oneapi didn't implement sparse dataset")
// convert RDD to HomogenTable
val coalescedTables = coalescedRdd.mapPartitionsWithIndex { (index: Int, it: Iterator[Row]) =>
val list = it.toList
val subRowCount: Int = list.size / numberCores
val labeledPointsList: ListBuffer[Future[(Array[Double], Array[Double])]] =
new ListBuffer[Future[(Array[Double], Array[Double])]]()
val numRows = list.size
val numCols = list(0).getAs[Vector](1).toArray.size
val labelsArray = new Array[Double](numRows)
val featuresArray = new Array[Double](numRows * numCols)
for ( i <- 0 until numberCores) {
val f = Future {
val iter = list.iterator
val slice = if (i == numberCores - 1) {
iter.slice(subRowCount * i, numRows * numCols)
} else {
iter.slice(subRowCount * i, subRowCount * i + subRowCount)
}
slice.toArray.zipWithIndex.map { case (row, index) =>
val length = row.getAs[Vector](1).toArray.length
System.arraycopy(row.getAs[Vector](1).toArray, 0, featuresArray, subRowCount * numCols * i + length * index, length)
labelsArray(subRowCount * i + index) = row.getAs[Double](0)
}
(labelsArray, featuresArray)
}
labeledPointsList += f

val labelsTable = doubleArrayToHomogenTable(labels, device)

Iterator((featuresTable.getcObejct(), labelsTable.getcObejct()))
}
}.cache()

tables.count()

// Coalesce partitions belonging to the same executor
val coalescedTables = tables.rdd.coalesce(executorNum,
partitionCoalescer = Some(new ExecutorInProcessCoalescePartitioner()))

val mergedTables = coalescedTables.mapPartitions { iter =>
val mergedFeatures = new HomogenTable(device)
val mergedLabels = new HomogenTable(device)
iter.foreach { case (featureAddr, labelAddr) =>
mergedFeatures.addHomogenTable(featureAddr)
mergedLabels.addHomogenTable(labelAddr)
val result = Future.sequence(labeledPointsList)
Await.result(result, Duration.Inf)
}
Iterator((mergedFeatures.getcObejct(), mergedLabels.getcObejct()))
}.cache()
val labelsTable = new HomogenTable(numRows.toLong, 1, labelsArray,
device)
val featuresTable = new HomogenTable(numRows.toLong, numCols.toLong, featuresArray,
device)

mergedTables.count()
Iterator((featuresTable.getcObejct(), labelsTable.getcObejct()))
}.setName("coalescedTables").cache()

mergedTables
coalescedTables.count()
coalescedTables
}


Expand Down Expand Up @@ -644,19 +583,7 @@ object OneDAL {
matrix
}

def partitionsToNumericTables(partitions: RDD[Vector], executorNum: Int): RDD[NumericTable] = {
val dataForConversion = partitions
.repartition(executorNum)
.setName("Repartitioned for conversion")
.cache()

dataForConversion.mapPartitionsWithIndex { (index: Int, it: Iterator[Vector]) =>
val table = makeNumericTable(it.toArray)
Iterator(table)
}
}

def coalesceToHomogenTables(data: RDD[Vector], executorNum: Int,
def coalesceVectorsToHomogenTables(data: RDD[Vector], executorNum: Int,
device: Common.ComputeDevice): RDD[Long] = {
logger.info(s"Processing partitions with $executorNum executors")
val numberCores: Int = data.sparkContext.getConf.getInt("spark.executor.cores", 1)
Expand Down Expand Up @@ -747,7 +674,7 @@ object OneDAL {
matrix
}

def rddVectorToMergedTables(vectors: RDD[Vector], executorNum: Int): RDD[Long] = {
def coalesceVectorsToNumericTables(vectors: RDD[Vector], executorNum: Int): RDD[Long] = {
require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")
Expand Down Expand Up @@ -806,62 +733,6 @@ object OneDAL {
coalescedTables
}

def rddVectorToMergedHomogenTables(vectors: RDD[Vector], executorNum: Int,
device: Common.ComputeDevice): RDD[Long] = {

require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")

// Repartition to executorNum if not enough partitions
val dataForConversion = if (vectors.getNumPartitions < executorNum) {
vectors.repartition(executorNum).setName("Repartitioned for conversion").cache()
} else {
vectors
}

// Get dimensions for each partition
val partitionDims = Utils.getPartitionDims(dataForConversion)

// Filter out empty partitions
val nonEmptyPartitions = dataForConversion.mapPartitionsWithIndex {
(index: Int, it: Iterator[Vector]) => Iterator(Tuple3(partitionDims(index)._1, index, it))
}.filter {
_._1 > 0
}

// Convert to RDD[HomogenTable]
val homogenTables = nonEmptyPartitions.map { entry =>
val numRows = entry._1
val index = entry._2
val it = entry._3
val numCols = partitionDims(index)._2

logger.info(s"Partition index: $index, numCols: $numCols, numRows: $numRows")

val table = vectorsToDenseHomogenTable(it, numRows, numCols, device)
table.getcObejct()
}.setName("homogenTables").cache()

homogenTables.count()

// Unpersist instances RDD
if (vectors.getStorageLevel != StorageLevel.NONE) {
vectors.unpersist()
}
// Coalesce partitions belonging to the same executor
val coalescedRdd = homogenTables.coalesce(executorNum,
partitionCoalescer = Some(new ExecutorInProcessCoalescePartitioner()))

val coalescedTables = coalescedRdd.mapPartitions { iter =>
val mergedData = new HomogenTable(device)
iter.foreach { address =>
mergedData.addHomogenTable(address)
}
Iterator(mergedData.getcObejct())
}.cache()
coalescedTables
}

@native def cAddNumericTable(cObject: Long, numericTableAddr: Long)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ class NaiveBayesDALImpl(val uid: String,
val kvsIPPort = getOneCCLIPPort(labeledPoints.rdd)

val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) {
OneDAL.rddLabeledPointToMergedTables(labeledPoints, labelCol, featuresCol, executorNum)
OneDAL.coalesceLabelPointsToNumericTables(labeledPoints, labelCol, featuresCol, executorNum)
} else {
OneDAL.rddLabeledPointToSparseTables(labeledPoints, labelCol, featuresCol, executorNum)
OneDAL.coalesceSparseLabelPointsToSparseNumericTables(labeledPoints,
labelCol, featuresCol, executorNum)
}

val results = labeledPointsTables.mapPartitionsWithIndex {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class RandomForestClassifierDALImpl(val uid: String,
rfcTimer.record("Preprocessing")
val labeledPointsTables = if (useDevice == "GPU") {
if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) {
OneDAL.rddLabeledPointToMergedHomogenTables(labeledPoints,
OneDAL.coalesceLabelPointsToHomogenTables(labeledPoints,
labelCol, featuresCol, executorNum, computeDevice)
} else {
throw new Exception("Oneapi didn't implement sparse dataset")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class KMeansDALImpl(var nClusters: Int,
kmeansTimer.record("Preprocessing")

val coalescedTables = if (useDevice == "GPU") {
OneDAL.coalesceToHomogenTables(data, executorNum, computeDevice)
OneDAL.coalesceVectorsToHomogenTables(data, executorNum, computeDevice)
} else {
OneDAL.rddVectorToMergedTables(data, executorNum)
OneDAL.coalesceVectorsToNumericTables(data, executorNum)
}
kmeansTimer.record("Data Convertion")

Expand Down
Loading

0 comments on commit c136298

Please sign in to comment.