Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Review] Allow lbfgs and admm with dask cupy inputs #89

Merged
merged 11 commits into from
Nov 16, 2020

Conversation

daxiongshu
Copy link
Contributor

@daxiongshu daxiongshu commented Oct 10, 2020

This PR allows lbfgs solver to accept cupy inputs. The main changes are:

  • converting numpy array, beta, to cupy array to accelerate pointwise_loss and pointwise_gradient

  • converting cupy arrays, loss and gradients, to numpy arrays to utilize scipy.optimize.fmin_l_bfgs_b

Similar changes are made to add cupy support to admm.

dask cupy tests are added for all 5 solvers.

Allow cupy inputs in algorithms "newton", "gradient_descen" and "prox…
@daxiongshu
Copy link
Contributor Author

@pentschev Could you please take a look at this PR? Thank you so much!

Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me, given the current limitation with converting CuPy arrays back and forth. Please note that starting from NumPy 1.20 (pending confirmation), we will have an alternative to that, as seen in dask/dask#6738 .

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @daxiongshu . Two questions:

  1. Should there be tests for this? They probably wouldn't run on CI, but at least other devs with GPUs could check things over time.
  2. Do the other algorithms need this same treatment?

Comment on lines 360 to 362
if 'cupy' in str(type(X._meta)):
import cupy
return cupy.asarray(beta)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this function is only used once, maybe we should drop the function and use this logic directly in the function above?

Or maybe your intention was to replicate this logic to other algorithms in this file? Should the changes here be applied to the other algotihms as well?

I'm curious, what happens now if beta is a numpy array while X is a cupy array?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious, what happens now if beta is a numpy array while X is a cupy array?

That's a good point, I completely missed that. Unless we know X is necessarily a Dask array backed by CuPy, the conditions should probably be if any('cupy' in s for s in [str(type(X)), str(type(X._meta))]).

I can't answer the other questions though, would be good to hear from @daxiongshu .

Copy link
Contributor Author

@daxiongshu daxiongshu Oct 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! Yes, this function is meant to be reused by admm with cupy. Should I include my changes for admm in this PR as well?

Yes, I'll definitely add tests for all algorithms.

I'm curious, what happens now if beta is a numpy array while X is a cupy array?

I think dask_glm.estimators only accepts dask.array as inputs due to this line:

if is_dask_array_sparse(X):

Is dask_glm.algorithms used outside dask_glm.estimators?

Click to see the example code and error

Code:

from dask_glm.estimators import LogisticRegression
import numpy
x = numpy.random.rand(10,4)
y = numpy.random.rand(10)

lr = LogisticRegression()
lr.fit(x,y)

Error:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-14-e644bf405118> in <module>
----> 1 lr.fit(x,y)

~/rapids/daskml_cupy/dask-glm/dask_glm/estimators.py in fit(self, X, y)
     65         X_ = self._maybe_add_intercept(X)
     66         fit_kwargs = dict(self._fit_kwargs)
---> 67         if is_dask_array_sparse(X):
     68             fit_kwargs['normalize'] = False
     69 

~/rapids/daskml_cupy/dask-glm/dask_glm/utils.py in is_dask_array_sparse(X)
    122     Check using _meta if a dask array contains sparse arrays
    123     """
--> 124     return isinstance(X._meta, sparse.SparseArray)
    125 
    126 

AttributeError: 'numpy.ndarray' object has no attribute '_meta'

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like a bug to me, I would normally expect that it should either support a NumPy/CuPy array automatically or that we wanted to have a more user-friendly error. Since we don't have the latter, I'm guessing the implementation is missing a guard to check whether hasattr(X, "_meta") first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply. Here is what happens to fit lbfgs when the input X is a dask cupy array and beta0 is a numpy array

n, p = X.shape
beta0 = np.zeros(p)

Click to see the sample code and error
import cupy as cp
import dask.array as da
from dask_glm.estimators import LogisticRegression

x = cp.random.rand(128, 64)
y = cp.random.randint(0, 2, 128)

dx = da.from_array(x, chunks=(32, 64))
dy = da.from_array(y, chunks=(32))

lr = LogisticRegression(solver='lbfgs')
lr.fit(dx, dy)

The error is:

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/config.py:569: UserWarning: Configuration key "fuse_ave_width" has been deprecated. Please use "optimization.fuse.ave-width" instead
  'Please use "{}" instead'.format(key, new)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-7-ab1a4e3a2bf0> in <module>
     10 
     11 lr = LogisticRegression(solver='lbfgs')
---> 12 lr.fit(dx, dy)

~/rapids/dask-glm/dask_glm/estimators.py in fit(self, X, y)
     68             fit_kwargs['normalize'] = False
     69 
---> 70         self._coef = algorithms._solvers[self.solver](X_, y, **fit_kwargs)
     71 
     72         if self.fit_intercept:

~/rapids/dask-glm/dask_glm/utils.py in normalize_inputs(X, y, *args, **kwargs)
     26             mean = mean if len(intercept_idx[0]) else safe_zeros_like(X, shape=mean.shape)
     27             Xn = (X - mean) / std
---> 28             out = algo(Xn, y, *args, **kwargs).copy()
     29             i_adj = np.sum(out * mean / std)
     30             out[intercept_idx] -= i_adj

~/rapids/dask-glm/dask_glm/algorithms.py in lbfgs(X, y, regularizer, lamduh, max_iter, tol, family, verbose, **kwargs)
    351             compute_loss_grad, beta0, fprime=None,
    352             args=(X, y),
--> 353             iprint=(verbose > 0) - 1, pgtol=tol, maxiter=max_iter)
    354 
    355     return beta

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/scipy/optimize/lbfgsb.py in fmin_l_bfgs_b(func, x0, fprime, args, approx_grad, bounds, m, factr, pgtol, epsilon, iprint, maxfun, maxiter, disp, callback, maxls)
    196 
    197     res = _minimize_lbfgsb(fun, x0, args=args, jac=jac, bounds=bounds,
--> 198                            **opts)
    199     d = {'grad': res['jac'],
    200          'task': res['message'],

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/scipy/optimize/lbfgsb.py in _minimize_lbfgsb(fun, x0, args, jac, bounds, disp, maxcor, ftol, gtol, eps, maxfun, maxiter, iprint, callback, maxls, finite_diff_rel_step, **unknown_options)
    306     sf = _prepare_scalar_function(fun, x0, jac=jac, args=args, epsilon=eps,
    307                                   bounds=new_bounds,
--> 308                                   finite_diff_rel_step=finite_diff_rel_step)
    309 
    310     func_and_grad = sf.fun_and_grad

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/scipy/optimize/optimize.py in _prepare_scalar_function(fun, x0, jac, args, bounds, epsilon, finite_diff_rel_step, hess)
    260     # calculation reduces overall function evaluations.
    261     sf = ScalarFunction(fun, x0, args, grad, hess,
--> 262                         finite_diff_rel_step, bounds, epsilon=epsilon)
    263 
    264     return sf

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/scipy/optimize/_differentiable_functions.py in __init__(self, fun, x0, args, grad, hess, finite_diff_rel_step, finite_diff_bounds, epsilon)
     74 
     75         self._update_fun_impl = update_fun
---> 76         self._update_fun()
     77 
     78         # Gradient evaluation

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/scipy/optimize/_differentiable_functions.py in _update_fun(self)
    164     def _update_fun(self):
    165         if not self.f_updated:
--> 166             self._update_fun_impl()
    167             self.f_updated = True
    168 

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/scipy/optimize/_differentiable_functions.py in update_fun()
     71 
     72         def update_fun():
---> 73             self.f = fun_wrapped(self.x)
     74 
     75         self._update_fun_impl = update_fun

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/scipy/optimize/_differentiable_functions.py in fun_wrapped(x)
     68         def fun_wrapped(x):
     69             self.nfev += 1
---> 70             return fun(x, *args)
     71 
     72         def update_fun():

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/scipy/optimize/optimize.py in __call__(self, x, *args)
     72     def __call__(self, x, *args):
     73         """ returns the the function value """
---> 74         self._compute_if_needed(x, *args)
     75         return self._value
     76 

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/scipy/optimize/optimize.py in _compute_if_needed(self, x, *args)
     66         if not np.all(x == self.x) or self._value is None or self.jac is None:
     67             self.x = np.asarray(x).copy()
---> 68             fg = self.fun(x, *args)
     69             self.jac = fg[1]
     70             self._value = fg[0]

~/rapids/dask-glm/dask_glm/algorithms.py in compute_loss_grad(beta, X, y)
    344         loss_fn = pointwise_loss(scatter_beta, X, y)
    345         gradient_fn = pointwise_gradient(scatter_beta, X, y)
--> 346         loss, gradient = compute(loss_fn, gradient_fn)
    347         return loss, gradient.copy()
    348 

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    509         postcomputes.append(x.__dask_postcompute__())
    510 
--> 511     results = schedule(dsk, keys, **kwargs)
    512     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    513 

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, pool, **kwargs)
     82         get_id=_thread_get_id,
     83         pack_exception=pack_exception,
---> 84         **kwargs
     85     )
     86 

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/local.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, **kwargs)
    484                         _execute_task(task, data)  # Re-execute locally
    485                     else:
--> 486                         raise_exception(exc, tb)
    487                 res, worker_id = loads(res_info)
    488                 state["cache"][key] = res

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/local.py in reraise(exc, tb)
    314     if exc.__traceback__ is not tb:
    315         raise exc.with_traceback(tb)
--> 316     raise exc
    317 
    318 

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/local.py in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    220     try:
    221         task, data = loads(task_info)
--> 222         result = _execute_task(task, data)
    223         id = get_id()
    224         result = dumps((result, id))

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/optimization.py in __call__(self, *args)
    961         if not len(args) == len(self.inkeys):
    962             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 963         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    964 
    965     def __reduce__(self):

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/core.py in get(dsk, out, cache)
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/core.py in _execute_task(arg, cache, dsk)
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/utils.py in apply(func, args, kwargs)
     27 def apply(func, args, kwargs=None):
     28     if kwargs:
---> 29         return func(*args, **kwargs)
     30     else:
     31         return func(*args)

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/dask/array/routines.py in _tensordot(a, b, axes)
    227         )
    228     else:
--> 229         x = tensordot(a, b, axes=axes)
    230 
    231     ind = [slice(None, None)] * x.ndim

/raid/data/ml/anaconda3/envs/rapids-0.16/lib/python3.7/site-packages/cupy/linalg/product.py in tensordot(a, b, axes)
    317     m = b.size // k if k != 0 else 0
    318 
--> 319     return core.tensordot_core(a, b, None, n, m, k, ret_shape)
    320 
    321 

TypeError: Argument 'b' has incorrect type (expected cupy.core.core.ndarray, got numpy.ndarray)

Copy link
Member

@pentschev pentschev Nov 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And is the error what you want, or should beta0 be implicitly elevated to a CuPy array? In the latter case you may want to replace that line by:

from dask.array.utils import meta_from_array, zeros_like_safe

...

beta0 = zeros_like_safe(meta_from_array(X), p)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added meta_from_array in the above, as we probably want the array to be of _meta type, should X be a Dask array too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And is the error what you want, or should beta0 be implicitly elevated to a CuPy array?

Definitely the latter. I'll use zeros_like_safe(meta_from_array(X) instead.
The current commits also solve this error but your method is better. I'll add more tests. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, sorry I think I confused myself with the discussion #90 . This PR doesn't really need zeros_like_safe(meta_from_array(X). beta is expected to be initialized as a numpy array regardless of input X being a dask cupy array or not, because it relies on scipy fmin_l_bfgs_b which only accepts numpy array.

When I look at my commits, it is all about what I said in the main thread:

  • converting numpy array, beta, to cupy array to accelerate pointwise_loss and pointwise_gradient

  • converting cupy arrays, loss and gradients, to numpy arrays to utilize scipy.optimize.fmin_l_bfgs_b

So can we leave all the zeros_like_safe(meta_from_array(X) changes to #90? I'll just add tests to this PR.
Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So can we leave all the zeros_like_safe(meta_from_array(X) changes to #90?

Sounds good to me.

@daxiongshu daxiongshu changed the title Allow lbfgs with cupy inputs [WIP] Allow lbfgs with cupy inputs Oct 29, 2020
@daxiongshu daxiongshu changed the title [WIP] Allow lbfgs with cupy inputs [WIP] Allow lbfgs and admm with cupy inputs Nov 11, 2020
@daxiongshu daxiongshu changed the title [WIP] Allow lbfgs and admm with cupy inputs [Review] Allow lbfgs and admm with cupy inputs Nov 12, 2020
@daxiongshu
Copy link
Contributor Author

This PR is ready for review. Thank you.

Comment on lines 57 to 60
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very convoluted, the best way to generate X is:

cupy = pytest.importorskip('cupy')
xp = cupy if is_cupy else np
rs = da.random.RandomState(RandomState=xp.random.RandomState)
X = rs.random((N, p), chunks=(N // nchunks, p))

It seems that make_y could also be changed to something such as:

def make_y(X, beta=np.array([1.5, -3]), chunks=2, rs=da.random.RandomState(RandomState=np.random.RandomState)):
    n, p = X.shape
    z0 = X.dot(beta)
    y = rs.random(z0.shape, chunks=z0.chunks) < sigmoid(z0)
    return y

And then you'd call it similar to:

y = make_y(X, beta=xp.array(beta), chunks=(N // nchunks,), rs=rs)

NOTE: I didn't actually test the code above, so it may need some fixing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You seem to be using this in 10 or so different tests, it's probably a good idea to write a small util to avoid copying it all back everywhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review! I notice that X and y are generated in different ways in the current tests. The following utils are used:

  • X, y = make_intercept_data(N, p, seed=seed)
  • X = da.random.random and y = make_y(...)
  • X, y = make_classification(...)
  • X, y = make_regression(...)
  • X, y = make_poisson(...)

I would like cupy inputs to be created using the same functions. Should I go into each of the utils above and add is_cupy branch? What I did currently is like just using one util function to convert the dask array to cupy after they are created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point, maybe this is indeed too much work for very little benefit. With that said, I don't necessarily oppose to just leaving things as they are now, I'll defer to @mrocklin on whether he thinks it's acceptable. The only real downside I see with this is with people eventually referring to this code and thinking this is a good way to handle such cases, for tests this is ok but for real-world application this is not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. I wrapped up the repeated code in a util function to make it look nicer.

Comment on lines 54 to 57
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as comment above.

Comment on lines 83 to 86
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Comment on lines 117 to 120
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Comment on lines 152 to 155
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Comment on lines 56 to 59
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Comment on lines 76 to 79
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Comment on lines 97 to 100
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Comment on lines 119 to 122
X = X.map_blocks(lambda x: cupy.asarray(x),
dtype=X.dtype, meta=cupy.asarray(X._meta))
y = y.map_blocks(lambda x: cupy.asarray(x),
dtype=y.dtype, meta=cupy.asarray(y._meta))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

@daxiongshu daxiongshu changed the title [Review] Allow lbfgs and admm with cupy inputs [Review] Allow lbfgs and admm with dask cupy inputs Nov 13, 2020
@daxiongshu
Copy link
Contributor Author

daxiongshu commented Nov 13, 2020

Locally all the added cupy tests passed. However, some of the added cupy tests are slower than their cpu counterparts due to small data size and small chunk size. With large data size and large chunks size the speedup is pretty decent. I published a notebook to show the speedup.
image

Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @daxiongshu , this looks good to me now. We still need @mrocklin or someone else to approve and merge this.

@TomAugspurger
Copy link
Member

I'll defer to @pentschev here. Thanks all!

@TomAugspurger TomAugspurger merged commit 1daf4c5 into dask:master Nov 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants