Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: add skew and kurtosis aggregators. #1946

Merged
merged 1 commit into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions packages/vaex-core/vaex/agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,69 @@ def finish(sum_moment, sum, count):
return [task_sum_moment, task_sum, task_count], finish(task_sum_moment, task_sum, task_count)


class AggregatorDescriptorSkew(AggregatorDescriptorMulti):
def __init__(self, name, expression, short_name='skew', selection=None, edges=False):
super(AggregatorDescriptorSkew, self).__init__(name, expression, short_name, selection=selection, edges=edges)

def add_tasks(self, df, binners, progress):
progressbar = vaex.utils.progressbars(progress, title=repr(self))
expression = expression_sum = expression = df[str(self.expression)]
expression = expression_sum = expression.astype('float64')

sum_moment1 = _sum_moment(str(expression_sum), 1, selection=self.selection, edges=self.edges)
sum_moment2 = _sum_moment(str(expression_sum), 2, selection=self.selection, edges=self.edges)
sum_moment3 = _sum_moment(str(expression_sum), 3, selection=self.selection, edges=self.edges)
count_ = count(str(expression), selection=self.selection, edges=self.edges)

task_sum_moment1 = sum_moment1.add_tasks(df, binners, progress=progressbar)[0][0]
task_sum_moment2 = sum_moment2.add_tasks(df, binners, progress=progressbar)[0][0]
task_sum_moment3 = sum_moment3.add_tasks(df, binners, progress=progressbar)[0][0]
task_count = count_.add_tasks(df, binners, progress=progressbar)[0][0]

@vaex.delayed
def finish(sum_moment1, sum_moment2, sum_moment3, count):
with np.errstate(divide='ignore', invalid='ignore'):
m1 = sum_moment1 / count
m2 = sum_moment2 / count
m3 = sum_moment3 / count
skew = (m3 - 3*m1*m2 + 2*m1**3) / (m2 - m1**2)**(3/2)
return self.finish(skew)
return [task_sum_moment1, task_sum_moment2, task_sum_moment3, task_count], finish(task_sum_moment1, task_sum_moment2, task_sum_moment3, task_count)


class AggregatorDescriptorKurtosis(AggregatorDescriptorMulti):
def __init__(self, name, expression, short_name='kurtosis', selection=None, edges=False):
super(AggregatorDescriptorKurtosis, self).__init__(name, expression, short_name, selection=selection, edges=edges)

def add_tasks(self, df, binners, progress):
progressbar = vaex.utils.progressbars(progress, title=repr(self))
expression = expression_sum = expression = df[str(self.expression)]
expression = expression_sum = expression.astype('float64')

sum_moment1 = _sum_moment(str(expression_sum), 1, selection=self.selection, edges=self.edges)
sum_moment2 = _sum_moment(str(expression_sum), 2, selection=self.selection, edges=self.edges)
sum_moment3 = _sum_moment(str(expression_sum), 3, selection=self.selection, edges=self.edges)
sum_moment4 = _sum_moment(str(expression_sum), 4, selection=self.selection, edges=self.edges)
count_ = count(str(expression), selection=self.selection, edges=self.edges)

task_sum_moment1 = sum_moment1.add_tasks(df, binners, progress=progressbar)[0][0]
task_sum_moment2 = sum_moment2.add_tasks(df, binners, progress=progressbar)[0][0]
task_sum_moment3 = sum_moment3.add_tasks(df, binners, progress=progressbar)[0][0]
task_sum_moment4 = sum_moment4.add_tasks(df, binners, progress=progressbar)[0][0]
task_count = count_.add_tasks(df, binners, progress=progressbar)[0][0]

@vaex.delayed
def finish(sum_moment1, sum_moment2, sum_moment3, sum_moment4, count):
with np.errstate(divide='ignore', invalid='ignore'):
m1 = sum_moment1 / count
m2 = sum_moment2 / count
m3 = sum_moment3 / count
m4 = sum_moment4 / count
kurtosis = (m4 - 4*m1*m3 + 6*m1**2*m2 - 3*m1**4) / (m2 - m1**2)**2 -3.0
return self.finish(kurtosis)
return [task_sum_moment1, task_sum_moment2, task_sum_moment3, task_sum_moment4, task_count], finish(task_sum_moment1, task_sum_moment2, task_sum_moment3, task_sum_moment4, task_count)


class AggregatorDescriptorStd(AggregatorDescriptorVar):
def finish(self, value):
return value**0.5
Expand Down Expand Up @@ -508,6 +571,16 @@ def var(expression, ddof=0, selection=None, edges=False):
'''Creates a variance aggregation'''
return AggregatorDescriptorVar('var', expression, 'var', ddof=ddof, selection=selection, edges=edges)

@register
def skew(expression, selection=None, edges=False):
'''Create a skew aggregation.'''
return AggregatorDescriptorSkew('skew', expression, 'skew', selection=selection, edges=edges)

@register
def kurtosis(expression, selection=None, edges=False):
'''Create a kurtosis aggregation.'''
return AggregatorDescriptorKurtosis('kurtosis', expression, 'kurtosis', selection=selection, edges=edges)

@register
@docsubst
def nunique(expression, dropna=False, dropnan=False, dropmissing=False, selection=None, edges=False):
Expand Down
52 changes: 50 additions & 2 deletions packages/vaex-core/vaex/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1171,8 +1171,6 @@ def var(self, expression, binby=[], limits=None, shape=default_shape, selection=
array([ 15271.90481083, 7284.94713504, 3738.52239232, 1449.63418988])
>>> df.var("vz", binby=["(x**2+y**2)**0.5"], shape=4)**0.5
array([ 123.57954851, 85.35190177, 61.14345748, 38.0740619 ])
>>> df.std("vz", binby=["(x**2+y**2)**0.5"], shape=4)
array([ 123.57954851, 85.35190177, 61.14345748, 38.0740619 ])

:param expression: {expression}
:param binby: {binby}
Expand All @@ -1187,6 +1185,56 @@ def var(self, expression, binby=[], limits=None, shape=default_shape, selection=
edges = False
return self._compute_agg('var', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)

@docsubst
def skew(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, edges=False, array_type=None):
'''
Calculate the skew for the given expression, possible on a grid defined by binby.

Example:

>>> df.skew("vz")
0.02116528
>>> df.skew("vz", binby=["E"], shape=4)
array([-0.069976 , -0.01003445, 0.05624177, -2.2444322 ])

:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param array_type: {array_type}
:return: {return_stat_scalar}
'''
edges=False
return self._compute_agg('skew', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)

@docsubst
def kurtosis(self, expression, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None, edges=False, array_type=None):
'''
Calculate the kurtosis for the given expression, possible on a grid defined by binby.

Example:

>>> df.kurtosis('vz')
0.33414303
>>> df.kurtosis("vz", binby=["E"], shape=4)
array([0.35286113, 0.14455428, 0.52955107, 5.06716345])

:param expression: {expression}
:param binby: {binby}
:param limits: {limits}
:param shape: {shape}
:param selection: {selection}
:param delay: {delay}
:param progress: {progress}
:param array_type: {array_type}
:return: {return_stat_scalar}
'''
edges=False
return self._compute_agg('kurtosis', expression, binby, limits, shape, selection, delay, edges, progress, array_type=array_type)

@docsubst
def covar(self, x, y, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None):
"""Calculate the covariance cov[x,y] between x and y, possibly on a grid defined by binby.
Expand Down
16 changes: 15 additions & 1 deletion packages/vaex-core/vaex/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ def __getitem__(self, slicer):
indices, fields = slicer
else:
raise NotImplementedError

if indices != slice(None):
expr = self.df[indices][self.expression]
else:
Expand Down Expand Up @@ -931,6 +931,20 @@ def var(self, binby=[], limits=None, shape=default_shape, selection=False, delay
kwargs['expression'] = self.expression
return self.ds.var(**kwargs)

def skew(self, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None):
'''Shortcut for df.skew(expression, ...), see `DataFrame.skew`'''
kwargs = dict(locals())
del kwargs['self']
kwargs['expression'] = self.expression
return self.df.skew(**kwargs)

def kurtosis(self, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None):
'''Shortcut for df.kurtosis(expression, ...), see `DataFrame.kurtosis`'''
kwargs = dict(locals())
del kwargs['self']
kwargs['expression'] = self.expression
return self.df.kurtosis(**kwargs)

def minmax(self, binby=[], limits=None, shape=default_shape, selection=False, delay=False, progress=None):
'''Shortcut for ds.minmax(expression, ...), see `Dataset.minmax`'''
kwargs = dict(locals())
Expand Down
34 changes: 34 additions & 0 deletions tests/agg_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,37 @@ def test_unique_2d(df_factory, binner1, binner2):
assert dfg['x'].tolist() == [0, 0, 1, 1, 2, 2]
assert dfg['y'].tolist() == [4, 5, 4, 5, 4, 5]
assert dfg['c'].tolist() == [1, 1, 1, 2, 1, 3]


def test_skew(df_example):
df = df_example
pandas_df = df.to_pandas_df()
np.testing.assert_approx_equal(df.skew("x"), pandas_df.x.skew(), significant=5)
np.testing.assert_approx_equal(df.skew("Lz"), pandas_df.Lz.skew(), significant=5)
np.testing.assert_approx_equal(df.skew("E"), pandas_df.E.skew(), significant=5)


def test_groupby_skew(df_example):
df = df_example
pandas_df = df.to_pandas_df()
vaex_g = df.groupby("id", sort=True).agg({"skew": vaex.agg.skew("Lz")})
pandas_g = pandas_df.groupby("id", sort=True).agg(skew=("Lz", "skew"))
np.testing.assert_almost_equal(vaex_g["skew"].values, pandas_g["skew"].values, decimal=4)


def test_kurtosis(df_example):
df = df_example
pandas_df = df.to_pandas_df()
np.testing.assert_approx_equal(df.kurtosis("x"), pandas_df.x.kurtosis(), significant=4)
np.testing.assert_approx_equal(df.kurtosis("Lz"), pandas_df.Lz.kurtosis(), significant=4)
np.testing.assert_approx_equal(df.kurtosis("E"), pandas_df.E.kurtosis(), significant=4)


def test_groupby_kurtosis(df_example):
import pandas as pd

df = df_example
pandas_df = df.to_pandas_df()
vaex_g = df.groupby("id", sort=True).agg({"kurtosis": vaex.agg.kurtosis("Lz")})
pandas_g = pandas_df.groupby("id", sort=True).agg(kurtosis=("Lz", pd.Series.kurtosis))
np.testing.assert_almost_equal(vaex_g["kurtosis"].values, pandas_g["kurtosis"].values, decimal=3)