-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18218][ML][MLLib] Reduce shuffled data size of BlockMatrix multiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multiplication #15730
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@yanboliang @sethah I will be very pleasure to hear your opinions, thanks! This optimization may also bring some inspirations to a series of algos based on distributed matrix. |
|
Test build #67969 has finished for PR 15730 at commit
|
|
Test build #67977 has finished for PR 15730 at commit
|
|
ping @brkyvz who wrote these libraries. |
|
Hi @WeichenXu123 Thank you for this PR. Sorry for taking so long to get back to you. Your optimization would be very helpful. I have a couple thoughts though. Your examples always take into account fully dense matrices, i.e. that all blocks exist all the time. How would sparsity affect shuffling? Would there ever be a case where sparsity of blocks and unlucky alignment of blocks could actually cause a lot more shuffling with your parameter? Nevertheless, I can see fully dense matrix multiplications benefitting significantly from your optimization. I guess we will need to work on the APIs a bit and document it a bit more clearly. |
|
Good question about the shuffling data in the sparse case. Now I give some simple analysis on it (maybe not very strict): Now I am considering improve the API interface to make it easier to use. And thanks for careful review! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you shouldn't change the signature of a public method and still call it @Since(1.3.0). Maybe move the inside of this to
private def multiplyImpl(
other: BlockMatrix,
shufflePartitioner: GridPartitioner,
midDimPartNum: Int,
resultPartitioner: GridPartitioner): BlockMatrixbut keep the
@Since("1.3.0")
def multiply(other: BlockMatrix): BlockMatrix = {
multiplyImpl(other, ...)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this is not used?
|
@WeichenXu123 How about if we only add: def multiply(other: BlockMatrix, numMidDimSplits: Integer): BlockMatrixas the public API. Users shouldn't have to define |
|
This way, all that needs to change in the implementation of multiply is: val intermediatePartitioner = new Partitioner {
override def numPartitions: Int = resultPartitioner.numPartitions * numMidDimSplits
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}
val newBlocks = flatA.cogroup(flatB, intermediatePartitioner).flatMap { case (pId, (a, b)) => |
|
@brkyvz All right, I'll update code ASAP. Thanks! |
4324a52 to
13ccfff
Compare
|
Test build #70574 has finished for PR 15730 at commit
|
|
@brkyvz I update code and attach a running result screenshot, waiting for your review, thanks! |
|
@WeichenXu123 Thanks! Will take a look once I get back from vacation (in a week). Happy new year! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2.2.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add documentation for this? This is the most important part about this PR, understanding how this parameter improves performance. You may copy most of your PR description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather write a test function testMultiply which takes A, B, and expected C and make numSplits configurable. Here you can simply have
testMultiply(largeA, largeB, largeC, 1)
testMultiply(largeA, largeB, largeC, 2)
testMultiply(largeA, largeB, largeC, 3)
testMultiply(largeA, largeB, largeC, 4)|
Looks pretty good overall. Left one major comment about documentation and one about tests. |
13ccfff to
bf2a603
Compare
|
@brkyvz Done. Thanks! |
|
Test build #71219 has finished for PR 15730 at commit
|
|
Jenkins, test this please. |
|
Test build #71242 has finished for PR 15730 at commit
|
|
cc @yanboliang Thanks! |
| } | ||
|
|
||
| /** | ||
| * Left multiplies this [[BlockMatrix]] to `other`, another [[BlockMatrix]]. This method add |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I feel this is very verbose in terms of documentation. Can we summarize it somehow?
/**
* Left multiplies this [[BlockMatrix]] to `other`, another [[BlockMatrix]]. The `colsPerBlock`
* of this matrix must equal the `rowsPerBlock` of `other`. If `other` contains
* `SparseMatrix`, they will have to be converted to a `DenseMatrix`. The output
* [[BlockMatrix]] will only consist of blocks of `DenseMatrix`. This may cause
* some performance issues until support for multiplying two sparse matrices is added.
* Blocks with duplicate indices will be added with each other.
*
* @param other Matrix `B` in `A * B = C`
* @param numMidDimSplits Number of splits to cut on the middle dimension when doing multiplication.
For example, when multiplying a Matrix `A` of size `m x n` with Matrix `B` of size `n x k`, this parameter
configures the parallelism to use when grouping the matrices. The parallelism will increase from `m x k` to
`m x k x numMidDimSplits`, which in some cases also reduces total shuffled data.Wrote out a sketch, please put in proper format, i.e. omitted * on the last lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All right, thanks!
|
Test build #71390 has finished for PR 15730 at commit
|
|
@WeichenXu123 This LGTM thanks. @mengxr Would you also like to take a look? |
|
The API looks good to me. I have not reviewed the internals carefully. One comment: Let's add a check to verify that numMidDimSplits is > 0. |
|
Test build #71691 has finished for PR 15730 at commit
|
| s"of B must be equal. A.numCols: ${numCols()}, B.numRows: ${other.numRows()}. If you " + | ||
| "think they should be equal, try setting the dimensions of A and B explicitly while " + | ||
| "initializing them.") | ||
| require(numMidDimSplits > 0, "numMidDimSplits should be positive value.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ultra nit: should be a positive value or should be a positive integer
|
@WeichenXu123 This LGTM! Thank you for providing this functionality. I left one final comment then I'll merge this |
|
@brkyvz Also thanks for your careful code review! ^_^ |
|
Test build #72063 has finished for PR 15730 at commit
|
|
Merging to master! Thanks! |
…tiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multiplication ## What changes were proposed in this pull request? ### The problem in current block matrix mulitiplication As in JIRA https://issues.apache.org/jira/browse/SPARK-18218 described, block matrix multiplication in spark may cause some problem, suppose we have `M*N` dimensions matrix A multiply `N*P` dimensions matrix B, when N is much larger than M and P, then the following problem may occur: - when the middle dimension N is too large, it will cause reducer OOM. - even if OOM do not occur, it will still cause parallism too low. - when N is much large than M and P, and matrix A and B have many partitions, it may cause too many partition on M and P dimension, it will cause much larger shuffled data size. (I will expain this in detail in the following.) ### Key point of my improvement In this PR, I introduce `midDimSplitNum` parameter, and improve the algorithm, to resolve this problem. In order to understand the improvement in this PR, first let me give a simple case to explain how the current mulitiplication works and what cause the problems above: suppose we have block matrix A, contains 200 blocks (`2 numRowBlocks * 100 numColBlocks`), blocks arranged in 2 rows, 100 cols: ``` A00 A01 A02 ... A0,99 A10 A11 A12 ... A1,99 ``` and we have block matrix B, also contains 200 blocks (`100 numRowBlocks * 2 numColBlocks`), blocks arranged in 100 rows, 2 cols: ``` B00 B01 B10 B11 B20 B21 ... B99,0 B99,1 ``` Suppose all blocks in the two matrices are dense for now. Now we call A.multiply(B), suppose the generated `resultPartitioner` contains 2 rowPartitions and 2 colPartitions (can't be more partitions because the result matrix only contains `2 * 2` blocks), the current algorithm will contains two shuffle steps: **step-1** Step-1 will generate 4 reducer, I tag them as reducer-00, reducer-01, reducer-10, reducer-11, and shuffle data as following: ``` A00 A01 A02 ... A0,99 B00 B10 B20 ... B99,0 shuffled into reducer-00 A00 A01 A02 ... A0,99 B01 B11 B21 ... B99,1 shuffled into reducer-01 A10 A11 A12 ... A1,99 B00 B10 B20 ... B99,0 shuffled into reducer-10 A10 A11 A12 ... A1,99 B01 B11 B21 ... B99,1 shuffled into reducer-11 ``` and the shuffling above is a `cogroup` transform, note that each reducer contains **only one group**. **step-2** Step-2 will do an `aggregateByKey` transform on the result of step-1, will also generate 4 reducers, and generate the final result RDD, contains 4 partitions, each partition contains one block. The main problems are in step-1. Now we have only 4 reducers, but matrix A and B have 400 blocks in total, obviously the reducer number is too small. and, we can see that, each reducer contains only one group(the group concept in `coGroup` transform), each group contains 200 blocks. This is terrible because we know that `coGroup` transformer will load each group into memory when computing. It is un-extensable in the algorithm level. Suppose matrix A has 10000 cols blocks or more instead of 100? Than each reducer will load 20000 blocks into memory. It will easily cause reducer OOM. This PR try to resolve the problem described above. When matrix A with dimension M * N multiply matrix B with dimension N * P, the middle dimension N is the keypoint. If N is large, the current mulitiplication implementation works badly. In this PR, I introduce a `numMidDimSplits` parameter, represent how many splits it will cut on the middle dimension N. Still using the example described above, now we set `numMidDimSplits = 10`, now we can generate 40 reducers in **step-1**: the reducer-ij above now will be splited into 10 reducers: reducer-ij0, reducer-ij1, ... reducer-ij9, each reducer will receive 20 blocks. now the shuffle works as following: **reducer-000 to reducer-009** ``` A0,0 A0,10 A0,20 ... A0,90 B0,0 B10,0 B20,0 ... B90,0 shuffled into reducer-000 A0,1 A0,11 A0,21 ... A0,91 B1,0 B11,0 B21,0 ... B91,0 shuffled into reducer-001 A0,2 A0,12 A0,22 ... A0,92 B2,0 B12,0 B22,0 ... B92,0 shuffled into reducer-002 ... A0,9 A0,19 A0,29 ... A0,99 B9,0 B19,0 B29,0 ... B99,0 shuffled into reducer-009 ``` **reducer-010 to reducer-019** ``` A0,0 A0,10 A0,20 ... A0,90 B0,1 B10,1 B20,1 ... B90,1 shuffled into reducer-010 A0,1 A0,11 A0,21 ... A0,91 B1,1 B11,1 B21,1 ... B91,1 shuffled into reducer-011 A0,2 A0,12 A0,22 ... A0,92 B2,1 B12,1 B22,1 ... B92,1 shuffled into reducer-012 ... A0,9 A0,19 A0,29 ... A0,99 B9,1 B19,1 B29,1 ... B99,1 shuffled into reducer-019 ``` **reducer-100 to reducer-109** and **reducer-110 to reducer-119** is similar to the above, I omit to write them out. ### API for this optimized algorithm I add a new API as following: ``` def multiply( other: BlockMatrix, numMidDimSplits: Int // middle dimension split number, expained above ): BlockMatrix ``` ### Shuffled data size analysis (compared under the same parallelism) The optimization has some subtle influence on the total shuffled data size. Appropriate `numMidDimSplits` will significantly reduce the shuffled data size, but too large `numMidDimSplits` may increase the shuffled data in reverse. For now I don't want to introduce formula to make thing too complex, I only use a simple case to represent it here: Suppose we have two same size square matrices X and Y, both have `16 numRowBlocks * 16 numColBlocks`. X and Y are both dense matrix. Now let me analysis the shuffling data size in the following case: **case 1: X and Y both partitioned in 16 rowPartitions and 16 colPartitions, numMidDimSplits = 1** ShufflingDataSize = (16 * 16 * (16 + 16) + 16 * 16) blocks = 8448 blocks parallelism = 16 * 16 * 1 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm. **case 2: X and Y both partitioned in 8 rowPartitions and 8 colPartitions, numMidDimSplits = 4** ShufflingDataSize = (8 * 8 * (32 + 32) + 16 * 16 * 4) blocks = 5120 blocks parallelism = 8 * 8 * 4 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm. **The two cases above all have parallism = 256**, case 1 `numMidDimSplits = 1` is equivalent with current implementation in mllib, but case 2 shuffling data is 60.6% of case 1, **it shows that under the same parallelism, proper `numMidDimSplits` will significantly reduce the shuffling data size**. ## How was this patch tested? Test suites added. Running result:  Author: WeichenXu <WeichenXu123@outlook.com> Closes apache#15730 from WeichenXu123/optim_block_matrix.
…tiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multiplication ## What changes were proposed in this pull request? ### The problem in current block matrix mulitiplication As in JIRA https://issues.apache.org/jira/browse/SPARK-18218 described, block matrix multiplication in spark may cause some problem, suppose we have `M*N` dimensions matrix A multiply `N*P` dimensions matrix B, when N is much larger than M and P, then the following problem may occur: - when the middle dimension N is too large, it will cause reducer OOM. - even if OOM do not occur, it will still cause parallism too low. - when N is much large than M and P, and matrix A and B have many partitions, it may cause too many partition on M and P dimension, it will cause much larger shuffled data size. (I will expain this in detail in the following.) ### Key point of my improvement In this PR, I introduce `midDimSplitNum` parameter, and improve the algorithm, to resolve this problem. In order to understand the improvement in this PR, first let me give a simple case to explain how the current mulitiplication works and what cause the problems above: suppose we have block matrix A, contains 200 blocks (`2 numRowBlocks * 100 numColBlocks`), blocks arranged in 2 rows, 100 cols: ``` A00 A01 A02 ... A0,99 A10 A11 A12 ... A1,99 ``` and we have block matrix B, also contains 200 blocks (`100 numRowBlocks * 2 numColBlocks`), blocks arranged in 100 rows, 2 cols: ``` B00 B01 B10 B11 B20 B21 ... B99,0 B99,1 ``` Suppose all blocks in the two matrices are dense for now. Now we call A.multiply(B), suppose the generated `resultPartitioner` contains 2 rowPartitions and 2 colPartitions (can't be more partitions because the result matrix only contains `2 * 2` blocks), the current algorithm will contains two shuffle steps: **step-1** Step-1 will generate 4 reducer, I tag them as reducer-00, reducer-01, reducer-10, reducer-11, and shuffle data as following: ``` A00 A01 A02 ... A0,99 B00 B10 B20 ... B99,0 shuffled into reducer-00 A00 A01 A02 ... A0,99 B01 B11 B21 ... B99,1 shuffled into reducer-01 A10 A11 A12 ... A1,99 B00 B10 B20 ... B99,0 shuffled into reducer-10 A10 A11 A12 ... A1,99 B01 B11 B21 ... B99,1 shuffled into reducer-11 ``` and the shuffling above is a `cogroup` transform, note that each reducer contains **only one group**. **step-2** Step-2 will do an `aggregateByKey` transform on the result of step-1, will also generate 4 reducers, and generate the final result RDD, contains 4 partitions, each partition contains one block. The main problems are in step-1. Now we have only 4 reducers, but matrix A and B have 400 blocks in total, obviously the reducer number is too small. and, we can see that, each reducer contains only one group(the group concept in `coGroup` transform), each group contains 200 blocks. This is terrible because we know that `coGroup` transformer will load each group into memory when computing. It is un-extensable in the algorithm level. Suppose matrix A has 10000 cols blocks or more instead of 100? Than each reducer will load 20000 blocks into memory. It will easily cause reducer OOM. This PR try to resolve the problem described above. When matrix A with dimension M * N multiply matrix B with dimension N * P, the middle dimension N is the keypoint. If N is large, the current mulitiplication implementation works badly. In this PR, I introduce a `numMidDimSplits` parameter, represent how many splits it will cut on the middle dimension N. Still using the example described above, now we set `numMidDimSplits = 10`, now we can generate 40 reducers in **step-1**: the reducer-ij above now will be splited into 10 reducers: reducer-ij0, reducer-ij1, ... reducer-ij9, each reducer will receive 20 blocks. now the shuffle works as following: **reducer-000 to reducer-009** ``` A0,0 A0,10 A0,20 ... A0,90 B0,0 B10,0 B20,0 ... B90,0 shuffled into reducer-000 A0,1 A0,11 A0,21 ... A0,91 B1,0 B11,0 B21,0 ... B91,0 shuffled into reducer-001 A0,2 A0,12 A0,22 ... A0,92 B2,0 B12,0 B22,0 ... B92,0 shuffled into reducer-002 ... A0,9 A0,19 A0,29 ... A0,99 B9,0 B19,0 B29,0 ... B99,0 shuffled into reducer-009 ``` **reducer-010 to reducer-019** ``` A0,0 A0,10 A0,20 ... A0,90 B0,1 B10,1 B20,1 ... B90,1 shuffled into reducer-010 A0,1 A0,11 A0,21 ... A0,91 B1,1 B11,1 B21,1 ... B91,1 shuffled into reducer-011 A0,2 A0,12 A0,22 ... A0,92 B2,1 B12,1 B22,1 ... B92,1 shuffled into reducer-012 ... A0,9 A0,19 A0,29 ... A0,99 B9,1 B19,1 B29,1 ... B99,1 shuffled into reducer-019 ``` **reducer-100 to reducer-109** and **reducer-110 to reducer-119** is similar to the above, I omit to write them out. ### API for this optimized algorithm I add a new API as following: ``` def multiply( other: BlockMatrix, numMidDimSplits: Int // middle dimension split number, expained above ): BlockMatrix ``` ### Shuffled data size analysis (compared under the same parallelism) The optimization has some subtle influence on the total shuffled data size. Appropriate `numMidDimSplits` will significantly reduce the shuffled data size, but too large `numMidDimSplits` may increase the shuffled data in reverse. For now I don't want to introduce formula to make thing too complex, I only use a simple case to represent it here: Suppose we have two same size square matrices X and Y, both have `16 numRowBlocks * 16 numColBlocks`. X and Y are both dense matrix. Now let me analysis the shuffling data size in the following case: **case 1: X and Y both partitioned in 16 rowPartitions and 16 colPartitions, numMidDimSplits = 1** ShufflingDataSize = (16 * 16 * (16 + 16) + 16 * 16) blocks = 8448 blocks parallelism = 16 * 16 * 1 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm. **case 2: X and Y both partitioned in 8 rowPartitions and 8 colPartitions, numMidDimSplits = 4** ShufflingDataSize = (8 * 8 * (32 + 32) + 16 * 16 * 4) blocks = 5120 blocks parallelism = 8 * 8 * 4 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm. **The two cases above all have parallism = 256**, case 1 `numMidDimSplits = 1` is equivalent with current implementation in mllib, but case 2 shuffling data is 60.6% of case 1, **it shows that under the same parallelism, proper `numMidDimSplits` will significantly reduce the shuffling data size**. ## How was this patch tested? Test suites added. Running result:  Author: WeichenXu <WeichenXu123@outlook.com> Closes apache#15730 from WeichenXu123/optim_block_matrix.
What changes were proposed in this pull request?
The problem in current block matrix mulitiplication
As in JIRA https://issues.apache.org/jira/browse/SPARK-18218 described, block matrix multiplication in spark may cause some problem, suppose we have
M*Ndimensions matrix A multiplyN*Pdimensions matrix B, when N is much larger than M and P, then the following problem may occur:Key point of my improvement
In this PR, I introduce
midDimSplitNumparameter, and improve the algorithm, to resolve this problem.In order to understand the improvement in this PR, first let me give a simple case to explain how the current mulitiplication works and what cause the problems above:
suppose we have block matrix A, contains 200 blocks (
2 numRowBlocks * 100 numColBlocks), blocks arranged in 2 rows, 100 cols:and we have block matrix B, also contains 200 blocks (
100 numRowBlocks * 2 numColBlocks), blocks arranged in 100 rows, 2 cols:Suppose all blocks in the two matrices are dense for now.
Now we call A.multiply(B), suppose the generated
resultPartitionercontains 2 rowPartitions and 2 colPartitions (can't be more partitions because the result matrix only contains2 * 2blocks), the current algorithm will contains two shuffle steps:step-1
Step-1 will generate 4 reducer, I tag them as reducer-00, reducer-01, reducer-10, reducer-11, and shuffle data as following:
and the shuffling above is a
cogrouptransform, note that each reducer contains only one group.step-2
Step-2 will do an
aggregateByKeytransform on the result of step-1, will also generate 4 reducers, and generate the final result RDD, contains 4 partitions, each partition contains one block.The main problems are in step-1. Now we have only 4 reducers, but matrix A and B have 400 blocks in total, obviously the reducer number is too small.
and, we can see that, each reducer contains only one group(the group concept in
coGrouptransform), each group contains 200 blocks. This is terrible because we know thatcoGrouptransformer will load each group into memory when computing. It is un-extensable in the algorithm level. Suppose matrix A has 10000 cols blocks or more instead of 100? Than each reducer will load 20000 blocks into memory. It will easily cause reducer OOM.This PR try to resolve the problem described above.
When matrix A with dimension M * N multiply matrix B with dimension N * P, the middle dimension N is the keypoint. If N is large, the current mulitiplication implementation works badly.
In this PR, I introduce a
numMidDimSplitsparameter, represent how many splits it will cut on the middle dimension N.Still using the example described above, now we set
numMidDimSplits = 10, now we can generate 40 reducers in step-1:the reducer-ij above now will be splited into 10 reducers: reducer-ij0, reducer-ij1, ... reducer-ij9, each reducer will receive 20 blocks.
now the shuffle works as following:
reducer-000 to reducer-009
reducer-010 to reducer-019
reducer-100 to reducer-109 and reducer-110 to reducer-119 is similar to the above, I omit to write them out.
API for this optimized algorithm
I add a new API as following:
Shuffled data size analysis (compared under the same parallelism)
The optimization has some subtle influence on the total shuffled data size. Appropriate
numMidDimSplitswill significantly reduce the shuffled data size,but too large
numMidDimSplitsmay increase the shuffled data in reverse. For now I don't want to introduce formula to make thing too complex, I only use a simple case to represent it here:Suppose we have two same size square matrices X and Y, both have
16 numRowBlocks * 16 numColBlocks. X and Y are both dense matrix. Now let me analysis the shuffling data size in the following case:case 1: X and Y both partitioned in 16 rowPartitions and 16 colPartitions, numMidDimSplits = 1
ShufflingDataSize = (16 * 16 * (16 + 16) + 16 * 16) blocks = 8448 blocks
parallelism = 16 * 16 * 1 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm.
case 2: X and Y both partitioned in 8 rowPartitions and 8 colPartitions, numMidDimSplits = 4
ShufflingDataSize = (8 * 8 * (32 + 32) + 16 * 16 * 4) blocks = 5120 blocks
parallelism = 8 * 8 * 4 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm.
The two cases above all have parallism = 256, case 1
numMidDimSplits = 1is equivalent with current implementation in mllib, but case 2 shuffling data is 60.6% of case 1, it shows that under the same parallelism, propernumMidDimSplitswill significantly reduce the shuffling data size.How was this patch tested?
Test suites added.

Running result: