Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,146 +18,111 @@
package org.apache.spark.mllib.linalg.distributed

import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.util.Utils

import org.apache.spark.{Logging, Partitioner}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.linalg.{DenseMatrix, Matrix}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

/**
* A grid partitioner, which stores every block in a separate partition.
* A grid partitioner, which uses a regular grid to partition coordinates.
*
* @param numRowBlocks Number of blocks that form the rows of the matrix.
* @param numColBlocks Number of blocks that form the columns of the matrix.
* @param suggestedNumPartitions Number of partitions to partition the rdd into. The final number
* of partitions will be set to `min(suggestedNumPartitions,
* numRowBlocks * numColBlocks)`, because setting the number of
* partitions greater than the number of sub matrices is not useful.
* @param rows Number of rows.
* @param cols Number of columns.
* @param rowsPerPart Number of rows per partition, which may be less at the bottom edge.
* @param colsPerPart Number of columns per partition, which may be less at the right edge.
*/
private[mllib] class GridPartitioner(
val numRowBlocks: Int,
val numColBlocks: Int,
suggestedNumPartitions: Int) extends Partitioner {
private val totalBlocks = numRowBlocks.toLong * numColBlocks
// Having the number of partitions greater than the number of sub matrices does not help
override val numPartitions = math.min(suggestedNumPartitions, totalBlocks).toInt

private val blockLengthsPerPartition = findOptimalBlockLengths
// Number of neighboring blocks to take in each row
private val numRowBlocksPerPartition = blockLengthsPerPartition._1
// Number of neighboring blocks to take in each column
private val numColBlocksPerPartition = blockLengthsPerPartition._2
// Number of rows of partitions
private val blocksPerRow = math.ceil(numRowBlocks * 1.0 / numRowBlocksPerPartition).toInt
val rows: Int,
val cols: Int,
val rowsPerPart: Int,
val colsPerPart: Int) extends Partitioner {

require(rows > 0)
require(cols > 0)
require(rowsPerPart > 0)
require(colsPerPart > 0)

private val rowPartitions = math.ceil(rows / rowsPerPart).toInt
private val colPartitions = math.ceil(cols / colsPerPart).toInt

override val numPartitions = rowPartitions * colPartitions

/**
* Returns the index of the partition the SubMatrix belongs to. Tries to achieve block wise
* partitioning.
* Returns the index of the partition the input coordinate belongs to.
*
* @param key The key for the SubMatrix. Can be its position in the grid (its column major index)
* or a tuple of three integers that are the final row index after the multiplication,
* the index of the block to multiply with, and the final column index after the
* multiplication.
* @return The index of the partition, which the SubMatrix belongs to.
* @param key The coordinate (i, j) or a tuple (i, j, k), where k is the inner index used in
* multiplication. k is ignored in computing partitions.
* @return The index of the partition, which the coordinate belongs to.
*/
override def getPartition(key: Any): Int = {
key match {
case (blockRowIndex: Int, blockColIndex: Int) =>
getPartitionId(blockRowIndex, blockColIndex)
case (blockRowIndex: Int, innerIndex: Int, blockColIndex: Int) =>
getPartitionId(blockRowIndex, blockColIndex)
case (i: Int, j: Int) =>
getPartitionId(i, j)
case (i: Int, j: Int, _: Int) =>
getPartitionId(i, j)
case _ =>
throw new IllegalArgumentException(s"Unrecognized key. key: $key")
throw new IllegalArgumentException(s"Unrecognized key: $key.")
}
}

/** Partitions sub-matrices as blocks with neighboring sub-matrices. */
private def getPartitionId(blockRowIndex: Int, blockColIndex: Int): Int = {
require(0 <= blockRowIndex && blockRowIndex < numRowBlocks, "The blockRowIndex in the key " +
s"must be in the range 0 <= blockRowIndex < numRowBlocks. blockRowIndex: $blockRowIndex," +
s"numRowBlocks: $numRowBlocks")
require(0 <= blockRowIndex && blockColIndex < numColBlocks, "The blockColIndex in the key " +
s"must be in the range 0 <= blockRowIndex < numColBlocks. blockColIndex: $blockColIndex, " +
s"numColBlocks: $numColBlocks")
// Coordinates of the block
val i = blockRowIndex / numRowBlocksPerPartition
val j = blockColIndex / numColBlocksPerPartition
// The mod shouldn't be required but is added as a guarantee for possible corner cases
Utils.nonNegativeMod(j * blocksPerRow + i, numPartitions)
private def getPartitionId(i: Int, j: Int): Int = {
require(0 <= i && i < rows, s"Row index $i out of range [0, $rows).")
require(0 <= j && j < cols, s"Column index $j out of range [0, $cols).")
i / rowsPerPart + j / colsPerPart * rowPartitions
}

/** Tries to calculate the optimal number of blocks that should be in each partition. */
private def findOptimalBlockLengths: (Int, Int) = {
// Gives the optimal number of blocks that need to be in each partition
val targetNumBlocksPerPartition = math.ceil(totalBlocks * 1.0 / numPartitions).toInt
// Number of neighboring blocks to take in each row
var m = math.ceil(math.sqrt(targetNumBlocksPerPartition)).toInt
// Number of neighboring blocks to take in each column
var n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt
// Try to make m and n close to each other while making sure that we don't exceed the number
// of partitions
var numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m)
var numBlocksForCols = math.ceil(numColBlocks * 1.0 / n)
while ((numBlocksForRows * numBlocksForCols > numPartitions) && (m * n != 0)) {
if (numRowBlocks <= numColBlocks) {
m += 1
n = math.ceil(targetNumBlocksPerPartition * 1.0 / m).toInt
} else {
n += 1
m = math.ceil(targetNumBlocksPerPartition * 1.0 / n).toInt
}
numBlocksForRows = math.ceil(numRowBlocks * 1.0 / m)
numBlocksForCols = math.ceil(numColBlocks * 1.0 / n)
}
// If a good partitioning scheme couldn't be found, set the side with the smaller dimension to
// 1 and the other to the number of targetNumBlocksPerPartition
if (m * n == 0) {
if (numRowBlocks <= numColBlocks) {
m = 1
n = targetNumBlocksPerPartition
} else {
n = 1
m = targetNumBlocksPerPartition
}
}
(m, n)
}

/** Checks whether the partitioners have the same characteristics */
override def equals(obj: Any): Boolean = {
obj match {
case r: GridPartitioner =>
(this.numRowBlocks == r.numRowBlocks) && (this.numColBlocks == r.numColBlocks) &&
(this.numPartitions == r.numPartitions)
(this.rows == r.rows) && (this.cols == r.cols) &&
(this.rowsPerPart == r.rowsPerPart) && (this.colsPerPart == r.colsPerPart)
case _ =>
false
}
}
}

private[mllib] object GridPartitioner {

/** Creates a new [[GridPartitioner]] instance. */
def apply(rows: Int, cols: Int, rowsPerPart: Int, colsPerPart: Int): GridPartitioner = {
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}

/** Creates a new [[GridPartitioner]] instance with the input suggested number of partitions. */
def apply(rows: Int, cols: Int, suggestedNumPartitions: Int): GridPartitioner = {
require(suggestedNumPartitions > 0)
val scale = 1.0 / math.sqrt(suggestedNumPartitions)
val rowsPerPart = math.round(math.max(scale * rows, 1.0)).toInt
val colsPerPart = math.round(math.max(scale * cols, 1.0)).toInt
new GridPartitioner(rows, cols, rowsPerPart, colsPerPart)
}
}

/**
* Represents a distributed matrix in blocks of local matrices.
*
* @param rdd The RDD of SubMatrices (local matrices) that form this matrix
* @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
* the number of rows will be calculated when `numRows` is invoked.
* @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
* zero, the number of columns will be calculated when `numCols` is invoked.
* @param blocks The RDD of sub-matrix blocks (blockRowIndex, blockColIndex, sub-matrix) that form
* this distributed matrix.
* @param rowsPerBlock Number of rows that make up each block. The blocks forming the final
* rows are not required to have the given number of rows
* @param colsPerBlock Number of columns that make up each block. The blocks forming the final
* columns are not required to have the given number of columns
* @param nRows Number of rows of this matrix. If the supplied value is less than or equal to zero,
* the number of rows will be calculated when `numRows` is invoked.
* @param nCols Number of columns of this matrix. If the supplied value is less than or equal to
* zero, the number of columns will be calculated when `numCols` is invoked.
*/
class BlockMatrix(
val rdd: RDD[((Int, Int), Matrix)],
private var nRows: Long,
private var nCols: Long,
val blocks: RDD[((Int, Int), Matrix)],
val rowsPerBlock: Int,
val colsPerBlock: Int) extends DistributedMatrix with Logging {
val colsPerBlock: Int,
private var nRows: Long,
private var nCols: Long) extends DistributedMatrix with Logging {

private type SubMatrix = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), matrix)
private type MatrixBlock = ((Int, Int), Matrix) // ((blockRowIndex, blockColIndex), sub-matrix)

/**
* Alternate constructor for BlockMatrix without the input of the number of rows and columns.
Expand All @@ -172,45 +137,48 @@ class BlockMatrix(
rdd: RDD[((Int, Int), Matrix)],
rowsPerBlock: Int,
colsPerBlock: Int) = {
this(rdd, 0L, 0L, rowsPerBlock, colsPerBlock)
this(rdd, rowsPerBlock, colsPerBlock, 0L, 0L)
}

private lazy val dims: (Long, Long) = getDim

override def numRows(): Long = {
if (nRows <= 0L) nRows = dims._1
if (nRows <= 0L) estimateDim()
nRows
}

override def numCols(): Long = {
if (nCols <= 0L) nCols = dims._2
if (nCols <= 0L) estimateDim()
nCols
}

val numRowBlocks = math.ceil(numRows() * 1.0 / rowsPerBlock).toInt
val numColBlocks = math.ceil(numCols() * 1.0 / colsPerBlock).toInt

private[mllib] var partitioner: GridPartitioner =
new GridPartitioner(numRowBlocks, numColBlocks, rdd.partitions.length)

/** Returns the dimensions of the matrix. */
private def getDim: (Long, Long) = {
val (rows, cols) = rdd.map { case ((blockRowIndex, blockColIndex), mat) =>
(blockRowIndex * rowsPerBlock + mat.numRows, blockColIndex * colsPerBlock + mat.numCols)
}.reduce((x0, x1) => (math.max(x0._1, x1._1), math.max(x0._2, x1._2)))

(math.max(rows, nRows), math.max(cols, nCols))
GridPartitioner(numRowBlocks, numColBlocks, suggestedNumPartitions = blocks.partitions.size)

/** Estimates the dimensions of the matrix. */
private def estimateDim(): Unit = {
val (rows, cols) = blocks.map { case ((blockRowIndex, blockColIndex), mat) =>
(blockRowIndex.toLong * rowsPerBlock + mat.numRows,
blockColIndex.toLong * colsPerBlock + mat.numCols)
}.reduce { (x0, x1) =>
(math.max(x0._1, x1._1), math.max(x0._2, x1._2))
}
if (nRows <= 0L) nRows = rows
assert(rows <= nRows, s"The number of rows $rows is more than claimed $nRows.")
if (nCols <= 0L) nCols = cols
assert(cols <= nCols, s"The number of columns $cols is more than claimed $nCols.")
}

/** Cache the underlying RDD. */
def cache(): BlockMatrix = {
rdd.cache()
/** Caches the underlying RDD. */
def cache(): this.type = {
blocks.cache()
this
}

/** Set the storage level for the underlying RDD. */
def persist(storageLevel: StorageLevel): BlockMatrix = {
rdd.persist(storageLevel)
/** Persists the underlying RDD with the specified storage level. */
def persist(storageLevel: StorageLevel): this.type = {
blocks.persist(storageLevel)
this
}

Expand All @@ -222,22 +190,22 @@ class BlockMatrix(
s"Int.MaxValue. Currently numCols: ${numCols()}")
require(numRows() * numCols() < Int.MaxValue, "The length of the values array must be " +
s"less than Int.MaxValue. Currently numRows * numCols: ${numRows() * numCols()}")
val nRows = numRows().toInt
val nCols = numCols().toInt
val mem = nRows * nCols / 125000
val m = numRows().toInt
val n = numCols().toInt
val mem = m * n / 125000
if (mem > 500) logWarning(s"Storing this matrix will require $mem MB of memory!")

val parts = rdd.collect()
val values = new Array[Double](nRows * nCols)
parts.foreach { case ((blockRowIndex, blockColIndex), block) =>
val localBlocks = blocks.collect()
val values = new Array[Double](m * n)
localBlocks.foreach { case ((blockRowIndex, blockColIndex), submat) =>
val rowOffset = blockRowIndex * rowsPerBlock
val colOffset = blockColIndex * colsPerBlock
block.foreachActive { (i, j, v) =>
val indexOffset = (j + colOffset) * nRows + rowOffset + i
submat.foreachActive { (i, j, v) =>
val indexOffset = (j + colOffset) * m + rowOffset + i
values(indexOffset) = v
}
}
new DenseMatrix(nRows, nCols, values)
new DenseMatrix(m, n, values)
}

/** Collects data and assembles a local dense breeze matrix (for test only). */
Expand Down
Loading