diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala index 0558e0092..b56b4ca52 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/OneDAL.scala @@ -25,13 +25,18 @@ import org.apache.spark.rdd.{ExecutorInProcessCoalescePartitioner, RDD} import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.storage.StorageLevel import java.lang -import java.nio.DoubleBuffer +import java.nio.{ByteBuffer, ByteOrder, DoubleBuffer} import java.util.logging.{Level, Logger} +import com.intel.oap.mllib.recommendation.ALSPartitionInfo + import scala.collection.mutable.ArrayBuffer object OneDAL { + // Rating struct size is size of Long+Long+Float + val RATING_SIZE = 8 + 8 + 4 + LibLoader.loadLibraries() private val logger = Logger.getLogger("util.OneDAL") @@ -236,8 +241,8 @@ object OneDAL { matrixLabel } - private def vectorsToSparseNumericTable(vectors: Array[Vector], - nFeatures: Long): CSRNumericTable = { + def vectorsToSparseNumericTable(vectors: Array[Vector], + nFeatures: Long): CSRNumericTable = { require(vectors(0).isInstanceOf[SparseVector], "vectors should be sparse") println(s"Features row x column: ${vectors.length} x ${vectors(0).size}") @@ -250,10 +255,10 @@ object OneDAL { val columnIndices = Array.fill(ratingsNum) { 0L } + // First row index is 1 val rowOffsets = ArrayBuffer[Long](1L) var indexValues = 0 - var curRow = 0L // Converted to one CSRNumericTable for (row <- 0 until vectors.length) { @@ -263,20 +268,22 @@ object OneDAL { // one-based indexValues columnIndices(indexValues) = column + 1 - if (row > curRow) { - curRow = row - // one-based indexValues - rowOffsets += indexValues + 1 - } - indexValues = indexValues + 1 } + // one-based row indexValues + rowOffsets += indexValues + 1 } - // one-based row indexValues - rowOffsets += indexValues + 1 val contextLocal = new DaalContext() + // check CSR encoding + assert(values.length == ratingsNum, + "the length of values should be equal to the number of non-zero elements") + assert(columnIndices.length == ratingsNum, + "the length of columnIndices should be equal to the number of non-zero elements") + assert(rowOffsets.size == (csrRowNum + 1), + "the size of rowOffsets should be equal to the number of rows + 1") + val cTable = OneDAL.cNewCSRNumericTableDouble(values, columnIndices, rowOffsets.toArray, nFeatures, csrRowNum) val table = new CSRNumericTable(contextLocal, cTable) @@ -324,6 +331,72 @@ object OneDAL { tables } + private def bufferToCSRNumericTable(buffer: ByteBuffer, info: ALSPartitionInfo, + nVectors: Int, nFeatures: Int, + nBlocks: Int, rankId: Int): CSRNumericTable = { + + // Use little endian + buffer.order(ByteOrder.LITTLE_ENDIAN) + + val ratingsNum = info.ratingsNum + val csrRowNum = info.csrRowNum + val values = Array.fill(ratingsNum) { + 0.0f + } + val columnIndices = Array.fill(ratingsNum) { + 0L + } + val rowOffsets = ArrayBuffer[Long](1L) + + var index = 0 + var curRow = 0L + // Each partition converted to one CSRNumericTable + for (i <- 0 until ratingsNum) { + // Modify row index for each partition (start from 0) + val row = buffer.getLong(i * RATING_SIZE) - getPartitionOffset(rankId, nFeatures, nBlocks) + val column = buffer.getLong(i * RATING_SIZE + 8) + val rating = buffer.getFloat(i * RATING_SIZE + 16) + + values(index) = rating + // one-based index + columnIndices(index) = column + 1 + + if (row > curRow) { + // multiple rows without non-zero elements + for (i <- 0 until (row-curRow).toInt) { + // one-based indexValues + rowOffsets += index + 1 + } + curRow = row + } + + index = index + 1 + } + // one-based row index + rowOffsets += index + 1 + + // check CSR encoding + assert(values.length == ratingsNum, + "the length of values should be equal to the number of non-zero elements") + assert(columnIndices.length == ratingsNum, + "the length of columnIndices should be equal to the number of non-zero elements") + assert(rowOffsets.size == (csrRowNum + 1), + "the size of rowOffsets should be equal to the number of rows + 1") + + val contextLocal = new DaalContext() + val cTable = OneDAL.cNewCSRNumericTableFloat(values, columnIndices, rowOffsets.toArray, + nVectors, csrRowNum) + val table = new CSRNumericTable(contextLocal, cTable) + + table + } + + private def getPartitionOffset(partitionId: Int, nRatings: Int, nBlocks: Int): Int = { + require(partitionId >= 0 && partitionId < nBlocks) + val itemsInBlock = nRatings / nBlocks + partitionId * itemsInBlock + } + def rddLabeledPointToMergedTables(labeledPoints: Dataset[_], labelCol: String, featuresCol: String, diff --git a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala index d920db537..d083fb350 100644 --- a/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala +++ b/mllib-dal/src/main/scala/com/intel/oap/mllib/recommendation/ALSDALImpl.scala @@ -87,7 +87,7 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], OneCCL.init(executorNum, rank, kvsIPPort) val rankId = OneCCL.rankID() - println("rankId", rankId, "nUsers", nVectors, "nItems", nFeatures) + println("rankId", rankId, "nVectors", nVectors, "nFeatures", nFeatures) val buffer = ratingsToByteBuffer(iter.toArray) val bufferInfo = new ALSPartitionInfo @@ -169,8 +169,9 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], } private def bufferToCSRNumericTable(buffer: ByteBuffer, info: ALSPartitionInfo, - nVectors: Int, nFeatures: Int, - nBlocks: Int, rankId: Int): CSRNumericTable = { + nVectors: Int, nFeatures: Int, + nBlocks: Int, rankId: Int): CSRNumericTable = { + // Use little endian buffer.order(ByteOrder.LITTLE_ENDIAN) @@ -198,9 +199,12 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], columnIndices(index) = column + 1 if (row > curRow) { + // multiple rows without non-zero elements + for (i <- 0 until (row-curRow).toInt) { + // one-based indexValues + rowOffsets += index + 1 + } curRow = row - // one-based index - rowOffsets += index + 1 } index = index + 1 @@ -208,6 +212,14 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], // one-based row index rowOffsets += index + 1 + // check CSR encoding + assert(values.length == ratingsNum, + "the length of values should be equal to the number of non-zero elements") + assert(columnIndices.length == ratingsNum, + "the length of columnIndices should be equal to the number of non-zero elements") + assert(rowOffsets.size == (csrRowNum + 1), + "the size of rowOffsets should be equal to the number of rows + 1") + val contextLocal = new DaalContext() val cTable = OneDAL.cNewCSRNumericTableFloat(values, columnIndices, rowOffsets.toArray, nVectors, csrRowNum) diff --git a/mllib-dal/src/test/scala/org/apache/spark/ml/oneDALSuite.scala b/mllib-dal/src/test/scala/org/apache/spark/ml/oneDALSuite.scala new file mode 100644 index 000000000..65b3acb32 --- /dev/null +++ b/mllib-dal/src/test/scala/org/apache/spark/ml/oneDALSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml + +import com.intel.oap.mllib.OneDAL +import org.apache.spark.internal.Logging +import org.apache.spark.ml.linalg.{Matrices, Vector, Vectors} +import org.apache.spark.ml.recommendation.ALS.Rating +import org.apache.spark.sql.Row + +class oneDALSuite extends FunctionsSuite with Logging { + + import testImplicits._ + + test("test sparse vector to CSRNumericTable") { + val data = Seq( + Vectors.sparse(3, Seq((0, 1.0), (1, 2.0), (2, 3.0))), + Vectors.sparse(3, Seq((0, 10.0), (1, 20.0), (2, 30.0))), + Vectors.sparse(3, Seq.empty), + Vectors.sparse(3, Seq.empty), + Vectors.sparse(3, Seq((0, 1.0), (1, 2.0))), + Vectors.sparse(3, Seq((0, 10.0), (2, 20.0))), + ) + val df = data.map(Tuple1.apply).toDF("features") + df.show() + val rowsRDD = df.rdd.map { + case Row(features: Vector) => features + } + val results = rowsRDD.coalesce(1).mapPartitions { it: Iterator[Vector] => + val vectors: Array[Vector] = it.toArray + val numColumns = vectors(0).size + val CSRNumericTable = { + OneDAL.vectorsToSparseNumericTable(vectors, numColumns) + } + Iterator(CSRNumericTable.getCNumericTable) + }.collect() + val csr = OneDAL.makeNumericTable(results(0)) + val resultMatrix = OneDAL.numericTableToMatrix(csr) + val matrix = Matrices.fromVectors(data) + + assert((resultMatrix.toArray sameElements matrix.toArray) === true) + } +}