From 95c9e9ee8f75d48c3208aa135a2482dee2de7ed7 Mon Sep 17 00:00:00 2001 From: Jovan Veljanoski Date: Tue, 22 Feb 2022 18:44:20 +0100 Subject: [PATCH] feat(core): Add skew and kurtosis aggregators. --- packages/vaex-core/vaex/agg.py | 73 +++++++++++++++++++++++++++ packages/vaex-core/vaex/dataframe.py | 52 ++++++++++++++++++- packages/vaex-core/vaex/expression.py | 16 +++++- tests/agg_test.py | 34 +++++++++++++ 4 files changed, 172 insertions(+), 3 deletions(-) diff --git a/packages/vaex-core/vaex/agg.py b/packages/vaex-core/vaex/agg.py index 5e295d708d..d2c993e7ac 100644 --- a/packages/vaex-core/vaex/agg.py +++ b/packages/vaex-core/vaex/agg.py @@ -441,6 +441,69 @@ def finish(sum_moment, sum, count): return [task_sum_moment, task_sum, task_count], finish(task_sum_moment, task_sum, task_count) +class AggregatorDescriptorSkew(AggregatorDescriptorMulti): + def __init__(self, name, expression, short_name='skew', selection=None, edges=False): + super(AggregatorDescriptorSkew, self).__init__(name, expression, short_name, selection=selection, edges=edges) + + def add_tasks(self, df, binners, progress): + progressbar = vaex.utils.progressbars(progress, title=repr(self)) + expression = expression_sum = expression = df[str(self.expression)] + expression = expression_sum = expression.astype('float64') + + sum_moment1 = _sum_moment(str(expression_sum), 1, selection=self.selection, edges=self.edges) + sum_moment2 = _sum_moment(str(expression_sum), 2, selection=self.selection, edges=self.edges) + sum_moment3 = _sum_moment(str(expression_sum), 3, selection=self.selection, edges=self.edges) + count_ = count(str(expression), selection=self.selection, edges=self.edges) + + task_sum_moment1 = sum_moment1.add_tasks(df, binners, progress=progressbar)[0][0] + task_sum_moment2 = sum_moment2.add_tasks(df, binners, progress=progressbar)[0][0] + task_sum_moment3 = sum_moment3.add_tasks(df, binners, progress=progressbar)[0][0] + task_count = count_.add_tasks(df, binners, progress=progressbar)[0][0] + + @vaex.delayed + def finish(sum_moment1, sum_moment2, sum_moment3, count): + with np.errstate(divide='ignore', invalid='ignore'): + m1 = sum_moment1 / count + m2 = sum_moment2 / count + m3 = sum_moment3 / count + skew = (m3 - 3*m1*m2 + 2*m1**3) / (m2 - m1**2)**(3/2) + return self.finish(skew) + return [task_sum_moment1, task_sum_moment2, task_sum_moment3, task_count], finish(task_sum_moment1, task_sum_moment2, task_sum_moment3, task_count) + + +class AggregatorDescriptorKurtosis(AggregatorDescriptorMulti): + def __init__(self, name, expression, short_name='kurtosis', selection=None, edges=False): + super(AggregatorDescriptorKurtosis, self).__init__(name, expression, short_name, selection=selection, edges=edges) + + def add_tasks(self, df, binners, progress): + progressbar = vaex.utils.progressbars(progress, title=repr(self)) + expression = expression_sum = expression = df[str(self.expression)] + expression = expression_sum = expression.astype('float64') + + sum_moment1 = _sum_moment(str(expression_sum), 1, selection=self.selection, edges=self.edges) + sum_moment2 = _sum_moment(str(expression_sum), 2, selection=self.selection, edges=self.edges) + sum_moment3 = _sum_moment(str(expression_sum), 3, selection=self.selection, edges=self.edges) + sum_moment4 = _sum_moment(str(expression_sum), 4, selection=self.selection, edges=self.edges) + count_ = count(str(expression), selection=self.selection, edges=self.edges) + + task_sum_moment1 = sum_moment1.add_tasks(df, binners, progress=progressbar)[0][0] + task_sum_moment2 = sum_moment2.add_tasks(df, binners, progress=progressbar)[0][0] + task_sum_moment3 = sum_moment3.add_tasks(df, binners, progress=progressbar)[0][0] + task_sum_moment4 = sum_moment4.add_tasks(df, binners, progress=progressbar)[0][0] + task_count = count_.add_tasks(df, binners, progress=progressbar)[0][0] + + @vaex.delayed + def finish(sum_moment1, sum_moment2, sum_moment3, sum_moment4, count): + with np.errstate(divide='ignore', invalid='ignore'): + m1 = sum_moment1 / count + m2 = sum_moment2 / count + m3 = sum_moment3 / count + m4 = sum_moment4 / count + kurtosis = (m4 - 4*m1*m3 + 6*m1**2*m2 - 3*m1**4) / (m2 - m1**2)**2 -3.0 + return self.finish(kurtosis) + return [task_sum_moment1, task_sum_moment2, task_sum_moment3, task_sum_moment4, task_count], finish(task_sum_moment1, task_sum_moment2, task_sum_moment3, task_sum_moment4, task_count) + + class AggregatorDescriptorStd(AggregatorDescriptorVar): def finish(self, value): return value**0.5 @@ -508,6 +571,16 @@ def var(expression, ddof=0, selection=None, edges=False): '''Creates a variance aggregation''' return AggregatorDescriptorVar('var', expression, 'var', ddof=ddof, selection=selection, edges=edges) +@register +def skew(expression, selection=None, edges=False): + '''Create a skew aggregation.''' + return AggregatorDescriptorSkew('skew', expression, 'skew', selection=selection, edges=edges) + +@register +def kurtosis(expression, selection=None, edges=False): + '''Create a kurtosis aggregation.''' + return AggregatorDescriptorKurtosis('kurtosis', expression, 'kurtosis', selection=selection, edges=edges) + @register @docsubst def nunique(expression, dropna=False, dropnan=False, dropmissing=False, selection=None, edges=False): diff --git a/packages/vaex-core/vaex/dataframe.py b/packages/vaex-core/vaex/dataframe.py index 4df3d34ffd..db3cf09286 100644 --- a/packages/vaex-core/vaex/dataframe.py +++ b/packages/vaex-core/vaex/dataframe.py @@ -1171,8 +1171,6 @@ def var(self, expression, binby=[], limits=None, shape=default_shape, selection= array([ 15271.90481083, 7284.94713504, 3738.52239232, 1449.63418988]) >>> df.var("vz", binby=["(x**2+y**2)**0.5"], shape=4)**0.5 array([ 123.57954851, 85.35190177, 61.14345748, 38.0740619 ]) - >>> df.std("vz", binby=["(x**2+y**2)**0.5"], shape=4) - array([ 123.57954851, 85.35190177, 61.14345748, 38.0740619 ]) :param expression: {expression} :param binby: {binby} @@ -1187,6 +1185,56 @@ def var(self, expression, binby=[], limits=None, shape=default_shape, selection= edges = False return self._compute_agg('var', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type) + @docsubst + def skew(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, edges=False, array_type=None): + ''' + Calculate the skew for the given expression, possible on a grid defined by binby. + + Example: + + >>> df.skew("vz") + 0.02116528 + >>> df.skew("vz", binby=["E"], shape=4) + array([-0.069976 , -0.01003445, 0.05624177, -2.2444322 ]) + + :param expression: {expression} + :param binby: {binby} + :param limits: {limits} + :param shape: {shape} + :param selection: {selection} + :param delay: {delay} + :param progress: {progress} + :param array_type: {array_type} + :return: {return_stat_scalar} + ''' + edges=False + return self._compute_agg('skew', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type) + + @docsubst + def kurtosis(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, edges=False, array_type=None): + ''' + Calculate the kurtosis for the given expression, possible on a grid defined by binby. + + Example: + + >>> df.kurtosis('vz') + 0.33414303 + >>> df.kurtosis("vz", binby=["E"], shape=4) + array([0.35286113, 0.14455428, 0.52955107, 5.06716345]) + + :param expression: {expression} + :param binby: {binby} + :param limits: {limits} + :param shape: {shape} + :param selection: {selection} + :param delay: {delay} + :param progress: {progress} + :param array_type: {array_type} + :return: {return_stat_scalar} + ''' + edges=False + return self._compute_agg('kurtosis', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type) + @docsubst def covar(self, x, y, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None): """Calculate the covariance cov[x,y] between x and y, possibly on a grid defined by binby. diff --git a/packages/vaex-core/vaex/expression.py b/packages/vaex-core/vaex/expression.py index 38cd83125d..68b330ea10 100644 --- a/packages/vaex-core/vaex/expression.py +++ b/packages/vaex-core/vaex/expression.py @@ -605,7 +605,7 @@ def __getitem__(self, slicer): indices, fields = slicer else: raise NotImplementedError - + if indices != slice(None): expr = self.df[indices][self.expression] else: @@ -931,6 +931,20 @@ def var(self, binby=[], limits=None, shape=default_shape, selection=False, delay kwargs['expression'] = self.expression return self.ds.var(**kwargs) + def skew(self, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None): + '''Shortcut for df.skew(expression, ...), see `DataFrame.skew`''' + kwargs = dict(locals()) + del kwargs['self'] + kwargs['expression'] = self.expression + return self.df.skew(**kwargs) + + def kurtosis(self, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None): + '''Shortcut for df.kurtosis(expression, ...), see `DataFrame.kurtosis`''' + kwargs = dict(locals()) + del kwargs['self'] + kwargs['expression'] = self.expression + return self.df.kurtosis(**kwargs) + def minmax(self, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None): '''Shortcut for ds.minmax(expression, ...), see `Dataset.minmax`''' kwargs = dict(locals()) diff --git a/tests/agg_test.py b/tests/agg_test.py index 469883f6e0..4d78e9738d 100644 --- a/tests/agg_test.py +++ b/tests/agg_test.py @@ -620,3 +620,37 @@ def test_unique_2d(df_factory, binner1, binner2): assert dfg['x'].tolist() == [0, 0, 1, 1, 2, 2] assert dfg['y'].tolist() == [4, 5, 4, 5, 4, 5] assert dfg['c'].tolist() == [1, 1, 1, 2, 1, 3] + + +def test_skew(df_example): + df = df_example + pandas_df = df.to_pandas_df() + np.testing.assert_approx_equal(df.skew("x"), pandas_df.x.skew(), significant=5) + np.testing.assert_approx_equal(df.skew("Lz"), pandas_df.Lz.skew(), significant=5) + np.testing.assert_approx_equal(df.skew("E"), pandas_df.E.skew(), significant=5) + + +def test_groupby_skew(df_example): + df = df_example + pandas_df = df.to_pandas_df() + vaex_g = df.groupby("id", sort=True).agg({"skew": vaex.agg.skew("Lz")}) + pandas_g = pandas_df.groupby("id", sort=True).agg(skew=("Lz", "skew")) + np.testing.assert_almost_equal(vaex_g["skew"].values, pandas_g["skew"].values, decimal=4) + + +def test_kurtosis(df_example): + df = df_example + pandas_df = df.to_pandas_df() + np.testing.assert_approx_equal(df.kurtosis("x"), pandas_df.x.kurtosis(), significant=4) + np.testing.assert_approx_equal(df.kurtosis("Lz"), pandas_df.Lz.kurtosis(), significant=4) + np.testing.assert_approx_equal(df.kurtosis("E"), pandas_df.E.kurtosis(), significant=4) + + +def test_groupby_kurtosis(df_example): + import pandas as pd + + df = df_example + pandas_df = df.to_pandas_df() + vaex_g = df.groupby("id", sort=True).agg({"kurtosis": vaex.agg.kurtosis("Lz")}) + pandas_g = pandas_df.groupby("id", sort=True).agg(kurtosis=("Lz", pd.Series.kurtosis)) + np.testing.assert_almost_equal(vaex_g["kurtosis"].values, pandas_g["kurtosis"].values, decimal=3)