Skip to content

Commit

Permalink
Replace ComboAgg with Aggregate/AggregateAll
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Mar 22, 2023
1 parent 20da642 commit de48976
Show file tree
Hide file tree
Showing 12 changed files with 557 additions and 406 deletions.
12 changes: 6 additions & 6 deletions py/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,22 @@ table = table1.join(table2, on=["Group"])
session.bind_table(name="my_table", table=table)
```

## Use a combo aggregation on a table
## Perform aggregations on a table

Combined aggregations can be executed on tables in the Python client. This example creates a combo aggregation that averages the `Count` column of a table, and aggregates it by the `Group` column.
Aggregations can be applied on tables in the Python client. This example creates an aggregation that averages
the `Count` column of a table, and aggregates it by the `Group` column.

```
from pydeephaven import Session, ComboAggregation
from pydeephaven import Session, agg
session = Session()
table = session.empty_table(10)
table = table.update(["Count = i", "Group = i % 2"])
my_agg = ComboAggregation()
my_agg = my_agg.avg(["Count"])
my_agg = agg.avg(["Count"])
table = table.agg_by(my_agg, ["Group"])
table = table.agg_by(aggs=[my_agg], by=["Group"])
session.bind_table(name="my_table", table=table)
```

Expand Down
13 changes: 6 additions & 7 deletions py/client/docs/source/examples.rst
Original file line number Diff line number Diff line change
Expand Up @@ -135,24 +135,23 @@ Join 2 tables

session.bind_table(name="my_table", table=table)

Use a combo aggregation on a table
Perform aggregations on a table
##################################

Combined aggregations can be executed on tables in the Python client. This example creates a combo aggregation that averages the `Count` column of a table, and aggregates it by the `Group` column:
Aggregations can be applied on tables in the Python client. This example creates a aggregation that
averages the `Count` column of a table, and aggregates it by the `Group` column:

from pydeephaven import Session, ComboAggregation
from pydeephaven import Session, agg

session = Session()

table = session.empty_table(10)

table = table.update(["Count = i", "Group = i % 2"])

my_agg = ComboAggregation()
my_agg = agg.avg(["Count"])

my_agg = my_agg.avg(["Count"])

table = table.agg_by(my_agg, ["Group"])
table = table.agg_by(aggs=[my_agg], by=["Group"])

session.bind_table(name="my_table", table=table)

Expand Down
1 change: 0 additions & 1 deletion py/client/pydeephaven/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
from .table import Table
from .session import Session
from .dherror import DHError
from .combo_agg import ComboAggregation
from .constants import SortDirection, MatchRule
from .query import Query

Expand Down
24 changes: 0 additions & 24 deletions py/client/pydeephaven/_constants.py

This file was deleted.

60 changes: 36 additions & 24 deletions py/client/pydeephaven/_table_interface.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
# Copyright (c) 2016-2023 Deephaven Data Labs and Patent Pending
#

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import List, Any

from pydeephaven._constants import AggType
from pydeephaven import agg
from pydeephaven._table_ops import UpdateOp, LazyUpdateOp, ViewOp, UpdateViewOp, SelectOp, DropColumnsOp, \
SelectDistinctOp, SortOp, UnstructuredFilterOp, HeadOp, TailOp, HeadByOp, TailByOp, UngroupOp, NaturalJoinOp, \
ExactJoinOp, CrossJoinOp, AsOfJoinOp, DedicatedAggOp, ComboAggOp, UpdateByOp, SnapshotTableOp, SnapshotWhenTableOp
from pydeephaven.combo_agg import ComboAggregation
ExactJoinOp, CrossJoinOp, AsOfJoinOp, UpdateByOp, SnapshotTableOp, SnapshotWhenTableOp, AggregateOp, AggregateAllOp
from pydeephaven.agg import Aggregation, _AggregationColumns
from pydeephaven.constants import MatchRule, SortDirection
from pydeephaven.dherror import DHError
from pydeephaven.updateby import UpdateByOperation


Expand Down Expand Up @@ -347,7 +348,7 @@ def group_by(self, by: List[str] = []):
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.GROUP, column_names=by)
table_op = AggregateAllOp(agg=agg.group(), by=by)
return self.table_op_handler(table_op)

def ungroup(self, cols: List[str] = [], null_fill: bool = True):
Expand Down Expand Up @@ -381,7 +382,7 @@ def first_by(self, by: List[str] = []):
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.FIRST, column_names=by)
table_op = AggregateAllOp(agg=agg.first(), by=by)
return self.table_op_handler(table_op)

def last_by(self, by: List[str] = []):
Expand All @@ -397,7 +398,7 @@ def last_by(self, by: List[str] = []):
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.LAST, column_names=by)
table_op = AggregateAllOp(agg=agg.last(), by=by)
return self.table_op_handler(table_op)

def sum_by(self, by: List[str] = []):
Expand All @@ -413,7 +414,7 @@ def sum_by(self, by: List[str] = []):
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.SUM, column_names=by)
table_op = AggregateAllOp(agg=agg.sum_(), by=by)
return self.table_op_handler(table_op)

def avg_by(self, by: List[str] = []):
Expand All @@ -429,7 +430,7 @@ def avg_by(self, by: List[str] = []):
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.AVG, column_names=by)
table_op = AggregateAllOp(agg=agg.avg(), by=by)
return self.table_op_handler(table_op)

def std_by(self, by: List[str] = []):
Expand All @@ -445,7 +446,7 @@ def std_by(self, by: List[str] = []):
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.STD, column_names=by)
table_op = AggregateAllOp(agg=agg.std(), by=by)
return self.table_op_handler(table_op)

def var_by(self, by: List[str] = []):
Expand All @@ -461,7 +462,7 @@ def var_by(self, by: List[str] = []):
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.VAR, column_names=by)
table_op = AggregateAllOp(agg=agg.var(), by=by)
return self.table_op_handler(table_op)

def median_by(self, by: List[str] = []):
Expand All @@ -477,7 +478,7 @@ def median_by(self, by: List[str] = []):
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.MEDIAN, column_names=by)
table_op = AggregateAllOp(agg=agg.median(), by=by)
return self.table_op_handler(table_op)

def min_by(self, by: List[str] = []):
Expand All @@ -493,7 +494,7 @@ def min_by(self, by: List[str] = []):
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.MIN, column_names=by)
table_op = AggregateAllOp(agg=agg.min_(), by=by)
return self.table_op_handler(table_op)

def max_by(self, by: List[str] = []):
Expand All @@ -509,7 +510,7 @@ def max_by(self, by: List[str] = []):
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.MAX, column_names=by)
table_op = AggregateAllOp(agg=agg.max_(), by=by)
return self.table_op_handler(table_op)

def count_by(self, col: str, by: List[str] = []):
Expand All @@ -526,30 +527,38 @@ def count_by(self, col: str, by: List[str] = []):
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.COUNT, column_names=by, count_column=col)
table_op = AggregateOp(aggs=[agg.count_(col=col)], by=by)
return self.table_op_handler(table_op)

def count(self, col: str):
""" Count the number of values in the specified column on the table and return the result in a table with one row
and one column.
def agg_by(self, aggs: List[Aggregation], by: List[str]):
""" Perform an Aggregate operation on the table and return the result table.
Args:
col (str): the name of the column whose values to be counted
aggs (List[Aggregation]): the aggregations to be applied
by (List[str]): the group-by column names
Returns:
a Table object
Raises:
DHError
"""
table_op = DedicatedAggOp(AggType.COUNT, count_column=col)
for agg in aggs:
if hasattr(agg, 'cols') and not agg.cols:
raise DHError(message="No columns specified for the aggregation operation {agg}.")


table_op = AggregateOp(aggs=aggs, by=by)
return self.table_op_handler(table_op)

def agg_by(self, agg: ComboAggregation, by: List[str]):
""" Perform a Combined Aggregation operation on the table and return the result table.
def agg_all_by(self, agg: Aggregation, by: List[str]):
""" Perform an AggregateAll operation on the table and return the result table.
Note, because agg_all_by applies the aggregation to all the columns of the table, it will ignore
any column names specified for the aggregation.
Args:
agg (ComboAggregation): the combined aggregation definition
agg (Aggregation): the aggregation to be applied
by (List[str]): the group-by column names
Returns:
Expand All @@ -558,7 +567,10 @@ def agg_by(self, agg: ComboAggregation, by: List[str]):
Raises:
DHError
"""
table_op = ComboAggOp(column_names=by, combo_aggregation=agg)
if not isinstance(agg, _AggregationColumns):
raise DHError(f"unsupported aggregation {agg}.")

table_op = AggregateAllOp(agg=agg, by=by)
return self.table_op_handler(table_op)

def update_by(self, ops: List[UpdateByOperation], by: List[str]):
Expand Down
Loading

0 comments on commit de48976

Please sign in to comment.