From 43ed75a956b1f8c0d331f1801d45b1804df9600f Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Fri, 2 Dec 2022 22:30:16 -0800 Subject: [PATCH] Disallow using the JRH with Python streaming pipelines This cleans-up a lot of logic related to flag setting with respect to streaming and runner v2 experiments There is also some dead code removed for configurations not possible any more (e.g. side input override, JRH Create, ...) --- ...ob_PerformanceTests_PubsubIO_Python.groovy | 6 +- ...mit_Python_ValidatesRunner_Dataflow.groovy | 1 - CHANGES.md | 34 ++ .../apache_beam/io/gcp/pubsub_io_perf_test.py | 1 - .../apache_beam/options/pipeline_options.py | 7 - .../runners/dataflow/dataflow_runner.py | 293 ++++++++---------- .../runners/dataflow/dataflow_runner_test.py | 208 ++++++++----- .../runners/dataflow/internal/apiclient.py | 84 +---- .../dataflow/internal/apiclient_test.py | 125 +------- .../runners/dataflow/ptransform_overrides.py | 60 +--- .../apache_beam/runners/portability/stager.py | 8 - .../testing/load_tests/build.gradle | 8 - sdks/python/scripts/run_integration_test.sh | 12 - .../python/test-suites/dataflow/common.gradle | 46 +-- 14 files changed, 308 insertions(+), 585 deletions(-) diff --git a/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy index 262eda3fd909..c488e5989dea 100644 --- a/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy +++ b/.test-infra/jenkins/job_PerformanceTests_PubsubIO_Python.groovy @@ -24,10 +24,6 @@ import static java.util.UUID.randomUUID def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC')) -def final JOB_SPECIFIC_SWITCHES = [ - '-PwithDataflowWorkerJar="true"' -] - def psio_test = [ title : 'PubsubIO Write Performance Test Python 2GB', test : 'apache_beam.io.gcp.pubsub_io_perf_test', @@ -58,7 +54,7 @@ def executeJob = { scope, testConfig -> commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 240) loadTestsBuilder.loadTest(scope, testConfig.title, testConfig.runner, - CommonTestProperties.SDK.PYTHON, testConfig.pipelineOptions, testConfig.test, JOB_SPECIFIC_SWITCHES) + CommonTestProperties.SDK.PYTHON, testConfig.pipelineOptions, testConfig.test) } PhraseTriggeringPostCommitBuilder.postCommitJob( diff --git a/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow.groovy index 82e1ed173622..574d5493af7e 100644 --- a/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow.groovy +++ b/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow.groovy @@ -37,7 +37,6 @@ PostcommitJobBuilder.postCommitJob('beam_PostCommit_Py_VR_Dataflow', 'Run Python gradle { rootBuildScriptDir(commonJobProperties.checkoutDir) tasks(':sdks:python:test-suites:dataflow:validatesRunnerBatchTests') - tasks(':sdks:python:test-suites:dataflow:validatesRunnerStreamingTests') switches('-PdisableRunnerV2') commonJobProperties.setGradleSwitches(delegate) } diff --git a/CHANGES.md b/CHANGES.md index 47d170dd905d..f737862ca2f3 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -44,10 +44,44 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). + ## Known Issues * ([#X](https://github.com/apache/beam/issues/X)). --> +# [2.45.0] - Unreleased + +## Highlights + +* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)). +* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)). + +## I/Os + +* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). + +## New Features / Improvements + +* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). + +## Breaking Changes + +* Python streaming pipelines and portable Python batch pipelines on Dataflow are required to + use Runner V2. The `disable_runner_v2`, `disable_runner_v2_until_2023`, `disable_prime_runner_v2` + experiments will raise an error during pipeline construction. Note that non-portable Python + batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515)) + +## Deprecations + +* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)). + +## Bugfixes + +* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). + +## Known Issues + +* ([#X](https://github.com/apache/beam/issues/X)). # [2.44.0] - Unreleased diff --git a/sdks/python/apache_beam/io/gcp/pubsub_io_perf_test.py b/sdks/python/apache_beam/io/gcp/pubsub_io_perf_test.py index c932628b6dd4..a91b07018c8d 100644 --- a/sdks/python/apache_beam/io/gcp/pubsub_io_perf_test.py +++ b/sdks/python/apache_beam/io/gcp/pubsub_io_perf_test.py @@ -34,7 +34,6 @@ --publish_to_big_query= --metrics_dataset= --metrics_table= - --dataflow_worker_jar= --input_options='{ \"num_records\": \"key_size\": 1 diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 02d8b02c18ed..596dbfec405c 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -1079,13 +1079,6 @@ def _add_argparse_args(cls, parser): dest='min_cpu_platform', type=str, help='GCE minimum CPU platform. Default is determined by GCP.') - parser.add_argument( - '--dataflow_worker_jar', - dest='dataflow_worker_jar', - type=str, - help='Dataflow worker jar file. If specified, the jar file is staged ' - 'in GCS, then gets loaded by workers. End users usually ' - 'should not use this feature.') def validate(self, validator): errors = [] diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index aecbc29425c2..39d5e727bff1 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -105,7 +105,6 @@ class DataflowRunner(PipelineRunner): # TODO: Remove the apache_beam.pipeline dependency in CreatePTransformOverride from apache_beam.runners.dataflow.ptransform_overrides import CombineValuesPTransformOverride from apache_beam.runners.dataflow.ptransform_overrides import CreatePTransformOverride - from apache_beam.runners.dataflow.ptransform_overrides import JrhReadPTransformOverride from apache_beam.runners.dataflow.ptransform_overrides import ReadPTransformOverride from apache_beam.runners.dataflow.ptransform_overrides import NativeReadPTransformOverride @@ -116,10 +115,6 @@ class DataflowRunner(PipelineRunner): NativeReadPTransformOverride(), ] # type: List[PTransformOverride] - _JRH_PTRANSFORM_OVERRIDES = [ - JrhReadPTransformOverride(), - ] # type: List[PTransformOverride] - # These overrides should be applied after the proto representation of the # graph is created. _NON_PORTABLE_PTRANSFORM_OVERRIDES = [ @@ -138,7 +133,7 @@ def is_fnapi_compatible(self): return False def apply(self, transform, input, options): - self._maybe_add_unified_worker_missing_options(options) + _check_and_add_missing_options(options) return super().apply(transform, input, options) def _get_unique_step_name(self): @@ -267,10 +262,7 @@ def _only_element(iterable): return element @staticmethod - def side_input_visitor( - use_unified_worker=False, - use_fn_api=False, - deterministic_key_coders=True): + def side_input_visitor(is_runner_v2=False, deterministic_key_coders=True): # Imported here to avoid circular dependencies. # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.pipeline import PipelineVisitor @@ -285,37 +277,16 @@ class SideInputVisitor(PipelineVisitor): def visit_transform(self, transform_node): if isinstance(transform_node.transform, ParDo): new_side_inputs = [] - for ix, side_input in enumerate(transform_node.side_inputs): + for side_input in transform_node.side_inputs: access_pattern = side_input._side_input_data().access_pattern if access_pattern == common_urns.side_inputs.ITERABLE.urn: - if use_unified_worker or not use_fn_api: - # TODO(https://github.com/apache/beam/issues/20043): Stop - # patching up the access pattern to appease Dataflow when - # using the UW and hardcode the output type to be Any since - # the Dataflow JSON and pipeline proto can differ in coders - # which leads to encoding/decoding issues within the runner. - side_input.pvalue.element_type = typehints.Any - new_side_input = _DataflowIterableSideInput(side_input) - else: - # Add a map to ('', value) as Dataflow currently only handles - # keyed side inputs when using the JRH. - pipeline = side_input.pvalue.pipeline - new_side_input = _DataflowIterableAsMultimapSideInput( - side_input) - new_side_input.pvalue = beam.pvalue.PCollection( - pipeline, - element_type=typehints.KV[bytes, - side_input.pvalue.element_type], - is_bounded=side_input.pvalue.is_bounded) - parent = transform_node.parent or pipeline._root_transform() - map_to_void_key = beam.pipeline.AppliedPTransform( - parent, - beam.Map(lambda x: (b'', x)), - transform_node.full_label + '/MapToVoidKey%s' % ix, - {'input': side_input.pvalue}) - new_side_input.pvalue.producer = map_to_void_key - map_to_void_key.add_output(new_side_input.pvalue, None) - parent.add_part(map_to_void_key) + # TODO(https://github.com/apache/beam/issues/20043): Stop + # patching up the access pattern to appease Dataflow when + # using the UW and hardcode the output type to be Any since + # the Dataflow JSON and pipeline proto can differ in coders + # which leads to encoding/decoding issues within the runner. + side_input.pvalue.element_type = typehints.Any + new_side_input = _DataflowIterableSideInput(side_input) elif access_pattern == common_urns.side_inputs.MULTIMAP.urn: # Ensure the input coder is a KV coder and patch up the # access pattern to appease Dataflow. @@ -329,7 +300,7 @@ def visit_transform(self, transform_node): 'Unsupported access pattern for %r: %r' % (transform_node.full_label, access_pattern)) new_side_inputs.append(new_side_input) - if use_fn_api: + if is_runner_v2: transform_node.side_inputs = new_side_inputs transform_node.transform.side_inputs = new_side_inputs @@ -416,44 +387,34 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): 'Google Cloud Dataflow runner not available, ' 'please install apache_beam[gcp]') - debug_options = options.view_as(DebugOptions) if pipeline_proto or pipeline.contains_external_transforms: - if debug_options.lookup_experiment('disable_runner_v2'): + if _is_runner_v2_disabled(options): raise ValueError( 'This pipeline contains cross language transforms, ' - 'which require runner v2.') - if not apiclient._use_unified_worker(options): + 'which requires Runner V2.') + if not _is_runner_v2(options): _LOGGER.info( - 'Automatically enabling Dataflow Runner v2 since the ' + 'Automatically enabling Dataflow Runner V2 since the ' 'pipeline used cross-language transforms.') - # This has to be done before any Fn API specific setup. - debug_options.add_experiment("use_runner_v2") - # Dataflow multi-language pipelines require portable job submission. - if not debug_options.lookup_experiment('use_portable_job_submission'): - debug_options.add_experiment("use_portable_job_submission") - - self._maybe_add_unified_worker_missing_options(options) - use_fnapi = apiclient._use_fnapi(options) - - if not use_fnapi: + is_runner_v2 = _is_runner_v2(options) + if not is_runner_v2: self._check_for_unsupported_features_on_non_portable_worker(pipeline) # Convert all side inputs into a form acceptable to Dataflow. if pipeline: pipeline.visit( self.side_input_visitor( - apiclient._use_unified_worker(options), - apiclient._use_fnapi(options), + _is_runner_v2(options), deterministic_key_coders=not options.view_as( TypeOptions).allow_non_deterministic_key_coders)) - # Performing configured PTransform overrides. Note that this is currently + # Performing configured PTransform overrides. Note that this is currently # done before Runner API serialization, since the new proto needs to # contain any added PTransforms. pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES) - if debug_options.lookup_experiment('use_legacy_bq_sink'): + if options.view_as(DebugOptions).lookup_experiment('use_legacy_bq_sink'): warnings.warn( "Native sinks no longer implemented; " "ignoring use_legacy_bq_sink.") @@ -462,9 +423,6 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): pipeline.replace_all( [GroupIntoBatchesWithShardedKeyPTransformOverride(self, options)]) - if use_fnapi and not apiclient._use_unified_worker(options): - pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES) - if pipeline_proto: self.proto_pipeline = pipeline_proto @@ -480,7 +438,7 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): self._default_environment.container_image) else: artifacts = environments.python_sdk_dependencies(options) - if artifacts and apiclient._use_fnapi(options): + if artifacts and _is_runner_v2(options): _LOGGER.info( "Pipeline has additional dependencies to be installed " "in SDK worker container, consider using the SDK " @@ -532,7 +490,7 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): known_runner_urns=frozenset(), partial=True) - if not use_fnapi: + if not is_runner_v2: # Performing configured PTransform overrides which should not be reflected # in the proto representation of the graph. pipeline.replace_all(DataflowRunner._NON_PORTABLE_PTRANSFORM_OVERRIDES) @@ -552,36 +510,6 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): debug_options.add_experiment( 'min_cpu_platform=' + worker_options.min_cpu_platform) - # Elevate "enable_streaming_engine" to pipeline option, but using the - # existing experiment. - google_cloud_options = options.view_as(GoogleCloudOptions) - if google_cloud_options.enable_streaming_engine: - debug_options.add_experiment("enable_windmill_service") - debug_options.add_experiment("enable_streaming_engine") - elif (apiclient._use_fnapi(options) and - apiclient._use_unified_worker(options) and - options.view_as(StandardOptions).streaming): - debug_options.add_experiment("enable_windmill_service") - debug_options.add_experiment("enable_streaming_engine") - else: - if (debug_options.lookup_experiment("enable_windmill_service") or - debug_options.lookup_experiment("enable_streaming_engine")): - raise ValueError( - """Streaming engine both disabled and enabled: - --enable_streaming_engine flag is not set, but - enable_windmill_service - and/or enable_streaming_engine experiments are present. - It is recommended you only set the --enable_streaming_engine flag.""") - - dataflow_worker_jar = getattr(worker_options, 'dataflow_worker_jar', None) - if dataflow_worker_jar is not None: - if not apiclient._use_fnapi(options): - _LOGGER.warning( - 'Typical end users should not use this worker jar feature. ' - 'It can only be used when FnAPI is enabled.') - else: - debug_options.add_experiment('use_staged_dataflow_worker_jar') - self.job = apiclient.Job(options, self.proto_pipeline) # TODO: Consider skipping these for all use_portable_job_submission jobs. @@ -618,27 +546,6 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): result.metric_results = self._metrics return result - def _maybe_add_unified_worker_missing_options(self, options): - debug_options = options.view_as(DebugOptions) - # Streaming is always portable, default to runner v2. - if options.view_as(StandardOptions).streaming: - if debug_options.lookup_experiment('disable_runner_v2_until_2023'): - debug_options.add_experiment('disable_runner_v2') - elif debug_options.lookup_experiment('disable_runner_v2'): - raise ValueError( - 'disable_runner_v2 no longer supported for Beam Python %s, please ' - 'use disable_runner_v2_until_2023' % beam.version.__version__) - else: - debug_options.add_experiment('beam_fn_api') - debug_options.add_experiment('use_runner_v2') - debug_options.add_experiment('use_portable_job_submission') - # set default beam_fn_api experiment if use unified - # worker experiment flag exists, no-op otherwise. - from apache_beam.runners.dataflow.internal import apiclient - if apiclient._use_unified_worker(options): - if not debug_options.lookup_experiment('beam_fn_api'): - debug_options.add_experiment('beam_fn_api') - def _get_typehint_based_encoding(self, typehint, window_coder): """Returns an encoding based on a typehint object.""" return self._get_cloud_encoding( @@ -795,36 +702,21 @@ def _add_singleton_step( return step def run_Impulse(self, transform_node, options): - standard_options = options.view_as(StandardOptions) - debug_options = options.view_as(DebugOptions) - use_fn_api = ( - debug_options.experiments and - 'beam_fn_api' in debug_options.experiments) - use_streaming_engine = ( - debug_options.experiments and - 'enable_streaming_engine' in debug_options.experiments and - 'enable_windmill_service' in debug_options.experiments) - step = self._add_step( TransformNames.READ, transform_node.full_label, transform_node) - if (standard_options.streaming and - (not use_fn_api or not use_streaming_engine)): - step.add_property(PropertyNames.FORMAT, 'pubsub') - step.add_property(PropertyNames.PUBSUB_SUBSCRIPTION, '_starting_signal/') + step.add_property(PropertyNames.FORMAT, 'impulse') + encoded_impulse_element = coders.WindowedValueCoder( + coders.BytesCoder(), + coders.coders.GlobalWindowCoder()).get_impl().encode_nested( + window.GlobalWindows.windowed_value(b'')) + if _is_runner_v2(options): + encoded_impulse_as_str = self.byte_array_to_json_string( + encoded_impulse_element) else: - step.add_property(PropertyNames.FORMAT, 'impulse') - encoded_impulse_element = coders.WindowedValueCoder( - coders.BytesCoder(), - coders.coders.GlobalWindowCoder()).get_impl().encode_nested( - window.GlobalWindows.windowed_value(b'')) - - if use_fn_api: - encoded_impulse_as_str = self.byte_array_to_json_string( - encoded_impulse_element) - else: - encoded_impulse_as_str = base64.b64encode( - encoded_impulse_element).decode('ascii') - step.add_property(PropertyNames.IMPULSE_ELEMENT, encoded_impulse_as_str) + encoded_impulse_as_str = base64.b64encode(encoded_impulse_element).decode( + 'ascii') + + step.add_property(PropertyNames.IMPULSE_ELEMENT, encoded_impulse_as_str) step.encoding = self._get_encoded_output_coder(transform_node) step.add_property( @@ -987,13 +879,9 @@ def run_ParDo(self, transform_node, options): ('/{}'.format(transform_name) if transform_node.side_inputs else ''), transform_node, transform_node.transform.output_tags) - # Import here to avoid adding the dependency for local running scenarios. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.runners.dataflow.internal import apiclient transform_proto = self.proto_context.transforms.get_proto(transform_node) transform_id = self.proto_context.transforms.get_id(transform_node) - use_fnapi = apiclient._use_fnapi(options) - use_unified_worker = apiclient._use_unified_worker(options) + is_runner_v2 = _is_runner_v2(options) # Patch side input ids to be unique across a given pipeline. if (label_renames and transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn): @@ -1015,10 +903,8 @@ def run_ParDo(self, transform_node, options): self.proto_pipeline.components.transforms[transform_id].CopyFrom( transform_proto)) # The data transmitted in SERIALIZED_FN is different depending on whether - # this is a fnapi pipeline or not. - if (use_fnapi and - (transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn or - use_unified_worker)): + # this is a runner v2 pipeline or not. + if is_runner_v2: serialized_data = transform_id else: serialized_data = pickler.dumps( @@ -1080,7 +966,6 @@ def run_ParDo(self, transform_node, options): step.add_property(PropertyNames.OUTPUT_INFO, outputs) # Add the restriction encoding if we are a splittable DoFn - # and are using the Fn API on the unified worker. restriction_coder = transform.get_restriction_coder() if restriction_coder: step.add_property( @@ -1128,10 +1013,8 @@ def run_CombineValuesReplacement(self, transform_node, options): transform_id = self.proto_context.transforms.get_id(transform_node.parent) # The data transmitted in SERIALIZED_FN is different depending on whether - # this is a fnapi pipeline or not. - from apache_beam.runners.dataflow.internal import apiclient - use_fnapi = apiclient._use_fnapi(options) - if use_fnapi: + # this is a runner v2 pipeline or not. + if _is_runner_v2(options): # Fnapi pipelines send the transform ID of the CombineValues transform's # parent composite because Dataflow expects the ID of a CombinePerKey # transform. @@ -1433,18 +1316,92 @@ def _side_input_data(self): return self._data -class _DataflowIterableAsMultimapSideInput(_DataflowSideInput): - """Wraps an iterable side input as dataflow-compatible side input.""" - def __init__(self, side_input): - # pylint: disable=protected-access - side_input_data = side_input._side_input_data() - assert ( - side_input_data.access_pattern == common_urns.side_inputs.ITERABLE.urn) - iterable_view_fn = side_input_data.view_fn - self._data = beam.pvalue.SideInputData( - common_urns.side_inputs.MULTIMAP.urn, - side_input_data.window_mapping_fn, - lambda multimap: iterable_view_fn(multimap[b''])) +def _check_and_add_missing_options(options): + # Type: (PipelineOptions) -> None + + """Validates and adds missing pipeline options depending on options set. + + :param options: PipelineOptions for this pipeline. + """ + def _add_runner_v2_missing_options(options): + debug_options = options.view_as(DebugOptions) + debug_options.add_experiment('beam_fn_api') + debug_options.add_experiment('use_unified_worker') + debug_options.add_experiment('use_runner_v2') + debug_options.add_experiment('use_portable_job_submission') + + debug_options = options.view_as(DebugOptions) + dataflow_service_options = options.view_as( + GoogleCloudOptions).dataflow_service_options or [] + options.view_as( + GoogleCloudOptions).dataflow_service_options = dataflow_service_options + + # Ensure that prime is specified as an experiment if specified as a dataflow + # service option + if 'enable_prime' in dataflow_service_options: + debug_options.add_experiment('enable_prime') + elif debug_options.lookup_experiment('enable_prime'): + dataflow_service_options.append('enable_prime') + + # Streaming only supports using runner v2 (aka unified worker). + # Runner v2 only supports using streaming engine (aka windmill service) + if options.view_as(StandardOptions).streaming: + google_cloud_options = options.view_as(GoogleCloudOptions) + if _is_runner_v2_disabled(options): + raise ValueError( + 'Disabling Runner V2 no longer supported for streaming pipeline ' + 'using Beam Python %s.' % beam.version.__version__) + + if (not google_cloud_options.enable_streaming_engine and + (debug_options.lookup_experiment("enable_windmill_service") or + debug_options.lookup_experiment("enable_streaming_engine"))): + raise ValueError( + """Streaming engine both disabled and enabled: + --enable_streaming_engine flag is not set, but + enable_windmill_service and/or enable_streaming_engine experiments + are present. It is recommended you only set the + --enable_streaming_engine flag.""") + + # Ensure that if we detected a streaming pipeline that streaming specific + # options and experiments. + options.view_as(StandardOptions).streaming = True + google_cloud_options.enable_streaming_engine = True + debug_options.add_experiment("enable_streaming_engine") + debug_options.add_experiment("enable_windmill_service") + _add_runner_v2_missing_options(debug_options) + elif (debug_options.lookup_experiment('enable_prime') or + debug_options.lookup_experiment('beam_fn_api') or + debug_options.lookup_experiment('use_unified_worker') or + debug_options.lookup_experiment('use_runner_v2') or + debug_options.lookup_experiment('use_portable_job_submission')): + if _is_runner_v2_disabled(options): + raise ValueError( + """Runner V2 both disabled and enabled: at least one of + ['enable_prime', 'beam_fn_api', 'use_unified_worker', 'use_runner_v2', + 'use_portable_job_submission'] is set and also one of + ['disable_runner_v2', 'disable_runner_v2_until_2023', + 'disable_prime_runner_v2'] is set.""") + _add_runner_v2_missing_options(debug_options) + + +def _is_runner_v2(options): + # Type: (PipelineOptions) -> bool + + """Returns true if runner v2 is enabled.""" + _check_and_add_missing_options(options) + return options.view_as(DebugOptions).lookup_experiment( + 'use_runner_v2', default=False) + + +def _is_runner_v2_disabled(options): + # Type: (PipelineOptions) -> bool + + """Returns true if runner v2 is disabled.""" + debug_options = options.view_as(DebugOptions) + return ( + debug_options.lookup_experiment('disable_runner_v2') or + debug_options.lookup_experiment('disable_runner_v2_until_2023') or + debug_options.lookup_experiment('disable_prime_runner_v2')) class _DataflowIterableSideInput(_DataflowSideInput): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index c91737d9d177..aca1def59422 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -22,6 +22,7 @@ import json import unittest from datetime import datetime +from itertools import product import mock from parameterized import param @@ -30,6 +31,7 @@ import apache_beam as beam import apache_beam.transforms as ptransform from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.pipeline import AppliedPTransform from apache_beam.pipeline import Pipeline @@ -44,6 +46,8 @@ from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException from apache_beam.runners.dataflow.dataflow_runner import PropertyNames +from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2 +from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2_disabled from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api from apache_beam.runners.runner import PipelineState from apache_beam.testing.extra_assertions import ExtraAssertionsMixin @@ -253,22 +257,6 @@ def test_remote_runner_translation(self): | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) - def test_streaming_create_translation(self): - remote_runner = DataflowRunner() - self.default_properties.append("--streaming") - self.default_properties.append("--experiments=disable_runner_v2_until_2023") - with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p: - p | ptransform.Create([1]) # pylint: disable=expression-not-assigned - job_dict = json.loads(str(remote_runner.job)) - self.assertEqual(len(job_dict[u'steps']), 3) - - self.assertEqual(job_dict[u'steps'][0][u'kind'], u'ParallelRead') - self.assertEqual( - job_dict[u'steps'][0][u'properties'][u'pubsub_subscription'], - '_starting_signal/') - self.assertEqual(job_dict[u'steps'][1][u'kind'], u'ParallelDo') - self.assertEqual(job_dict[u'steps'][2][u'kind'], u'ParallelDo') - def test_remote_runner_display_data(self): remote_runner = DataflowRunner() p = Pipeline( @@ -419,12 +407,14 @@ def test_side_input_visitor(self): beam.pvalue.AsMultiMap(pc)) applied_transform = AppliedPTransform(None, transform, "label", {'pc': pc}) DataflowRunner.side_input_visitor( - use_fn_api=True).visit_transform(applied_transform) + is_runner_v2=True).visit_transform(applied_transform) self.assertEqual(2, len(applied_transform.side_inputs)) - for side_input in applied_transform.side_inputs: - self.assertEqual( - common_urns.side_inputs.MULTIMAP.urn, - side_input._side_input_data().access_pattern) + self.assertEqual( + common_urns.side_inputs.ITERABLE.urn, + applied_transform.side_inputs[0]._side_input_data().access_pattern) + self.assertEqual( + common_urns.side_inputs.MULTIMAP.urn, + applied_transform.side_inputs[1]._side_input_data().access_pattern) def test_min_cpu_platform_flag_is_propagated_to_experiments(self): remote_runner = DataflowRunner() @@ -462,32 +452,6 @@ def test_upload_graph_experiment(self): remote_runner.job.options.view_as(DebugOptions).experiments) self.assertIn('upload_graph', experiments_for_job) - def test_dataflow_worker_jar_flag_non_fnapi_noop(self): - remote_runner = DataflowRunner() - self.default_properties.append('--experiment=some_other_experiment') - self.default_properties.append('--dataflow_worker_jar=test.jar') - - with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p: - p | ptransform.Create([1]) # pylint: disable=expression-not-assigned - - experiments_for_job = ( - remote_runner.job.options.view_as(DebugOptions).experiments) - self.assertIn('some_other_experiment', experiments_for_job) - self.assertNotIn('use_staged_dataflow_worker_jar', experiments_for_job) - - def test_dataflow_worker_jar_flag_adds_use_staged_worker_jar_experiment(self): - remote_runner = DataflowRunner() - self.default_properties.append('--experiment=beam_fn_api') - self.default_properties.append('--dataflow_worker_jar=test.jar') - - with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p: - p | ptransform.Create([1]) # pylint: disable=expression-not-assigned - - experiments_for_job = ( - remote_runner.job.options.view_as(DebugOptions).experiments) - self.assertIn('beam_fn_api', experiments_for_job) - self.assertIn('use_staged_dataflow_worker_jar', experiments_for_job) - def test_use_fastavro_experiment_is_not_added_when_use_avro_is_present(self): remote_runner = DataflowRunner() self.default_properties.append('--experiment=use_avro') @@ -714,40 +678,6 @@ def test_group_into_batches_translation_non_sharded(self): self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties) self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties) - def test_group_into_batches_translation_non_se(self): - with self.assertRaisesRegex( - ValueError, - 'Runner determined sharding not available in Dataflow for ' - 'GroupIntoBatches for non-Streaming-Engine jobs'): - _ = self._run_group_into_batches_and_get_step_properties( - True, ['--experiments=use_runner_v2']) - - def test_group_into_batches_translation_non_unified_worker(self): - # non-portable - with self.assertRaisesRegex( - ValueError, - 'Runner determined sharding not available in Dataflow for ' - 'GroupIntoBatches for jobs not using Runner V2'): - _ = self._run_group_into_batches_and_get_step_properties( - True, - [ - '--enable_streaming_engine', - '--experiments=disable_runner_v2_until_2023' - ]) - - # JRH - with self.assertRaisesRegex( - ValueError, - 'Runner determined sharding not available in Dataflow for ' - 'GroupIntoBatches for jobs not using Runner V2'): - _ = self._run_group_into_batches_and_get_step_properties( - True, - [ - '--enable_streaming_engine', - '--experiments=beam_fn_api', - '--experiments=disable_runner_v2' - ]) - def test_pack_combiners(self): class PackableCombines(beam.PTransform): def annotations(self): @@ -804,6 +734,122 @@ def test_resource_hints_translation(self, memory_hint): 'type%3Anvidia-tesla-k80%3Bcount%3A1%3Binstall-nvidia-drivers' }) + @parameterized.expand([ + ( + "%s_%s" % (enable_option, disable_option), + enable_option, + disable_option) + for (enable_option, + disable_option) in product([ + False, + 'enable_prime', + 'beam_fn_api', + 'use_unified_worker', + 'use_runner_v2', + 'use_portable_job_submission' + ], + [ + False, + 'disable_runner_v2', + 'disable_runner_v2_until_2023', + 'disable_prime_runner_v2' + ]) + ]) + def test_batch_is_runner_v2(self, name, enable_option, disable_option): + options = PipelineOptions( + (['--experiments=%s' % enable_option] if enable_option else []) + + (['--experiments=%s' % disable_option] if disable_option else [])) + if (enable_option and disable_option): + with self.assertRaisesRegex(ValueError, + 'Runner V2 both disabled and enabled'): + _is_runner_v2(options) + elif enable_option: + self.assertTrue(_is_runner_v2(options)) + self.assertFalse(_is_runner_v2_disabled(options)) + for expected in ['beam_fn_api', + 'use_unified_worker', + 'use_runner_v2', + 'use_portable_job_submission']: + self.assertTrue( + options.view_as(DebugOptions).lookup_experiment(expected, False)) + if enable_option == 'enable_prime': + self.assertIn( + 'enable_prime', + options.view_as(GoogleCloudOptions).dataflow_service_options) + elif disable_option: + self.assertFalse(_is_runner_v2(options)) + self.assertTrue(_is_runner_v2_disabled(options)) + else: + self.assertFalse(_is_runner_v2(options)) + + @parameterized.expand([ + ( + "%s_%s" % (enable_option, disable_option), + enable_option, + disable_option) + for (enable_option, + disable_option) in product([ + False, + 'enable_prime', + 'beam_fn_api', + 'use_unified_worker', + 'use_runner_v2', + 'use_portable_job_submission' + ], + [ + False, + 'disable_runner_v2', + 'disable_runner_v2_until_2023', + 'disable_prime_runner_v2' + ]) + ]) + def test_streaming_is_runner_v2(self, name, enable_option, disable_option): + options = PipelineOptions( + ['--streaming'] + + (['--experiments=%s' % enable_option] if enable_option else []) + + (['--experiments=%s' % disable_option] if disable_option else [])) + if disable_option: + with self.assertRaisesRegex( + ValueError, + 'Disabling Runner V2 no longer supported for streaming pipeline'): + _is_runner_v2(options) + else: + self.assertTrue(_is_runner_v2(options)) + for expected in ['beam_fn_api', + 'use_unified_worker', + 'use_runner_v2', + 'use_portable_job_submission', + 'enable_windmill_service', + 'enable_streaming_engine']: + self.assertTrue( + options.view_as(DebugOptions).lookup_experiment(expected, False)) + if enable_option == 'enable_prime': + self.assertIn( + 'enable_prime', + options.view_as(GoogleCloudOptions).dataflow_service_options) + + def test_dataflow_service_options_enable_prime_sets_runner_v2(self): + options = PipelineOptions(['--dataflow_service_options=enable_prime']) + self.assertTrue(_is_runner_v2(options)) + for expected in ['beam_fn_api', + 'use_unified_worker', + 'use_runner_v2', + 'use_portable_job_submission']: + self.assertTrue( + options.view_as(DebugOptions).lookup_experiment(expected, False)) + + options = PipelineOptions( + ['--streaming', '--dataflow_service_options=enable_prime']) + self.assertTrue(_is_runner_v2(options)) + for expected in ['beam_fn_api', + 'use_unified_worker', + 'use_runner_v2', + 'use_portable_job_submission', + 'enable_windmill_service', + 'enable_streaming_engine']: + self.assertTrue( + options.view_as(DebugOptions).lookup_experiment(expected, False)) + if __name__ == '__main__': unittest.main() diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 0528901363e4..9ac2ceb98ba9 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -150,6 +150,7 @@ def __init__( environment_version, proto_pipeline_staged_url, proto_pipeline=None): + from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2 self.standard_options = options.view_as(StandardOptions) self.google_cloud_options = options.view_as(GoogleCloudOptions) self.worker_options = options.view_as(WorkerOptions) @@ -189,7 +190,7 @@ def __init__( if self.standard_options.streaming: job_type = 'FNAPI_STREAMING' else: - if _use_fnapi(options): + if _is_runner_v2(options): job_type = 'FNAPI_BATCH' else: job_type = 'PYTHON_BATCH' @@ -203,17 +204,6 @@ def __init__( if job_type.startswith('FNAPI_'): self.debug_options.experiments = self.debug_options.experiments or [] - if self.debug_options.lookup_experiment( - 'runner_harness_container_image') or _use_unified_worker(options): - # Default image is not used if user provides a runner harness image. - # Default runner harness image is selected by the service for unified - # worker. - pass - else: - runner_harness_override = (get_runner_harness_container_image()) - if runner_harness_override: - self.debug_options.add_experiment( - 'runner_harness_container_image=' + runner_harness_override) debug_options_experiments = self.debug_options.experiments # Add use_multiple_sdk_containers flag if it's not already present. Do not # add the flag if 'no_use_multiple_sdk_containers' is present. @@ -305,7 +295,7 @@ def __init__( container_image.capabilities.append(capability) pool.sdkHarnessContainerImages.append(container_image) - if not _use_fnapi(options) or not pool.sdkHarnessContainerImages: + if not _is_runner_v2(options) or not pool.sdkHarnessContainerImages: pool.workerHarnessContainerImage = ( get_container_image_from_options(options)) elif len(pool.sdkHarnessContainerImages) == 1: @@ -553,7 +543,8 @@ def __init__(self, options, root_staging_location=None): self._root_staging_location = ( root_staging_location or self.google_cloud_options.staging_location) - if _use_fnapi(options): + from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2 + if _is_runner_v2(options): self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION else: self.environment_version = _LEGACY_ENVIRONMENT_MAJOR_VERSION @@ -762,10 +753,6 @@ def create_job(self, job): job.options.view_as(GoogleCloudOptions).template_location) if job.options.view_as(DebugOptions).lookup_experiment('upload_graph'): - # For Runner V2, also set portable job submission. - if _use_unified_worker(job.options): - job.options.view_as(DebugOptions).add_experiment( - 'use_portable_job_submission') self.stage_file( job.options.view_as(GoogleCloudOptions).staging_location, "dataflow_graph.json", @@ -1183,37 +1170,6 @@ def translate_value(value, metric_update_proto): metric_update_proto.integer = to_split_int(value) -def _use_fnapi(pipeline_options): - standard_options = pipeline_options.view_as(StandardOptions) - debug_options = pipeline_options.view_as(DebugOptions) - - return standard_options.streaming or ( - debug_options.experiments and 'beam_fn_api' in debug_options.experiments) - - -def _use_unified_worker(pipeline_options): - debug_options = pipeline_options.view_as(DebugOptions) - use_unified_worker_flag = 'use_unified_worker' - use_runner_v2_flag = 'use_runner_v2' - enable_prime_flag = 'enable_prime' - - if (debug_options.lookup_experiment(use_runner_v2_flag) and - not debug_options.lookup_experiment(use_unified_worker_flag)): - debug_options.add_experiment(use_unified_worker_flag) - - dataflow_service_options = pipeline_options.view_as( - GoogleCloudOptions).dataflow_service_options or [] - if ((debug_options.lookup_experiment(enable_prime_flag) or - enable_prime_flag in dataflow_service_options) and - not any([debug_options.lookup_experiment('disable_prime_runner_v2'), - debug_options.lookup_experiment('disable_runner_v2')])): - debug_options.add_experiment(use_runner_v2_flag) - debug_options.add_experiment(use_unified_worker_flag) - debug_options.add_experiment(enable_prime_flag) - - return debug_options.lookup_experiment(use_unified_worker_flag) - - def _get_container_image_tag(): base_version = pkg_resources.parse_version( beam_version.__version__).base_version @@ -1235,13 +1191,13 @@ def get_container_image_from_options(pipeline_options): Returns: str: Container image for remote execution. """ + from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2 worker_options = pipeline_options.view_as(WorkerOptions) if worker_options.sdk_container_image: return worker_options.sdk_container_image - use_fnapi = _use_fnapi(pipeline_options) # TODO(tvalentyn): Use enumerated type instead of strings for job types. - if use_fnapi: + if _is_runner_v2(pipeline_options): fnapi_suffix = '-fnapi' else: fnapi_suffix = '' @@ -1252,22 +1208,22 @@ def get_container_image_from_options(pipeline_options): version_suffix=version_suffix, fnapi_suffix=fnapi_suffix) - image_tag = _get_required_container_version(use_fnapi) + image_tag = _get_required_container_version(_is_runner_v2(pipeline_options)) return image_name + ':' + image_tag -def _get_required_container_version(use_fnapi): +def _get_required_container_version(is_runner_v2): """For internal use only; no backwards-compatibility guarantees. Args: - use_fnapi (bool): True, if pipeline is using FnAPI, False otherwise. + is_runner_v2 (bool): True if and only if pipeline is using runner v2. Returns: str: The tag of worker container images in GCR that corresponds to current version of the SDK. """ if 'dev' in beam_version.__version__: - if use_fnapi: + if is_runner_v2: return names.BEAM_FNAPI_CONTAINER_VERSION else: return names.BEAM_CONTAINER_VERSION @@ -1275,24 +1231,6 @@ def _get_required_container_version(use_fnapi): return _get_container_image_tag() -def get_runner_harness_container_image(): - """For internal use only; no backwards-compatibility guarantees. - - Returns: - str: Runner harness container image that shall be used by default - for current SDK version or None if the runner harness container image - bundled with the service shall be used. - """ - # Pin runner harness for released versions of the SDK. - if 'dev' not in beam_version.__version__: - return ( - names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/' + 'harness' + ':' + - _get_container_image_tag()) - # Don't pin runner harness for dev versions so that we can notice - # potential incompatibility between runner and sdk harnesses. - return None - - def get_response_encoding(): """Encoding to use to decode HTTP response from Google APIs.""" return 'utf8' diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 5b5d713fba9a..1192a18fc4e1 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -573,26 +573,7 @@ def test_number_of_worker_harness_threads(self): 'apache_beam.runners.dataflow.internal.apiclient.' 'beam_version.__version__', '2.2.0') - def test_harness_override_default_in_released_sdks(self): - pipeline_options = PipelineOptions( - ['--temp_location', 'gs://any-location/temp', '--streaming']) - override = ''.join([ - 'runner_harness_container_image=', - names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY, - '/harness:2.2.0' - ]) - env = apiclient.Environment( - [], #packages - pipeline_options, - '2.0.0', #any environment version - FAKE_PIPELINE_URL) - self.assertIn(override, env.proto.experiments) - - @mock.patch( - 'apache_beam.runners.dataflow.internal.apiclient.' - 'beam_version.__version__', - '2.2.0') - def test_harness_override_absent_in_released_sdks_with_runner_v2(self): + def test_harness_override_absent_with_runner_v2(self): pipeline_options = PipelineOptions([ '--temp_location', 'gs://any-location/temp', @@ -612,32 +593,7 @@ def test_harness_override_absent_in_released_sdks_with_runner_v2(self): 'apache_beam.runners.dataflow.internal.apiclient.' 'beam_version.__version__', '2.2.0') - def test_harness_override_custom_in_released_sdks(self): - pipeline_options = PipelineOptions([ - '--temp_location', - 'gs://any-location/temp', - '--streaming', - '--experiments=runner_harness_container_image=fake_image' - ]) - env = apiclient.Environment( - [], #packages - pipeline_options, - '2.0.0', #any environment version - FAKE_PIPELINE_URL) - self.assertEqual( - 1, - len([ - x for x in env.proto.experiments - if x.startswith('runner_harness_container_image=') - ])) - self.assertIn( - 'runner_harness_container_image=fake_image', env.proto.experiments) - - @mock.patch( - 'apache_beam.runners.dataflow.internal.apiclient.' - 'beam_version.__version__', - '2.2.0') - def test_harness_override_custom_in_released_sdks_with_runner_v2(self): + def test_custom_harness_override_present_with_runner_v2(self): pipeline_options = PipelineOptions([ '--temp_location', 'gs://any-location/temp', @@ -659,41 +615,6 @@ def test_harness_override_custom_in_released_sdks_with_runner_v2(self): self.assertIn( 'runner_harness_container_image=fake_image', env.proto.experiments) - @mock.patch( - 'apache_beam.runners.dataflow.internal.apiclient.' - 'beam_version.__version__', - '2.2.0.rc1') - def test_harness_override_uses_base_version_in_rc_releases(self): - pipeline_options = PipelineOptions( - ['--temp_location', 'gs://any-location/temp', '--streaming']) - override = ''.join([ - 'runner_harness_container_image=', - names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY, - '/harness:2.2.0' - ]) - env = apiclient.Environment( - [], #packages - pipeline_options, - '2.0.0', #any environment version - FAKE_PIPELINE_URL) - self.assertIn(override, env.proto.experiments) - - @mock.patch( - 'apache_beam.runners.dataflow.internal.apiclient.' - 'beam_version.__version__', - '2.2.0.dev') - def test_harness_override_absent_in_unreleased_sdk(self): - pipeline_options = PipelineOptions( - ['--temp_location', 'gs://any-location/temp', '--streaming']) - env = apiclient.Environment( - [], #packages - pipeline_options, - '2.0.0', #any environment version - FAKE_PIPELINE_URL) - if env.proto.experiments: - for experiment in env.proto.experiments: - self.assertNotIn('runner_harness_container_image=', experiment) - @mock.patch( 'apache_beam.runners.dataflow.internal.apiclient.' 'beam_version.__version__', @@ -1044,48 +965,6 @@ def test_interpreter_version_check_fails_on_not_yet_supported_version(self): apiclient._verify_interpreter_version_is_supported, pipeline_options) - def test_use_unified_worker(self): - pipeline_options = PipelineOptions([]) - self.assertFalse(apiclient._use_unified_worker(pipeline_options)) - - pipeline_options = PipelineOptions(['--experiments=beam_fn_api']) - self.assertFalse(apiclient._use_unified_worker(pipeline_options)) - - pipeline_options = PipelineOptions(['--experiments=use_unified_worker']) - self.assertTrue(apiclient._use_unified_worker(pipeline_options)) - - pipeline_options = PipelineOptions( - ['--experiments=use_unified_worker', '--experiments=beam_fn_api']) - self.assertTrue(apiclient._use_unified_worker(pipeline_options)) - - pipeline_options = PipelineOptions( - ['--experiments=use_runner_v2', '--experiments=beam_fn_api']) - self.assertTrue(apiclient._use_unified_worker(pipeline_options)) - - pipeline_options = PipelineOptions(['--experiments=enable_prime']) - self.assertTrue(apiclient._use_unified_worker(pipeline_options)) - - pipeline_options = PipelineOptions( - ['--dataflow_service_options=enable_prime']) - self.assertTrue(apiclient._use_unified_worker(pipeline_options)) - - pipeline_options = PipelineOptions([ - '--dataflow_service_options=enable_prime', - '--experiments=disable_prime_runner_v2' - ]) - self.assertFalse(apiclient._use_unified_worker(pipeline_options)) - - pipeline_options = PipelineOptions( - ['--experiments=enable_prime', '--experiments=disable_prime_runner_v2']) - self.assertFalse(apiclient._use_unified_worker(pipeline_options)) - - pipeline_options = PipelineOptions([ - '--experiments=use_unified_worker', - '--experiments=use_runner_v2', - '--experiments=beam_fn_api' - ]) - self.assertTrue(apiclient._use_unified_worker(pipeline_options)) - def test_get_response_encoding(self): encoding = apiclient.get_response_encoding() diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py index 1f215fd02bdd..1012a7d36240 100644 --- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py +++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py @@ -19,7 +19,6 @@ # pytype: skip-file -from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.pipeline import PTransformOverride @@ -30,13 +29,7 @@ def matches(self, applied_ptransform): # Imported here to avoid circular dependencies. # pylint: disable=wrong-import-order, wrong-import-position from apache_beam import Create - from apache_beam.runners.dataflow.internal import apiclient - - if isinstance(applied_ptransform.transform, Create): - return not apiclient._use_fnapi( - applied_ptransform.outputs[None].pipeline._options) - else: - return False + return isinstance(applied_ptransform.transform, Create) def get_replacement_transform_for_applied_ptransform( self, applied_ptransform): @@ -86,42 +79,6 @@ def expand(self, pbegin): transform.get_type_hints().simple_output_type('Read')) -class JrhReadPTransformOverride(PTransformOverride): - """A ``PTransformOverride`` for ``Read(BoundedSource)``""" - def matches(self, applied_ptransform): - from apache_beam.io import Read - from apache_beam.io.iobase import BoundedSource - return ( - isinstance(applied_ptransform.transform, Read) and - isinstance(applied_ptransform.transform.source, BoundedSource)) - - def get_replacement_transform_for_applied_ptransform( - self, applied_ptransform): - from apache_beam.io import Read - from apache_beam.transforms import core - from apache_beam.transforms import util - # Make this a local to narrow what's captured in the closure. - source = applied_ptransform.transform.source - - class JrhRead(core.PTransform): - def expand(self, pbegin): - return ( - pbegin - | core.Impulse() - | 'Split' >> core.FlatMap( - lambda _: source.split( - Read.get_desired_chunk_size(source.estimate_size()))) - | util.Reshuffle() - | 'ReadSplits' >> core.FlatMap( - lambda split: split.source.read( - split.source.get_range_tracker( - split.start_position, split.stop_position)))) - - return JrhRead().with_output_types( - applied_ptransform.transform.get_type_hints().simple_output_type( - 'Read')) - - class CombineValuesPTransformOverride(PTransformOverride): """A ``PTransformOverride`` for ``CombineValues``. @@ -221,21 +178,6 @@ def matches(self, applied_ptransform): standard_options = self.options.view_as(StandardOptions) if not standard_options.streaming: return False - google_cloud_options = self.options.view_as(GoogleCloudOptions) - if not google_cloud_options.enable_streaming_engine: - raise ValueError( - 'Runner determined sharding not available in Dataflow for ' - 'GroupIntoBatches for non-Streaming-Engine jobs. In order to use ' - 'runner determined sharding, please use ' - '--streaming --enable_streaming_engine --experiments=use_runner_v2') - - from apache_beam.runners.dataflow.internal import apiclient - if not apiclient._use_unified_worker(self.options): - raise ValueError( - 'Runner determined sharding not available in Dataflow for ' - 'GroupIntoBatches for jobs not using Runner V2. In order to use ' - 'runner determined sharding, please use ' - '--streaming --enable_streaming_engine --experiments=use_runner_v2') self.dataflow_runner.add_pcoll_with_auto_sharding(applied_ptransform) return True diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 6ebdd3f2854a..b31136b82db7 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -374,14 +374,6 @@ def create_job_resources(options, # type: PipelineOptions Stager._create_file_stage_to_artifact( pickled_session_file, names.PICKLED_MAIN_SESSION_FILE)) - worker_options = options.view_as(WorkerOptions) - dataflow_worker_jar = getattr(worker_options, 'dataflow_worker_jar', None) - if dataflow_worker_jar is not None: - jar_staged_filename = 'dataflow-worker.jar' - resources.append( - Stager._create_file_stage_to_artifact( - dataflow_worker_jar, jar_staged_filename)) - return resources def stage_job_resources(self, diff --git a/sdks/python/apache_beam/testing/load_tests/build.gradle b/sdks/python/apache_beam/testing/load_tests/build.gradle index 902488724e92..144f7d12ba6c 100644 --- a/sdks/python/apache_beam/testing/load_tests/build.gradle +++ b/sdks/python/apache_beam/testing/load_tests/build.gradle @@ -32,7 +32,6 @@ def mainClass = project.findProperty(mainClassProperty) def loadTestArgsProperty = "loadTest.args" def runnerProperty = "runner" -def withDataflowWorkerJarProperty = "withDataflowWorkerJar" def requirementsTxtFileProperty = "loadTest.requirementsTxtFile" @@ -45,13 +44,6 @@ task run(type: Exec, dependsOn: installGcpTest) { loadTestArgs +=" --sdk_location=${files(configurations.distTarBall.files).singleFile}" } - String withDataflowWorkerJarArg = project.findProperty(withDataflowWorkerJarProperty).toString() ?: "" - if (withDataflowWorkerJarArg.toLowerCase() == 'true') { - evaluationDependsOn(':runners:google-cloud-dataflow-java:worker') - dependsOn ':runners:google-cloud-dataflow-java:worker:shadowJar' - loadTestArgs +=" --dataflow_worker_jar=${project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath}" - } - String requirementsTxtFileArg = project.findProperty(requirementsTxtFileProperty) ?: null if (requirementsTxtFileArg != null) { doFirst { diff --git a/sdks/python/scripts/run_integration_test.sh b/sdks/python/scripts/run_integration_test.sh index 5492b6363c4d..d38f1bf3baa2 100755 --- a/sdks/python/scripts/run_integration_test.sh +++ b/sdks/python/scripts/run_integration_test.sh @@ -37,7 +37,6 @@ # num_workers -> Number of workers. # sleep_secs -> Number of seconds to wait before verification. # streaming -> True if a streaming job. -# worker_jar -> Customized worker jar for dataflow runner. # kms_key_name -> Name of Cloud KMS encryption key to use in some tests. # pipeline_opts -> List of space separated pipeline options. If this # flag is specified, all above flag will be ignored. @@ -75,7 +74,6 @@ SDK_LOCATION=build/apache-beam.tar.gz NUM_WORKERS=1 SLEEP_SECS=20 STREAMING=false -WORKER_JAR="" KMS_KEY_NAME="projects/apache-beam-testing/locations/global/keyRings/beam-it/cryptoKeys/test" SUITE="" COLLECT_MARKERS= @@ -135,11 +133,6 @@ case $key in shift # past argument shift # past value ;; - --worker_jar) - WORKER_JAR="$2" - shift # past argument - shift # past value - ;; --runner_v2) RUNNER_V2="$2" shift # past argument @@ -251,11 +244,6 @@ if [[ -z $PIPELINE_OPTS ]]; then opts+=("--streaming") fi - # Add --dataflow_worker_jar if provided - if [[ ! -z "$WORKER_JAR" ]]; then - opts+=("--dataflow_worker_jar=$WORKER_JAR") - fi - # Add --runner_v2 if provided if [[ "$RUNNER_V2" = true ]]; then opts+=("--experiments=use_runner_v2") diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle index 0b06d6ad62b1..5e2fa3d7f7f9 100644 --- a/sdks/python/test-suites/dataflow/common.gradle +++ b/sdks/python/test-suites/dataflow/common.gradle @@ -43,9 +43,6 @@ def preCommitIT(String runScriptsDir, String envdir, Boolean streaming, Boolean task "preCommitIT${suffix}" { dependsOn 'installGcpTest' dependsOn ':sdks:python:sdist' - dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar" - - def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath doLast { // Basic integration tests to run in PreCommit @@ -64,17 +61,14 @@ def preCommitIT(String runScriptsDir, String envdir, Boolean streaming, Boolean def argMap = [ "test_opts" : testOpts, "sdk_location": files(configurations.distTarBall.files).singleFile, - "worker_jar" : dataflowWorkerJar, "suite" : "preCommitIT-df${pythonSuffix}", ] - if (runnerV2) { - argMap.put("runner_v2", "true") - // KMS is not supported for streaming engine. - argMap.put("kms_key_name", "\"\"") - } if (streaming){ argMap.put("streaming", "true") + argMap.put("runner_v2", "true") + } else if (runnerV2) { + argMap.put("runner_v2", "true") } def cmdArgs = mapToArgString(argMap) @@ -104,16 +98,12 @@ task preCommitIT_V2{ task postCommitIT { dependsOn 'installGcpTest' dependsOn ':sdks:python:sdist' - dependsOn ':runners:google-cloud-dataflow-java:worker:shadowJar' - - def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath doLast { def testOpts = basicPytestOpts + ["--numprocesses=8", "--dist=loadfile"] def argMap = [ "test_opts": testOpts, "sdk_location": files(configurations.distTarBall.files).singleFile, - "worker_jar": dataflowWorkerJar, "suite": "postCommitIT-df${pythonVersionSuffix}", "collect": "it_postcommit" ] @@ -148,16 +138,12 @@ task postCommitSickbay { task spannerioIT { dependsOn 'installGcpTest' dependsOn ':sdks:python:sdist' - dependsOn ':runners:google-cloud-dataflow-java:worker:shadowJar' - - def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath doLast { def testOpts = basicPytestOpts + ["--numprocesses=8", "--dist=loadfile"] def argMap = [ "test_opts": testOpts, "sdk_location": files(configurations.distTarBall.files).singleFile, - "worker_jar": dataflowWorkerJar, "suite": "postCommitIT-df${pythonVersionSuffix}", "collect": "spannerio_it" ] @@ -210,12 +196,9 @@ task examples { task validatesRunnerBatchTests { dependsOn 'installGcpTest' dependsOn ':sdks:python:sdist' - dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar" - def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath def argMap = [ "test_opts" : basicPytestOpts + ["--numprocesses=8"], - "worker_jar" : dataflowWorkerJar, "sdk_location": files(configurations.distTarBall.files).singleFile, "suite" : "validatesRunnerBatchTests-df${pythonVersionSuffix}", "collect": "it_validatesrunner and not no_sickbay_batch" @@ -223,8 +206,6 @@ task validatesRunnerBatchTests { if (project.hasProperty('useRunnerV2')) { argMap.put("runner_v2", "true") - // KMS is not supported for streaming engine. - argMap.put("kms_key_name", "\"\"") } if (project.hasProperty('disableRunnerV2')) { @@ -243,9 +224,6 @@ task validatesRunnerBatchTests { task validatesRunnerStreamingTests { dependsOn 'installGcpTest' dependsOn ':sdks:python:sdist' - dependsOn ":runners:google-cloud-dataflow-java:worker:shadowJar" - - def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath // TODO(BEAM-3544,https://github.com/apache/beam/issues/19012): Disable tests with 'sickbay-streaming' tag. // Execute tests with xdists @@ -254,15 +232,10 @@ task validatesRunnerStreamingTests { "test_opts": basicPytestOpts + ["--numprocesses=8"], "streaming": "true", "sdk_location": files(configurations.distTarBall.files).singleFile, - "worker_jar": dataflowWorkerJar, "suite": "validatesRunnerStreamingTests-df${pythonVersionSuffix}-xdist", - "collect": "it_validatesrunner and not no_sickbay_streaming and not no_xdist" + "collect": "it_validatesrunner and not no_sickbay_streaming and not no_xdist", + "runner_v2": "true", ] - if (project.hasProperty('useRunnerV2')) { - argMap.put("runner_v2", "true") - // KMS is not supported for streaming engine. - argMap.put("kms_key_name", "\"\"") - } def cmdArgs = mapToArgString(argMap) exec { @@ -277,15 +250,10 @@ task validatesRunnerStreamingTests { "test_opts": basicPytestOpts, "streaming": "true", "sdk_location": files(configurations.distTarBall.files).singleFile, - "worker_jar": dataflowWorkerJar, "suite": "validatesRunnerStreamingTests-df${pythonVersionSuffix}-noxdist", - "collect": "it_validatesrunner and not no_sickbay_streaming and no_xdist" + "collect": "it_validatesrunner and not no_sickbay_streaming and no_xdist", + "runner_v2": "true", ] - if (project.hasProperty('useRunnerV2')) { - argMap.put("runner_v2", "true") - // KMS is not supported for streaming engine. - argMap.put("kms_key_name", "\"\"") - } def cmdArgs = mapToArgString(argMap) exec {