Skip to content

Commit

Permalink
Add a GroupByChunked which accepts chunked arrays and use that in pya…
Browse files Browse the repository at this point in the history
…rrow
  • Loading branch information
westonpace committed Jan 4, 2023
1 parent 66817b0 commit 5f71dd9
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 34 deletions.
47 changes: 34 additions & 13 deletions cpp/src/arrow/compute/exec/groupby.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,20 @@ namespace compute {

namespace {

std::shared_ptr<Schema> SimpleSchemaForBatch(const ExecBatch& batch) {
std::shared_ptr<Schema> SimpleSchemaForColumns(
const std::vector<std::shared_ptr<ChunkedArray>>& columns) {
std::vector<std::shared_ptr<Field>> fields;
for (int i = 0; i < batch.num_values(); i++) {
fields.push_back(
field("key_" + ::arrow::internal::ToChars(i), batch.values[i].type()));
for (int i = 0; i < static_cast<int>(columns.size()); i++) {
fields.push_back(field("key_" + ::arrow::internal::ToChars(i), columns[i]->type()));
}
return schema(std::move(fields));
}

} // namespace

Result<std::shared_ptr<Table>> GroupBy(
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys,
Result<std::shared_ptr<Table>> GroupByChunked(
const std::vector<std::shared_ptr<ChunkedArray>>& arguments,
const std::vector<std::shared_ptr<ChunkedArray>>& keys,
const std::vector<SimpleAggregate>& aggregates, bool use_threads, ExecContext* ctx) {
if (arguments.size() != aggregates.size()) {
return Status::Invalid("arguments and aggregates must be the same size");
Expand All @@ -62,7 +62,7 @@ Result<std::shared_ptr<Table>> GroupBy(
return Table::MakeEmpty(schema({}));
}

std::vector<Datum> all_columns;
std::vector<std::shared_ptr<ChunkedArray>> all_columns;
int64_t length = 0;
for (const auto& key : keys) {
if (length == 0) {
Expand All @@ -84,11 +84,9 @@ Result<std::shared_ptr<Table>> GroupBy(
}
all_columns.emplace_back(argument);
}
ExecBatch input_batch(std::move(all_columns), length);
std::shared_ptr<Schema> batch_schema = SimpleSchemaForBatch(input_batch);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> rb,
input_batch.ToRecordBatch(std::move(batch_schema)));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Table> table, Table::FromRecordBatches({rb}));
std::shared_ptr<Schema> batch_schema = SimpleSchemaForColumns(all_columns);
std::shared_ptr<Table> table =
Table::Make(std::move(batch_schema), std::move(all_columns));

std::vector<FieldRef> key_refs;
for (int i = 0; i < static_cast<int>(keys.size()); i++) {
Expand Down Expand Up @@ -127,5 +125,28 @@ Result<std::shared_ptr<Table>> GroupBy(
return DeclarationToTable(plan);
}

namespace {

std::vector<std::shared_ptr<ChunkedArray>> ToChunked(
const std::vector<std::shared_ptr<Array>>& arrays) {
std::vector<std::shared_ptr<ChunkedArray>> chunked;
chunked.reserve(arrays.size());
for (const auto& array : arrays) {
chunked.push_back(std::make_shared<ChunkedArray>(array));
}
return chunked;
}

} // namespace

Result<std::shared_ptr<Table>> GroupBy(
const std::vector<std::shared_ptr<Array>>& arguments,
const std::vector<std::shared_ptr<Array>>& keys,
const std::vector<SimpleAggregate>& aggregates, bool use_threads, ExecContext* ctx) {
std::vector<std::shared_ptr<ChunkedArray>> chunked_args = ToChunked(arguments);
std::vector<std::shared_ptr<ChunkedArray>> chunked_keys = ToChunked(keys);
return GroupByChunked(chunked_args, chunked_keys, aggregates, use_threads, ctx);
}

} // namespace compute
} // namespace arrow
8 changes: 8 additions & 0 deletions cpp/src/arrow/compute/exec/groupby.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,13 @@ Result<std::shared_ptr<Table>> GroupBy(
const std::vector<SimpleAggregate>& aggregates, bool use_threads = false,
ExecContext* ctx = default_exec_context());

/// \see GroupBy
ARROW_EXPORT
Result<std::shared_ptr<Table>> GroupByChunked(
const std::vector<std::shared_ptr<ChunkedArray>>& arguments,
const std::vector<std::shared_ptr<ChunkedArray>>& keys,
const std::vector<SimpleAggregate>& aggregates, bool use_threads = false,
ExecContext* ctx = default_exec_context());

} // namespace compute
} // namespace arrow
17 changes: 7 additions & 10 deletions python/pyarrow/_compute.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2191,23 +2191,20 @@ class RankOptions(_RankOptions):
self._set_options(sort_keys, null_placement, tiebreaker)


cdef _pack_groupby_args(object values, vector[shared_ptr[CArray]]* out):
cdef _pack_groupby_args(object values, vector[shared_ptr[CChunkedArray]]* out):
for val in values:
if isinstance(val, (list, np.ndarray)):
val = lib.asarray(val)

if isinstance(val, Array):
out.push_back((<Array> val).sp_array)
if isinstance(val, ChunkedArray):
out.push_back((<ChunkedArray> val).sp_chunked_array)
continue

raise TypeError(f"Got unexpected argument type {type(val)} "
"for group_by function, expected Array")
"for group_by function, expected ChunkedArray")


def _group_by(args, keys, aggregations):
cdef:
vector[shared_ptr[CArray]] c_args
vector[shared_ptr[CArray]] c_keys
vector[shared_ptr[CChunkedArray]] c_args
vector[shared_ptr[CChunkedArray]] c_keys
vector[CSimpleAggregate] c_aggregations
CSimpleAggregate c_aggr
shared_ptr[CTable] sp_table
Expand All @@ -2225,7 +2222,7 @@ def _group_by(args, keys, aggregations):

with nogil:
sp_table = GetResultValue(
GroupBy(c_args, c_keys, c_aggregations)
GroupByChunked(c_args, c_keys, c_aggregations)
)

return pyarrow_wrap_table(sp_table)
Expand Down
7 changes: 4 additions & 3 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2490,9 +2490,10 @@ cdef extern from "arrow/compute/exec/groupby.h" namespace \
c_string function
shared_ptr[CFunctionOptions] options

CResult[shared_ptr[CTable]] GroupBy(const vector[shared_ptr[CArray]]& arguments,
const vector[shared_ptr[CArray]]& keys,
const vector[CSimpleAggregate]& aggregates)
CResult[shared_ptr[CTable]] GroupByChunked(
const vector[shared_ptr[CChunkedArray]]& arguments,
const vector[shared_ptr[CChunkedArray]]& keys,
const vector[CSimpleAggregate]& aggregates)


cdef extern from * namespace "arrow::compute":
Expand Down
13 changes: 5 additions & 8 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -5384,11 +5384,8 @@ list[tuple(str, str, FunctionOptions)]
] + self.keys

agg_tables = []
for batch in self._table.to_batches():
agg_tables.append(_pc()._group_by(
[batch[c] for c in columns],
[batch[k] for k in self.keys],
group_by_aggrs
))

return concat_tables(agg_tables).rename_columns(column_names)
return _pc()._group_by(
[self._table[c] for c in columns],
[self._table[k] for k in self.keys],
group_by_aggrs
).rename_columns(column_names)
8 changes: 8 additions & 0 deletions python/pyarrow/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2024,6 +2024,14 @@ def sorted_by_keys(d):
"values_count": [1]
}

table = pa.table({'keys': ['a', 'b', 'a', 'b', 'a', 'b'], 'values': range(6)})
table_with_chunks = pa.Table.from_batches(table.to_batches(max_chunksize=3))
r = table_with_chunks.group_by('keys').aggregate([('values', 'sum')])
print(r)
assert sorted_by_keys(r.to_pydict()) == {
"keys": ["a", "b"],
"values_sum": [6, 9]
}

def test_table_to_recordbatchreader():
table = pa.Table.from_pydict({'x': [1, 2, 3]})
Expand Down

0 comments on commit 5f71dd9

Please sign in to comment.