diff --git a/holoviews/core/data/__init__.py b/holoviews/core/data/__init__.py index 75c23bb98f..949b9b4369 100644 --- a/holoviews/core/data/__init__.py +++ b/holoviews/core/data/__init__.py @@ -55,6 +55,12 @@ except ImportError: pass +try: + from .cudf import cuDFInterface # noqa (Conditional API import) + datatypes.append('cuDF') +except ImportError: + pass + if 'array' not in datatypes: datatypes.append('array') if 'multitabular' not in datatypes: diff --git a/holoviews/core/data/cudf.py b/holoviews/core/data/cudf.py new file mode 100644 index 0000000000..ea0f62b5a3 --- /dev/null +++ b/holoviews/core/data/cudf.py @@ -0,0 +1,340 @@ +from __future__ import absolute_import + +import sys +import warnings + +try: + import itertools.izip as zip +except ImportError: + pass + +from itertools import product + +import numpy as np + +from .. import util +from ..dimension import dimension_name +from ..element import Element +from ..ndmapping import NdMapping, item_check, sorted_context +from .interface import DataError, Interface +from .pandas import PandasInterface + + +class cuDFInterface(PandasInterface): + """ + The cuDFInterface allows a Dataset objects to wrap a cuDF + DataFrame object. Using cuDF allows working with columnar + data on a GPU. Most operations leave the data in GPU memory, + however to plot the data it has to be loaded into memory. + + The cuDFInterface covers almost the complete API exposed + by the PandasInterface with two notable exceptions: + + 1) Aggregation and groupby do not have a consistent sort order + (see https://github.com/rapidsai/cudf/issues/4237) + 3) Not all functions can be easily applied to a cuDF so + some functions applied with aggregate and reduce will not work. + """ + + datatype = 'cuDF' + + types = () + + @classmethod + def loaded(cls): + return 'cudf' in sys.modules + + @classmethod + def applies(cls, obj): + if not cls.loaded(): + return False + import cudf + return isinstance(obj, (cudf.DataFrame, cudf.Series)) + + @classmethod + def init(cls, eltype, data, kdims, vdims): + import cudf + import pandas as pd + + element_params = eltype.param.objects() + kdim_param = element_params['kdims'] + vdim_param = element_params['vdims'] + + if isinstance(data, (cudf.Series, pd.Series)): + data = data.to_frame() + + if not isinstance(data, cudf.DataFrame): + data, _, _ = PandasInterface.init(eltype, data, kdims, vdims) + data = cudf.from_pandas(data) + + ncols = len(data.columns) + index_names = [data.index.name] + if index_names == [None]: + index_names = ['index'] + if eltype._auto_indexable_1d and ncols == 1 and kdims is None: + kdims = list(index_names) + + if isinstance(kdim_param.bounds[1], int): + ndim = min([kdim_param.bounds[1], len(kdim_param.default)]) + else: + ndim = None + nvdim = vdim_param.bounds[1] if isinstance(vdim_param.bounds[1], int) else None + if kdims and vdims is None: + vdims = [c for c in data.columns if c not in kdims] + elif vdims and kdims is None: + kdims = [c for c in data.columns if c not in vdims][:ndim] + elif kdims is None: + kdims = list(data.columns[:ndim]) + if vdims is None: + vdims = [d for d in data.columns[ndim:((ndim+nvdim) if nvdim else None)] + if d not in kdims] + elif kdims == [] and vdims is None: + vdims = list(data.columns[:nvdim if nvdim else None]) + + # Handle reset of index if kdims reference index by name + for kd in kdims: + kd = dimension_name(kd) + if kd in data.columns: + continue + if any(kd == ('index' if name is None else name) + for name in index_names): + data = data.reset_index() + break + if any(isinstance(d, (np.int64, int)) for d in kdims+vdims): + raise DataError("cudf DataFrame column names used as dimensions " + "must be strings not integers.", cls) + + if kdims: + kdim = dimension_name(kdims[0]) + if eltype._auto_indexable_1d and ncols == 1 and kdim not in data.columns: + data = data.copy() + data.insert(0, kdim, np.arange(len(data))) + + for d in kdims+vdims: + d = dimension_name(d) + if len([c for c in data.columns if c == d]) > 1: + raise DataError('Dimensions may not reference duplicated DataFrame ' + 'columns (found duplicate %r columns). If you want to plot ' + 'a column against itself simply declare two dimensions ' + 'with the same name. '% d, cls) + return data, {'kdims':kdims, 'vdims':vdims}, {} + + + @classmethod + def range(cls, dataset, dimension): + column = dataset.data[dataset.get_dimension(dimension, strict=True).name] + if column.dtype.kind == 'O': + return np.NaN, np.NaN + else: + return (column.min(), column.max()) + + + @classmethod + def values(cls, dataset, dim, expanded=True, flat=True, compute=True, + keep_index=False): + dim = dataset.get_dimension(dim, strict=True) + data = dataset.data[dim.name] + if not expanded: + data = data.unique() + return data.to_array() if compute else data + elif keep_index: + return data + elif compute: + return data.to_array() + return data + + + @classmethod + def groupby(cls, dataset, dimensions, container_type, group_type, **kwargs): + # Get dimensions information + dimensions = [dataset.get_dimension(d).name for d in dimensions] + kdims = [kdim for kdim in dataset.kdims if kdim not in dimensions] + + # Update the kwargs appropriately for Element group types + group_kwargs = {} + group_type = dict if group_type == 'raw' else group_type + if issubclass(group_type, Element): + group_kwargs.update(util.get_param_values(dataset)) + group_kwargs['kdims'] = kdims + group_kwargs.update(kwargs) + + # Propagate dataset + group_kwargs['dataset'] = dataset.dataset + + # Find all the keys along supplied dimensions + keys = product(*(dataset.data[dimensions[0]].unique() for d in dimensions)) + + # Iterate over the unique entries applying selection masks + grouped_data = [] + for unique_key in util.unique_iterator(keys): + group_data = dataset.select(**dict(zip(dimensions, unique_key))) + if not len(group_data): + continue + group_data = group_type(group_data, **group_kwargs) + grouped_data.append((unique_key, group_data)) + + if issubclass(container_type, NdMapping): + with item_check(False), sorted_context(False): + kdims = [dataset.get_dimension(d) for d in dimensions] + return container_type(grouped_data, kdims=kdims) + else: + return container_type(grouped_data) + + + @classmethod + def select_mask(cls, dataset, selection): + """ + Given a Dataset object and a dictionary with dimension keys and + selection keys (i.e tuple ranges, slices, sets, lists or literals) + return a boolean mask over the rows in the Dataset object that + have been selected. + """ + mask = None + for dim, sel in selection.items(): + if isinstance(sel, tuple): + sel = slice(*sel) + arr = cls.values(dataset, dim, compute=False) + if util.isdatetime(arr) and util.pd: + try: + sel = util.parse_datetime_selection(sel) + except: + pass + + new_masks = [] + if isinstance(sel, slice): + with warnings.catch_warnings(): + warnings.filterwarnings('ignore', r'invalid value encountered') + if sel.start is not None: + new_masks.append(sel.start <= arr) + if sel.stop is not None: + new_masks.append(arr < sel.stop) + if not new_masks: + continue + new_mask = new_masks[0] + for imask in new_masks[1:]: + new_mask &= imask + elif isinstance(sel, (set, list)): + for v in sel: + new_masks.append(arr==v) + if not new_masks: + continue + new_mask = new_masks[0] + for imask in new_masks[1:]: + new_mask |= imask + elif callable(sel): + new_mask = sel(arr) + else: + new_mask = arr == sel + + if mask is None: + mask = new_mask + else: + mask &= new_mask + return mask + + + @classmethod + def select(cls, dataset, selection_mask=None, **selection): + df = dataset.data + if selection_mask is None: + selection_mask = cls.select_mask(dataset, selection) + + indexed = cls.indexed(dataset, selection) + if selection_mask is not None: + df = df[selection_mask] + if indexed and len(df) == 1 and len(dataset.vdims) == 1: + return df[dataset.vdims[0].name].iloc[0] + return df + + + @classmethod + def concat_fn(cls, dataframes, **kwargs): + import cudf + return cudf.concat(dataframes, **kwargs) + + + @classmethod + def add_dimension(cls, dataset, dimension, dim_pos, values, vdim): + data = dataset.data.copy() + if dimension.name not in data: + data[dimension.name] = values + return data + + + @classmethod + def aggregate(cls, dataset, dimensions, function, **kwargs): + data = dataset.data + cols = [d.name for d in dataset.kdims if d in dimensions] + vdims = dataset.dimensions('value', label='name') + reindexed = data[cols+vdims] + agg = function.__name__ + if len(dimensions): + agg_map = {'amin': 'min', 'amax': 'max'} + agg = agg_map.get(agg, agg) + grouped = reindexed.groupby(cols, sort=False) + if not hasattr(grouped, agg): + raise ValueError('%s aggregation is not supported on cudf DataFrame.' % agg) + df = getattr(grouped, agg)().reset_index() + else: + agg_map = {'amin': 'min', 'amax': 'max', 'size': 'count'} + agg = agg_map.get(agg, agg) + if not hasattr(reindexed, agg): + raise ValueError('%s aggregation is not supported on cudf DataFrame.' % agg) + agg = getattr(reindexed, agg)() + data = dict(((col, [v]) for col, v in zip(agg.index, agg.to_array()))) + df = util.pd.DataFrame(data, columns=list(agg.index)) + + dropped = [] + for vd in vdims: + if vd not in df.columns: + dropped.append(vd) + return df, dropped + + + @classmethod + def iloc(cls, dataset, index): + import cudf + + rows, cols = index + scalar = False + columns = list(dataset.data.columns) + if isinstance(cols, slice): + cols = [d.name for d in dataset.dimensions()][cols] + elif np.isscalar(cols): + scalar = np.isscalar(rows) + cols = [dataset.get_dimension(cols).name] + else: + cols = [dataset.get_dimension(d).name for d in index[1]] + col_index = [columns.index(c) for c in cols] + if np.isscalar(rows): + rows = [rows] + + if scalar: + return dataset.data[cols[0]].iloc[rows[0]] + result = dataset.data.iloc[rows, col_index] + + # cuDF does not handle single rows and cols indexing correctly + # as of cudf=0.10.0 so we have to convert Series back to DataFrame + if isinstance(result, cudf.Series): + if len(cols) == 1: + result = result.to_frame(cols[0]) + else: + result = result.to_frame().T + return result + + + @classmethod + def sort(cls, dataset, by=[], reverse=False): + cols = [dataset.get_dimension(d, strict=True).name for d in by] + return dataset.data.sort_values(by=cols, ascending=not reverse) + + + @classmethod + def dframe(cls, dataset, dimensions): + if dimensions: + return dataset.data[dimensions].to_pandas() + else: + return dataset.data.to_pandas() + + +Interface.register(cuDFInterface) diff --git a/holoviews/core/data/dask.py b/holoviews/core/data/dask.py index 8dca6c321e..9b7755093d 100644 --- a/holoviews/core/data/dask.py +++ b/holoviews/core/data/dask.py @@ -270,15 +270,9 @@ def add_dimension(cls, dataset, dimension, dim_pos, values, vdim): return data @classmethod - def concat(cls, datasets, dimensions, vdims): + def concat_fn(cls, dataframes, **kwargs): import dask.dataframe as dd - dataframes = [] - for key, ds in datasets: - data = ds.data.copy() - for d, k in zip(dimensions, key): - data[d.name] = k - dataframes.append(data) - return dd.concat(dataframes) + return dd.concat(dataframes, **kwargs) @classmethod def dframe(cls, dataset, dimensions): diff --git a/holoviews/core/data/pandas.py b/holoviews/core/data/pandas.py index 692452d820..b8934d7de2 100644 --- a/holoviews/core/data/pandas.py +++ b/holoviews/core/data/pandas.py @@ -173,6 +173,13 @@ def range(cls, dataset, dimension): return (column.min(), column.max()) + @classmethod + def concat_fn(cls, dataframes, **kwargs): + if util.pandas_version >= '0.23.0': + kwargs['sort'] = False + return pd.concat(dataframes, **kwargs) + + @classmethod def concat(cls, datasets, dimensions, vdims): dataframes = [] @@ -181,8 +188,7 @@ def concat(cls, datasets, dimensions, vdims): for d, k in zip(dimensions, key): data[d.name] = k dataframes.append(data) - kwargs = dict(sort=False) if util.pandas_version >= '0.23.0' else {} - return pd.concat(dataframes, **kwargs) + return cls.concat_fn(dataframes) @classmethod @@ -315,13 +321,20 @@ def values( @classmethod def sample(cls, dataset, samples=[]): data = dataset.data - mask = False + mask = None for sample in samples: - sample_mask = True + sample_mask = None if np.isscalar(sample): sample = [sample] for i, v in enumerate(sample): - sample_mask = np.logical_and(sample_mask, data.iloc[:, i]==v) - mask |= sample_mask + submask = data.iloc[:, i]==v + if sample_mask is None: + sample_mask = submask + else: + sample_mask &= submask + if mask is None: + mask = sample_mask + else: + mask |= sample_mask return data[mask] diff --git a/holoviews/core/data/xarray.py b/holoviews/core/data/xarray.py index c7b77f823f..51b125a006 100644 --- a/holoviews/core/data/xarray.py +++ b/holoviews/core/data/xarray.py @@ -13,6 +13,13 @@ from .interface import Interface, DataError, dask_array_module +def is_cupy(array): + if 'cupy' not in sys.modules: + return False + from cupy import ndarray + return isinstance(array, ndarray) + + class XArrayInterface(GridInterface): types = () @@ -359,6 +366,9 @@ def values(cls, dataset, dim, expanded=True, flat=True, compute=True, keep_index da = dask_array_module() if compute and da and isinstance(data, da.Array): data = data.compute() + if is_cupy(data): + import cupy + data = cupy.asnumpy(data) data = cls.canonicalize(dataset, data, data_coords=data_coords, virtual_coords=virtual_coords) return data.T.flatten() if flat else data diff --git a/holoviews/operation/datashader.py b/holoviews/operation/datashader.py index 1fee7a0dc5..2adcb00879 100644 --- a/holoviews/operation/datashader.py +++ b/holoviews/operation/datashader.py @@ -22,7 +22,7 @@ from ..core import (Operation, Element, Dimension, NdOverlay, CompositeOverlay, Dataset, Overlay, OrderedDict) -from ..core.data import PandasInterface, XArrayInterface, DaskInterface +from ..core.data import PandasInterface, XArrayInterface, DaskInterface, cuDFInterface from ..core.util import ( Iterable, LooseVersion, basestring, cftime_types, cftime_to_timestamp, datetime_types, dt_to_int, isfinite, get_param_values, max_range) @@ -387,14 +387,14 @@ def get_agg_data(cls, obj, category=None): if category and df[category].dtype.name != 'category': df[category] = df[category].astype('category') - is_dask = isinstance(df, dd.DataFrame) - if any((not is_dask and len(df[d.name]) and isinstance(df[d.name].values[0], cftime_types)) or + is_custom = isinstance(df, dd.DataFrame) or cuDFInterface.applies(df) + if any((not is_custom and len(df[d.name]) and isinstance(df[d.name].values[0], cftime_types)) or df[d.name].dtype.kind == 'M' for d in (x, y)): df = df.copy() for d in (x, y): vals = df[d.name] - if not is_dask and len(vals) and isinstance(vals.values[0], cftime_types): + if not is_custom and len(vals) and isinstance(vals.values[0], cftime_types): vals = cftime_to_timestamp(vals, 'ns') elif df[d.name].dtype.kind == 'M': vals = vals.astype('datetime64[ns]') diff --git a/holoviews/tests/core/data/testcudfinterface.py b/holoviews/tests/core/data/testcudfinterface.py new file mode 100644 index 0000000000..89c0b4c3c4 --- /dev/null +++ b/holoviews/tests/core/data/testcudfinterface.py @@ -0,0 +1,107 @@ +import logging + +from unittest import SkipTest + +import numpy as np + +try: + import cudf +except: + raise SkipTest("Could not import cuDF, skipping cuDFInterface tests.") + +from holoviews.core.data import Dataset +from holoviews.core.spaces import HoloMap + +from .base import HeterogeneousColumnTests, InterfaceTests + + +class cuDFInterfaceTests(HeterogeneousColumnTests, InterfaceTests): + """ + Tests for the cuDFInterface. + """ + + datatype = 'cuDF' + data_type = cudf.DataFrame + + __test__ = True + + def setUp(self): + super(cuDFInterfaceTests, self).setUp() + logging.getLogger('numba.cuda.cudadrv.driver').setLevel(30) + + def test_dataset_2D_aggregate_spread_fn_with_duplicates(self): + raise SkipTest("cuDF does not support variance aggregation") + + def test_dataset_mixed_type_range(self): + ds = Dataset((['A', 'B', 'C', None],), 'A') + vmin, vmax = ds.range(0) + self.assertTrue(np.isnan(vmin)) + self.assertTrue(np.isnan(vmax)) + + def test_dataset_groupby(self): + group1 = {'Age':[10,16], 'Weight':[15,18], 'Height':[0.8,0.6]} + group2 = {'Age':[12], 'Weight':[10], 'Height':[0.8]} + grouped = HoloMap([('M', Dataset(group1, kdims=['Age'], vdims=self.vdims)), + ('F', Dataset(group2, kdims=['Age'], vdims=self.vdims))], + kdims=['Gender']) + self.assertEqual(self.table.groupby(['Gender']).apply('sort'), grouped.apply('sort')) + + def test_dataset_groupby_alias(self): + group1 = {'age':[10,16], 'weight':[15,18], 'height':[0.8,0.6]} + group2 = {'age':[12], 'weight':[10], 'height':[0.8]} + grouped = HoloMap([('M', Dataset(group1, kdims=[('age', 'Age')], + vdims=self.alias_vdims)), + ('F', Dataset(group2, kdims=[('age', 'Age')], + vdims=self.alias_vdims))], + kdims=[('gender', 'Gender')]) + self.assertEqual(self.alias_table.groupby('Gender').apply('sort'), + grouped) + + def test_dataset_aggregate_ht(self): + aggregated = Dataset({'Gender':['M', 'F'], 'Weight':[16.5, 10], 'Height':[0.7, 0.8]}, + kdims=self.kdims[:1], vdims=self.vdims) + self.compare_dataset(self.table.aggregate(['Gender'], np.mean).sort(), aggregated.sort()) + + def test_dataset_aggregate_ht_alias(self): + aggregated = Dataset({'gender':['M', 'F'], 'weight':[16.5, 10], 'height':[0.7, 0.8]}, + kdims=self.alias_kdims[:1], vdims=self.alias_vdims) + self.compare_dataset(self.alias_table.aggregate('Gender', np.mean).sort(), aggregated.sort()) + + def test_dataset_2D_partial_reduce_ht(self): + dataset = Dataset({'x':self.xs, 'y':self.ys, 'z':self.zs}, + kdims=['x', 'y'], vdims=['z']) + reduced = Dataset({'x':self.xs, 'z':self.zs}, + kdims=['x'], vdims=['z']) + self.assertEqual(dataset.reduce(['y'], np.mean).sort(), reduced.sort()) + + def test_dataset_2D_aggregate_partial_ht(self): + dataset = Dataset({'x':self.xs, 'y':self.ys, 'z':self.zs}, + kdims=['x', 'y'], vdims=['z']) + reduced = Dataset({'x':self.xs, 'z':self.zs}, + kdims=['x'], vdims=['z']) + self.assertEqual(dataset.aggregate(['x'], np.mean).sort(), reduced.sort()) + + def test_dataset_2D_aggregate_partial_hm(self): + z_ints = [el**2 for el in self.y_ints] + dataset = Dataset({'x':self.xs, 'y':self.y_ints, 'z':z_ints}, + kdims=['x', 'y'], vdims=['z']) + self.assertEqual(dataset.aggregate(['x'], np.mean).sort(), + Dataset({'x':self.xs, 'z':z_ints}, kdims=['x'], vdims=['z']).sort()) + + def test_dataset_reduce_ht(self): + reduced = Dataset({'Age':self.age, 'Weight':self.weight, 'Height':self.height}, + kdims=self.kdims[1:], vdims=self.vdims) + self.assertEqual(self.table.reduce(['Gender'], np.mean).sort(), reduced.sort()) + + def test_dataset_groupby_second_dim(self): + group1 = {'Gender':['M'], 'Weight':[15], 'Height':[0.8]} + group2 = {'Gender':['M'], 'Weight':[18], 'Height':[0.6]} + group3 = {'Gender':['F'], 'Weight':[10], 'Height':[0.8]} + grouped = HoloMap([(10, Dataset(group1, kdims=['Gender'], vdims=self.vdims)), + (16, Dataset(group2, kdims=['Gender'], vdims=self.vdims)), + (12, Dataset(group3, kdims=['Gender'], vdims=self.vdims))], + kdims=['Age']) + self.assertEqual(self.table.groupby(['Age']).apply('sort'), grouped) + + def test_dataset_aggregate_string_types_size(self): + raise SkipTest("cuDF does not support variance aggregation") diff --git a/holoviews/tests/operation/testdatashader.py b/holoviews/tests/operation/testdatashader.py index 2f5d4d3b4a..47d7779c89 100644 --- a/holoviews/tests/operation/testdatashader.py +++ b/holoviews/tests/operation/testdatashader.py @@ -20,13 +20,19 @@ except: raise SkipTest('Datashader not available') +try: + import cudf + import cupy +except: + cudf = None + try: import spatialpandas except: spatialpandas = None spatialpandas_skip = skipIf(spatialpandas is None, "SpatialPandas not available") - +cudf_skip = skipIf(cudf is None, "cuDF not available") class DatashaderAggregateTests(ComparisonTestCase): @@ -42,6 +48,17 @@ def test_aggregate_points(self): vdims=['Count']) self.assertEqual(img, expected) + @cudf_skip + def test_aggregate_points_cudf(self): + points = Points([(0.2, 0.3), (0.4, 0.7), (0, 0.99)], datatype=['cuDF']) + self.assertIsInstance(points.data, cudf.DataFrame) + img = aggregate(points, dynamic=False, x_range=(0, 1), y_range=(0, 1), + width=2, height=2) + expected = Image(([0.25, 0.75], [0.25, 0.75], [[1, 0], [2, 0]]), + vdims=['Count']) + self.assertIsInstance(img.data.Count.data, cupy.ndarray) + self.assertEqual(img, expected) + def test_aggregate_zero_range_points(self): p = Points([(0, 0), (1, 1)]) agg = rasterize(p, x_range=(0, 0), y_range=(0, 1), expand=False, dynamic=False,