Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add key param support for helper Combiners #25895

Merged
merged 7 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@
* Schema'd PTransforms can now be directly applied to Beam dataframes just like PCollections.
(Note that when doing multiple operations, it may be more efficient to explicitly chain the operations
like `df | (Transform1 | Transform2 | ...)` to avoid excessive conversions.)
* The Go SDK adds new transforms periodic.Impulse and periodic.Sequence that extends support
* The Go SDK adds new transforms periodic.Impulse and periodic.Sequence that extends support
for slowly updating side input patterns. ([#23106](https://github.com/apache/beam/issues/23106))

## Breaking Changes

* If a main session fails to load, the pipeline will now fail at worker startup. ([#25401](https://github.com/apache/beam/issues/25401)).
* Python pipeline options will now ignore unparsed command line flags prefixed with a single dash. ([#25943](https://github.com/apache/beam/issues/25943)).
* SmallestPerKey combiner requires specifying keyword only arguments 'key' and 'reverse'. ([#25888](https://github.com/apache/beam/issues/25888)).
tvalentyn marked this conversation as resolved.
Show resolved Hide resolved

## Deprecations

Expand Down
20 changes: 10 additions & 10 deletions sdks/python/apache_beam/transforms/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,33 +293,33 @@ def expand(self, pcoll):

@staticmethod
@ptransform.ptransform_fn
def Largest(pcoll, n, has_defaults=True):
def Largest(pcoll, n, has_defaults=True, key=None):
"""Obtain a list of the greatest N elements in a PCollection."""
if has_defaults:
return pcoll | Top.Of(n)
return pcoll | Top.Of(n, key)
else:
return pcoll | Top.Of(n).without_defaults()
return pcoll | Top.Of(n, key).without_defaults()

@staticmethod
@ptransform.ptransform_fn
def Smallest(pcoll, n, has_defaults=True):
def Smallest(pcoll, n, has_defaults=True, key=None):
"""Obtain a list of the least N elements in a PCollection."""
if has_defaults:
return pcoll | Top.Of(n, reverse=True)
return pcoll | Top.Of(n, key, reverse=True)
else:
return pcoll | Top.Of(n, reverse=True).without_defaults()
return pcoll | Top.Of(n, key, reverse=True).without_defaults()

@staticmethod
@ptransform.ptransform_fn
def LargestPerKey(pcoll, n):
def LargestPerKey(pcoll, n, key=None):
"""Identifies the N greatest elements associated with each key."""
return pcoll | Top.PerKey(n)
return pcoll | Top.PerKey(n, key)

@staticmethod
@ptransform.ptransform_fn
def SmallestPerKey(pcoll, n, reverse=True):
def SmallestPerKey(pcoll, n, *, key=None, reverse=None):
"""Identifies the N least elements associated with each key."""
return pcoll | Top.PerKey(n, reverse=True)
return pcoll | Top.PerKey(n, key, reverse=True)


@with_input_types(T)
Expand Down
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/transforms/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ def test_top_key(self):
| combine.Top.Of(3, key=len, reverse=True),
[['c', 'aa', 'bbb']])

self.assertEqual(['xc', 'zb', 'yd', 'wa']
| combine.Top.Largest(3, key=lambda x: x[-1]),
[['yd', 'xc', 'zb']])
self.assertEqual(['xc', 'zb', 'yd', 'wa']
| combine.Top.Smallest(3, key=lambda x: x[-1]),
[['wa', 'zb', 'xc']])

self.assertEqual([('a', x) for x in [1, 2, 3, 4, 1, 1]]
| combine.Top.LargestPerKey(3, key=lambda x: -x),
[('a', [1, 1, 1])])
self.assertEqual([('a', x) for x in [1, 2, 3, 4, 1, 1]]
| combine.Top.SmallestPerKey(3, key=lambda x: -x),
[('a', [4, 3, 2])])

def test_sharded_top_combine_fn(self):
def test_combine_fn(combine_fn, shards, expected):
accumulators = [
Expand Down