Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pairwise distance scalability #375

Closed
alimanfoo opened this issue Nov 6, 2020 · 49 comments · Fixed by #447
Closed

Pairwise distance scalability #375

alimanfoo opened this issue Nov 6, 2020 · 49 comments · Fixed by #447

Comments

@alimanfoo
Copy link
Collaborator

Raising this issue to revisit the scalability of our pairwise distance calculation and whether it's worth returning to a map-reduce style implementation that would allow chunking along both dimensions.

In the work that @aktech is doing on early scalability demonstrations (#345) there are some memory usage difficulties emerging. @aktech is I believe trying to run a pairwise distance computation over data from Ag1000G phase 2, using all SNPs and samples from a single chromosome arm. This is of the order ~20 million variants and ~1000 samples. With the current implementation, it is hard to get this to run on systems with average memory/CPU ratios, below 12G/CPU. My understanding is that, essentially, this is because the pairwise distance implementation currently does not support chunking along the variants dimension, and so to reduce memory footprint you need short chunks along the samples dimension. Depending on how the input data have been chunked natively, this may be suboptimal, i.e., you may need to run the computation with sample chunks that are (much) shorter than the native storage.

If this is correct, then it raises two questions for discussion.

First, should we revisit the map-reduce implementation of pairwise distance? This would allow chunking on both samples and variants dimensions, and so could naturally make use of whatever the underlying chunking of the data in storage, without large memory footprint.

Secondly, do we ever really need to run pairwise distance on arrays that are large in the variants dimension? I.e., do we care about scaling this up to large numbers of variants? xref https://github.com/pystatgen/sgkit/pull/306#issuecomment-714654217

@jeromekelleher
Copy link
Collaborator

Secondly, do we ever really need to run pairwise distance on arrays that are large in the variants dimension? I.e., do we care about scaling this up to large numbers of variants? xref #306 (comment)

I think you mean in the samples dimension? (If so, I agree - ~10K is as many samples as we could hope to support for O(n^2) like this)

@alimanfoo
Copy link
Collaborator Author

Secondly, do we ever really need to run pairwise distance on arrays that are large in the variants dimension? I.e., do we care about scaling this up to large numbers of variants? xref #306 (comment)

On this, it is true that for many analyses it's entirely reasonable to work around memory limitations by downsampling data along the variants dimensions. E.g., often when doing a PCA we randomly downsample variants, and this gives a very reasonable approximation. I.e., typically information about population structure is very clear from just a fraction of the variants.

In other words, the scalability test that @aktech is trying, running pairwise distance over all 20 million SNPs from a chromosome arm, is perhaps somewhat artificial, as you could reasonably downsample to 100,000 SNPs and get the information you need.

On the other side, we do have use cases where we want to compute pairwise distance using all SNPs. E.g., during QC, we check for samples that are technically identical, and check these match our expectations for replicate samples. To do this we compute pairwise distance at all SNPs. (To avoid unnecessary computation, we do an initial pass with a downsampled set of SNPs to exclude pairs of individuals that are obviously not identical, then rerun with all SNPs on remaining samples. But that's a detail.)

@alimanfoo
Copy link
Collaborator Author

Secondly, do we ever really need to run pairwise distance on arrays that are large in the variants dimension? I.e., do we care about scaling this up to large numbers of variants? xref #306 (comment)

I think you mean in the samples dimension? (If so, I agree - ~10K is as many samples as we could hope to support for O(n^2) like this)

No, I did mean the variants dimension.

Re the samples dimension, yes agreed, probably ~10k samples is the most we could hope to support due to computational complexity (although if you have access to GPUs then perhaps that allows you to reach a little further).

My main point was about the variants dimension. Computation scales linearly on the variants dimension, there's no computational reason to expect any limit there. We currently have a practical limit due to memory requirements, which could be removed if we supported chunking along the variants dimension via the original map/reduce implementation. Should we revisit that?

@jeromekelleher
Copy link
Collaborator

jeromekelleher commented Nov 6, 2020

I see, thanks. Well, from my perspective I'd find it surprising as a user to be limited on the number of variants we could process by memory restrictions - since it's a sample x sample comparison.

How have you used this in the past? Do you subset down to a representative set of sites, or just throw compute at it and let it run over the whole lot?

@alimanfoo
Copy link
Collaborator Author

I see, thanks. Well, from my perspective I'd find it surprising as a user to be limited on the number of variants we could process by memory restrictions - since it's a sample x sample comparison.

Yes, exactly.

How have you used this in the past? Do you subset down to a representative set of sites, or just throw compute at it and let it run over the whole lot?

We've done both, more details above.

@alimanfoo
Copy link
Collaborator Author

Just to add, memory restrictions will be even tighter when moving this to GPU.

@ravwojdyla
Copy link
Collaborator

Re the samples dimension, yes agreed, probably ~10k samples is the most we could hope to support due to computational complexity (although if you have access to GPUs then perhaps that allows you to reach a little further).

@alimanfoo I'm curious, I understand that the O(n^2) complexity is vicious, but I'm curious how exactly have you arrived at ~10k (including potential horizontal scaling via dask)?

Also a quick point regarding our current implementation, our current blockwise:

    x_distance = da.blockwise(
        # Lambda wraps reshape for broadcast
        lambda _x, _y: metric_ufunc(_x[:, None, :], _y),
        "jk",
        x,
        "ji",
        x,
        "ki",
        dtype="float64",
        concatenate=True,
    )

we specify concatenate=True, from dask:

Any index, like i missing from the output index is interpreted as a contraction (note that this differs from Einstein convention; repeated indices do not imply contraction.) In the case of a contraction the passed function should expect an iterable of blocks on any array that holds that index. To receive arrays concatenated along contracted dimensions instead pass concatenate=True.

So afaiu in #345 we are concatenating on the variants blocks, and thus potentially the memory issue(?). @aktech what's the reasoning behind concatenate=True? I might have missed it, but have we evaluated an implementation that takes concatenate=None (default)?

@jeromekelleher
Copy link
Collaborator

@alimanfoo I'm curious, I understand that the O(n^2) complexity is vicious, but I'm curious how exactly have you arrived at ~10k (including potential horizontal scaling via dask)?

I pulled this one out of thin air first. It was just an arbitrary number from experience - in practise, somewhere between 1K and 100K.

@aktech
Copy link
Contributor

aktech commented Nov 8, 2020

So afaiu in #345 we are concatenating on the variants blocks, and thus potentially the memory issue(?). @aktech what's the reasoning behind concatenate=True? I might have missed it, but have we evaluated an implementation that takes concatenate=None (default)?

@ravwojdyla The concatenate=True as you noted is concatenation along variant blocks is made to pass full vectors to the distance metrics functions for distance calculation. If it is None the x and y will be passed to that lamba function as a list of chunks and you cannot calculate distance metric with non-full vectors i.e. chunks, unless you do it via Map-Reduce approach, which we did evaluated, here is the implementation in this commit. This was slower than the current implementation, but perhaps seems like more scalable.

To summarise what I think of the current state is it's worth evaluating the map reduce implementation for scalability.

@ravwojdyla
Copy link
Collaborator

ravwojdyla commented Nov 9, 2020

Thanks @aktech, I was curious about the use of blockwise with concatenate=False, and specifically the memory footprint. The commit you reference doesn't use blockwise but instead works directly on blocks. I understand that both might look similar. I was also curious if the iterable of blocks on any array that holds that index was lazy or not(?) I can't find a documentation on this, and when I try it on a dummy data, I actually get a np.ndarray (which would suggest it isn't lazy), in which case the memory consumption is likely the same for concatenate equal True/False, which would be rather disappointing (it would still be interesting to double check that tho, plus whether it duplicates the memory required for chunks), I've also noticed @eric-czech comment:

I'm also thinking that there is a way to express the loop we have now that references .blocks as a blockwise call like this, though I haven't been able to get it to work yet.

which might suggest we have not evaluated it.

+1 to trying the "map-reduce" implementation on the #345. Going forward iff the performance is important right now, we could also have both implementations that switch based on the shape of the data (and/or parameter), unfortunately that would have a real maintenance cost (so that's something we need to be cautious about).

@aktech
Copy link
Contributor

aktech commented Nov 9, 2020

Thanks @aktech, I was curious about the use of blockwise with concatenate=False, and specifically the memory footprint. The commit you reference doesn't use blockwise but instead works directly on blocks. I understand that both might look similar. I was also curious if the iterable of blocks on any array that holds that index was lazy or not(?) I can't find a documentation on this, and when I try it on a dummy data, I actually get a np.ndarray (which would suggest it isn't lazy), in which case the memory consumption is likely the same for concatenate equal True/False, which would be rather disappointing (it would still be interesting to double check that tho, plus whether it duplicates the memory required for chunks), I've also noticed @eric-czech comment:

@ravwojdyla Yes, that's very true, I was also expecting the iterable of blocks to be lazy, which would make the memory consumption very low compared to the current implementation.

I only realised this after I wrote a map reduce implementation with blockwise yesterday. Here is the map reduce implementation with blockwise: https://github.com/aktech/sgkit/blob/pairwise-mr/sgkit/distance/api.py#L53. It might not be the most efficient though (could probably be improved), but gives a rough idea of how it might look like. I tried it yesterday and the memory consumption was not any better (if not worse). So far the the previous map reduce implementation I shared in the commit in my previous comment was better than these two.

Also, one of the reasons that the iterable of blocks are not lazy is probably due to the fact, that the func (blockwise's first argument) is not called unless you call .compute on it. You would rather expect it would call it before you do .compute on it and make an efficient graph. I don't know if that's a feature or a bug, but in most of dask's internal codebase blockwise is called with concatenate=True, which gives me the impression that they are not scalable in both dimenstions.

@ravwojdyla
Copy link
Collaborator

ravwojdyla commented Nov 9, 2020

@aktech so in general it "seems" like for blockwise iff you have a contraction for specific index, all blocks along that index must fit into memory regardless of the concatenate option (which might actually double the memory required for those blocks, first to keep the chunks, then to build up the input for the blockwise function (?), tho this part likely depends on dask). I wish this was in the documentation, plus the iterable of blocks is a bit misleading. I wonder if we should open an issue to add that information, wdyt?

@aktech
Copy link
Contributor

aktech commented Nov 9, 2020

@aktech so in general it "seems" like for blockwise iff you have a contraction for specific index, all block along that index must fit into memory regardless of the concatenate option

Yes, true.

(which might actually double the memory required for those blocks, first to keep the chunks, then to build up the input for the blockwise function (?), tho this part likely depends on dask).

What do you mean by "keep the chunks", those chunks are computed once, from the original array to make the input for blockwise's func I believe. The memory consumption would be twice atleast for sure from the chunks from _x and _y, which are passed to the func.

I wish this was in the documentation, plus the iterable of blocks is a bit misleading. I wonder if we should open an issue to add that information, wdyt?

Yes, I think so. That would be worth mentioning.

@ravwojdyla
Copy link
Collaborator

ravwojdyla commented Nov 9, 2020

What do you mean by "keep the chunks", those chunks are computed once, from the original array to make the input for blockwise's func I believe. The memory consumption would be twice atleast for sure from the chunks from _x and _y, which are passed to the func.

@aktech yea, per process (threads within process will reuse memory if needed), I just wonder at which stage exactly will Dask free up the memory required for chunks on each process and how exactly will it build up the input for blockwise. On a dummy data it looks like the chunks are coming in a ndarray (not py list). Also if the cluster has multiple processes and they require the same chunks they will likely fetch that from a process that computed them. So essentially it might be that we would need memory for chunks, _and_ memory for concatenated array or an "array of chunks" (depending on the concatenate parameter). The documentation says that blockwise for concatenate=None will pass list or iterable (depending where you look) of blocks, but locally I'm actually seeing a numpy array of blocks being pass into the function given to blockwise, which afaiu might requires more memory to build up the array than to pass a list/iterable of materialized/numpy chunks (depending on how soon can dask free up memory of the chunk). I wonder if this is some kind of local optimization or a bug. (clarification https://github.com/pystatgen/sgkit/issues/375#issuecomment-724204043)

Edit: but just to be clear, I agree that the blockwise implementations do not look at all promising for any computation that has a large number of elements in the contraction index.

@aktech
Copy link
Contributor

aktech commented Nov 9, 2020

On a dummy data it looks like the chunks are coming in a ndarray (not py list).

Ah that's interesting, for me that is only in the first iteration on the func you'll see an empty numpy array, if you check on the next iteration it will be a Python list. That execution of the function happens even before you call .compute, with empty arrays. Do you see arrays in all iterations?

@ravwojdyla
Copy link
Collaborator

@aktech I think I found the issue, if there is a contraction and concatenate=None the input will be lists (as expected), if concatenate=True the input will be numpy array, but also if there is no contraction, the input is numpy arrays (regardless of the concatenate).

@aktech
Copy link
Contributor

aktech commented Nov 9, 2020

but also if there is no contraction, the input is numpy arrays (regardless of the concatenate).

Interesting, good to know that.

By the way, I discovered something interesting about utilising the symmetry of matrix in dask. We had assumed so far that we can drop duplicate calculation via the following line, this is basically making the lower triangular matrix 0 as the distance matrix is a symmetric matrix.

https://github.com/pystatgen/sgkit/blob/682d58fbb39f91fcf7e441e3d4a937b91123fcb4/sgkit/distance/api.py#L102

I was trying to prove this today, but from what is looks like, our assumption was not true, unless I am missing something. Here is a notebook of that analysis showing the the above line doesn't makes dask utilise the symmetry of a matrix to drop duplicate calculations, in-fact that takes more time.

At the moment I am trying to evaluate the map-reduce approach, which does not use the blockwise.

@eric-czech
Copy link
Collaborator

FYI: https://stackoverflow.com/questions/64774771/does-blockwise-allow-iteration-over-out-of-core-arrays

cc: @mrocklin, if you get a moment we could use your help understanding the scalability limits of blockwise. A more specific question is framed in the SO post.

@mrocklin
Copy link

mrocklin commented Nov 11, 2020 via email

@eric-czech
Copy link
Collaborator

If folks here were interested in implementing some of the automatic rechunking heuristics mentioned in that issue that would be very welcome.

Thanks @mrocklin. Would a simplistic example of this be to decrease chunking along non-contracted axes as the contracted axes get larger so that the concatenated chunks still fit in memory? As a simpler example, say we want to do matrix multiplication via blockwise on 100 x 100 arrays with 10 x 10 chunks. Now say we want to do the same on 1M x 1M arrays with 10 x 10 chunks -- we now need to fit 1M / 10 chunks in memory instead of 100 / 10 chunks so we could decrease the chunks in the first axes to try to compensate for this. This is what those heuristics would accomplish correct?

@mrocklin
Copy link

mrocklin commented Nov 11, 2020 via email

@aktech
Copy link
Contributor

aktech commented Nov 12, 2020

I ran the map-reduce pairwise implementation on the MalariaGEN dataset on Coiled Cloud dask cluster with the following configuration:

  • Workers: 25
  • Memory: 16 GB each
  • CPU: 4
  • Nthreads: 1

Code:

  • Implementation in branch:pairwise-mr-blocks (this includes missing value handling, similar to current implementation)
  • Notebook used is here
  • Metric used: euclidean

Here are the results:

1. Chunk size: 100 MB

Takeaway: Everything is quick except the euclidean_map task, which is the bottleneck as you can see from the report above, which makes sense because that is something non-parallelizable from a dask point of view. The chunk size is 100 MB, which means a 100 MB chunk is passed to guvectorized euclidean_map, which runs serially.

2. (a) Chunk Size 100 MB, using Numba's target="parallel"

Takeaway: Since, the chunk is non parallelizable from dask point of view as it is dispatched to guvectorize, one way to speed things is to use Numba’s target="parallel", which would parallelize the individual chunk.

One thing to note about this approach is ideally dask’s nthreads should be 1 in this case, because Dask and Numba don’t share their threadpool, which might cause contention, see this issue

2. (b) Chunk Size 50 MB, using Numba's target="parallel"

Since the bottleneck in all these cases is the map function, any kind of parallelisation can speed things up, another approach could be parallelising the chunk with Dask instead of Numba, by getting rid of broadcasting and parallelising calls to guvectorize with two 1D arrays via Dask.


As mentioned earlier, I was not able to run the blockwise implementation due to memory issues, and the above map-reduce implementation seems to scale well.

Update:

@eric-czech @alimanfoo

We are able to achieve similar speeds of numba target="parallel" by increasing number of threads in above dask workers, as suggested by Eric:

3. Same configuration as "1" above with nthreads: 4

@aktech
Copy link
Contributor

aktech commented Nov 12, 2020

@eric-czech @alimanfoo

I was comparing it with scikit-allel as suggested by Eric in the dev call today. It doesn't seems to work with even a small array of the MalariaGEN data. Here is the notebook for the same.

@alimanfoo
Copy link
Collaborator Author

Hi @aktech

I was comparing it with scikit-allel as suggested by Eric in the dev call today. It doesn't seems to work with even a small array of the MalariaGEN data. Here is the notebook for the same.

Looks like you need a petabyte of memory :-)

Perhaps Eric was suggesting to compare with the scikit-allel v2 prototype? The pairwise distance function in scikit-allel v1 is just scipy pdist, not dask enabled.

@aktech
Copy link
Contributor

aktech commented Nov 12, 2020

Perhaps Eric was suggesting to compare with the scikit-allel v2 prototype? The pairwise distance function in scikit-allel v1 is just scipy pdist, not dask enabled.

Yes, that's the one I tried. Here is the environment (Note that, I had to remove pinning from libraries to resolve conflicts) I used for workers. I also faced this issue, which was fixed by upgrading llvmlite.

Although, I just realised that code was calling numpy backend for some reason, even though a dask array was passed to it. I will play with it again to see why.

@alimanfoo
Copy link
Collaborator Author

Yes, that's the one I tried.

Sorry, I should've looked more carefully.

I think the problem is transposing the input array. The skallel-stats pairwise distance function computes pairwise distance between columns, not rows. This is because the genotype data we have is typically arranged this way (columns are samples).

@alimanfoo
Copy link
Collaborator Author

Just to add I have run the skallel pairwise distance on a simulated dataset approaching the size of Ag1000G: https://github.com/scikit-allel/skallel-stats/blob/master/examples/pairwise_distance.ipynb

Note that I created the simulated zarr dataset with chunking best for pairwise distance, i.e., no chunks along the samples dimension.

I just tried running on Ag1000G on malariagen datalab and it's a little awkward because the chunks are quite tall (~500,000) and skallel rechunks the data to remove any chunks in the samples dimension, so memory is a challenge. I can get it to run if I use dask rechunk on the input data to make chunks shorter, which is then suboptimal for I/O (because the same chunk is being read multiple times) but does work. But it doesn't seem to be running particularly well.

Perhaps try a simulated dataset for now?

@aktech
Copy link
Contributor

aktech commented Nov 18, 2020

I just tried running on Ag1000G on malariagen datalab and it's a little awkward because the chunks are quite tall (~500,000) and skallel rechunks the data to remove any chunks in the samples dimension, so memory is a challenge. I can get it to run if I use dask rechunk on the input data to make chunks shorter, which is then suboptimal for I/O (because the same chunk is being read multiple times) but does work. But it doesn't seem to be running particularly well.

I also had hard time trying to run it on the MalariaGEN data, couldn't get it working.

Perhaps try a simulated dataset for now?

I ran it on a simulated dataset, similar to the size of MalariaGEN. It seems to run very well, here is the notebook for the same. It took about 5min 32s, on 25 workers (same configuration as above).

@aktech
Copy link
Contributor

aktech commented Nov 18, 2020

I ran it on a simulated dataset, similar to the size of MalariaGEN. It seems to run very well, here is the notebook for the same. It took about 5min 32s, on 25 workers (same configuration as above).

Also, to compare with the map-reduce implementation the same takes 18 minutes roughly, whereas it takes ~10 minutes on the MalariaGEN data. Also, note that the MalariaGEN data and simulated data are of nearly the same size (~28.5 GB)

It seems generating random numbers is more expensive than reading from GCS in the map-reduce implementation.
Update: (This sentence is incorrect, the slow speed was due to the chunking shape not due to random number generation)

@alimanfoo @eric-czech any thoughts on this? Is that fast enough?

@jeromekelleher
Copy link
Collaborator

So, just to clarify, we're looking at about a 4X slow-down for the map-reduce implementation? This seems OK, if it allows us to scale.

@alimanfoo
Copy link
Collaborator Author

Hi @aktech, echoing @jeromekelleher, I would say that speed per se is less important than scalability. I.e., whether it takes 5 minutes or 18 minutes is less important than being able to run this robustly within reasonable memory limits, and demonstrating that increasing the cluster size makes the whole computation complete faster (up to some natural limit such as the number of chunks in the data).

@aktech
Copy link
Contributor

aktech commented Nov 18, 2020

So, just to clarify, we're looking at about a 4X slow-down for the map-reduce implementation? This seems OK, if it allows us to scale.

That is true for simulated data with random values, although on the actual data its ~10 mins, which is 2X slow, assuming the skallel implementation is able to run on the MalariaGEN data with same performance as it runs on the simulated data.

@aktech
Copy link
Contributor

aktech commented Nov 19, 2020

Also, to compare with the map-reduce implementation the same takes 18 minutes roughly.

So, this is reduced to 10 minutes with better chunking.

@alimanfoo I think it is able to scale well and memory hasn't been a problem from what I have seen from various experiments, I did on Map-reduce implementation (if each worker has 8GB memory allowance, which seems reasonable to me).

The chunk size was 100MB in all of these cases.

1. Macbook Pro 16 GB (2 workers, 4threads)

I am able to run the simulated data of same size as MalariaGEN on my laptop as well, here is the notebook. Following are the links to reports:

It took about 8 hours 19 minutes overnight, given the fact that there would be some background tasks running on my laptop too and I also suspect, it might have slept and paused the workflow at night, but the point is I was able to run it on my laptop.

I chose two workers to give 8GB memory to each worker. I didn't had any memory issues. 8GB per worker seems like a good number for memory allowance.

2. Coiled cluster: 24 Worker (16 GB, 4 Cores each): MalariaGEN data

Time Taken: 10 min 48 sec

3. Coiled cluster: 50 Worker (16 GB, 4 Cores each): MalariaGEN data

Time Taken: 5 min 54 sec

Let me know if this sounds reasonable.


Also, thanks to @mrocklin's @coiled for generously increasing my cores limit to 200.

@mrocklin
Copy link

mrocklin commented Nov 19, 2020 via email

@mrocklin
Copy link

Looking at the performance reports it seems like you're definitely not dask-communication bound, so I would not expect decreasing chunking to improve performance. This aligns with @aktech 's intutition from the meeting.

@necaris
Copy link

necaris commented Nov 19, 2020

Also, thanks to @mrocklin

I don't remember doing this, but I'm glad that it happened regardless. If you all want an team account with more capacity then do let me know.

That was me 😄 but yes, if more capacity is needed either of us is happy to help!

@eric-czech
Copy link
Collaborator

Hey @mrocklin looking at matmul, it looks like it must have contracted axes .. right? It would be great to know if it's possible to implement matrix multiplication using reductions and no contractions instead since we could easily extend that to our more general pairwise metrics.

@mrocklin
Copy link

mrocklin commented Nov 20, 2020 via email

@tomwhite
Copy link
Collaborator

It would be great to know if it's possible to implement matrix multiplication using reductions and no contractions instead since we could easily extend that to our more general pairwise metrics.

Could we try replacing calls to x @ y (in the GWAS code) with something like da.tensordot(x, y, (1, 0)) to see if that has any effect?

@eric-czech
Copy link
Collaborator

eric-czech commented Nov 20, 2020

Good idea @tomwhite, I tried swapping that in the notebook mentioned at https://github.com/pystatgen/sgkit/issues/390#issuecomment-730449672 and saw this on small simulated arrays:

Screen Shot 2020-11-20 at 6 38 12 AM

Do you think it is worth starting a conversation as a dask issue about changing the matmul implementation @mrocklin?

@ravwojdyla
Copy link
Collaborator

ravwojdyla commented Nov 20, 2020

Btw in the meantime for 2d input we could also use dot: x @ y -> da.dot(x, y) which in dask under the hood uses tensordot (and should give the same results as matmul for 2d). +1 to starting the discussion about matmul using the same trick of blockwise product followed by reduction, and maybe potentially having a common code for tensordot, matmul, dot, einsum (it looks like it's also using this trick btw with it's own implementation).

@mrocklin
Copy link

mrocklin commented Nov 20, 2020 via email

@mrocklin
Copy link

mrocklin commented Nov 20, 2020 via email

@aktech
Copy link
Contributor

aktech commented Jan 21, 2021

I have implemented a blockwise version of pairwise in this branch (based on @ravwojdyla’s implementation of matmul in this PR: dask/dask#7000).

The blockwise implementation doesn’t seems to be any better than the blocks implementation (the one for which I shared the results above). I ran some benchmarks to compare them, here is the comparison of memory and CPU usage between the two implementations:

Screenshot 2021-01-21 at 14 53 13

Data:

  • Random data of size about 10% of malariaGEN: 2.83 GB,
  • 50 MB chunks
  • Machine: 48 cores and 64GB memory

In general the blockwise implementation was a bit slower than the blocks implementation, it took the following times:

  • Without blockwise: 6min 24s
  • With blockwise: 10min 42s

Example notebook for this comparison is here.

@ravwojdyla
Copy link
Collaborator

ravwojdyla commented Jan 25, 2021

@aktech reviewed this version, I believe the blockwise implementation can perform as well as the bespoke blocks implementation, here are some comments:

  • what's the need for the extra l dimension here?
  • this should be implemented as a da.reduction

Furthermore (orthogonal to the blockwise investigation):

  • instead of summing two separate da.triu, would it be possible to reuse a single result of da.triu? (and please use the same trick in both blocks and blockwise)
  • what's the advantage of having euclidean_reduce guvectorized?
  • there seem to be a substantial overhead of handling "missing" (negative) values in the current euclidean_map, once we choose how we orchestrate blocks (blocks vs blockwise etc), we should research if there is a way to optimize it

Btw for future reference:

  • having the performance notebook collocated with the code that is required (extra methods etc) and producing the graphs would make things easier to review, I can only assume that you have used this branch as the "environment" for that notebook?
  • having performance reports linked would help too

Don't hesitate to ping me if something is not clear or problematic.

@aktech
Copy link
Contributor

aktech commented Jan 25, 2021

@ravwojdyla

what's the need for the extra l dimension here?

When a pair of partial vectors are evaluated for the map step of the pairwise for correlation metric, it gives an array of size 6, to capture that in the output of the blockwise and be able to reshape it we need to add that dimension (l in this case) which is a function of the metric.

To elaborate on this based on my understanding of blockwise, when we perform blockwise the actual output dimension is not known to dask, since it would never know what the function is doing to the dimension of the output. The only way for blockwise to know its dimension is via its arguments, which is out_ind (and adjust_chunks), I call that as the anticipated dimension of blockwise’s output. The anticipated dimension is not always same as actual dimension, so if you perform a reshape on the output, it has to be valid w.r.t the anticipated dimension (irrespective of the fact that it's valid for the actual dimension). This makes it important to make the anticipated dimension suitable for reshaping in this case, hence adding the 4th dimension ‘l’, which makes the output dimension a function of nparams.

Also, note that the np.new_axis is not really required, in this case it might just work without that, I added that to be able to do out.compute() on blockwise’s output prior to reshaping to see what blockwise returned.

Let me know if this doesn’t makes sense or seems inaccurate.

this should be implemented as a da.reduction

I see, I have never used that. I’ll have a look, do you reckon it will make it more efficient? Or is that a standard way of doing reduction in dask?

instead of summing two separate da.triu, would it be possible to reuse a single result of da.triu? (and please use the same trick in both blocks and blockwise)

I think so, I'll give it a go.

what's the advantage of having euclidean_reduce guvectorized?

Not much really, I think it started with having some for loops initially maybe, then its was never removed.

Btw for future reference:

Yes, sure. Apologies for the missing information. I used this branch for comparison. I have now updated the notebook to reflect the actual experiment.

@ravwojdyla
Copy link
Collaborator

Let me know if this doesn’t makes sense or seems inaccurate.

Ah, I see, it's for the correlation metric, ok, if we decide to use blockwise, it should be probably explicitly stated. Thanks for the clarification tho.

this should be implemented as a da.reduction

I see, I have never used that. I’ll have a look, do you reckon it will make it more efficient? Or is that a standard way of doing reduction in dask?

Yes, here's a sketch of the euclidean distance variant:

    ...
    o = da.blockwise(
        _pairwise,
        'ijk',
        x,
        'ik',
        x,
        'jk',
        adjust_chunks={'k': 1},
        dtype=x.dtype,
        concatenate=False,
        h=np.empty(n_map_param, dtype=x.dtype),
    )

    r = da.reduction(o,
                     chunk=lambda x_chunk, axis, keepdims: x_chunk,
                     combine=lambda x_chunk, axis, keepdims: x_chunk.sum(-1)[..., np.newaxis],
                     aggregate=lambda x_chunk, axis, keepdims: metric_reduce_ufunc(x_chunk),
                     axis=-1,
                     dtype=np.float,
                     name="pairwise")
    t = da.triu(r)
    return t + t.T

On my laptop, this performs as well as the custom blocks. Here it's also worth mentioned that da.reduction has a handy parameter split_every, that could potentially be exposed on the pairwise_distance method.

what's the advantage of having euclidean_reduce guvectorized?

Not much really, I think it started with having some for loops initially maybe, then its was never removed.

If there is not need for it, it should be removed?

Yes, sure. Apologies for the missing information. I used this branch for comparison. I have now updated the notebook to reflect the actual experiment.

Having notebook as GH gist isn't great, because it's hard to rerun it or track it, I assume the gist should be run using the code from the branch, but it would be a lot more clear if the notebook just lived in the branch for the time of the comparison, so it's easy to just clone it and run it or change stuff. Does that make sense @aktech?

@aktech
Copy link
Contributor

aktech commented Jan 26, 2021

On my laptop, this performs as well as the custom blocks. Here it's also worth mentioned that da.reduction has a handy parameter split_every, that could potentially be exposed on the pairwise_distance method.

Oh, nice! Thanks, let me do the same for correlation too.

If there is not need for it, it should be removed?

Yes, definitely.

Having notebook as GH gist isn't great, because it's hard to rerun it or track it, I assume the gist should be run using the code from the branch, but it would be a lot more clear if the notebook just lived in the branch for the time of the comparison, so it's easy to just clone it and run it or change stuff. Does that make sense @aktech?

Ah, I see what you mean. I did that because many times the IPython notebooks in repository are not loaded at all, so if people just want to see the results there isn't a way and nbviewer doesn't seems to load from a repo link. Although I completely agree with your point it makes review difficult. I'll keep that in mind for future, thanks!

@aktech
Copy link
Contributor

aktech commented Jan 27, 2021

@ravwojdyla I have now used the da.reduction approach suggested above to compute the pairwise distance in blockwise. I did a quick comparison by running both locally on about 1 GB data and they are performing nearly the same,
roughly:

  • blocks implementation: 9 min 5 sec
  • blockwise implementation: 9 min 42 sec

I'll run it again on a little larger dataset to compare and will share a notebook thereafter, meanwhile the implementation is in this branch. I have also removed guvectorize from euclidean_reduce.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants