From ef8fd43a51143156efb7e3f437428b9ae9cc1e4b Mon Sep 17 00:00:00 2001 From: Anand Inguva <34158215+AnandInguva@users.noreply.github.com> Date: Thu, 13 Jul 2023 13:50:09 -0400 Subject: [PATCH] Revert "[Python] use get_buffer to fetch buffer when the buffer is None (#27373)" (#27491) This reverts commit 63d5171b70fd05ea8c79cc549d1ec94b3eb4c57d. --- CHANGES.md | 2 +- .../portability/fn_api_runner/fn_runner.py | 16 +++++----------- .../portability/fn_api_runner/fn_runner_test.py | 9 --------- 3 files changed, 6 insertions(+), 21 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index d6d46507f8a0..7ed75c7ee05d 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -78,7 +78,7 @@ ## Bugfixes -* Fixed DirectRunner bug in Python SDK where GroupByKey gets empty PCollection and fails when pipeline option `direct_num_workers!=1`. ([#27373](https://github.com/apache/beam/pull/27373)) +* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). ## Known Issues diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index be7f99dc61f4..8d957068d08b 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -825,11 +825,10 @@ def _execute_bundle(self, buffers_to_clean = set() known_consumers = set() - for transform_id, buffer_id in ( - bundle_context_manager.stage_data_outputs.items()): - for (consuming_stage_name, consuming_transform - ) in runner_execution_context.buffer_id_to_consumer_pairs.get( - buffer_id, []): + for _, buffer_id in bundle_context_manager.stage_data_outputs.items(): + for (consuming_stage_name, consuming_transform) in \ + runner_execution_context.buffer_id_to_consumer_pairs.get(buffer_id, + []): buffer = runner_execution_context.pcoll_buffers.get(buffer_id, None) if (buffer_id in runner_execution_context.pcoll_buffers and @@ -841,11 +840,6 @@ def _execute_bundle(self, # so we create a copy of the buffer for every new stage. runner_execution_context.pcoll_buffers[buffer_id] = buffer.copy() buffer = runner_execution_context.pcoll_buffers[buffer_id] - # When the buffer is not in the pcoll_buffers, it means that the - # it could be an empty PCollection. In this case, get the buffer using - # the buffer id and transform id - if buffer is None: - buffer = bundle_context_manager.get_buffer(buffer_id, transform_id) # If the buffer has already been added to be consumed by # (stage, transform), then we don't need to add it again. This case @@ -860,7 +854,7 @@ def _execute_bundle(self, # MAX_TIMESTAMP for the downstream stage. runner_execution_context.queues.watermark_pending_inputs.enque( ((consuming_stage_name, timestamp.MAX_TIMESTAMP), - DataInput({consuming_transform: buffer}, {}))) + DataInput({consuming_transform: buffer}, {}))) # type: ignore for bid in buffers_to_clean: if bid in runner_execution_context.pcoll_buffers: diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py index b55c7162aea7..ed09bb8f2236 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py @@ -1831,15 +1831,6 @@ def create_pipeline(self, is_drain=False): p._options.view_as(DebugOptions).experiments.remove('beam_fn_api') return p - def test_group_by_key_with_empty_pcoll_elements(self): - with self.create_pipeline() as p: - res = ( - p - | beam.Create([('test_key', 'test_value')]) - | beam.Filter(lambda x: False) - | beam.GroupByKey()) - assert_that(res, equal_to([])) - def test_metrics(self): raise unittest.SkipTest("This test is for a single worker only.")