From 7e7508731521862097c6e6230ac4bb6c53477965 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Sat, 3 Aug 2024 22:06:12 -0700 Subject: [PATCH] Update top_wikipedia_sessions to be more idiomatic with beam.Map. (#32041) * Update top_wikipedia_sessions to be more idiomatic with beam.Map. * lint --- .../complete/top_wikipedia_sessions.py | 34 ++++++++----------- .../complete/top_wikipedia_sessions_test.py | 2 ++ 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index 7064a5add13c7..50b026edf2402 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -61,14 +61,13 @@ MAX_TIMESTAMP = 0x7fffffffffffffff -class ExtractUserAndTimestampDoFn(beam.DoFn): +def extract_user_and_timestamp(element): """Extracts user and timestamp representing a Wikipedia edit.""" - def process(self, element): - table_row = json.loads(element) - if 'contributor_username' in table_row: - user_name = table_row['contributor_username'] - timestamp = table_row['timestamp'] - yield TimestampedValue(user_name, timestamp) + table_row = json.loads(element) + if 'contributor_username' in table_row: + user_name = table_row['contributor_username'] + timestamp = table_row['timestamp'] + return TimestampedValue(user_name, timestamp) class ComputeSessions(beam.PTransform): @@ -98,19 +97,15 @@ def expand(self, pcoll): without_defaults()) -class SessionsToStringsDoFn(beam.DoFn): +def sessions_to_strings(element, window=beam.DoFn.WindowParam): """Adds the session information to be part of the key.""" - def process(self, element, window=beam.DoFn.WindowParam): - yield (element[0] + ' : ' + str(window), element[1]) + return (element[0] + ' : ' + str(window), element[1]) -class FormatOutputDoFn(beam.DoFn): +def format_output(element, window=beam.DoFn.WindowParam): """Formats a string containing the user, count, and session.""" - def process(self, element, window=beam.DoFn.WindowParam): - for kv in element: - session = kv[0] - count = kv[1] - yield session + ' : ' + str(count) + ' : ' + str(window) + for session, count in element: + yield session + ' : ' + str(count) + ' : ' + str(window) class ComputeTopSessions(beam.PTransform): @@ -124,14 +119,13 @@ def __init__(self, sampling_threshold): def expand(self, pcoll): return ( pcoll - | - 'ExtractUserAndTimestamp' >> beam.ParDo(ExtractUserAndTimestampDoFn()) + | 'ExtractUserAndTimestamp' >> beam.Map(extract_user_and_timestamp) | beam.Filter( lambda x: (abs(hash(x)) <= MAX_TIMESTAMP * self.sampling_threshold)) | ComputeSessions() - | 'SessionsToStrings' >> beam.ParDo(SessionsToStringsDoFn()) + | 'SessionsToStrings' >> beam.Map(sessions_to_strings) | TopPerMonth() - | 'FormatOutput' >> beam.ParDo(FormatOutputDoFn())) + | 'FormatOutput' >> beam.FlatMap(format_output)) def run(argv=None): diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py index 3c171664e45d2..92d1d196fe055 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py @@ -28,6 +28,8 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to +# TODO: Unit test top_wikipedia_sessions.extract_user_and_timestamp, etc. + class ComputeTopSessionsTest(unittest.TestCase):