Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML-178] Fix error when converting buffer to CSRNumericTable #181

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 85 additions & 12 deletions mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@ import org.apache.spark.rdd.{ExecutorInProcessCoalescePartitioner, RDD}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import java.lang
import java.nio.DoubleBuffer
import java.nio.{ByteBuffer, ByteOrder, DoubleBuffer}
import java.util.logging.{Level, Logger}

import com.intel.oap.mllib.recommendation.ALSPartitionInfo

import scala.collection.mutable.ArrayBuffer

object OneDAL {

// Rating struct size is size of Long+Long+Float
val RATING_SIZE = 8 + 8 + 4

LibLoader.loadLibraries()

private val logger = Logger.getLogger("util.OneDAL")
Expand Down Expand Up @@ -236,8 +241,8 @@ object OneDAL {
matrixLabel
}

private def vectorsToSparseNumericTable(vectors: Array[Vector],
nFeatures: Long): CSRNumericTable = {
def vectorsToSparseNumericTable(vectors: Array[Vector],
nFeatures: Long): CSRNumericTable = {
require(vectors(0).isInstanceOf[SparseVector], "vectors should be sparse")

println(s"Features row x column: ${vectors.length} x ${vectors(0).size}")
Expand All @@ -250,10 +255,10 @@ object OneDAL {
val columnIndices = Array.fill(ratingsNum) {
0L
}
// First row index is 1
val rowOffsets = ArrayBuffer[Long](1L)

var indexValues = 0
var curRow = 0L

// Converted to one CSRNumericTable
for (row <- 0 until vectors.length) {
Expand All @@ -263,20 +268,22 @@ object OneDAL {
// one-based indexValues
columnIndices(indexValues) = column + 1

if (row > curRow) {
curRow = row
// one-based indexValues
rowOffsets += indexValues + 1
}

indexValues = indexValues + 1
}
// one-based row indexValues
rowOffsets += indexValues + 1
}
// one-based row indexValues
rowOffsets += indexValues + 1

val contextLocal = new DaalContext()

// check CSR encoding
assert(values.length == ratingsNum,
"the length of values should be equal to the number of non-zero elements")
assert(columnIndices.length == ratingsNum,
"the length of columnIndices should be equal to the number of non-zero elements")
assert(rowOffsets.size == (csrRowNum + 1),
"the size of rowOffsets should be equal to the number of rows + 1")

val cTable = OneDAL.cNewCSRNumericTableDouble(values, columnIndices, rowOffsets.toArray,
nFeatures, csrRowNum)
val table = new CSRNumericTable(contextLocal, cTable)
Expand Down Expand Up @@ -324,6 +331,72 @@ object OneDAL {
tables
}

private def bufferToCSRNumericTable(buffer: ByteBuffer, info: ALSPartitionInfo,
nVectors: Int, nFeatures: Int,
nBlocks: Int, rankId: Int): CSRNumericTable = {

// Use little endian
buffer.order(ByteOrder.LITTLE_ENDIAN)

val ratingsNum = info.ratingsNum
val csrRowNum = info.csrRowNum
val values = Array.fill(ratingsNum) {
0.0f
}
val columnIndices = Array.fill(ratingsNum) {
0L
}
val rowOffsets = ArrayBuffer[Long](1L)

var index = 0
var curRow = 0L
// Each partition converted to one CSRNumericTable
for (i <- 0 until ratingsNum) {
// Modify row index for each partition (start from 0)
val row = buffer.getLong(i * RATING_SIZE) - getPartitionOffset(rankId, nFeatures, nBlocks)
val column = buffer.getLong(i * RATING_SIZE + 8)
val rating = buffer.getFloat(i * RATING_SIZE + 16)

values(index) = rating
// one-based index
columnIndices(index) = column + 1

if (row > curRow) {
// multiple rows without non-zero elements
for (i <- 0 until (row-curRow).toInt) {
// one-based indexValues
rowOffsets += index + 1
}
curRow = row
}

index = index + 1
}
// one-based row index
rowOffsets += index + 1

// check CSR encoding
assert(values.length == ratingsNum,
"the length of values should be equal to the number of non-zero elements")
assert(columnIndices.length == ratingsNum,
"the length of columnIndices should be equal to the number of non-zero elements")
assert(rowOffsets.size == (csrRowNum + 1),
"the size of rowOffsets should be equal to the number of rows + 1")

val contextLocal = new DaalContext()
val cTable = OneDAL.cNewCSRNumericTableFloat(values, columnIndices, rowOffsets.toArray,
nVectors, csrRowNum)
val table = new CSRNumericTable(contextLocal, cTable)

table
}

private def getPartitionOffset(partitionId: Int, nRatings: Int, nBlocks: Int): Int = {
require(partitionId >= 0 && partitionId < nBlocks)
val itemsInBlock = nRatings / nBlocks
partitionId * itemsInBlock
}

def rddLabeledPointToMergedTables(labeledPoints: Dataset[_],
labelCol: String,
featuresCol: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]],
OneCCL.init(executorNum, rank, kvsIPPort)
val rankId = OneCCL.rankID()

println("rankId", rankId, "nUsers", nVectors, "nItems", nFeatures)
println("rankId", rankId, "nVectors", nVectors, "nFeatures", nFeatures)

val buffer = ratingsToByteBuffer(iter.toArray)
val bufferInfo = new ALSPartitionInfo
Expand Down Expand Up @@ -169,8 +169,9 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]],
}

private def bufferToCSRNumericTable(buffer: ByteBuffer, info: ALSPartitionInfo,
nVectors: Int, nFeatures: Int,
nBlocks: Int, rankId: Int): CSRNumericTable = {
nVectors: Int, nFeatures: Int,
nBlocks: Int, rankId: Int): CSRNumericTable = {

// Use little endian
buffer.order(ByteOrder.LITTLE_ENDIAN)

Expand Down Expand Up @@ -198,16 +199,27 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]],
columnIndices(index) = column + 1

if (row > curRow) {
// multiple rows without non-zero elements
for (i <- 0 until (row-curRow).toInt) {
// one-based indexValues
rowOffsets += index + 1
}
curRow = row
// one-based index
rowOffsets += index + 1
}

index = index + 1
}
// one-based row index
rowOffsets += index + 1

// check CSR encoding
assert(values.length == ratingsNum,
"the length of values should be equal to the number of non-zero elements")
assert(columnIndices.length == ratingsNum,
"the length of columnIndices should be equal to the number of non-zero elements")
assert(rowOffsets.size == (csrRowNum + 1),
"the size of rowOffsets should be equal to the number of rows + 1")

val contextLocal = new DaalContext()
val cTable = OneDAL.cNewCSRNumericTableFloat(values, columnIndices, rowOffsets.toArray,
nVectors, csrRowNum)
Expand Down
57 changes: 57 additions & 0 deletions mllib-dal/src/test/scala/org/apache/spark/ml/oneDALSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ml

import com.intel.oap.mllib.OneDAL
import org.apache.spark.internal.Logging
import org.apache.spark.ml.linalg.{Matrices, Vector, Vectors}
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.sql.Row

class oneDALSuite extends FunctionsSuite with Logging {

import testImplicits._

test("test sparse vector to CSRNumericTable") {
val data = Seq(
Vectors.sparse(3, Seq((0, 1.0), (1, 2.0), (2, 3.0))),
Vectors.sparse(3, Seq((0, 10.0), (1, 20.0), (2, 30.0))),
Vectors.sparse(3, Seq.empty),
Vectors.sparse(3, Seq.empty),
Vectors.sparse(3, Seq((0, 1.0), (1, 2.0))),
Vectors.sparse(3, Seq((0, 10.0), (2, 20.0))),
)
val df = data.map(Tuple1.apply).toDF("features")
df.show()
val rowsRDD = df.rdd.map {
case Row(features: Vector) => features
}
val results = rowsRDD.coalesce(1).mapPartitions { it: Iterator[Vector] =>
val vectors: Array[Vector] = it.toArray
val numColumns = vectors(0).size
val CSRNumericTable = {
OneDAL.vectorsToSparseNumericTable(vectors, numColumns)
}
Iterator(CSRNumericTable.getCNumericTable)
}.collect()
val csr = OneDAL.makeNumericTable(results(0))
val resultMatrix = OneDAL.numericTableToMatrix(csr)
val matrix = Matrices.fromVectors(data)

assert((resultMatrix.toArray sameElements matrix.toArray) === true)
}
}