-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add example DAG for demonstrating usage of GCS sensors #22808
Conversation
Following GCS Sensors examples are provided as part of the change: 1. GCSUploadSessionCompleteSensor 2. GCSObjectUpdateSensor
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me
airflow/example_dags/example_gcs.py
Outdated
tags=["example", "gcs"], | ||
) as dag: | ||
|
||
# [START howto_sensor_gcs_upload_session_complete_task] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add this in https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/gcs.html ?
You can add it in https://github.com/apache/airflow/blob/main/docs/apache-airflow-providers-google/operators/cloud/gcs.rst which will show up in the page I linked above when new version of provider is released.
You can also build docs locally as described in https://github.com/apache/airflow/blob/main/BREEZE.rst#building-the-documentation
./breeze build-docs -- --package-filter apache-airflow-providers-google
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
The commit does the following: 1. Delete the newly created top level example_gcs.py as it was a wrong place for the sensors 2. Add the intended sensors of the PR to the existing example_gcs.py file located in airflow/cloud/example_dags directory
The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease. |
I recommend installing and using pre-commit hook so this will be automatically checked when you run "git commit" -> this is explained in https://github.com/apache/airflow/blob/main/STATIC_CODE_CHECKS.rst#installing-pre-commit-hooks I ❤️ pre-commit hooks and I am sure you will love that framework too, it is very handy :) |
Awesome work, congrats on your first merged pull request! |
Well done. Congratulations on your first merged PR 👏 |
"GCP_GCS_PATH_TO_SAVED_FILE", os.path.join(temp_dir_path, "test-gcs-example-download.txt") | ||
) | ||
|
||
BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1] | ||
|
||
# Upload 'test-gcs-manual-example-upload.txt' manually in the <BUCKET_1> after triggering the DAG. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests shouldn't depend on manual steps - if the file is required we should:
- store it in resources
- upload using operator, for example LocalFilesystemToGCSOperator
- After the tests, remove any resources created during tests (usually it's enough to remove Bucket)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @bhirsz I wanted to do upload the file programatically like the other chain. However, the sensor task GCSUploadSessionCompleteSensor waits for change in number of files in the bucket. If we have the upload task before GCSUploadSessionCompleteSensor task, it won't detect any changes. On the other hand, if we add upload task after GCSUploadSessionCompleteSensor task, it would be blocked until GCSUploadSessionCompleteSensor task completes and does not solve the need. I am unsure how to add dependency between such tasks. Any suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's tricky in this case, indeed. How about starting sensors in parallel with upload_file?
Example:
chain(
# TEST SETUP
create_bucket,
upload_file,
# TEST BODY
[gcs_object_exists, gcs_object_with_prefix_exists],
# TEST TEARDOWN
delete_bucket,
)
chain(
create_bucket,
# TEST BODY
[gcs_upload_session_complete, gcs_update_object_exists],
delete_bucket
)
We're starting the sensors and in meantime we're uploading the file - and sensors detect it:
I'm not sure though if it will not be flaky in some cases - running this in CI will show. I will update my PR with this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. Since, we are starting in parallel, it may happen that, the upload task is picked up before the sensor task begins and it may not detect the change as expected. Hence, added the comment to manually upload the file. :)
Also, the gcs_object_update_sensor_task needs to be activated after gcs_upload_session_complete_task (and not in parallel with it) as the object is expected to be detected by sensor and the object_update task is to confirm the manually upload has happened prior to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can alleviate it a bit with sleep (yes, the old the ugly sleep ;)). Run sensors separably and in the meantime trigger sleep (5s will suffice) and then upload task:
chain(
# TEST SETUP
create_bucket,
sleep,
upload_file,
# TEST BODY
[gcs_object_exists, gcs_object_with_prefix_exists],
# TEST TEARDOWN
delete_bucket,
)
chain(
create_bucket,
# TEST BODY
gcs_upload_session_complete,
gcs_update_object_exists,
delete_bucket
)
And of course put some explanation in comments why we're doing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Manual steps are unfortunately a big no for us since the plan is to run system tests in CI in community. We can't expect people performing manual task while running them so I'm looking for an automatic solution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I was not aware of the fact that we're migrating these example DAGs to tests. I was of the impression that the example DAGs are for references to our community on how to implement DAGs for certain operators. Understand and totally agree to your fair point that tests should not need manual interventions.
Okay, so we can try introducing sleep if we feel right about it. Also, I am new to contributing to Airflow :)
cc: @kaxil
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we had two goals with the migration - to make writing system tests easier and actually ensure that are example dags are runnable (what would be point in example that doesn't work, and it's actually often the case ;)). In the old design system tests actually "wrapped" and run examples - but not all example dags were used by system tests. In the new design we're also using examples as system tests, but without separating it to different files and now example dag is the system test itself.
Any change is highly welcome and it's good to see PRs such like yours - congrats! We only missed the notification on PR (sadly Github doesn't allow for an advanced notification system) and we were not able to have this discussion before merging the PR.
Hi, |
Following GCS Sensor examples are provided as part of the change: