|
1 | 1 | import abc |
2 | 2 | import math |
3 | | -from typing import TYPE_CHECKING, Any, Callable, List, Optional |
| 3 | +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional |
4 | 4 |
|
5 | 5 | import numpy as np |
6 | 6 | import pyarrow.compute as pc |
@@ -889,6 +889,88 @@ def _to_set(x): |
889 | 889 | return {x} |
890 | 890 |
|
891 | 891 |
|
| 892 | +@PublicAPI |
| 893 | +class ValueCounter(AggregateFnV2): |
| 894 | + """Counts the number of times each value appears in a column. |
| 895 | +
|
| 896 | + This aggregation computes value counts for a specified column, similar to pandas' |
| 897 | + `value_counts()` method. It returns a dictionary with two lists: "values" containing |
| 898 | + the unique values found in the column, and "counts" containing the corresponding |
| 899 | + count for each value. |
| 900 | +
|
| 901 | + Example: |
| 902 | +
|
| 903 | + .. testcode:: |
| 904 | +
|
| 905 | + import ray |
| 906 | + from ray.data.aggregate import ValueCounter |
| 907 | +
|
| 908 | + # Create a dataset with repeated values |
| 909 | + ds = ray.data.from_items([ |
| 910 | + {"category": "A"}, {"category": "B"}, {"category": "A"}, |
| 911 | + {"category": "C"}, {"category": "A"}, {"category": "B"} |
| 912 | + ]) |
| 913 | +
|
| 914 | + # Count occurrences of each category |
| 915 | + result = ds.aggregate(ValueCounter(on="category")) |
| 916 | + # result: {'value_counter(category)': {'values': ['A', 'B', 'C'], 'counts': [3, 2, 1]}} |
| 917 | +
|
| 918 | + # Using with groupby |
| 919 | + ds = ray.data.from_items([ |
| 920 | + {"group": "X", "category": "A"}, {"group": "X", "category": "B"}, |
| 921 | + {"group": "Y", "category": "A"}, {"group": "Y", "category": "A"} |
| 922 | + ]) |
| 923 | + result = ds.groupby("group").aggregate(ValueCounter(on="category")).take_all() |
| 924 | + # result: [{'group': 'X', 'value_counter(category)': {'values': ['A', 'B'], 'counts': [1, 1]}}, |
| 925 | + # {'group': 'Y', 'value_counter(category)': {'values': ['A'], 'counts': [2]}}] |
| 926 | +
|
| 927 | + Args: |
| 928 | + on: The name of the column to count values in. Must be provided. |
| 929 | + alias_name: Optional name for the resulting column. If not provided, |
| 930 | + defaults to "value_counter({column_name})". |
| 931 | + """ |
| 932 | + |
| 933 | + def __init__( |
| 934 | + self, |
| 935 | + on: str, |
| 936 | + alias_name: Optional[str] = None, |
| 937 | + ): |
| 938 | + super().__init__( |
| 939 | + alias_name if alias_name else f"value_counter({str(on)})", |
| 940 | + on=on, |
| 941 | + ignore_nulls=True, |
| 942 | + zero_factory=lambda: {"values": [], "counts": []}, |
| 943 | + ) |
| 944 | + |
| 945 | + def aggregate_block(self, block: Block) -> Dict[str, List]: |
| 946 | + |
| 947 | + col_accessor = BlockColumnAccessor.for_column(block[self._target_col_name]) |
| 948 | + return col_accessor.value_counts() |
| 949 | + |
| 950 | + def combine( |
| 951 | + self, |
| 952 | + current_accumulator: Dict[str, List], |
| 953 | + new_accumulator: Dict[str, List], |
| 954 | + ) -> Dict[str, List]: |
| 955 | + |
| 956 | + values = current_accumulator["values"] |
| 957 | + counts = current_accumulator["counts"] |
| 958 | + |
| 959 | + # Build a value → index map once (avoid repeated lookups) |
| 960 | + value_to_index = {v: i for i, v in enumerate(values)} |
| 961 | + |
| 962 | + for v_new, c_new in zip(new_accumulator["values"], new_accumulator["counts"]): |
| 963 | + if v_new in value_to_index: |
| 964 | + idx = value_to_index[v_new] |
| 965 | + counts[idx] += c_new |
| 966 | + else: |
| 967 | + value_to_index[v_new] = len(values) |
| 968 | + values.append(v_new) |
| 969 | + counts.append(c_new) |
| 970 | + |
| 971 | + return current_accumulator |
| 972 | + |
| 973 | + |
892 | 974 | def _null_safe_zero_factory(zero_factory, ignore_nulls: bool): |
893 | 975 | """NOTE: PLEASE READ CAREFULLY BEFORE CHANGING |
894 | 976 |
|
|
0 commit comments