Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataflow operator checks wrong project_id #15483

Closed
filippociceri opened this issue Apr 22, 2021 · 4 comments · Fixed by #24020
Closed

Dataflow operator checks wrong project_id #15483

filippociceri opened this issue Apr 22, 2021 · 4 comments · Fixed by #24020
Assignees
Labels
good first issue kind:bug This is a clearly a bug provider:google Google (including GCP) related issues

Comments

@filippociceri
Copy link

Apache Airflow version:
composer-1.16.1-airflow-1.10.15

Environment:

  • Cloud provider or hardware configuration: Google Composer

What happened:
First, a bit of context. We have a single instance of airflow within its own GCP project, which runs dataflows jobs on different GCP projects.

Let's call the project which runs airflow project A, while the project where dataflow jobs are run project D.

We recently upgraded from 1.10.14 to 1.10.15 (composer-1.14.2-airflow-1.10.14 to composer-1.16.1-airflow-1.10.15), and noticed that jobs were running successfully from the Dataflow console, but an error was being thrown when the wait_for_done call was being made by airflow to check if a dataflow job had ended. The error was reporting a 403 error code on Dataflow APIs when retrieving the job state. The error was:

{taskinstance.py:1152} ERROR - <HttpError 403 when requesting https://dataflow.googleapis.com/v1b3/projects/<PROJECT_A>/locations/us-east1/jobs/<JOB_NAME>?alt=json returned "(9549b560fdf4d2fe): Permission 'dataflow.jobs.get' denied on project: '<PROJECT_A>". Details: "(9549b560fdf4d2fe): Permission 'dataflow.jobs.get' denied on project: '<PROJECT_A>'">

What you expected to happen:

I noticed that the 403 code was thrown when looking up the job state within project A, while I expect this lookup to happen within project D (and to consequently NOT fail, since the associated service account has the correct permissions - since it managed to launch the job). I investigated a bit, and noticed that this looks like a regression introduced when upgrading to composer-1.16.1-airflow-1.10.15.

This version uses an image which automatically installs apache-airflow-backport-providers-apache-beam==2021.3.13, which backports the dataflow operator from v2. The previous version we were using was installing apache-airflow-backport-providers-google==2020.11.23

I checked the commits and changes, and noticed that this operator was last modified in 1872d87. Relevant lines from that commit are the following:

self.beam_hook.start_python_pipeline(
variables=formatted_pipeline_options,
py_file=self.py_file,
py_options=self.py_options,
py_interpreter=self.py_interpreter,
py_requirements=self.py_requirements,
py_system_site_packages=self.py_system_site_packages,
process_line_callback=process_line_callback,
)
self.dataflow_hook.wait_for_done( # pylint: disable=no-value-for-parameter
job_name=job_name,
location=self.location,
job_id=self.job_id,
multiple_jobs=False,
)

while these are from the previous version:

self.hook.start_python_dataflow( # type: ignore[attr-defined]
job_name=self.job_name,
variables=formatted_options,
dataflow=self.py_file,
py_options=self.py_options,
py_interpreter=self.py_interpreter,
py_requirements=self.py_requirements,
py_system_site_packages=self.py_system_site_packages,
on_new_job_id_callback=set_current_job_id,
project_id=self.project_id,
location=self.location,
)

def _start_dataflow(
self,
variables: dict,
name: str,
command_prefix: List[str],
project_id: str,
multiple_jobs: bool = False,
on_new_job_id_callback: Optional[Callable[[str], None]] = None,
location: str = DEFAULT_DATAFLOW_LOCATION,
) -> None:
cmd = command_prefix + [
"--runner=DataflowRunner",
f"--project={project_id}",
]
if variables:
cmd.extend(self._options_to_args(variables))
runner = _DataflowRunner(cmd=cmd, on_new_job_id_callback=on_new_job_id_callback)
job_id = runner.wait_for_done()
job_controller = _DataflowJobsController(
dataflow=self.get_conn(),
project_number=project_id,
name=name,
location=location,
poll_sleep=self.poll_sleep,
job_id=job_id,
num_retries=self.num_retries,
multiple_jobs=multiple_jobs,
drain_pipeline=self.drain_pipeline,
cancel_timeout=self.cancel_timeout,
wait_until_finished=self.wait_until_finished,
)
job_controller.wait_for_done()

self._start_dataflow(
variables=variables,
name=name,
command_prefix=command_prefix,
project_id=project_id,
on_new_job_id_callback=on_new_job_id_callback,
location=location,
)

In the previous version, the job was started by calling start_python_dataflow, which in turn would call the _start_dataflow method, which would then create a local job_controller and use it to check if the job had ended. Throughout this chain of calls, the project_id parameter was passed all the way from the initialization of the DataflowCreatePythonJobOperator to the creation of the controller which would check if the job had ended.

In the latest relevant commit, this behavior was changed. The operator receives a project_id during intialization, and creates the job using the start_python_pipeline method, which receives the project_id as part of the variables parameter. However, the completion of the job is checked by the dataflow_hook.wait_for_done call. The DataFlowHook used here:

  • does not specify the project_id when it is initialized
  • does not specify the project_id as a parameter when making the call to check for completion (the wait_for_done call)

As a result, it looks like it is using the default GCP project ID (the one which the composer is running inside) and not the one used to create the Dataflow job. This explains why we can see the job launching successfully while the operator fails.

I think that specifying the project_id as a parameter in the wait_for_done call may solve the issue.

How to reproduce it:

  • Instatiate a composer on a new GCP project.
  • Launch a simple Dataflow job on another project

The Dataflow job will succeed (you can see no errors get thrown from the GCP console), but an error will be thrown in airflow logs.

Note: I am reporting a 403 because the service account I am using which is associated to airflow does not have the correct permissions. I suspect that, even with the correct permission, you may get another error (maybe 404, since there will be no job running with that ID within the project) but I have no way to test this at the moment.

Anything else we need to know:

This problem occurs every time I launch a Dataflow job on a project where the composer isn't running.

@filippociceri filippociceri added the kind:bug This is a clearly a bug label Apr 22, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Apr 22, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@eladkal eladkal added the provider:google Google (including GCP) related issues label Apr 22, 2021
@lwyszomi
Copy link
Contributor

@potiuk @eladkal can you assign this issue to me. I already verified the problem and it also exist in latest version of the operators. It is not related directly with google-providers because Job operators for Dataflow were moved to the Apache-beam package and google operators are deprecated.

@uranusjr
Copy link
Member

Done

@lwyszomi
Copy link
Contributor

I sent PR with the fix

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue kind:bug This is a clearly a bug provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants