Skip to content

Commit

Permalink
Add key param support for helper Combiners (#25895)
Browse files Browse the repository at this point in the history
Co-authored-by: tvalentyn <tvalentyn@users.noreply.github.com>
  • Loading branch information
harrisonlimh and tvalentyn authored Mar 29, 2023
1 parent 9c61455 commit 9f582e5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@

* If a main session fails to load, the pipeline will now fail at worker startup. ([#25401](https://github.com/apache/beam/issues/25401)).
* Python pipeline options will now ignore unparsed command line flags prefixed with a single dash. ([#25943](https://github.com/apache/beam/issues/25943)).
* The SmallestPerKey combiner now requires keyword-only arguments for specifying optional parameters, such as `key` and `reverse`. ([#25888](https://github.com/apache/beam/issues/25888)).

## Deprecations

Expand Down
20 changes: 10 additions & 10 deletions sdks/python/apache_beam/transforms/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,33 +293,33 @@ def expand(self, pcoll):

@staticmethod
@ptransform.ptransform_fn
def Largest(pcoll, n, has_defaults=True):
def Largest(pcoll, n, has_defaults=True, key=None):
"""Obtain a list of the greatest N elements in a PCollection."""
if has_defaults:
return pcoll | Top.Of(n)
return pcoll | Top.Of(n, key)
else:
return pcoll | Top.Of(n).without_defaults()
return pcoll | Top.Of(n, key).without_defaults()

@staticmethod
@ptransform.ptransform_fn
def Smallest(pcoll, n, has_defaults=True):
def Smallest(pcoll, n, has_defaults=True, key=None):
"""Obtain a list of the least N elements in a PCollection."""
if has_defaults:
return pcoll | Top.Of(n, reverse=True)
return pcoll | Top.Of(n, key, reverse=True)
else:
return pcoll | Top.Of(n, reverse=True).without_defaults()
return pcoll | Top.Of(n, key, reverse=True).without_defaults()

@staticmethod
@ptransform.ptransform_fn
def LargestPerKey(pcoll, n):
def LargestPerKey(pcoll, n, key=None):
"""Identifies the N greatest elements associated with each key."""
return pcoll | Top.PerKey(n)
return pcoll | Top.PerKey(n, key)

@staticmethod
@ptransform.ptransform_fn
def SmallestPerKey(pcoll, n, reverse=True):
def SmallestPerKey(pcoll, n, *, key=None, reverse=None):
"""Identifies the N least elements associated with each key."""
return pcoll | Top.PerKey(n, reverse=True)
return pcoll | Top.PerKey(n, key, reverse=True)


@with_input_types(T)
Expand Down
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/transforms/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,20 @@ def test_top_key(self):
| combine.Top.Of(3, key=len, reverse=True),
[['c', 'aa', 'bbb']])

self.assertEqual(['xc', 'zb', 'yd', 'wa']
| combine.Top.Largest(3, key=lambda x: x[-1]),
[['yd', 'xc', 'zb']])
self.assertEqual(['xc', 'zb', 'yd', 'wa']
| combine.Top.Smallest(3, key=lambda x: x[-1]),
[['wa', 'zb', 'xc']])

self.assertEqual([('a', x) for x in [1, 2, 3, 4, 1, 1]]
| combine.Top.LargestPerKey(3, key=lambda x: -x),
[('a', [1, 1, 1])])
self.assertEqual([('a', x) for x in [1, 2, 3, 4, 1, 1]]
| combine.Top.SmallestPerKey(3, key=lambda x: -x),
[('a', [4, 3, 2])])

def test_sharded_top_combine_fn(self):
def test_combine_fn(combine_fn, shards, expected):
accumulators = [
Expand Down

0 comments on commit 9f582e5

Please sign in to comment.