Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
minmingzhu committed Oct 15, 2024
1 parent 90d015d commit 9df5c66
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ public class ColumnAccessor {
private long cObject;
private Common.ComputeDevice cDevice;

public ColumnAccessor(long cObject) {
this.cObject = cObject;
this.cDevice = Common.ComputeDevice.HOST;
}

public ColumnAccessor(long cObject, Common.ComputeDevice device) {
this.cObject = cObject;
this.cDevice = device;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@
public class RowAccessor {
private long cObject;
private Common.ComputeDevice cDevice;
public RowAccessor(long cObject) {
this.cObject = cObject;
this.cDevice = Common.ComputeDevice.HOST;
}

public RowAccessor(long cObject, Common.ComputeDevice device) {
this.cObject = cObject;
this.cDevice = device;
Expand Down
45 changes: 27 additions & 18 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ object OneDAL {
matrix
}

def homogenTableToMatrix(table: HomogenTable): Matrix = {
def homogenTableToMatrix(table: HomogenTable,
device: Common.ComputeDevice = Common.ComputeDevice.HOST): Matrix = {
val numRows = table.getRowCount.toInt
val numCols = table.getColumnCount.toInt

val accessor = new RowAccessor(table.getcObejct())
val accessor = new RowAccessor(table.getcObejct(), device)
val arrayDouble: Array[Double] = accessor.pullDouble(0, numRows)

// Transpose as DAL numeric table is row-major and DenseMatrix is column major
Expand All @@ -82,11 +83,13 @@ object OneDAL {
matrix
}

def homogenTableToOldMatrix(table: HomogenTable): OldMatrix = {
def homogenTableToOldMatrix(table: HomogenTable,
device: Common.ComputeDevice = Common.ComputeDevice.HOST)
: OldMatrix = {
val numRows = table.getRowCount.toInt
val numCols = table.getColumnCount.toInt

val accessor = new RowAccessor(table.getcObejct())
val accessor = new RowAccessor(table.getcObejct(), device)
val arrayDouble: Array[Double] = accessor.pullDouble(0, numRows)

// Transpose as DAL numeric table is row-major and DenseMatrix is column major
Expand Down Expand Up @@ -115,8 +118,9 @@ object OneDAL {
Vectors.dense(arrayDouble)
}

def homogenTableNx1ToVector(cTable: Long): Vector = {
val columnAcc = new ColumnAccessor(cTable)
def homogenTableNx1ToVector(cTable: Long,
device: Common.ComputeDevice = Common.ComputeDevice.HOST): Vector = {
val columnAcc = new ColumnAccessor(cTable, device)
val arrayDouble = columnAcc.pullDouble(0)
Vectors.dense(arrayDouble)
}
Expand All @@ -135,8 +139,9 @@ object OneDAL {
Vectors.dense(arrayDouble)
}

def homogenTable1xNToVector(table: HomogenTable): Vector = {
val rowAcc = new RowAccessor(table.getcObejct)
def homogenTable1xNToVector(table: HomogenTable,
device: Common.ComputeDevice = Common.ComputeDevice.HOST): Vector = {
val rowAcc = new RowAccessor(table.getcObejct, device)
val arrayDouble = rowAcc.pullDouble(0, 1)
Vectors.dense(arrayDouble)
}
Expand All @@ -159,10 +164,12 @@ object OneDAL {
resArray
}

def homogenTableToVectors(table: HomogenTable): Array[Vector] = {
def homogenTableToVectors(table: HomogenTable,
device: Common.ComputeDevice = Common.ComputeDevice.HOST)
: Array[Vector] = {
val numRows = table.getRowCount.toInt

val rowAcc = new RowAccessor(table.getcObejct())
val rowAcc = new RowAccessor(table.getcObejct(), device)

val resArray = new Array[Vector](numRows.toInt)

Expand Down Expand Up @@ -254,7 +261,7 @@ object OneDAL {
}

def makeHomogenTable(arrayVectors: Array[Vector],
device: Common.ComputeDevice): HomogenTable = {
device: Common.ComputeDevice = Common.ComputeDevice.HOST): HomogenTable = {
val numCols = arrayVectors.head.size
val numRows: Int = arrayVectors.size
val arrayDouble = new Array[Double](numRows * numCols)
Expand All @@ -274,7 +281,7 @@ object OneDAL {
}

def makeHomogenTable(arrayVectors: Array[OldVector],
device: Common.ComputeDevice): HomogenTable = {
device: Common.ComputeDevice = Common.ComputeDevice.HOST): HomogenTable = {
val numCols = arrayVectors.head.size
val numRows: Int = arrayVectors.size
val arrayDouble = new Array[Double](numRows * numCols)
Expand All @@ -294,16 +301,16 @@ object OneDAL {

private[mllib] def doubleArrayToHomogenTable(
points: Array[Double],
device: Common.ComputeDevice): HomogenTable = {
val table = new HomogenTable(points.length,1, points, device)
device: Common.ComputeDevice = Common.ComputeDevice.HOST): HomogenTable = {
val table = new HomogenTable(points.length, 1, points, device)
table
}

private def vectorsToDenseHomogenTable(
it: Iterator[Vector],
numRows: Int,
numCols: Int,
device: Common.ComputeDevice): HomogenTable = {
device: Common.ComputeDevice = Common.ComputeDevice.HOST): HomogenTable = {
printf(s"vectorsToDenseHomogenTable numRows: $numRows numCols: $numCols \n")
val arrayDouble = new Array[Double](numRows * numCols)
var index = 0
Expand Down Expand Up @@ -403,7 +410,8 @@ object OneDAL {
labelCol: String,
featuresCol: String,
executorNum: Int,
device: Common.ComputeDevice): RDD[(Tuple3[Long, Long, Long], Tuple3[Long, Long, Long])] = {
device: Common.ComputeDevice = Common.ComputeDevice.HOST)
: RDD[(Tuple3[Long, Long, Long], Tuple3[Long, Long, Long])] = {
require(executorNum > 0)

logger.info(s"Processing partitions with $executorNum executors")
Expand Down Expand Up @@ -586,9 +594,10 @@ object OneDAL {
* Return a new RDD containing targetArrayAddress, numRows, numCols in this RDD.
*/
def coalesceVectorsToHomogenTables(data: RDD[Vector], executorNum: Int,
device: Common.ComputeDevice): RDD[Tuple3[Long, Long, Long]] = {
device: Common.ComputeDevice = Common.ComputeDevice.HOST)
: RDD[Tuple3[Long, Long, Long]] = {
logger.info(s"Processing partitions with $executorNum executors")
val numberCores: Int = data.sparkContext.getConf.getInt("spark.executor.cores", 1)
val numberCores: Int = data.sparkContext.getConf.getInt("spark.executor.cores", 1)

// Repartition to executorNum if not enough partitions
val dataForConversion = if (data.getNumPartitions < executorNum) {
Expand Down

0 comments on commit 9df5c66

Please sign in to comment.