Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-298][Core] Optimize LabelPoints single-threaded to multi-threaded and the rename coalesce functions #302

Merged
merged 3 commits into from
Jul 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 66 additions & 195 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.intel.daal.data_management.data.{CSRNumericTable, HomogenNumericTable
import com.intel.daal.services.DaalContext
import org.apache.spark.{Partition, SparkContext, SparkException}
import org.apache.spark.ml.linalg.{DenseMatrix, DenseVector, Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, Matrix => OldMatrix, Vector => OldVector}
import org.apache.spark.rdd.{ExecutorInProcessCoalescePartitioner, PartitionGroup, RDD}
import org.apache.spark.storage.StorageLevel

Expand All @@ -31,6 +30,7 @@ import com.intel.oneapi.dal.table.Common.ComputeDevice
import com.intel.oneapi.dal.table.{ColumnAccessor, Common, HomogenTable, RowAccessor, Table}
import com.intel.daal.data_management.data.{CSRNumericTable, HomogenNumericTable, NumericTable, RowMergedNumericTable, Matrix => DALMatrix}
import com.intel.daal.services.DaalContext
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.{DenseMatrix, DenseVector, Matrix, SparseVector, Vector, Vectors}
import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, Matrix => OldMatrix, Vector => OldVector}
import org.apache.spark.rdd.{ExecutorInProcessCoalescePartitioner, RDD}
Expand All @@ -44,7 +44,6 @@ import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, Future}
import scala.sys.exit


object OneDAL {
Expand Down Expand Up @@ -85,24 +84,6 @@ object OneDAL {
matrix
}

def numericTableToOldMatrix(table: NumericTable): OldMatrix = {
val numRows = table.getNumberOfRows.toInt
val numCols = table.getNumberOfColumns.toInt

var dataDouble: DoubleBuffer = null
// returned DoubleBuffer is ByteByffer, need to copy as double array
dataDouble = table.getBlockOfRows(0, numRows, dataDouble)
val arrayDouble = new Array[Double](numRows * numCols)
dataDouble.get(arrayDouble)

// Transpose as DAL numeric table is row-major and DenseMatrix is column major
val OldMatrix = new OldDenseMatrix(numRows, numCols, arrayDouble, isTransposed = false)

table.releaseBlockOfRows(0, numRows, dataDouble)

OldMatrix
}

def homogenTableToOldMatrix(table: HomogenTable, device: Common.ComputeDevice): OldMatrix = {
val numRows = table.getRowCount.toInt
val numCols = table.getColumnCount.toInt
Expand Down Expand Up @@ -226,27 +207,7 @@ object OneDAL {
table
}

def rddDoubleToNumericTables(doubles: RDD[Double], executorNum: Int): RDD[Long] = {
require(executorNum > 0)

val doublesTables = doubles.repartition(executorNum).mapPartitions { it: Iterator[Double] =>
val data = it.toArray
// Build DALMatrix, this will load libJavaAPI, libtbb, libtbbmalloc
val context = new DaalContext()
val matrix = new DALMatrix(context, classOf[java.lang.Double],
1, data.length, NumericTable.AllocationFlag.DoAllocate)

data.zipWithIndex.foreach { case (value: Double, index: Int) =>
cSetDouble(matrix.getCNumericTable, index, 0, value)
}
Iterator(matrix.getCNumericTable)
}
doublesTables.count()

doublesTables
}

def rddLabeledPointToSparseTables(labeledPoints: Dataset[_],
def coalesceSparseLabelPointsToSparseNumericTables(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int): RDD[(Long, Long)] = {
Expand Down Expand Up @@ -333,46 +294,6 @@ object OneDAL {
table
}

def rddLabeledPointToSparseTables_shuffle(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int): RDD[(Long, Long)] = {
require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")

val spark = SparkSession.active

val labeledPointsRDD = labeledPoints.select(labelCol, featuresCol).rdd.map {
case Row(label: Double, features: Vector) => (features, label)
}

// Repartition to executorNum
val dataForConversion = labeledPointsRDD.repartition(executorNum)
.setName("Repartitioned for conversion")

val tables = dataForConversion.mapPartitions { it: Iterator[(Vector, Double)] =>
val points: Array[(Vector, Double)] = it.toArray

val features = points.map(_._1)
val labels = points.map(_._2)

if (features.size == 0) {
Iterator()
} else {
val numColumns = features(0).size
val featuresTable = vectorsToSparseNumericTable(features, numColumns)
val labelsTable = doubleArrayToNumericTable(labels)

Iterator((featuresTable.getCNumericTable, labelsTable.getCNumericTable))
}
}.cache()

tables.count()

tables
}

private[mllib] def doubleArrayToHomogenTable(
points: Array[Double],
device: Common.ComputeDevice): HomogenTable = {
Expand Down Expand Up @@ -402,7 +323,7 @@ object OneDAL {
table
}

def rddLabeledPointToMergedTables(labeledPoints: Dataset[_],
def coalesceLabelPointsToNumericTables(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int): RDD[(Long, Long)] = {
Expand Down Expand Up @@ -476,73 +397,91 @@ object OneDAL {
mergedTables
}

def rddLabeledPointToMergedHomogenTables(labeledPoints: Dataset[_],
def coalesceLabelPointsToHomogenTables(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int,
device: Common.ComputeDevice): RDD[(Long, Long)] = {
require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")
val numberCores: Int = labeledPoints.sparkSession.sparkContext.getConf.getInt("spark.executor.cores", 1)

val spark = SparkSession.active
import spark.implicits._
val labeledPointsRDD = labeledPoints.rdd

// Repartition to executorNum if not enough partitions
val dataForConversion = if (labeledPoints.rdd.getNumPartitions < executorNum) {
labeledPoints.repartition(executorNum).cache()
val dataForConversion = if (labeledPointsRDD.getNumPartitions < executorNum) {
logger.info(s"Repartition to executorNum if not enough partitions")
val rePartitions = labeledPoints.repartition(executorNum).cache()
rePartitions.count()
rePartitions
} else {
labeledPoints
}

val tables = dataForConversion.select(labelCol, featuresCol)
.toDF().mapPartitions { it: Iterator[Row] =>
val rows = it.toArray

val features = rows.map {
case Row(label: Double, features: Vector) => features
}
// Get dimensions for each partition
val partitionDims = Utils.getPartitionDims(dataForConversion.select(featuresCol).rdd.map{ row =>
val vector = row.getAs[Vector](0)
vector
})

val labels = rows.map {
case Row(label: Double, features: Vector) => label
}
// Filter out empty partitions, if there is no such rdd, coalesce will report an error
// "No partitions or no locations for partitions found".
// TODO: ML-312: Improve ExecutorInProcessCoalescePartitioner
val nonEmptyPartitions = dataForConversion.select(labelCol, featuresCol).toDF().rdd.mapPartitionsWithIndex {
(index: Int, it: Iterator[Row]) => Iterator(Tuple3(partitionDims(index)._1, index, it))
}.filter {
_._1 > 0
}.flatMap {
entry => entry._3
}

if (features.size == 0) {
Iterator()
} else {
val numColumns = features(0).size
val coalescedRdd = nonEmptyPartitions.coalesce(executorNum,
partitionCoalescer = Some(new ExecutorInProcessCoalescePartitioner()))
.setName("coalescedRdd")

val featuresTable: Table = if (features(0).isInstanceOf[DenseVector]) {
vectorsToDenseHomogenTable(features.toIterator, features.length, numColumns, device)
} else {
throw new Exception("Oneapi didn't implement sparse dataset")
// convert RDD to HomogenTable
val coalescedTables = coalescedRdd.mapPartitionsWithIndex { (index: Int, it: Iterator[Row]) =>
val list = it.toList
val subRowCount: Int = list.size / numberCores
val labeledPointsList: ListBuffer[Future[(Array[Double], Array[Double])]] =
new ListBuffer[Future[(Array[Double], Array[Double])]]()
val numRows = list.size
val numCols = list(0).getAs[Vector](1).toArray.size
val labelsArray = new Array[Double](numRows)
val featuresArray = new Array[Double](numRows * numCols)
for ( i <- 0 until numberCores) {
val f = Future {
val iter = list.iterator
val slice = if (i == numberCores - 1) {
iter.slice(subRowCount * i, numRows * numCols)
} else {
iter.slice(subRowCount * i, subRowCount * i + subRowCount)
}
slice.toArray.zipWithIndex.map { case (row, index) =>
val length = row.getAs[Vector](1).toArray.length
System.arraycopy(row.getAs[Vector](1).toArray, 0, featuresArray, subRowCount * numCols * i + length * index, length)
labelsArray(subRowCount * i + index) = row.getAs[Double](0)
}
(labelsArray, featuresArray)
}
labeledPointsList += f

val labelsTable = doubleArrayToHomogenTable(labels, device)

Iterator((featuresTable.getcObejct(), labelsTable.getcObejct()))
}
}.cache()

tables.count()

// Coalesce partitions belonging to the same executor
val coalescedTables = tables.rdd.coalesce(executorNum,
partitionCoalescer = Some(new ExecutorInProcessCoalescePartitioner()))

val mergedTables = coalescedTables.mapPartitions { iter =>
val mergedFeatures = new HomogenTable(device)
val mergedLabels = new HomogenTable(device)
iter.foreach { case (featureAddr, labelAddr) =>
mergedFeatures.addHomogenTable(featureAddr)
mergedLabels.addHomogenTable(labelAddr)
val result = Future.sequence(labeledPointsList)
Await.result(result, Duration.Inf)
}
Iterator((mergedFeatures.getcObejct(), mergedLabels.getcObejct()))
}.cache()
val labelsTable = new HomogenTable(numRows.toLong, 1, labelsArray,
device)
val featuresTable = new HomogenTable(numRows.toLong, numCols.toLong, featuresArray,
device)

mergedTables.count()
Iterator((featuresTable.getcObejct(), labelsTable.getcObejct()))
}.setName("coalescedTables").cache()

mergedTables
coalescedTables.count()
coalescedTables
}


Expand Down Expand Up @@ -644,19 +583,7 @@ object OneDAL {
matrix
}

def partitionsToNumericTables(partitions: RDD[Vector], executorNum: Int): RDD[NumericTable] = {
val dataForConversion = partitions
.repartition(executorNum)
.setName("Repartitioned for conversion")
.cache()

dataForConversion.mapPartitionsWithIndex { (index: Int, it: Iterator[Vector]) =>
val table = makeNumericTable(it.toArray)
Iterator(table)
}
}

def coalesceToHomogenTables(data: RDD[Vector], executorNum: Int,
def coalesceVectorsToHomogenTables(data: RDD[Vector], executorNum: Int,
device: Common.ComputeDevice): RDD[Long] = {
logger.info(s"Processing partitions with $executorNum executors")
val numberCores: Int = data.sparkContext.getConf.getInt("spark.executor.cores", 1)
Expand Down Expand Up @@ -747,7 +674,7 @@ object OneDAL {
matrix
}

def rddVectorToMergedTables(vectors: RDD[Vector], executorNum: Int): RDD[Long] = {
def coalesceVectorsToNumericTables(vectors: RDD[Vector], executorNum: Int): RDD[Long] = {
require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")
Expand Down Expand Up @@ -806,62 +733,6 @@ object OneDAL {
coalescedTables
}

def rddVectorToMergedHomogenTables(vectors: RDD[Vector], executorNum: Int,
device: Common.ComputeDevice): RDD[Long] = {

require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")

// Repartition to executorNum if not enough partitions
val dataForConversion = if (vectors.getNumPartitions < executorNum) {
vectors.repartition(executorNum).setName("Repartitioned for conversion").cache()
} else {
vectors
}

// Get dimensions for each partition
val partitionDims = Utils.getPartitionDims(dataForConversion)

// Filter out empty partitions
val nonEmptyPartitions = dataForConversion.mapPartitionsWithIndex {
(index: Int, it: Iterator[Vector]) => Iterator(Tuple3(partitionDims(index)._1, index, it))
}.filter {
_._1 > 0
}

// Convert to RDD[HomogenTable]
val homogenTables = nonEmptyPartitions.map { entry =>
val numRows = entry._1
val index = entry._2
val it = entry._3
val numCols = partitionDims(index)._2

logger.info(s"Partition index: $index, numCols: $numCols, numRows: $numRows")

val table = vectorsToDenseHomogenTable(it, numRows, numCols, device)
table.getcObejct()
}.setName("homogenTables").cache()

homogenTables.count()

// Unpersist instances RDD
if (vectors.getStorageLevel != StorageLevel.NONE) {
vectors.unpersist()
}
// Coalesce partitions belonging to the same executor
val coalescedRdd = homogenTables.coalesce(executorNum,
partitionCoalescer = Some(new ExecutorInProcessCoalescePartitioner()))

val coalescedTables = coalescedRdd.mapPartitions { iter =>
val mergedData = new HomogenTable(device)
iter.foreach { address =>
mergedData.addHomogenTable(address)
}
Iterator(mergedData.getcObejct())
}.cache()
coalescedTables
}

@native def cAddNumericTable(cObject: Long, numericTableAddr: Long)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ class NaiveBayesDALImpl(val uid: String,
val kvsIPPort = getOneCCLIPPort(labeledPoints.rdd)

val labeledPointsTables = if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) {
OneDAL.rddLabeledPointToMergedTables(labeledPoints, labelCol, featuresCol, executorNum)
OneDAL.coalesceLabelPointsToNumericTables(labeledPoints, labelCol, featuresCol, executorNum)
} else {
OneDAL.rddLabeledPointToSparseTables(labeledPoints, labelCol, featuresCol, executorNum)
OneDAL.coalesceSparseLabelPointsToSparseNumericTables(labeledPoints,
labelCol, featuresCol, executorNum)
}

val results = labeledPointsTables.mapPartitionsWithIndex {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class RandomForestClassifierDALImpl(val uid: String,
rfcTimer.record("Preprocessing")
val labeledPointsTables = if (useDevice == "GPU") {
if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) {
OneDAL.rddLabeledPointToMergedHomogenTables(labeledPoints,
OneDAL.coalesceLabelPointsToHomogenTables(labeledPoints,
labelCol, featuresCol, executorNum, computeDevice)
} else {
throw new Exception("Oneapi didn't implement sparse dataset")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class KMeansDALImpl(var nClusters: Int,
kmeansTimer.record("Preprocessing")

val coalescedTables = if (useDevice == "GPU") {
OneDAL.coalesceToHomogenTables(data, executorNum, computeDevice)
OneDAL.coalesceVectorsToHomogenTables(data, executorNum, computeDevice)
} else {
OneDAL.rddVectorToMergedTables(data, executorNum)
OneDAL.coalesceVectorsToNumericTables(data, executorNum)
}
kmeansTimer.record("Data Convertion")

Expand Down
Loading