diff --git a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py index bba1af0d7e518..3d7b3bf30d506 100644 --- a/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py +++ b/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py @@ -399,6 +399,10 @@ def execute(self, context: Context): if not self.beam_hook: raise AirflowException("Beam hook is not defined.") + if self.runner.lower() != BeamRunnerType.DataflowRunner.lower(): + # Links are rendered only for dataflow runner + self.operator_extra_links = () + if self.deferrable and not self.is_dataflow: self.defer( trigger=BeamPythonPipelineTrigger( @@ -577,6 +581,9 @@ def execute(self, context: Context): ) = self._init_pipeline_options() if not self.beam_hook: raise AirflowException("Beam hook is not defined.") + if self.runner.lower() != BeamRunnerType.DataflowRunner.lower(): + # Links are rendered only for dataflow runner + self.operator_extra_links = () if self.deferrable and not self.is_dataflow: self.defer( diff --git a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py index c517517c29963..d7b4e4d3125ce 100644 --- a/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py +++ b/providers/apache/beam/tests/unit/apache/beam/operators/test_beam.py @@ -192,6 +192,19 @@ def test_exec_direct_runner(self, gcs_hook, beam_hook_mock, py_options): py_system_site_packages=False, ) + @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook")) + @mock.patch(BEAM_OPERATOR_PATH.format("GCSHook")) + def test_direct_runner_no_op_extra_links(self, gcs_hook, beam_hook_mock, py_options): + """Test there is no operator_extra_links when running pipeline with direct runner type.""" + start_python_hook = beam_hook_mock.return_value.start_python_pipeline + op = BeamRunPythonPipelineOperator(**self.default_op_kwargs) + op.execute({}) + + beam_hook_mock.assert_called_once_with(runner=DEFAULT_RUNNER) + start_python_hook.assert_called_once() + + assert not op.operator_extra_links + @mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist")) @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook")) @mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook")) @@ -403,6 +416,21 @@ def test_exec_direct_runner(self, gcs_hook, beam_hook_mock, default_options, pip job_class=JOB_CLASS, ) + @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook")) + @mock.patch(BEAM_OPERATOR_PATH.format("GCSHook")) + def test_direct_runner_no_op_extra_links( + self, gcs_hook, beam_hook_mock, default_options, pipeline_options + ): + """Test there is no operator_extra_links when running pipeline with direct runner type.""" + start_java_hook = beam_hook_mock.return_value.start_java_pipeline + op = BeamRunJavaPipelineOperator(**self.default_op_kwargs) + + op.execute({}) + + beam_hook_mock.assert_called_once_with(runner=DEFAULT_RUNNER) + start_java_hook.assert_called_once() + assert not op.operator_extra_links + @mock.patch(BEAM_OPERATOR_PATH.format("DataflowJobLink.persist")) @mock.patch(BEAM_OPERATOR_PATH.format("BeamHook")) @mock.patch(BEAM_OPERATOR_PATH.format("DataflowHook"))