Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions sdks/python/apache_beam/dataframe/transforms_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ def create_animal_speed_input(p):
reshuffle=False)

def test_loc_filter(self):
with beam.Pipeline() as p:
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# monitoring_metrics property of the FnApiRunner which does not exist on
# other runners like Prism.
# https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
with beam.Pipeline('FnApiRunner') as p:
_ = (
self.create_animal_speed_input(p)
| transforms.DataframeTransform(lambda df: df[df.Speed > 10]))
Expand All @@ -383,7 +387,11 @@ def set_column(df, name, s):
df[name] = s
return df

with beam.Pipeline() as p:
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# monitoring_metrics property of the FnApiRunner which does not exist on
# other runners like Prism.
# https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
with beam.Pipeline('FnApiRunner') as p:
_ = (
self.create_animal_speed_input(p)
| transforms.DataframeTransform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@


def pardo_dofn_methods(test=None):
# Portable runners do not guarantee that teardown will be executed, so we
# use FnApiRunner instead of prism.
runner = 'FnApiRunner'
# [START pardo_dofn_methods]
import apache_beam as beam

Expand All @@ -60,9 +63,13 @@ def finish_bundle(self):
)

def teardown(self):
# Teardown is best effort and not guaranteed to be executed by all
# runners in all cases (for example, it may be skipped if the pipeline
# can otherwise complete). It should be used for best effort resource
# cleanup.
print('teardown')

with beam.Pipeline() as pipeline:
with beam.Pipeline(runner) as pipeline:
results = (
pipeline
| 'Create inputs' >> beam.Create(['🍓', '🥕', '🍆', '🍅', '🥔'])
Expand Down
15 changes: 12 additions & 3 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,10 @@ def test_load_job_id_used(self):
validate=False,
load_job_project_id='loadJobProject')

with TestPipeline('DirectRunner') as p:
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# lineage metrics which Prism doesn't seem to handle correctly. Defaulting
# to FnApiRunner instead.
with TestPipeline('FnApiRunner') as p:
outputs = p | beam.Create(_ELEMENTS) | transform
jobs = outputs[bqfl.BigQueryBatchFileLoads.DESTINATION_JOBID_PAIRS] \
| "GetJobs" >> beam.Map(lambda x: x[1])
Expand Down Expand Up @@ -571,7 +574,10 @@ def test_load_job_id_use_for_copy_job(self):
bq_client.jobs.Insert.return_value = result_job
bq_client.tables.Delete.return_value = None

with TestPipeline('DirectRunner') as p:
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# lineage metrics which Prism doesn't seem to handle correctly. Defaulting
# to FnApiRunner instead.
with TestPipeline('FnApiRunner') as p:
outputs = (
p
| beam.Create(_ELEMENTS, reshuffle=False)
Expand Down Expand Up @@ -709,7 +715,10 @@ def test_multiple_partition_files(self):
bq_client.jobs.Insert.return_value = result_job
bq_client.tables.Delete.return_value = None

with TestPipeline('DirectRunner') as p:
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# lineage metrics which Prism doesn't seem to handle correctly. Defaulting
# to FnApiRunner instead.
with TestPipeline('FnApiRunner') as p:
outputs = (
p
| beam.Create(_ELEMENTS, reshuffle=False)
Expand Down
27 changes: 20 additions & 7 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,13 +446,14 @@ def test_temp_dataset_is_configurable(
])
def test_create_temp_dataset_exception(self, exception_type, error_message):

# Uses the FnApiRunner to ensure errors are mocked/passed through correctly
with mock.patch.object(bigquery_v2_client.BigqueryV2.JobsService,
'Insert'),\
mock.patch.object(BigQueryWrapper,
'get_or_create_dataset') as mock_insert, \
mock.patch('time.sleep'), \
self.assertRaises(Exception) as exc,\
beam.Pipeline() as p:
beam.Pipeline('FnApiRunner') as p:

mock_insert.side_effect = exception_type(error_message)

Expand All @@ -462,7 +463,7 @@ def test_create_temp_dataset_exception(self, exception_type, error_message):
gcs_location='gs://temp_location')

mock_insert.assert_called()
self.assertIn(error_message, exc.exception.args[0])
self.assertIn(error_message, str(exc.exception))

@parameterized.expand([
# read without exception
Expand Down Expand Up @@ -515,6 +516,9 @@ class DummySchema:
numBytes = 5
schema = DummySchema()

# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# lineage metrics which Prism doesn't seem to handle correctly. Defaulting
# to FnApiRunner instead.
with mock.patch('time.sleep'), \
mock.patch.object(bigquery_v2_client.BigqueryV2.TablesService,
'Get') as mock_get_table, \
Expand All @@ -526,7 +530,7 @@ class DummySchema:
'match'), \
mock.patch.object(FileSystems,
'delete'), \
beam.Pipeline() as p:
beam.Pipeline('FnApiRunner') as p:
call_counter = 0

def store_callback(unused_request):
Expand Down Expand Up @@ -676,6 +680,9 @@ def store_callback(unused_request):
])
def test_query_job_exception(self, exception_type, error_message):

# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# mocking which prism doesn't seem to fully handle correctly (mocks get
# mixed between test runs). Pinning to FnApiRunner for now.
with mock.patch.object(beam.io.gcp.bigquery._CustomBigQuerySource,
'estimate_size') as mock_estimate,\
mock.patch.object(BigQueryWrapper,
Expand All @@ -685,7 +692,7 @@ def test_query_job_exception(self, exception_type, error_message):
mock.patch.object(bigquery_v2_client.BigqueryV2.DatasetsService, 'Get'), \
mock.patch('time.sleep'), \
self.assertRaises(Exception) as exc, \
beam.Pipeline() as p:
beam.Pipeline('FnApiRunner') as p:

mock_estimate.return_value = None
mock_query_location.return_value = None
Expand Down Expand Up @@ -727,14 +734,17 @@ def test_read_export_exception(self, exception_type, error_message):
gcs_location="gs://temp_location")

mock_query_job.assert_called()
self.assertIn(error_message, exc.exception.args[0])
self.assertIn(error_message, str(exc.exception))

def test_read_direct_lineage(self):
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# lineage metrics which Prism doesn't seem to handle correctly. Defaulting
# to FnApiRunner instead.
with mock.patch.object(bigquery_tools.BigQueryWrapper,
'_bigquery_client'),\
mock.patch.object(bq_storage.BigQueryReadClient,
'create_read_session'),\
beam.Pipeline() as p:
beam.Pipeline('FnApiRunner') as p:

_ = p | ReadFromBigQuery(
method=ReadFromBigQuery.Method.DIRECT_READ,
Expand All @@ -744,8 +754,11 @@ def test_read_direct_lineage(self):
set(["bigquery:project.dataset.table"]))

def test_read_all_lineage(self):
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# lineage metrics which Prism doesn't seem to handle correctly. Defaulting
# to FnApiRunner instead.
with mock.patch.object(_BigQueryReadSplit, '_export_files') as export, \
beam.Pipeline() as p:
beam.Pipeline('FnApiRunner') as p:

export.return_value = (None, [])

Expand Down
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigtableio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,12 @@ def setUp(self):

def test_write(self):
direct_rows = [self.generate_row(i) for i in range(5)]
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# lineage metrics which Prism doesn't seem to handle correctly. Defaulting
# to FnApiRunner instead.
runner = 'FnApiRunner'
with patch.object(MutationsBatcher, 'mutate'), \
patch.object(MutationsBatcher, 'close'), TestPipeline() as p:
patch.object(MutationsBatcher, 'close'), TestPipeline(runner) as p:
_ = p | beam.Create(direct_rows) | bigtableio.WriteToBigTable(
self._PROJECT_ID, self._INSTANCE_ID, self._TABLE_ID)
self.assertSetEqual(
Expand Down
18 changes: 15 additions & 3 deletions sdks/python/apache_beam/io/gcp/experimental/spannerio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,11 @@ def test_spanner_write(self, mock_batch_snapshot_class, mock_batch_checkout):
[('1234', "mutations-inset-1233-updated")]),
]

p = TestPipeline()
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# metrics filtering which doesn't work on Prism yet because Prism renames
# steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
# https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
p = TestPipeline('FnApiRunner')
_ = (
p
| beam.Create(mutations)
Expand All @@ -475,7 +479,11 @@ def test_spanner_bundles_size(
WriteMutation.insert(
"roles", ("key", "rolename"), [('1234', "mutations-inset-1234")])
] * 50
p = TestPipeline()
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# metrics filtering which doesn't work on Prism yet because Prism renames
# steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
# https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
p = TestPipeline('FnApiRunner')
_ = (
p
| beam.Create(mutations)
Expand Down Expand Up @@ -514,7 +522,11 @@ def test_spanner_write_mutation_groups(
MutationGroup([WriteMutation.delete("roles", ks)])
]

p = TestPipeline()
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# metrics filtering which doesn't work on Prism yet because Prism renames
# steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
# https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
p = TestPipeline('FnApiRunner')
_ = (
p
| beam.Create(mutation_groups)
Expand Down
12 changes: 12 additions & 0 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,10 @@ def test_read_from_pubsub_no_overwrite(self, unused_mock):
]
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# lineage metrics which Prism doesn't seem to handle correctly. Defaulting
# to FnApiRunner instead.
options.view_as(StandardOptions).runner = 'FnApiRunner'
for test_case in ('topic', 'subscription'):
with TestPipeline(options=options) as p:
# Direct runner currently overwrites the whole ReadFromPubSub transform.
Expand Down Expand Up @@ -1009,6 +1013,10 @@ def test_write_to_pubsub_no_overwrite(self, unused_mock):

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# lineage metrics which Prism doesn't seem to handle correctly. Defaulting
# to FnApiRunner instead.
options.view_as(StandardOptions).runner = 'FnApiRunner'
with TestPipeline(options=options) as p:
pcoll = p | Create(payloads)
WriteToPubSub(
Expand All @@ -1025,6 +1033,10 @@ def test_write_to_pubsub_with_attributes_no_overwrite(self, unused_mock):

options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# lineage metrics which Prism doesn't seem to handle correctly. Defaulting
# to FnApiRunner instead.
options.view_as(StandardOptions).runner = 'FnApiRunner'
with TestPipeline(options=options) as p:
pcoll = p | Create(payloads)
# Avoid direct runner overwrites WriteToPubSub
Expand Down
12 changes: 10 additions & 2 deletions sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ def common_project_path(self, *args):
return 'test'

with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
p = TestPipeline()
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# metrics filtering which doesn't work on Prism yet because Prism renames
# steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
# https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
p = TestPipeline('FnApiRunner')
config = {
"deidentify_config": {
"info_type_transformations": {
Expand Down Expand Up @@ -125,7 +129,11 @@ def common_project_path(self, *args):
return 'test'

with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
p = TestPipeline()
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# metrics filtering which doesn't work on Prism yet because Prism renames
# steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
# https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
p = TestPipeline('FnApiRunner')
config = {"inspect_config": {"info_types": [{"name": "EMAIL_ADDRESS"}]}}
# pylint: disable=expression-not-assigned
(
Expand Down
26 changes: 21 additions & 5 deletions sdks/python/apache_beam/ml/inference/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,11 @@ def test_unexpected_inference_args_passed(self):

def test_increment_failed_batches_counter(self):
with self.assertRaises(ValueError):
with TestPipeline() as pipeline:
# TODO(https://github.com/apache/beam/issues/34549): This test relies on
# metrics filtering which doesn't work on Prism yet because Prism renames
# steps (e.g. "Do" becomes "ref_AppliedPTransform_Do_7").
# https://github.com/apache/beam/blob/5f9cd73b7c9a2f37f83971ace3a399d633201dd1/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py#L1590
with TestPipeline('FnApiRunner') as pipeline:
examples = [7]
pcoll = pipeline | 'start' >> beam.Create(examples)
_ = pcoll | base.RunInference(FakeModelHandlerExpectedInferenceArgs())
Expand Down Expand Up @@ -1226,7 +1230,10 @@ def process(self, element):
for e in element:
yield e

with TestPipeline() as pipeline:
# This test relies on poorly defined side input semantics which vary
# across runners (including prism). Pinning to FnApiRunner which
# consistently guarantees output.
with TestPipeline('FnApiRunner') as pipeline:
side_input = (
pipeline
|
Expand Down Expand Up @@ -1324,7 +1331,10 @@ def process(self, element):
for e in element:
yield e

with TestPipeline() as pipeline:
# This test relies on poorly defined side input semantics which vary
# across runners (including prism). Pinning to FnApiRunner which
# consistently guarantees output.
with TestPipeline('FnApiRunner') as pipeline:
side_input = (
pipeline
|
Expand Down Expand Up @@ -1425,7 +1435,10 @@ def process(self, element):
for e in element:
yield e

with TestPipeline() as pipeline:
# This test relies on poorly defined side input semantics which vary
# across runners (including prism). Pinning to FnApiRunner which
# consistently guarantees output.
with TestPipeline('FnApiRunner') as pipeline:
side_input = (
pipeline
|
Expand Down Expand Up @@ -1500,7 +1513,10 @@ def process(self, element):
for e in element:
yield e

with TestPipeline() as pipeline:
# This test relies on poorly defined side input semantics which vary
# across runners (including prism). Pinning to FnApiRunner which
# consistently guarantees output.
with TestPipeline('FnApiRunner') as pipeline:
side_input = (
pipeline
|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ def test_predict_numpy_with_batch_size(self):
model = _create_mult2_model()
model_path = os.path.join(self.tmpdir, f'mult2_{uuid.uuid4()}.keras')
tf.keras.models.save_model(model, model_path)
with TestPipeline() as pipeline:
# TODO(https://github.com/apache/beam/issues/34549): This test relies on a
# runner producing a single bundle or bundles of even size, neither of
# which prism seems to do here
with TestPipeline('FnApiRunner') as pipeline:

def fake_batching_inference_fn(
model: tf.Module,
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,7 @@ class StandardOptions(PipelineOptions):
'apache_beam.runners.direct.direct_runner.SwitchingDirectRunner',
'apache_beam.runners.interactive.interactive_runner.InteractiveRunner',
'apache_beam.runners.portability.flink_runner.FlinkRunner',
'apache_beam.runners.portability.fn_api_runner.FnApiRunner',
'apache_beam.runners.portability.portable_runner.PortableRunner',
'apache_beam.runners.portability.prism_runner.PrismRunner',
'apache_beam.runners.portability.spark_runner.SparkRunner',
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,9 @@ def test_incompatible_submission_and_runtime_envs_fail_pipeline(self):
RuntimeError,
'Pipeline construction environment and pipeline runtime '
'environment are not compatible.'):
with TestPipeline() as p:
# TODO(https://github.com/apache/beam/issues/34549): Prism doesn't
# pass through capabilities as part of the ProcessBundleDescriptor.
with TestPipeline('FnApiRunner') as p:
_ = p | Create([None])


Expand Down
Loading
Loading