From 1581944f9e7d4b33a0b76e3b17b8beea8d0cb0b9 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 9 Jun 2025 16:45:43 -0400 Subject: [PATCH 1/2] Force FnApiRunner in cases where prism can't handle use case --- .../apache_beam/dataframe/transforms_test.py | 12 +++++++-- .../elementwise/pardo_dofn_methods.py | 9 ++++++- .../io/gcp/bigquery_file_loads_test.py | 15 ++++++++--- .../apache_beam/io/gcp/bigquery_test.py | 27 ++++++++++++++----- .../apache_beam/io/gcp/bigtableio_test.py | 6 ++++- .../io/gcp/experimental/spannerio_test.py | 18 ++++++++++--- sdks/python/apache_beam/io/gcp/pubsub_test.py | 12 +++++++++ .../apache_beam/ml/gcp/cloud_dlp_test.py | 12 +++++++-- .../apache_beam/ml/inference/base_test.py | 26 ++++++++++++++---- .../ml/inference/tensorflow_inference_test.py | 5 +++- .../apache_beam/options/pipeline_options.py | 1 + sdks/python/apache_beam/pipeline_test.py | 4 ++- .../runners/direct/direct_runner_test.py | 13 ++++++--- .../non_interactive_runner_test.py | 4 ++- .../python/apache_beam/runners/runner_test.py | 5 ++-- .../apache_beam/transforms/trigger_test.py | 4 ++- .../apache_beam/typehints/typecheck_test.py | 3 ++- sdks/python/apache_beam/yaml/readme_test.py | 4 ++- 18 files changed, 144 insertions(+), 36 deletions(-) diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py index 6b070090c624..a2ca2f9d3879 100644 --- a/sdks/python/apache_beam/dataframe/transforms_test.py +++ b/sdks/python/apache_beam/dataframe/transforms_test.py @@ -372,7 +372,11 @@ def create_animal_speed_input(p): reshuffle=False) def test_loc_filter(self): - with beam.Pipeline() as p: + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # monitoring_metrics property of the FnApiRunner which does not exist on + # other runners like Prism. + # https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590 + with beam.Pipeline('FnApiRunner') as p: _ = ( self.create_animal_speed_input(p) | transforms.DataframeTransform(lambda df: df[df.Speed > 10])) @@ -383,7 +387,11 @@ def set_column(df, name, s): df[name] = s return df - with beam.Pipeline() as p: + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # monitoring_metrics property of the FnApiRunner which does not exist on + # other runners like Prism. + # https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590 + with beam.Pipeline('FnApiRunner') as p: _ = ( self.create_animal_speed_input(p) | transforms.DataframeTransform( diff --git a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_dofn_methods.py b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_dofn_methods.py index 868519602569..46d4f5955b0c 100644 --- a/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_dofn_methods.py +++ b/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_dofn_methods.py @@ -34,6 +34,9 @@ def pardo_dofn_methods(test=None): + # Portable runners do not guarantee that teardown will be executed, so we + # use FnApiRunner instead of prism. + runner = 'FnApiRunner' # [START pardo_dofn_methods] import apache_beam as beam @@ -60,9 +63,13 @@ def finish_bundle(self): ) def teardown(self): + # Teardown is best effort and not guaranteed to be executed by all + # runners in all cases (for example, it may be skipped if the pipeline + # can otherwise complete). It should be used for best effort resource + # cleanup. print('teardown') - with beam.Pipeline() as pipeline: + with beam.Pipeline(runner) as pipeline: results = ( pipeline | 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔']) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py index 84e8ecfc486e..6400365918d2 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py @@ -541,7 +541,10 @@ def test_load_job_id_used(self): validate=False, load_job_project_id='loadJobProject') - with TestPipeline('DirectRunner') as p: + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # lineage metrics which Prism doesn't seem to handle correctly. Defaulting + # to FnApiRunner instead. + with TestPipeline('FnApiRunner') as p: outputs = p | beam.Create(_ELEMENTS) | transform jobs = outputs[bqfl.BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS] \ | "GetJobs" >> beam.Map(lambda x: x[1]) @@ -571,7 +574,10 @@ def test_load_job_id_use_for_copy_job(self): bq_client.jobs.Insert.return_value = result_job bq_client.tables.Delete.return_value = None - with TestPipeline('DirectRunner') as p: + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # lineage metrics which Prism doesn't seem to handle correctly. Defaulting + # to FnApiRunner instead. + with TestPipeline('FnApiRunner') as p: outputs = ( p | beam.Create(_ELEMENTS, reshuffle=False) @@ -709,7 +715,10 @@ def test_multiple_partition_files(self): bq_client.jobs.Insert.return_value = result_job bq_client.tables.Delete.return_value = None - with TestPipeline('DirectRunner') as p: + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # lineage metrics which Prism doesn't seem to handle correctly. Defaulting + # to FnApiRunner instead. + with TestPipeline('FnApiRunner') as p: outputs = ( p | beam.Create(_ELEMENTS, reshuffle=False) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index 3931b0822595..182e5359ef83 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -446,13 +446,14 @@ def test_temp_dataset_is_configurable( ]) def test_create_temp_dataset_exception(self, exception_type, error_message): + # Uses the FnApiRunner to ensure errors are mocked/passed through correctly with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService, 'Insert'),\ mock.patch.object(BigQueryWrapper, 'get_or_create_dataset') as mock_insert, \ mock.patch('time.sleep'), \ self.assertRaises(Exception) as exc,\ - beam.Pipeline() as p: + beam.Pipeline('FnApiRunner') as p: mock_insert.side_effect = exception_type(error_message) @@ -462,7 +463,7 @@ def test_create_temp_dataset_exception(self, exception_type, error_message): gcs_location='gs://temp_location') mock_insert.assert_called() - self.assertIn(error_message, exc.exception.args[0]) + self.assertIn(error_message, str(exc.exception)) @parameterized.expand([ # read without exception @@ -515,6 +516,9 @@ class DummySchema: numBytes = 5 schema = DummySchema() + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # lineage metrics which Prism doesn't seem to handle correctly. Defaulting + # to FnApiRunner instead. with mock.patch('time.sleep'), \ mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService, 'Get') as mock_get_table, \ @@ -526,7 +530,7 @@ class DummySchema: 'match'), \ mock.patch.object(FileSystems, 'delete'), \ - beam.Pipeline() as p: + beam.Pipeline('FnApiRunner') as p: call_counter = 0 def store_callback(unused_request): @@ -676,6 +680,9 @@ def store_callback(unused_request): ]) def test_query_job_exception(self, exception_type, error_message): + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # mocking which prism doesn't seem to fully handle correctly (mocks get + # mixed between test runs). Pinning to FnApiRunner for now. with mock.patch.object(beam.io.gcp.bigquery._CustomBigQuerySource, 'estimate_size') as mock_estimate,\ mock.patch.object(BigQueryWrapper, @@ -685,7 +692,7 @@ def test_query_job_exception(self, exception_type, error_message): mock.patch.object(bigquery_v2_client.BigqueryV2.DatasetsService, 'Get'), \ mock.patch('time.sleep'), \ self.assertRaises(Exception) as exc, \ - beam.Pipeline() as p: + beam.Pipeline('FnApiRunner') as p: mock_estimate.return_value = None mock_query_location.return_value = None @@ -727,14 +734,17 @@ def test_read_export_exception(self, exception_type, error_message): gcs_location="gs://temp_location") mock_query_job.assert_called() - self.assertIn(error_message, exc.exception.args[0]) + self.assertIn(error_message, str(exc.exception)) def test_read_direct_lineage(self): + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # lineage metrics which Prism doesn't seem to handle correctly. Defaulting + # to FnApiRunner instead. with mock.patch.object(bigquery_tools.BigQueryWrapper, '_bigquery_client'),\ mock.patch.object(bq_storage.BigQueryReadClient, 'create_read_session'),\ - beam.Pipeline() as p: + beam.Pipeline('FnApiRunner') as p: _ = p | ReadFromBigQuery( method=ReadFromBigQuery.Method.DIRECT_READ, @@ -744,8 +754,11 @@ def test_read_direct_lineage(self): set(["bigquery:project.dataset.table"])) def test_read_all_lineage(self): + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # lineage metrics which Prism doesn't seem to handle correctly. Defaulting + # to FnApiRunner instead. with mock.patch.object(_BigQueryReadSplit, '_export_files') as export, \ - beam.Pipeline() as p: + beam.Pipeline('FnApiRunner') as p: export.return_value = (None, []) diff --git a/sdks/python/apache_beam/io/gcp/bigtableio_test.py b/sdks/python/apache_beam/io/gcp/bigtableio_test.py index 130f9a714129..3a9d2ac6f7cb 100644 --- a/sdks/python/apache_beam/io/gcp/bigtableio_test.py +++ b/sdks/python/apache_beam/io/gcp/bigtableio_test.py @@ -271,8 +271,12 @@ def setUp(self): def test_write(self): direct_rows = [self.generate_row(i) for i in range(5)] + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # lineage metrics which Prism doesn't seem to handle correctly. Defaulting + # to FnApiRunner instead. + runner = 'FnApiRunner' with patch.object(MutationsBatcher, 'mutate'), \ - patch.object(MutationsBatcher, 'close'), TestPipeline() as p: + patch.object(MutationsBatcher, 'close'), TestPipeline(runner) as p: _ = p | beam.Create(direct_rows) | bigtableio.WriteToBigTable( self._PROJECT_ID, self._INSTANCE_ID, self._TABLE_ID) self.assertSetEqual( diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py index 0e22041dbea4..4e391900eaa7 100644 --- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py +++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py @@ -448,7 +448,11 @@ def test_spanner_write(self, mock_batch_snapshot_class, mock_batch_checkout): [('1234', "mutations-inset-1233-updated")]), ] - p = TestPipeline() + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # metrics filtering which doesn't work on Prism yet because Prism renames + # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7"). + # https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590 + p = TestPipeline('FnApiRunner') _ = ( p | beam.Create(mutations) @@ -475,7 +479,11 @@ def test_spanner_bundles_size( WriteMutation.insert( "roles", ("key", "rolename"), [('1234', "mutations-inset-1234")]) ] * 50 - p = TestPipeline() + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # metrics filtering which doesn't work on Prism yet because Prism renames + # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7"). + # https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590 + p = TestPipeline('FnApiRunner') _ = ( p | beam.Create(mutations) @@ -514,7 +522,11 @@ def test_spanner_write_mutation_groups( MutationGroup([WriteMutation.delete("roles", ks)]) ] - p = TestPipeline() + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # metrics filtering which doesn't work on Prism yet because Prism renames + # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7"). + # https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590 + p = TestPipeline('FnApiRunner') _ = ( p | beam.Create(mutation_groups) diff --git a/sdks/python/apache_beam/io/gcp/pubsub_test.py b/sdks/python/apache_beam/io/gcp/pubsub_test.py index fadc49461a3c..e3fb07a17625 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_test.py @@ -834,6 +834,10 @@ def test_read_from_pubsub_no_overwrite(self, unused_mock): ] options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # lineage metrics which Prism doesn't seem to handle correctly. Defaulting + # to FnApiRunner instead. + options.view_as(StandardOptions).runner = 'FnApiRunner' for test_case in ('topic', 'subscription'): with TestPipeline(options=options) as p: # Direct runner currently overwrites the whole ReadFromPubSub transform. @@ -1009,6 +1013,10 @@ def test_write_to_pubsub_no_overwrite(self, unused_mock): options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # lineage metrics which Prism doesn't seem to handle correctly. Defaulting + # to FnApiRunner instead. + options.view_as(StandardOptions).runner = 'FnApiRunner' with TestPipeline(options=options) as p: pcoll = p | Create(payloads) WriteToPubSub( @@ -1025,6 +1033,10 @@ def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock): options = PipelineOptions([]) options.view_as(StandardOptions).streaming = True + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # lineage metrics which Prism doesn't seem to handle correctly. Defaulting + # to FnApiRunner instead. + options.view_as(StandardOptions).runner = 'FnApiRunner' with TestPipeline(options=options) as p: pcoll = p | Create(payloads) # Avoid direct runner overwrites WriteToPubSub diff --git a/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py b/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py index d4153e5b3fe9..51916eaaf6c7 100644 --- a/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py +++ b/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py @@ -72,7 +72,11 @@ def common_project_path(self, *args): return 'test' with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock): - p = TestPipeline() + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # metrics filtering which doesn't work on Prism yet because Prism renames + # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7"). + # https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590 + p = TestPipeline('FnApiRunner') config = { "deidentify_config": { "info_type_transformations": { @@ -125,7 +129,11 @@ def common_project_path(self, *args): return 'test' with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock): - p = TestPipeline() + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # metrics filtering which doesn't work on Prism yet because Prism renames + # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7"). + # https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590 + p = TestPipeline('FnApiRunner') config = {"inspect_config": {"info_types": [{"name": "EMAIL_ADDRESS"}]}} # pylint: disable=expression-not-assigned ( diff --git a/sdks/python/apache_beam/ml/inference/base_test.py b/sdks/python/apache_beam/ml/inference/base_test.py index 6497de3fe9d5..29ac0ad4247d 100644 --- a/sdks/python/apache_beam/ml/inference/base_test.py +++ b/sdks/python/apache_beam/ml/inference/base_test.py @@ -950,7 +950,11 @@ def test_unexpected_inference_args_passed(self): def test_increment_failed_batches_counter(self): with self.assertRaises(ValueError): - with TestPipeline() as pipeline: + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # metrics filtering which doesn't work on Prism yet because Prism renames + # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7"). + # https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590 + with TestPipeline('FnApiRunner') as pipeline: examples = [7] pcoll = pipeline | 'start' >> beam.Create(examples) _ = pcoll | base.RunInference(FakeModelHandlerExpectedInferenceArgs()) @@ -1226,7 +1230,10 @@ def process(self, element): for e in element: yield e - with TestPipeline() as pipeline: + # This test relies on poorly defined side input semantics which vary + # across runners (including prism). Pinning to FnApiRunner which + # consistently guarantees output. + with TestPipeline('FnApiRunner') as pipeline: side_input = ( pipeline | @@ -1324,7 +1331,10 @@ def process(self, element): for e in element: yield e - with TestPipeline() as pipeline: + # This test relies on poorly defined side input semantics which vary + # across runners (including prism). Pinning to FnApiRunner which + # consistently guarantees output. + with TestPipeline('FnApiRunner') as pipeline: side_input = ( pipeline | @@ -1425,7 +1435,10 @@ def process(self, element): for e in element: yield e - with TestPipeline() as pipeline: + # This test relies on poorly defined side input semantics which vary + # across runners (including prism). Pinning to FnApiRunner which + # consistently guarantees output. + with TestPipeline('FnApiRunner') as pipeline: side_input = ( pipeline | @@ -1500,7 +1513,10 @@ def process(self, element): for e in element: yield e - with TestPipeline() as pipeline: + # This test relies on poorly defined side input semantics which vary + # across runners (including prism). Pinning to FnApiRunner which + # consistently guarantees output. + with TestPipeline('FnApiRunner') as pipeline: side_input = ( pipeline | diff --git a/sdks/python/apache_beam/ml/inference/tensorflow_inference_test.py b/sdks/python/apache_beam/ml/inference/tensorflow_inference_test.py index 9b23963723d1..75f15c87f5ce 100644 --- a/sdks/python/apache_beam/ml/inference/tensorflow_inference_test.py +++ b/sdks/python/apache_beam/ml/inference/tensorflow_inference_test.py @@ -221,7 +221,10 @@ def test_predict_numpy_with_batch_size(self): model = _create_mult2_model() model_path = os.path.join(self.tmpdir, f'mult2_{uuid.uuid4()}.keras') tf.keras.models.save_model(model, model_path) - with TestPipeline() as pipeline: + # TODO(https://github.com/apache/beam/issues/34549): This test relies on a + # runner producing a single bundle or bundles of even size, neither of + # which prism seems to do here + with TestPipeline('FnApiRunner') as pipeline: def fake_batching_inference_fn( model: tf.Module, diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 867a5bc24f24..3554e4cfe959 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -607,6 +607,7 @@ class StandardOptions(PipelineOptions): 'apache_beam.runners.direct.direct_runner.SwitchingDirectRunner', 'apache_beam.runners.interactive.interactive_runner.InteractiveRunner', 'apache_beam.runners.portability.flink_runner.FlinkRunner', + 'apache_beam.runners.portability.fn_api_runner.FnApiRunner', 'apache_beam.runners.portability.portable_runner.PortableRunner', 'apache_beam.runners.portability.prism_runner.PrismRunner', 'apache_beam.runners.portability.spark_runner.SparkRunner', diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 0bbd14b6afc7..4389f174cb92 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -752,7 +752,9 @@ def test_incompatible_submission_and_runtime_envs_fail_pipeline(self): RuntimeError, 'Pipeline construction environment and pipeline runtime ' 'environment are not compatible.'): - with TestPipeline() as p: + # TODO(https://github.com/apache/beam/issues/34549): Prism doesn't + # pass through capabilities as part of the ProcessBundleDescriptor. + with TestPipeline('FnApiRunner') as p: _ = p | Create([None]) diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py b/sdks/python/apache_beam/runners/direct/direct_runner_test.py index 92116c665b64..008a1bd47215 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner_test.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py @@ -56,9 +56,10 @@ class DirectPipelineResultTest(unittest.TestCase): def test_waiting_on_result_stops_executor_threads(self): pre_test_threads = set(t.ident for t in threading.enumerate()) - for runner in ['DirectRunner', - 'BundleBasedDirectRunner', - 'SwitchingDirectRunner']: + for runner in [ + 'BundleBasedDirectRunner', + 'apache_beam.runners.portability.fn_api_runner.fn_runner.FnApiRunner' + ]: pipeline = test_pipeline.TestPipeline(runner=runner) _ = (pipeline | beam.Create([{'foo': 'bar'}])) result = pipeline.run() @@ -91,7 +92,11 @@ def process(self, element): ("a", "b", str(element % 4))) return [element] - p = Pipeline(DirectRunner()) + # TODO(https://github.com/apache/beam/issues/34549): This test relies on + # metrics filtering which doesn't work on Prism yet because Prism renames + # steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7"). + # https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590 + p = Pipeline('FnApiRunner') pcoll = ( p | beam.Create([1, 2, 3, 4, 5], reshuffle=False) | 'Do' >> beam.ParDo(MyDoFn())) diff --git a/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py index f7fd052fecc4..82298f5def09 100644 --- a/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py +++ b/sdks/python/apache_beam/runners/interactive/non_interactive_runner_test.py @@ -76,7 +76,9 @@ class NonInteractiveRunnerTest(unittest.TestCase): @unittest.skipIf(sys.platform == "win32", "[BEAM-10627]") def test_basic(self): clear_side_effect() - p = beam.Pipeline(direct_runner.DirectRunner()) + # This test relies on the pipeline cache being populated. Prism doesn't + # consistently populate this cache, forcing FnApiRunner + p = beam.Pipeline('FnApiRunner') # Initial collection runs the pipeline. pcoll1 = p | beam.Create(['a', 'b', 'c']) | beam.Map(cause_side_effect) diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 61fe400997dd..0593be40465f 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -30,6 +30,7 @@ from apache_beam.metrics.metric import Metrics from apache_beam.runners import DirectRunner from apache_beam.runners import create_runner +from apache_beam.runners.portability.fn_api_runner import FnApiRunner class RunnerTest(unittest.TestCase): @@ -55,7 +56,7 @@ def test_create_runner_shorthand(self): def test_run_api(self): my_metric = Metrics.counter('namespace', 'my_metric') - runner = DirectRunner() + runner = FnApiRunner() result = runner.run( beam.Create([1, 10, 100]) | beam.Map(lambda x: my_metric.inc(x))) result.wait_until_finish() @@ -72,7 +73,7 @@ def fn(start): | beam.Create([1, 10, 100]) | beam.Map(lambda x: my_metric.inc(x))) - runner = DirectRunner() + runner = FnApiRunner() result = runner.run(fn) result.wait_until_finish() # Use counters to assert the pipeline actually ran. diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 2cad624272ba..79fd3151c083 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -700,7 +700,9 @@ def test_after_count_streaming(self): }.items()))) def test_always(self): - with TestPipeline() as p: + # Pin to FnApiRunner since portable runner could trigger differently if + # using bundle sizes of greater than 1. + with TestPipeline('FnApiRunner') as p: def construct_timestamped(k, t): return TimestampedValue((k, t), t) diff --git a/sdks/python/apache_beam/typehints/typecheck_test.py b/sdks/python/apache_beam/typehints/typecheck_test.py index 32307c5202e9..ce16e8e2f5d6 100644 --- a/sdks/python/apache_beam/typehints/typecheck_test.py +++ b/sdks/python/apache_beam/typehints/typecheck_test.py @@ -84,7 +84,8 @@ def process(self, element: int, *args, **kwargs) -> int: class RuntimeTypeCheckTest(unittest.TestCase): def setUp(self): - self.p = TestPipeline( + # Use FnApiRunner since it guarantees all lifecycle methods will be called. + self.p = TestPipeline('FnApiRunner', options=PipelineOptions( runtime_type_check=True, performance_runtime_type_check=False)) diff --git a/sdks/python/apache_beam/yaml/readme_test.py b/sdks/python/apache_beam/yaml/readme_test.py index ce9d6269e545..c05039cb703e 100644 --- a/sdks/python/apache_beam/yaml/readme_test.py +++ b/sdks/python/apache_beam/yaml/readme_test.py @@ -260,7 +260,9 @@ def test(self): with mock.patch( 'apache_beam.yaml.yaml_provider.ExternalProvider.create_transform', lambda *args, **kwargs: _Fakes.SomeTransform(*args, **kwargs)): - p = beam.Pipeline(options=PipelineOptions(**options)) + # Uses the FnApiRunner to ensure errors are mocked/passed through + # correctly + p = beam.Pipeline('FnApiRunner', options=PipelineOptions(**options)) yaml_transform.expand_pipeline( p, modified_yaml, yaml_provider.merge_providers([test_provider])) if test_type == 'BUILD': From b83ef8d41330b11128b37c61ce228530132e6c38 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 9 Jun 2025 17:12:58 -0400 Subject: [PATCH 2/2] yapf --- sdks/python/apache_beam/typehints/typecheck_test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/typehints/typecheck_test.py b/sdks/python/apache_beam/typehints/typecheck_test.py index ce16e8e2f5d6..bafb21c3dc17 100644 --- a/sdks/python/apache_beam/typehints/typecheck_test.py +++ b/sdks/python/apache_beam/typehints/typecheck_test.py @@ -85,7 +85,8 @@ def process(self, element: int, *args, **kwargs) -> int: class RuntimeTypeCheckTest(unittest.TestCase): def setUp(self): # Use FnApiRunner since it guarantees all lifecycle methods will be called. - self.p = TestPipeline('FnApiRunner', + self.p = TestPipeline( + 'FnApiRunner', options=PipelineOptions( runtime_type_check=True, performance_runtime_type_check=False))