-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Apache Beam operators #12814
Add Apache Beam operators #12814
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a great idea. I did not review line by line but it looks like it is in good shape.
One question: How would you differentiate between Beam operator and Dataflow operator? Would it make sense to document somewhere what are the differences and when to use one or the other.
Another point: It would be good to align parameters between Beam and Dataflow operators so users (if they want to) could switch between the two operators easily.
GCS_INPUT = os.environ.get('APACHE_BEAM_PYTHON', 'gs://apache-beam-samples/shakespeare/kinglear.txt') | ||
GCS_TMP = os.environ.get('APACHE_BEAM_GCS_TMP', 'gs://test-dataflow-example/temp/') | ||
GCS_STAGING = os.environ.get('APACHE_BEAM_GCS_STAGING', 'gs://test-dataflow-example/staging/') | ||
GCS_OUTPUT = os.environ.get('APACHE_BEAM_GCS_OUTPUT', 'gs://test-dataflow-example/output') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curiosity question: Does gs://test-dataflow-example/
exist as a publicly writable location?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not publicly writable location. This example DAG is used in system tests (tests/providers/apache/beam/operators/test_beam_system.py
) which is being run by pytest with flag --system
e.g. --system=google
and when system test is executed user should provide GCP connection with proper permissions to storage and Dataflow resources. Environmental variables with resources should be adjusted as well if necessary.
}, | ||
py_options=[], | ||
job_name='{{task.task_id}}', | ||
py_requirements=['apache-beam[gcp]==2.21.0'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a newer version in examples? 2.25.0 is the latest, 2.26.0 is imminent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I will update to 2.25.0
for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. (Btw 2.26 is available as of today.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I update version to the latest one :)
tags=['example'], | ||
) as dag_native_python: | ||
|
||
start_python_job_local_direct_runner = BeamRunPythonPipelineOperator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this and start_python_job_direct_runner are a bit duplicative. They more or less do the same exact thing except for the location of module to run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another role of example dags is bo be part of documentation and cen be visible from docs (example from PR for databricks).
I refactored it slightly so local task is "fully" local and in second task both input and output will be on GCS. My idea was to show working example showing usage different sources/outputs. Also executing system tests will confirm that both tasks works.
py_system_site_packages=False, | ||
) | ||
|
||
start_python_job_local_direct_runner >> start_python_job_local_flink_runner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these two lines equivalent to [start_python_job_local_direct_runner, start_python_job_direct_runner] >> start_python_job_local_flink_runner
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. It will be easier to follow the DAG, i made small refactor of this part.
for readable_fd in reads: | ||
self._process_fd(readable_fd) | ||
|
||
self.log.info("Process exited with return code: %s", self._proc.returncode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to capture stdout or stderr, that usually has the real relevant errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
printing stdout/stderr to logs is handled by _process_fd
function two lines above :) I used logs produced by it extensively during debugging of example dag and I can confirm they provide helpful information about errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
wait_for_done.assert_called_once_with() | ||
|
||
|
||
class TestDataflow(unittest.TestCase): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also run a real Beam pipeline with DirectRunner as another test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found there is mistake in the title (should be TestBeamRunner
) and it is purpose is to test providers.apache.beam.hooks.beam._BeamRunner
which is responsible for proper executing command in subprocess, printing stdout/stderr to logs and and handling exit code. In other tests this class is mocked and if runner is Dataflow slightly different command runner will be used: airflow.providers.google.cloud.hooks.dataflow._DataflowRunner.
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
] | ||
if variables: | ||
cmd.extend(self._options_to_args(variables)) | ||
_BeamRunner(cmd=cmd).wait_for_done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we allow async mode (op + sensor)? So the running pipeline is not blocking worker slot? I suppose this may be hard as the subprocess may be running in worker other than the sensor is using. However we may consider load balancing task to make sure the sensor will be scheduled on proper worker. See #12519 (comment) for ideas
Although, this is a bigger change than this PR
) | ||
self.py_file = tmp_gcs_file.name | ||
|
||
if self.runner == "DataflowRunner": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I agree that logging a warning or even using a UserWarning
for DataflowRunner is a good thing, I would say that in case of operator behaviour we should have only one, consistent behaviour. That's what I would at least expect as a user. Mind I'm not beam expert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure did I understand your comment correctly. AFAIK the behavior from user perspective will be the same, just if user wants to use more detaflow specific features available in DataflowOperator will be informed about it. Could you elaborate more?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I'm wrong, but we use two different hooks depending if it's dataflow or not. So at least at this point the behaviour is not identical (unless the hooks implements exactly the same logic but then - why to have two different hooks?). So I would be in favour of using only BeamHook
and rising a warning if users use DataflowRunner. WDYT?
ed8f776
to
12a8cfe
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
406afe7
to
b55c76b
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
b55c76b
to
bb80626
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
bb80626
to
7580003
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
7580003
to
e341463
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
b654368
to
362b362
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
362b362
to
779e44f
Compare
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
779e44f
to
aa0414d
Compare
PTAL @aaltay @mik-laj @turbaszek |
1690a98
to
8e1d421
Compare
thanks for all reviewers! I rebased on the latest master and squashed fixups |
Transient doc error :( |
The Workflow run is cancelling this PR. Building images for the PR has failed. Follow the the workflow link to check the reason. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
* Refactor common logic * Tests
Co-authored-by: Tomek Urbaszek <tomasz.urbaszek@polidea.com>
8e1d421
to
1a4844e
Compare
Passing in my fork: https://github.com/potiuk/airflow/runs/1825273695 merging it :) |
from typing import Callable, List, Optional | ||
|
||
from airflow.exceptions import AirflowException | ||
from airflow.hooks.base_hook import BaseHook |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@TobKed This raises warning that spams the logs:
/opt/airflow/airflow/providers/apache/beam/hooks/beam.py:28: DeprecationWarning: This module is deprecated. Please use `airflow.hooks.base`.
from airflow.hooks.base_hook import BaseHook
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solved in #14554
(cherry picked from commit 1872d87)
(cherry picked from commit 1872d87)
(cherry picked from commit 1872d87)
Simple Apache Beam operators to run Python and Java pipelines with DirectRunner (DirectRunner is used by default but other runners are supported as well).
It will allow to test DAGs with pipelines which uses DirectRunner for faster feedback, then using e.g. Dataflow operators.