Skip to content

Conversation

@uzadude
Copy link
Contributor

@uzadude uzadude commented Jul 6, 2016

What changes were proposed in this pull request?

We have a use case of multiplying very big sparse matrices. we have about 1000x1000 distributed block matrices multiplication and the simulate multiply goes like O(n^4) (n being 1000). it takes about 1.5 hours. We modified it slightly with classical hashmap and now run in about 30 seconds O(n^2).

How was this patch tested?

We have added a performance test and verified the reduced time.

@srowen
Copy link
Member

srowen commented Jul 6, 2016

@uzadude
Copy link
Contributor Author

uzadude commented Jul 7, 2016

Hi srowen,
I have read the "how to contribute" wiki. I thought that it is too small of enhancement to open a jira for it and it passes the tests.

@srowen
Copy link
Member

srowen commented Jul 7, 2016

I don't think this is trivial. You also need to explain the change in more detail.

@uzadude
Copy link
Contributor Author

uzadude commented Jul 8, 2016

Sure.
The current method for multiplying distributed block matrices starts by deciding which block should be shuffled to which partition to do the actual multiplications. This stage is implemented in the function called "simulateMultiply" in BlockMatrix class.
Specifically, it iterates over all the blocks in the left matrix (lets assume there are NxN blocks) and for each block iterates over all the blocks in the right matrix (also NxN) to check for potential matches. This process if obviously in the order of O(N^4). The nature of the inner iteration could be enhanced trivially. It currently filters on pre-determend condition:

 val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) =>
      val rightCounterparts = rightMatrix.filter(_._1 == colIndex)
      val partitions = rightCounterparts.map(b => partitioner.getPartition((rowIndex, b._2)))
      ((rowIndex, colIndex), partitions.toSet)
    }.toMap

more clearly this part:

      val rightCounterparts = rightMatrix.filter(_._1 == colIndex)

So if we were to cache this check for example in a HashMap:

    val rightCounterpartsHelper = rightMatrix.groupBy(_._1).map { case (rowIndex, arr) =>
      (rowIndex, arr.map(b => b._2))
    }

We can omit the inner filter and just use it:

    val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) =>
      ((rowIndex, colIndex), rightCounterpartsHelper.getOrElse(colIndex, Array()).map(b =>
        partitioner.getPartition((rowIndex, b))).toSet)
    }.toMap

And to put it al toghether:

    val rightCounterpartsHelper = rightMatrix.groupBy(_._1).map { case (rowIndex, arr) =>
      (rowIndex, arr.map(b => b._2))
    }
    val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) =>
      ((rowIndex, colIndex), rightCounterpartsHelper.getOrElse(colIndex, Array()).map(b =>
        partitioner.getPartition((rowIndex, b))).toSet)
    }.toMap

And the same trick also for the rightDestinations.
As I mentioned above we encountered this while trying to multiply big sparse matrices and it got stuck on the driver for a very long time (~1.5 hours) and we had to do it iteratively so the all process took many hours. After the fix this part reduced to a few seconds.

val leftMatrix = blockInfo.keys.collect() // blockInfo should already be cached
val rightMatrix = other.blocks.keys.collect()

val rightCounterpartsHelper = rightMatrix.groupBy(_._1).map { case (rowIndex, arr) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could this just be rightMatrix.groupBy(_._1).mapValues(_.map(_._2))

@srowen
Copy link
Member

srowen commented Jul 8, 2016

It does make sense. Please make a JIRA and connect this though.

@uzadude uzadude changed the title enhanced simulate multiply [SPARK-16469] enhanced simulate multiply Jul 10, 2016
@uzadude
Copy link
Contributor Author

uzadude commented Jul 10, 2016

I have opened SPARK-16469.

@srowen
Copy link
Member

srowen commented Jul 10, 2016

LGTM but CC @brkyvz

val rightCounterparts = rightMatrix.filter(_._1 == colIndex)
val partitions = rightCounterparts.map(b => partitioner.getPartition((rowIndex, b._2)))
((rowIndex, colIndex), partitions.toSet)
((rowIndex, colIndex), rightCounterpartsHelper.getOrElse(colIndex, Array()).map(b =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for readability could you assign this to a variable instead of inlining it?
In addition, for multi-line expressions

.map { b =>
  blah
}

is more preferred

@brkyvz
Copy link
Contributor

brkyvz commented Jul 10, 2016

LGTM, just a minor nit! Thanks for this PR

@uzadude
Copy link
Contributor Author

uzadude commented Jul 13, 2016

Have done the requested nit changes.

@srowen
Copy link
Member

srowen commented Jul 13, 2016

Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Jul 13, 2016

Test build #62234 has finished for PR 14068 at commit 3ae9ac7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in ea06e4e Jul 13, 2016
asfgit pushed a commit that referenced this pull request Jul 13, 2016
## What changes were proposed in this pull request?

We have a use case of multiplying very big sparse matrices. we have about 1000x1000 distributed block matrices multiplication and the simulate multiply goes like O(n^4) (n being 1000). it takes about 1.5 hours. We modified it slightly with classical hashmap and now run in about 30 seconds O(n^2).

## How was this patch tested?

We have added a performance test and verified the reduced time.

Author: oraviv <oraviv@paypal.com>

Closes #14068 from uzadude/master.

(cherry picked from commit ea06e4e)
Signed-off-by: Sean Owen <sowen@cloudera.com>
@akaltsikis
Copy link

@uzadude hey, i am looking to multiply VERY LARGE AND VERY SPARSE matrixes using Spark. I would love some discussion over it. Can you give me a way to contact you?

@uzadude
Copy link
Contributor Author

uzadude commented Jul 26, 2016

Sure, what size are you talking about? we had to some internal code fixes to do that. right now the support for sparse matrices is pretty poor - mainly because Breeze doesn't support it.
I'm not sure how can I privately send you my contact details here. could you send me a mean of communication and I'll contact you?

@akaltsikis
Copy link

@uzadude Looking forward to make it work for matrixes bigger than 1Mx1M and sparsity of 0.001-0.002.
It means that i need to use distributed linear algebra methods. I am using a CoordinateMatrix format for my matrices right now but it seems that it can not multiply the matrix by himself (square it ) fast enough. I will be happy if you can send me an email with the same username that i have at gmail dot com.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants