Skip to content

Commit

Permalink
Add cudf/dask-cudf support for points/line/area rendering and shade
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmmease committed Oct 12, 2019
1 parent 13d651e commit 8d06620
Show file tree
Hide file tree
Showing 21 changed files with 1,327 additions and 540 deletions.
33 changes: 19 additions & 14 deletions datashader/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@


@memoize
def compile_components(agg, schema, glyph):
def compile_components(agg, schema, glyph, cuda=False):
"""Given a ``Aggregation`` object and a schema, return 5 sub-functions.
Parameters
Expand Down Expand Up @@ -51,20 +51,20 @@ def compile_components(agg, schema, glyph):
reds = list(traverse_aggregation(agg))

# List of base reductions (actually computed)
bases = list(unique(concat(r._build_bases() for r in reds)))
bases = list(unique(concat(r._build_bases(cuda) for r in reds)))
dshapes = [b.out_dshape(schema) for b in bases]
# List of tuples of (append, base, input columns, temps)
calls = [_get_call_tuples(b, d, schema) for (b, d) in zip(bases, dshapes)]
calls = [_get_call_tuples(b, d, schema, cuda) for (b, d) in zip(bases, dshapes)]
# List of unique column names needed
cols = list(unique(concat(pluck(2, calls))))
# List of temps needed
temps = list(pluck(3, calls))

create = make_create(bases, dshapes)
create = make_create(bases, dshapes, cuda)
info = make_info(cols)
append = make_append(bases, cols, calls, glyph)
combine = make_combine(bases, dshapes, temps)
finalize = make_finalize(bases, agg, schema)
finalize = make_finalize(bases, agg, schema, cuda)

return create, info, append, combine, finalize

Expand All @@ -79,13 +79,18 @@ def traverse_aggregation(agg):
yield agg


def _get_call_tuples(base, dshape, schema):
return base._build_append(dshape, schema), (base,), base.inputs, base._build_temps()
def _get_call_tuples(base, dshape, schema, cuda):
return (base._build_append(dshape, schema, cuda),
(base,), base.inputs, base._build_temps(cuda))


def make_create(bases, dshapes):
def make_create(bases, dshapes, cuda):
creators = [b._build_create(d) for (b, d) in zip(bases, dshapes)]
array_module = np
if cuda:
import cupy
array_module = cupy
else:
array_module = np
return lambda shape: tuple(c(shape, array_module) for c in creators)


Expand Down Expand Up @@ -145,22 +150,22 @@ def combine(base_tuples):
return combine


def make_finalize(bases, agg, schema):
def make_finalize(bases, agg, schema, cuda):
arg_lk = dict((k, v) for (v, k) in enumerate(bases))
if isinstance(agg, summary):
calls = []
for key, val in zip(agg.keys, agg.values):
f = make_finalize(bases, val, schema)
f = make_finalize(bases, val, schema, cuda)
try:
# Override bases if possible
bases = val._build_bases()
bases = val._build_bases(cuda)
except AttributeError:
pass
inds = [arg_lk[b] for b in bases]
calls.append((key, f, inds))

def finalize(bases, **kwargs):
data = {key: finalizer(get(inds, bases), **kwargs)
def finalize(bases, cuda=False, **kwargs):
data = {key: finalizer(get(inds, bases), cuda, **kwargs)
for (key, finalizer, inds) in calls}
return xr.Dataset(data)
return finalize
Expand Down
14 changes: 12 additions & 2 deletions datashader/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@
from .resampling import resample_2d, resample_2d_distributed
from . import reductions as rd

try:
import cudf
except ImportError:
cudf = None

try:
import dask_cudf
except ImportError:
dask_cudf = None

class Axis(object):
"""Interface for implementing axis transformations.
Expand Down Expand Up @@ -107,7 +116,7 @@ class LogAxis(Axis):
@staticmethod
@ngjit
def mapper(val):
return log10(val)
return log10(float(val))

@staticmethod
@ngjit
Expand Down Expand Up @@ -993,7 +1002,8 @@ def bypixel(source, canvas, glyph, agg):
source = source.drop([col for col in columns if col not in cols_to_keep])
source = source.to_dask_dataframe()

if isinstance(source, pd.DataFrame):
if (isinstance(source, pd.DataFrame) or
(cudf and isinstance(source, cudf.DataFrame))):
# Avoid datashape.Categorical instantiation bottleneck
# by only retaining the necessary columns:
# https://github.com/bokeh/datashader/issues/396
Expand Down
11 changes: 11 additions & 0 deletions datashader/data_libraries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,14 @@
from . import dask # noqa (API import)
except ImportError:
pass

try:
import cudf as _cudf # noqa (Test cudf installed)
import cupy as _cupy # noqa (Test cupy installed)
from . import cudf # noqa (API import)

import dask_cudf as _dask_cudf # noqa (Test dask_cudf installed)
from . import dask_cudf # noqa (API import)

except ImportError:
pass
9 changes: 9 additions & 0 deletions datashader/data_libraries/cudf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import absolute_import
from datashader.data_libraries.pandas import default
from datashader.core import bypixel
import cudf


@bypixel.pipeline.register(cudf.DataFrame)
def cudf_pipeline(df, schema, canvas, glyph, summary):
return default(glyph, df, schema, canvas, summary, cuda=True)
24 changes: 14 additions & 10 deletions datashader/data_libraries/dask.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import absolute_import, division

import dask
import pandas as pd
import dask.dataframe as dd
from collections import OrderedDict
from dask.base import tokenize, compute
Expand All @@ -16,8 +15,8 @@


@bypixel.pipeline.register(dd.DataFrame)
def dask_pipeline(df, schema, canvas, glyph, summary):
dsk, name = glyph_dispatch(glyph, df, schema, canvas, summary)
def dask_pipeline(df, schema, canvas, glyph, summary, cuda=False):
dsk, name = glyph_dispatch(glyph, df, schema, canvas, summary, cuda=cuda)

# Get user configured scheduler (if any), or fall back to default
# scheduler for dask DataFrame
Expand Down Expand Up @@ -61,12 +60,12 @@ def shape_bounds_st_and_axis(df, canvas, glyph):


@glyph_dispatch.register(Glyph)
def default(glyph, df, schema, canvas, summary):
def default(glyph, df, schema, canvas, summary, cuda=False):
shape, bounds, st, axis = shape_bounds_st_and_axis(df, canvas, glyph)

# Compile functions
create, info, append, combine, finalize = \
compile_components(summary, schema, glyph)
compile_components(summary, schema, glyph, cuda=cuda)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append)
Expand All @@ -81,25 +80,30 @@ def chunk(df):
keys2 = [(name, i) for i in range(len(keys))]
dsk = dict((k2, (chunk, k)) for (k2, k) in zip(keys2, keys))
dsk[name] = (apply, finalize, [(combine, keys2)],
dict(coords=axis, dims=[glyph.y_label, glyph.x_label]))
dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label]))
return dsk, name


@glyph_dispatch.register(LineAxis0)
def line(glyph, df, schema, canvas, summary):
def line(glyph, df, schema, canvas, summary, cuda=False):
if cuda:
from cudf import concat
else:
from pandas import concat

shape, bounds, st, axis = shape_bounds_st_and_axis(df, canvas, glyph)

# Compile functions
create, info, append, combine, finalize = \
compile_components(summary, schema, glyph)
compile_components(summary, schema, glyph, cuda=cuda)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append)

def chunk(df, df2=None):
plot_start = True
if df2 is not None:
df = pd.concat([df.iloc[-1:], df2])
df = concat([df.iloc[-1:], df2])
plot_start = False
aggs = create(shape)
extend(aggs, df, st, bounds, plot_start=plot_start)
Expand All @@ -112,5 +116,5 @@ def chunk(df, df2=None):
dsk[(name, i)] = (chunk, (old_name, i - 1), (old_name, i))
keys2 = [(name, i) for i in range(df.npartitions)]
dsk[name] = (apply, finalize, [(combine, keys2)],
dict(coords=axis, dims=[glyph.y_label, glyph.x_label]))
dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label]))
return dsk, name
9 changes: 9 additions & 0 deletions datashader/data_libraries/dask_cudf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from __future__ import absolute_import
from datashader.data_libraries.dask import dask_pipeline
from datashader.core import bypixel
import dask_cudf


@bypixel.pipeline.register(dask_cudf.DataFrame)
def dask_cudf_pipeline(df, schema, canvas, glyph, summary):
return dask_pipeline(df, schema, canvas, glyph, summary, cuda=True)
5 changes: 3 additions & 2 deletions datashader/data_libraries/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ def pandas_pipeline(df, schema, canvas, glyph, summary):

@glyph_dispatch.register(_PointLike)
@glyph_dispatch.register(_AreaToLineLike)
def default(glyph, source, schema, canvas, summary):
create, info, append, _, finalize = compile_components(summary, schema, glyph)
def default(glyph, source, schema, canvas, summary, cuda=False):
create, info, append, _, finalize = compile_components(summary, schema, glyph, cuda)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append)
Expand All @@ -44,6 +44,7 @@ def default(glyph, source, schema, canvas, summary):
extend(bases, source, x_st + y_st, x_range + y_range)

return finalize(bases,
cuda=cuda,
coords=OrderedDict([(glyph.x_label, x_axis),
(glyph.y_label, y_axis)]),
dims=[glyph.y_label, glyph.x_label])
Loading

0 comments on commit 8d06620

Please sign in to comment.