Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-14866: [C++] Remove internal GroupBy implementation #14867

Merged

Conversation

westonpace
Copy link
Member

@westonpace westonpace commented Dec 7, 2022

@github-actions
Copy link

github-actions bot commented Dec 7, 2022

@github-actions
Copy link

github-actions bot commented Dec 7, 2022

⚠️ GitHub issue #14866 has been automatically assigned in GitHub to PR creator.

@github-actions
Copy link

github-actions bot commented Dec 7, 2022

⚠️ GitHub issue #14866 has no components, please add labels for components.

@westonpace
Copy link
Member Author

This may be interesting to @lidavidm , @bkietz , and @amol-

result_batch = []
for c_column in c_batch.values:
result_batch.append(wrap_datum(c_column))
result_batches.append(result_batch)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this use the ExecBatch::ToRecordBatch to return a list of batches instead? (that seems simpler to work with, and also simplifies the code here)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was a bit torn on this one. The simplest thing might be for arrow::compute::GroupBy to return std::shared_ptr<RecordBatch>. However, I don't have column names, so I would be making those up. Also, the inputs are datums, and so it seemed like a mismatch to receive datums (not arrays) and return a record batch (and not an exec batch). So then I ended up with a list of lists of arrays which is unpleasant too.

I could return a list of record batches but then I would have to copy the CreateSimpleSchema method which invents names for these columns. Since the caller of this function has those names, and this function is private, I figured it best to leave that work for the caller.

That being said, after sleeping on this a bit, maybe a better change would be to change arrow::compute::GroupBy to receive arrays (not datums) and then returning a record batch wouldn't be inconsistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I ended up promoting arrow::compute::GroupBy to a "proper" convenience function. It now accepts arrays, returns a table, is a bit friendlier with field names, checks for invalid input, is added to the api.h file, and has unit tests.

@@ -185,7 +184,8 @@ Result<Datum> GroupByUsingExecPlan(const BatchesWithSchema& input,
Result<Datum> GroupByUsingExecPlan(const std::vector<Datum>& arguments,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps this could just be called GroupBy now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little hesitant to use GroupBy since that would mean the GroupBy function calls the GroupBy function which is a little confusing. I changed it to RunGroupBy.

Comment on lines 311 to 309
if (io_executor == NULLPTR) {
io_executor = plan->exec_context()->executor();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this removal intended?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not terribly relevant. I was initially wanting to use exec_batch_source here but ran into a problem because I was not transferring off the background generator. Incidentally, I think it may be close to time to fix the background generator to remove this limitation, but I have enough on my plate for the moment.

This change was simply because, a few lines down (on line 316) we have a very similar if condition:

    if (io_executor == NULLPTR) {
      io_executor = io::internal::GetIOThreadPool();
    }

I don't think we need both of these statements and defaulting to the I/O thread pool seemed like the better default. If that is not the correct default, or there is some subtlety I am missing, let me know and I can revert this.

@rtpsw
Copy link
Contributor

rtpsw commented Dec 22, 2022

@jorisvandenbossche, it looks like this PR is pending your review. Could you take a look? I have another PR, which is important to me, that is waiting for this one.

@jorisvandenbossche
Copy link
Member

Ok, I ended up promoting arrow::compute::GroupBy to a "proper" convenience function. It now accepts arrays, returns a table, is a bit friendlier with field names, checks for invalid input, is added to the api.h file, and has unit tests.

Sorry for the slow reply. I don't fully understand how the above can work correctly (working on arrays, instead of on chunked arrays), because you now calculate the groupby batch by batch? But then those results should be "merged" somehow, and not just concatenated?

@jorisvandenbossche
Copy link
Member

To illustrate what I mean, using a small example (and using this branch):

In [7]: table = pa.table({'key': ['a', 'b', 'a', 'b', 'a', 'b'], 'col': range(6)})

In [8]: table = pa.Table.from_batches(table.to_batches(max_chunksize=3))

In [9]: table.to_pandas()
Out[9]: 
  key  col
0   a    0
1   b    1
2   a    2
3   b    3
4   a    4
5   b    5

In [10]: table.group_by('key').aggregate([('col', 'sum')]).to_pandas()
Out[10]: 
   col_sum key
0        2   a
1        1   b
2        8   b
3        4   a

I created a table consisting of multiple chunks, and then the result is incorrect as it is the concatentation of individual results of each chunk.

@rtpsw
Copy link
Contributor

rtpsw commented Dec 22, 2022

@westonpace, though I didn't review the code carefully, it looks like this PR removes code that is refactored and used in my ordered/segmented aggregation PR. GIven this, and the correctness problem @jorisvandenbossche is pointing out in this PR, might it make sense to wait with your removal until my PR is merged? or to include your removal in my PR?

@westonpace
Copy link
Member Author

@jorisvandenbossche it should be using an exec plan internally with an aggregate node. The aggregate node knows how to maintain state from batch to batch. However, I agree your example is pointing out a bug in my code. I'll take a look.

@rtpsw I will try merging this with your branch (and then create a third PR) just to make sure it works. I don't know if I can get to it before tomorrow morning. Either way, if there is concern about this approach, we can merge yours and clean up with mine.

The basic idea is that we have kernel functions for arrays / single batches and exec plans for multiple batches (which should include chunked arrays).

I don't see any value in maintaining a third path for chunked arrays when they should just be a special case of multiple batches / exec plans.

@jorisvandenbossche
Copy link
Member

it should be using an exec plan internally with an aggregate node.

But I assumed that it would be the purpose of the GroupBy(..) helper to do this? (its doc comment says "The result will be calculated using an exec plan with an aggregate node")

@westonpace
Copy link
Member Author

Yes. I see the problem now. The group by helper needs an overload that accepts chunked arrays, converts them to a table, and then uses that as input.

@westonpace
Copy link
Member Author

@rtpsw 0f2b458 is an example of layering this PR on top of your ordered groupby changes. I couldn't get the aggregate node working because I don't think it works at the moment and didn't want to dive too deep into that problem just yet. However, I don't see any reason your ordered aggregation won't work with this PR. Also, once things are working, we should be able to go further and:

  • remove chunked array from datum
  • remove all grouper.cc/grouper.h changes which use spans and instead only use batches

@westonpace
Copy link
Member Author

I'm going to rebase this and address the problem Joris raised.

@westonpace westonpace force-pushed the feature/14866--remove-internal-groupby branch from 1b407e2 to 3a799ac Compare December 23, 2022 17:21
@westonpace
Copy link
Member Author

@jorisvandenbossche Thanks for pointing out that problem. I think I've addressed your concerns (and I've added your example as a test case).

@westonpace westonpace force-pushed the feature/14866--remove-internal-groupby branch from 9baee4b to 412a32d Compare January 4, 2023 13:57
@westonpace
Copy link
Member Author

@jorisvandenbossche friendly ping now that the holidays are over.

Copy link
Member

@jorisvandenbossche jorisvandenbossche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! (only looked at the cython code again, and the expected behaviour)

Just some small clean-ups needed

python/pyarrow/table.pxi Outdated Show resolved Hide resolved
python/pyarrow/tests/test_table.py Outdated Show resolved Hide resolved
@westonpace westonpace force-pushed the feature/14866--remove-internal-groupby branch from 412a32d to c0cc97a Compare January 19, 2023 00:37
@westonpace westonpace requested a review from AlenkaF as a code owner January 19, 2023 00:37
@westonpace
Copy link
Member Author

I've rebased this and will merge if CI is still passing.

… directly instead of emulating one

apacheGH-14866: converted GroupBy into a proper convenience function, accepting arrays and returning table, with unit tests
@westonpace
Copy link
Member Author

closes #34238

@jorisvandenbossche
Copy link
Member

@westonpace BTW, if you want that the issue gets automatically closed, you need to add the "closes #34238" to the top comment, and not only in a comment like the one above (our tooling still won't automatically assign and milestone the issue though, it's only github that will automatically close it then)

@westonpace
Copy link
Member Author

@jorisvandenbossche Oh! Thanks for catching that. I hadn't realized that (I think this is the first PR I've done that closed multiple issues).

@ursabot
Copy link

ursabot commented Feb 22, 2023

Benchmark runs are scheduled for baseline = a988302 and contender = 92d91f5. 92d91f5 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️0.49% ⬆️0.0%] test-mac-arm
[Finished ⬇️0.0% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.48% ⬆️0.03%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] 92d91f53 ec2-t3-xlarge-us-east-2
[Finished] 92d91f53 test-mac-arm
[Finished] 92d91f53 ursa-i9-9960x
[Finished] 92d91f53 ursa-thinkcentre-m75q
[Finished] a9883024 ec2-t3-xlarge-us-east-2
[Failed] a9883024 test-mac-arm
[Finished] a9883024 ursa-i9-9960x
[Finished] a9883024 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

fatemehp pushed a commit to fatemehp/arrow that referenced this pull request Feb 24, 2023
…14867)

* Closes: apache#14866

Authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants