Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-363] Copy double array to float Homogen Table #364

Closed
45 changes: 40 additions & 5 deletions mllib-dal/src/main/native/OneDAL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ using namespace daal::data_management;
extern bool daal_check_is_intel_cpu();

// Define a global native array
typedef std::shared_ptr<double[]> NativeDoubleArrayPtr;
typedef std::shared_ptr<double> NativeDoubleArrayPtr;
typedef std::shared_ptr<float> NativeFloatArrayPtr;

std::mutex g_amtx;
std::vector<NativeDoubleArrayPtr> g_NativeDoubleArrayPtrVector;

void saveDoubleArrayPtrToVector(const NativeDoubleArrayPtr &ptr) {
template <typename T> std::vector<std::shared_ptr<T>> g_NativeArrayPtrVector;

template <typename T>
static void saveArrayPtrToVector(const std::shared_ptr<T> &ptr) {
g_amtx.lock();
g_NativeDoubleArrayPtrVector.push_back(ptr);
g_NativeArrayPtrVector<T>.push_back(ptr);
g_amtx.unlock();
}

Expand Down Expand Up @@ -180,7 +183,21 @@ JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_OneDAL_00024_cNewDoubleArray(

NativeDoubleArrayPtr arrayPtr(new double[size],
[](double *ptr) { delete[] ptr; });
saveDoubleArrayPtrToVector(arrayPtr);
saveArrayPtrToVector<double>(arrayPtr);
return (jlong)arrayPtr.get();
}

/*
* Class: com_intel_oap_mllib_OneDAL__
* Method: cNewFloatArray
* Signature: (J)J
*/
JNIEXPORT jlong JNICALL Java_com_intel_oap_mllib_OneDAL_00024_cNewFloatArray(
JNIEnv *env, jobject, jlong size) {
std::cout << "create float new native array size : " << size << std::endl;
NativeFloatArrayPtr arrayPtr(new float[size],
[](float *ptr) { delete[] ptr; });
saveArrayPtrToVector<float>(arrayPtr);
return (jlong)arrayPtr.get();
}

Expand All @@ -200,3 +217,21 @@ Java_com_intel_oap_mllib_OneDAL_00024_cCopyDoubleArrayToNative(
std::copy(source, source + sourceLength, nativeArray + index);
env->ReleasePrimitiveArrayCritical(sourceArray, source, 0);
}

/*
* Class: com_intel_oap_mllib_OneDAL__
* Method: cCopyFloatArrayToNative
* Signature: (J[DJ)V
*/
JNIEXPORT void JNICALL
Java_com_intel_oap_mllib_OneDAL_00024_cCopyDoubleArrayToFloatNative(
JNIEnv *env, jobject, jlong nativeArrayPtr, jdoubleArray sourceArray,
jlong index) {
float *nativeArray = reinterpret_cast<float *>(nativeArrayPtr);
jsize sourceLength = env->GetArrayLength(sourceArray);
jdouble *source = static_cast<jdouble *>(
env->GetPrimitiveArrayCritical(sourceArray, NULL));
std::transform(source, source + sourceLength, nativeArray + index,
[](double d) { return static_cast<float>(d); });
env->ReleasePrimitiveArrayCritical(sourceArray, source, 0);
}
16 changes: 16 additions & 0 deletions mllib-dal/src/main/native/javah/com_intel_oap_mllib_OneDAL__.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

192 changes: 190 additions & 2 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,25 @@ object OneDAL {
table
}

def makeFloatHomogenTable(arrayVectors: Array[OldVector],
device: Common.ComputeDevice): HomogenTable = {
val numCols = arrayVectors.head.size
val numRows: Int = arrayVectors.size
val arrayFloat = new Array[Float](numRows * numCols)
var index = 0
for( vector: OldVector <- arrayVectors) {
for (i <- 0 until vector.toArray.length ) {
arrayFloat(index) = vector(i).toFloat
if (index < (numRows * numCols)) {
index = index + 1
}
}
}
val table = new HomogenTable(numRows.toLong, numCols.toLong, arrayFloat,
device)
table
}

private[mllib] def doubleArrayToHomogenTable(
points: Array[Double],
device: Common.ComputeDevice): HomogenTable = {
Expand Down Expand Up @@ -484,6 +503,94 @@ object OneDAL {
}


def coalesceLabelPointsToFloatHomogenTables(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
executorNum: Int,
device: Common.ComputeDevice): RDD[(Long, Long)] = {
require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")
val numberCores: Int = labeledPoints.sparkSession.sparkContext.getConf.getInt("spark.executor.cores", 1)

val spark = SparkSession.active
import spark.implicits._
val labeledPointsRDD = labeledPoints.rdd

// Repartition to executorNum if not enough partitions
val dataForConversion = if (labeledPointsRDD.getNumPartitions < executorNum) {
logger.info(s"Repartition to executorNum if not enough partitions")
val rePartitions = labeledPoints.repartition(executorNum).cache()
rePartitions.count()
rePartitions
} else {
labeledPoints
}

// Get dimensions for each partition
val partitionDims = Utils.getPartitionDims(dataForConversion.select(featuresCol).rdd.map{ row =>
val vector = row.getAs[Vector](0)
vector
})

// Filter out empty partitions, if there is no such rdd, coalesce will report an error
// "No partitions or no locations for partitions found".
// TODO: ML-312: Improve ExecutorInProcessCoalescePartitioner
val nonEmptyPartitions = dataForConversion.select(labelCol, featuresCol).toDF().rdd.mapPartitionsWithIndex {
(index: Int, it: Iterator[Row]) => Iterator(Tuple3(partitionDims(index)._1, index, it))
}.filter {
_._1 > 0
}.flatMap {
entry => entry._3
}

val coalescedRdd = nonEmptyPartitions.coalesce(executorNum,
partitionCoalescer = Some(new ExecutorInProcessCoalescePartitioner()))
.setName("coalescedRdd")

// convert RDD to HomogenTable
val coalescedTables = coalescedRdd.mapPartitionsWithIndex { (index: Int, it: Iterator[Row]) =>
val list = it.toList
val subRowCount: Int = list.size / numberCores
val labeledPointsList: ListBuffer[Future[(Array[Float], Long)]] =
new ListBuffer[Future[(Array[Float], Long)]]()
val numRows = list.size
val numCols = list(0).getAs[Vector](1).toArray.size

val labelsArray = new Array[Float](numRows)
val featuresAddress= OneDAL.cNewFloatArray(numRows.toLong * numCols)
for ( i <- 0 until numberCores) {
val f = Future {
val iter = list.iterator
val slice = if (i == numberCores - 1) {
iter.slice(subRowCount * i, numRows)
} else {
iter.slice(subRowCount * i, subRowCount * i + subRowCount)
}
slice.toArray.zipWithIndex.map { case (row, index) =>
val length = row.getAs[Vector](1).toArray.length
OneDAL.cCopyDoubleArrayToFloatNative(featuresAddress, row.getAs[Vector](1).toArray, subRowCount.toLong * numCols * i + length * index)
labelsArray(subRowCount * i + index) = row.getAs[Float](0)
}
(labelsArray, featuresAddress)
}
labeledPointsList += f
}
val result = Future.sequence(labeledPointsList)
Await.result(result, Duration.Inf)

val labelsTable = new HomogenTable(numRows.toLong, 1, labelsArray,
device)
val featuresTable = new HomogenTable(numRows.toLong, numCols.toLong, featuresAddress, Common.DataType.FLOAT32,
device)

Iterator((featuresTable.getcObejct(), labelsTable.getcObejct()))
}.setName("coalescedTables").cache()

coalescedTables.count()
coalescedTables
}

private[mllib] def doubleArrayToNumericTable(points: Array[Double]): NumericTable = {
// Build DALMatrix, this will load libJavaAPI, libtbb, libtbbmalloc
val context = new DaalContext()
Expand Down Expand Up @@ -639,10 +746,10 @@ object OneDAL {
targetArrayAddress
}
futureList += f

}
val result = Future.sequence(futureList)
Await.result(result, Duration.Inf)
}

val table = new HomogenTable(numRows.toLong, numCols.toLong, targetArrayAddress,
Common.DataType.FLOAT64, device)

Expand All @@ -656,6 +763,81 @@ object OneDAL {
coalescedTables
}

def coalesceVectorsToFloatHomogenTables(data: RDD[Vector], executorNum: Int,
device: Common.ComputeDevice): RDD[Long] = {
logger.info(s"coalesceVectorsToFloatHomogenTables")
logger.info(s"Processing partitions with $executorNum executors")
val numberCores: Int = data.sparkContext.getConf.getInt("spark.executor.cores", 1)

// Repartition to executorNum if not enough partitions
val dataForConversion = if (data.getNumPartitions < executorNum) {
logger.info(s"Repartition to executorNum if not enough partitions")
val reData = data.repartition(executorNum).setName("RepartitionedRDD")
reData.cache().count()
reData
} else {
data
}

// Get dimensions for each partition
val partitionDims = Utils.getPartitionDims(dataForConversion)

// Filter out empty partitions, if there is no such rdd, coalesce will report an error
// "No partitions or no locations for partitions found".
// TODO: ML-312: Improve ExecutorInProcessCoalescePartitioner
val nonEmptyPartitions = dataForConversion.mapPartitionsWithIndex {
(index: Int, it: Iterator[Vector]) => Iterator(Tuple3(partitionDims(index)._1, index, it))
}.filter {
_._1 > 0
}.flatMap {
entry => entry._3
}

val coalescedRdd = nonEmptyPartitions.coalesce(executorNum,
partitionCoalescer = Some(new ExecutorInProcessCoalescePartitioner()))
.setName("coalescedRdd")

// convert RDD to HomogenTable
val coalescedTables = coalescedRdd.mapPartitionsWithIndex { (index: Int, it: Iterator[Vector]) =>
val list = it.toList
val subRowCount: Int = list.size / numberCores
val futureList: ListBuffer[Future[Long]] = new ListBuffer[Future[Long]]()
val numRows = list.size
val numCols = list(0).toArray.size
val size = numRows.toLong * numCols.toLong
val targetArrayAddress = OneDAL.cNewFloatArray(size)
for ( i <- 0 until numberCores) {
val f = Future {
val iter = list.iterator
val slice = if (i == numberCores - 1) {
iter.slice(subRowCount * i, numRows)
} else {
iter.slice(subRowCount * i, subRowCount * i + subRowCount)
}
slice.toArray.zipWithIndex.map { case (vector, index) =>
val length = vector.toArray.length
OneDAL.cCopyDoubleArrayToFloatNative(targetArrayAddress, vector.toArray, subRowCount.toLong * numCols * i + length * index)
}
targetArrayAddress
}
futureList += f
}
val result = Future.sequence(futureList)
Await.result(result, Duration.Inf)

val table = new HomogenTable(numRows.toLong, numCols.toLong, targetArrayAddress,
Common.DataType.FLOAT32, device)

Iterator(table.getcObejct())
}.setName("coalescedTables").cache()
coalescedTables.count()
// Unpersist instances RDD
if (data.getStorageLevel != StorageLevel.NONE) {
data.unpersist()
}
coalescedTables
}

def makeNumericTable(arrayVectors: Array[Vector]): NumericTable = {

val numCols = arrayVectors.head.size
Expand Down Expand Up @@ -758,4 +940,10 @@ object OneDAL {
@native def cCopyDoubleArrayToNative(arrayAddr: Long,
data: Array[Double],
index: Long): Unit

@native def cNewFloatArray(size: Long): Long

@native def cCopyDoubleArrayToFloatNative(arrayAddr: Long,
data: Array[Double],
index: Long): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class KMeansDALImpl(var nClusters: Int,
kmeansTimer.record("Preprocessing")

val coalescedTables = if (useDevice == "GPU") {
OneDAL.coalesceVectorsToHomogenTables(data, executorNum, computeDevice)
OneDAL.coalesceVectorsToFloatHomogenTables(data, executorNum, computeDevice)
} else {
OneDAL.coalesceVectorsToNumericTables(data, executorNum)
}
Expand All @@ -70,7 +70,7 @@ class KMeansDALImpl(var nClusters: Int,

val tableArr = table.next()
val initCentroids = if (useDevice == "GPU") {
OneDAL.makeHomogenTable(centers, computeDevice).getcObejct()
OneDAL.makeFloatHomogenTable(centers, computeDevice).getcObejct()
} else {
OneDAL.makeNumericTable(centers).getCNumericTable
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class PCADALImpl(val k: Int,
pcaTimer.record("Preprocessing")

val coalescedTables = if (useDevice == "GPU") {
OneDAL.coalesceVectorsToHomogenTables(normalizedData, executorNum,
OneDAL.coalesceVectorsToFloatHomogenTables(normalizedData, executorNum,
computeDevice)
} else {
OneDAL.coalesceVectorsToNumericTables(normalizedData, executorNum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class LinearRegressionDALImpl( val fitIntercept: Boolean,

val labeledPointsTables = if (useDevice == "GPU") {
if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) {
OneDAL.coalesceLabelPointsToHomogenTables(labeledPoints,
OneDAL.coalesceLabelPointsToFloatHomogenTables(labeledPoints,
labelCol, featuresCol, executorNum, computeDevice)
} else {
val msg = s"OAP MLlib: Sparse table is not supported for GPU now."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class RandomForestRegressorDALImpl(val uid: String,

val labeledPointsTables = if (useDevice == "GPU") {
if (OneDAL.isDenseDataset(labeledPoints, featuresCol)) {
OneDAL.coalesceLabelPointsToHomogenTables(labeledPoints,
OneDAL.coalesceLabelPointsToFloatHomogenTables(labeledPoints,
labelCol, featuresCol, executorNum, computeDevice)
} else {
throw new Exception("Oneapi didn't implement sparse dataset")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class CorrelationDALImpl(
corTimer.record("Preprocessing")

val coalescedTables = if (useDevice == "GPU") {
OneDAL.coalesceVectorsToHomogenTables(data, executorNum,
OneDAL.coalesceVectorsToFloatHomogenTables(data, executorNum,
computeDevice)
} else {
OneDAL.coalesceVectorsToNumericTables(data, executorNum)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SummarizerDALImpl(val executorNum: Int,
sumTimer.record("Preprocessing")

val coalescedTables = if (useDevice == "GPU") {
OneDAL.coalesceVectorsToHomogenTables(data, executorNum,
OneDAL.coalesceVectorsToFloatHomogenTables(data, executorNum,
computeDevice)
} else {
OneDAL.coalesceVectorsToNumericTables(data, executorNum)
Expand Down