Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pairwise distance calculation #306

Merged
merged 20 commits into from
Oct 22, 2020
Merged

Conversation

aktech
Copy link
Contributor

@aktech aktech commented Oct 5, 2020

This is an attempt to implement: #241

It is based on @eric-czech 's suggestion here: https://github.com/pystatgen/sgkit/issues/241#issuecomment-698878378. It implements Euclidean and Pearson correlation using the Map/reduce idea. I have tried to document the algorithm in the pairwise function. It only calculates the upper triangular matrix, to avoid repeated calculations. For the public API, I have taken inspiration from scipy.spatial.distance.pdist. I haven't added it to the api.rst yet.

I haven't ran this on GPU, but for the API, I am thinking to use, target (=cuda) argument of guvectorize. From user's point of view, I think we can let the user set it either via environment variable like say SGKIT_USE_GPU or via some kind of sgkit config, similar to dask's config object, and then we can compile the guvectorize functions to run on GPU. I am open to opinions on both.

cc @eric-czech @jeromekelleher

@eric-czech
Copy link
Collaborator

Thanks @aktech! Looks good, nice work figuring out how to omit the blocks for one of the triangular matrices.

I was feeling a little nervous about the map/reduce recommendation so I did some more research and found a couple key things:

  1. Dask array ops like matmul (which is a pairwise calculation when multiplying a matrix by itself) are apparently not actually block-scalable in both dimensions. I was surprised that they work this way, but the da.blockwise usage like this pairs up whole rows of blocks, concatenates them all together and then runs np.matmul on them. This is effectively like running an x.rechunk((None, -1)) beforehand, which is a bit disappointing.
  2. guvectorize definitely does not materialize arrays that need to be broadcast together

It is certainly worth considering the fact that the level of scalability we're trying for here isn't something dask does in core operations so I put together a bunch of different options and compared them in this notebook. My takeaways from this:

  1. The only way to get performance comparable to functions like da.corrcoef is to feed guvectorize 2D inputs and apply vectorized functions within that custom function. This is quite hard though since numba doesn't implement a lot of numpy functions that would make this more usable and realistically, we need to be able to run our own loops to handle things like missing values and metrics that can't be expressed as numpy.
  2. Everything is significantly faster than the implementation I suggested unless you rechunk to remove column chunks first, in which case it's about the same as most of the others
    • This makes sense since this implementation should scale linearly with the number of column chunks
  3. Using njit/cuda.jit instead of guvectorize doesn't help

tl;dr I think it still makes sense to move forward with this implementation, but we need to communicate the fact that performance at least somewhat comparable to functions like da.corrcoef is only possible when column chunks are removed. I would suggest we make this the default behavior. And if we encounter a metric that won't work in the map/reduce model, this is a nice simple alternative:

@guvectorize(['void(int8[:], int8[:], float64[:])'], '(n),(n)->()', fastmath=True)
def corrcoef(x, y, out):
    # Equivalent to np.corrcoef(x, y)[0, 1] but 2x faster
    cov = ((x - x.mean()) * (y - y.mean())).sum() 
    out[0] = cov / (x.std() * y.std()) / x.shape[0]
    
def pairwise(x, fn):
    return da.blockwise(
        # Lambda wraps reshape for broadcast
        lambda x, y: fn(x[:, None, :], y), 'jk', 
        x, 'ji', x, 'ki', 
        dtype='float64',
        concatenate=True
    )
    
pairwise(x, corrcoef)

I'm also thinking that there is a way to express the loop we have now that references .blocks as a blockwise call like this, though I haven't been able to get it to work yet.

@aktech
Copy link
Contributor Author

aktech commented Oct 7, 2020

@eric-czech Thanks for the analysis, that's really very useful. The option 6 is indeed much faster than 5 with a slight change, let me try to get rid of the column chunks, to incorporate that. Also, in the current implementation since we omit the blocks for one of the triangular matrices, it should be even more faster!

Also, do you have any comments on the proposed approach for running on GPU?

sgkit/distance/core.py Outdated Show resolved Hide resolved
sgkit/distance/core.py Outdated Show resolved Hide resolved
sgkit/distance/core.py Outdated Show resolved Hide resolved
sgkit/distance/core.py Outdated Show resolved Hide resolved
sgkit/distance/metrics.py Outdated Show resolved Hide resolved
@eric-czech
Copy link
Collaborator

Also, do you have any comments on the proposed approach for running on GPU?

For posterity after talking on our dev call, it would be ideal if a user could initiate GPU computations like this:

(
    ds
    .pipe(sg.count_call_alternate_alleles)
    # Move to gpu as in https://docs.dask.org/en/latest/gpu.html#arrays
    .assign(count_call_alternate_alleles=lambda ds: ds.count_call_alternate_alleles.map_blocks(cupy.asarray))
    # pairwise_distance knows how to pick guvectorize functions based on type of array._meta
    .pipe(sg.pairwise_distance, metric='euclidean') 
)

I think that would be best since many other dask functions will work automatically when cupy is the chunk backend. As far as detecting this goes, type(ddc._meta) == cupy.core.core.ndarray does some like a reasonable way to do it (as used in dask here). That seems to be the intent of that attribute based on dask/dask#2977. I'm not sure why it's hidden.

@codecov-io
Copy link

codecov-io commented Oct 9, 2020

Codecov Report

Merging #306 into master will decrease coverage by 1.16%.
The diff coverage is 43.47%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #306      +/-   ##
==========================================
- Coverage   96.49%   95.33%   -1.17%     
==========================================
  Files          29       31       +2     
  Lines        2053     2099      +46     
==========================================
+ Hits         1981     2001      +20     
- Misses         72       98      +26     
Impacted Files Coverage Δ
sgkit/distance/metrics.py 21.21% <21.21%> (ø)
sgkit/distance/api.py 100.00% <100.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 8467a4d...ed6c434. Read the comment docs.

@aktech
Copy link
Contributor Author

aktech commented Oct 12, 2020

@eric-czech I was updating the implementation to get rid of column chunks and realised that and the last dimension is not required anymore, i.e. the complete distance calculation can be done in the map step itself, because the map step receives a full pair of vectors which means the reduce step can be performed right after (together basically) it as there are no chunks for aggregation in the reduce step. Let me know if that doesn't makes sense.

@eric-czech
Copy link
Collaborator

I was updating the implementation to get rid of column chunks and realised that and the last dimension is not required anymore, i.e. the complete distance calculation can be done in the map step itself, because the map step receives a full pair of vectors which means the reduce step can be performed right after (together basically) it as there are no chunks for aggregation in the reduce step.

Makes sense, the custom functions can use full vectors if there are no column chunks (which may not necessarily be true). Is there a question I can help answer with that though?

@aktech
Copy link
Contributor Author

aktech commented Oct 13, 2020

if there are no column chunks (which may not necessarily be true)

Since, we are getting rid of column chunks (via x.rechunk((None, -1))), there won't be any column chunks and it would not be map reduce anymore, there will be just one dispatch function basically (instead of two). I am not sure what you meant by "which may not necessarily be true"?

@eric-czech
Copy link
Collaborator

Since, we are getting rid of column chunks (via x.rechunk((None, -1))), there won't be any column chunks

When I suggested that I meant it as default but optional behavior. There would definitely be no need for map/reduce functions if we made that mandatory.

Two things would make sense here in moving forward:

  1. Keep going with the map/reduce functions
  • Make .rechunk((None, -1)) default but optional behavior, perhaps with a keep_chunks=False flag
  • Validate that the performance of loops around .blocks references is the same as da.blockwise for fairly large arrays with large numbers of blocks
  • Communicate the performance/scalability tradeoff in the docs
  1. Switch to https://github.com/pystatgen/sgkit/pull/306#issuecomment-704294220
  • This wouldn't require benchmarking to prove anything

In the interest of time/simplicity, I'll recommend switching to 2 now. We can just get something basic in that way and iterate from there. Does that make sense @aktech? Then all the custom functions can work on whole vectors.

@aktech
Copy link
Contributor Author

aktech commented Oct 13, 2020

In the interest of time/simplicity, I'll recommend switching to 2 now. We can just get something basic in that way and iterate from there. Does that make sense @aktech? Then all the custom functions can work on whole vectors.

Yes, that makes perfect sense, but one quick question are you suggesting by moving forward with 2 now, we should skip 1 for now? i.e. in this PR. Also, things are overlapping in the 1st and 2nd point you made for example (Make .rechunk((None, -1)) default but optional behavior, Communicate the performance/scalability tradeoff in the docs), these are mentioned on the https://github.com/pystatgen/sgkit/pull/306#issuecomment-704294220 as well, that's why I am not entirely sure about the suggestion.

@eric-czech
Copy link
Collaborator

Yes, that makes perfect sense, but one quick question are you suggesting by moving forward with 2 now, we should skip 1 for now

Yes.

Also, things are overlapping in the 1st and 2nd point you made for example (Make .rechunk((None, -1)) default but optional behavior, Communicate the performance/scalability tradeoff in the docs), these are mentioned on the #306 (comment) as well, that's why I am not entirely sure about the suggestion.

Yep -- you can omit any code or documentation related to chunking. I meant everything in the number 1 and 2 scenarios to be exclusive. I am imagining a single pairwise function that wraps the da.blockwise call to individual numba functions, each of which will take whole vector inputs.

@aktech
Copy link
Contributor Author

aktech commented Oct 14, 2020

@eric-czech Thanks for clarification, I have pushed the changes. I'll update the docs in a bit. Can you confirm if the change seems appropriate for what you have suggested.

Also, I did a comparison between Alistair's prototype with the map reduce implementation in a notebook, which I'll share in a bit. (The gist was his application of guvectorize on scipy's pdist was faster than the map reduce implementation)

@eric-czech
Copy link
Collaborator

@eric-czech Thanks for clarification, I have pushed the changes. I'll update the docs in a bit. Can you confirm if the change seems appropriate for what you have suggested.

Thanks @aktech, that's more or less what I was picturing. I'll hold off on any specific questions/suggestions though until the PR comes out of draft status.

@aktech
Copy link
Contributor Author

aktech commented Oct 14, 2020

Thanks, I'll bring this out of draft in an hour or so, after I update the documentation.

@aktech aktech marked this pull request as ready for review October 14, 2020 14:16
@aktech
Copy link
Contributor Author

aktech commented Oct 14, 2020

Here is a brief comparison of speed, memory and CPU utilisation between this implementation, the previous implementation and Alistair's prototype on matrix of size 100 million: notebook.

Note that the branch was changed for 3, 4, 5 in the above notebook, which means the function used is still pdist from sgkit but the underneath implementation is different, the links for which is mentioned in the notebook.

sgkit/distance/api.py Outdated Show resolved Hide resolved
sgkit/distance/core.py Outdated Show resolved Hide resolved
sgkit/distance/core.py Outdated Show resolved Hide resolved
sgkit/distance/core.py Outdated Show resolved Hide resolved
@aktech
Copy link
Contributor Author

aktech commented Oct 18, 2020

@eric-czech @alimanfoo Thanks for enlightening me on this, that makes sense to me now. I have updated the metrics to handle missing values. Basically, negative and nan are considered as missing values as per the discussion above. I have also added tests for the same and for all the functionality so far.

Also, I had to get rid of fastmath=True param from guvectorize due to this issue in Numba.

@eric-czech
Copy link
Collaborator

Thanks @aktech, this looks great!

What ever happened with the GPU experiments? Can you drop some details on that into an issue if you weren't able to get it working?

Also, I had to get rid of fastmath=True param from guvectorize due to this issue in Numba.

Good to know, but does it matter if all the checks in the guvectorize functions are for values > 0?

sgkit/distance/api.py Outdated Show resolved Hide resolved
@aktech
Copy link
Contributor Author

aktech commented Oct 19, 2020

What ever happened with the GPU experiments? Can you drop some details on that into an issue if you weren't able to get it working?

I am planning to working on it this week, I gave a quick try with GPU on a borrowed machine last week and it didn't work straightaway throwing some NotImplementedError from cupy, probably to due guvectorize functions, I didn't put much time on it last week to conclude anything. I don't have the access to that machine anymore, so now I am creating one properly in
the cloud on GCP to do proper testing, will share the results after that, meanwhile, I'll create an issue for the same.

Update: GPU issue: https://github.com/pystatgen/sgkit/issues/338

Good to know, but does it matter if all the checks in the guvectorize functions are for values > 0?

I does actually for filtering nan values, because while comparing > 0, it should know that np.nan > 0 should return False, which it doesn't in the case of fastmath=True

@alimanfoo
Copy link
Collaborator

alimanfoo commented Oct 20, 2020 via email

@aktech
Copy link
Contributor Author

aktech commented Oct 20, 2020

For GPU experiments, have you tried Google colab? I think they give you an
Nvidia GPU for free.

I gave a quick try yesterday and setting up the environment was a bit non-trivial, like installing conda and adding stuff to path and installing system requirements. I didn't thought it will be easily usable, so didn't put much effort into it, now that you have suggested, I'll give it an another go.

@eric-czech
Copy link
Collaborator

@alimanfoo did you want to take another look at this? Otherwise, I'll set it to merge today.

@eric-czech eric-czech added the auto-merge Auto merge label for mergify test flight label Oct 22, 2020
@mergify mergify bot merged commit ac70d43 into sgkit-dev:master Oct 22, 2020
@aktech aktech deleted the pair-wise branch October 22, 2020 15:16
Copy link
Collaborator

@alimanfoo alimanfoo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some very minor suggestions, otherwise looks good. There really is some magic here! At some point I'd love to understand how this has eliminated the need for separate map and reduce steps.

"""

try:
metric_ufunc = getattr(metrics, f"{metric}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
metric_ufunc = getattr(metrics, f"{metric}")
metric_ufunc = getattr(metrics, metric)

x: ArrayLike,
metric: str = "euclidean",
) -> np.ndarray:
"""Calculates the pairwise distance between all pairs of vectors in the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Calculates the pairwise distance between all pairs of vectors in the
"""Calculates the pairwise distance between all pairs of row vectors in the

Just to help make obvious this is computing distance between rows. (And thus when computing distance between samples, user will have to transpose input.)

Comment on lines +54 to +57
[array-like, shape: (M, N)]
A two dimensional distance matrix, which will be symmetric. The dimension
will be (M, N). The (i, j) position in the resulting array
(matrix) denotes the distance between ith and jth vectors.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[array-like, shape: (M, N)]
A two dimensional distance matrix, which will be symmetric. The dimension
will be (M, N). The (i, j) position in the resulting array
(matrix) denotes the distance between ith and jth vectors.
[array-like, shape: (M, M)]
A two dimensional distance matrix, which will be symmetric. The dimension
will be (M, M). The (i, j) position in the resulting array
(matrix) denotes the distance between ith and jth row vectors in the input array.

@alimanfoo
Copy link
Collaborator

Oops, didn't notice this was merged already. Hope review comments are helpful anyway, perhaps a small follow-up PR to make the small suggested clarifications would be good.

@eric-czech
Copy link
Collaborator

There really is some magic here! At some point I'd love to understand how this has eliminated the need for separate map and reduce steps.

FYI @alimanfoo this implementation, like matrix multiplication in dask, assumes whole rows of blocks fit in memory. It isn't particularly scalable this way but is in greater alignment with how dask does similar things (e.g. da.corrcoef). IMO this is ok since you can't really scale quadratic operations up to gigantic datasets anyhow. Let me know if you disagree.

@aktech
Copy link
Contributor Author

aktech commented Oct 23, 2020

@alimanfoo I'll fix these in a follow up PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
auto-merge Auto merge label for mergify test flight
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants