diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 69129f344e27a..e93abeb063754 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -590,6 +590,14 @@ repos: pass_filenames: false require_serial: true additional_dependencies: ['rich'] + - id: check-system-tests + name: Check if system tests have required segments of code + entry: ./scripts/ci/pre_commit/pre_commit_check_system_tests.py + language: python + files: ^tests/system/.*/example_[^/]*.py$ + exclude: ^tests/system/providers/google/bigquery/example_bigquery_queries\.py$ + pass_filenames: true + additional_dependencies: ['rich'] - id: markdownlint name: Run markdownlint description: Checks the style of Markdown files. diff --git a/BREEZE.rst b/BREEZE.rst index 13f10214be116..d1364392770af 100644 --- a/BREEZE.rst +++ b/BREEZE.rst @@ -2293,9 +2293,9 @@ This is the current syntax for `./breeze <./breeze>`_: changelog-duplicates check-apache-license check-builtin-literals check-executables-have-shebangs check-extras-order check-hooks-apply check-integrations check-merge-conflict check-ti-run-id-in-providers check-xml - daysago-import-check debug-statements detect-private-key docstring-params doctoc - dont-use-safe-filter end-of-file-fixer fix-encoding-pragma flake8 flynt - forbidden-xcom-get-value codespell forbid-tabs helm-lint identity + check-system-tests daysago-import-check debug-statements detect-private-key + docstring-params doctoc dont-use-safe-filter end-of-file-fixer fix-encoding-pragma + flake8 flynt forbidden-xcom-get-value codespell forbid-tabs helm-lint identity incorrect-use-of-LoggingMixin insert-license isort json-schema language-matters lint-dockerfile lint-openapi markdownlint mermaid migration-reference mixed-line-ending mypy mypy-helm no-providers-in-core-examples no-relative-imports diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst index dc13c3cc6dc57..b12a8ef60defa 100644 --- a/STATIC_CODE_CHECKS.rst +++ b/STATIC_CODE_CHECKS.rst @@ -168,6 +168,8 @@ require Breeze Docker images to be installed locally. ------------------------------------ ---------------------------------------------------------------- ------------ ``check-xml`` Checks XML files with xmllint ------------------------------------ ---------------------------------------------------------------- ------------ +``check-system-tests`` Check if system tests have required segments of code +------------------------------------ ---------------------------------------------------------------- ------------ ``daysago-import-check`` Checks if daysago is properly imported ------------------------------------ ---------------------------------------------------------------- ------------ ``debug-statements`` Detects accidentally committed debug statements diff --git a/breeze-complete b/breeze-complete index 15359e3d43187..ad8172ac4405e 100644 --- a/breeze-complete +++ b/breeze-complete @@ -99,6 +99,7 @@ check-integrations check-merge-conflict check-ti-run-id-in-providers check-xml +check-system-tests daysago-import-check debug-statements detect-private-key diff --git a/dev/breeze/src/airflow_breeze/pre_commit_ids.py b/dev/breeze/src/airflow_breeze/pre_commit_ids.py index 208b5a1b3a83c..26724db9ab7cf 100644 --- a/dev/breeze/src/airflow_breeze/pre_commit_ids.py +++ b/dev/breeze/src/airflow_breeze/pre_commit_ids.py @@ -40,6 +40,7 @@ 'check-hooks-apply', 'check-integrations', 'check-merge-conflict', + 'check-system-tests', 'check-ti-run-id-in-providers', 'check-xml', 'codespell', diff --git a/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst b/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst index bcf947f4164a5..5cab46e37b6d6 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/bigquery.rst @@ -29,7 +29,7 @@ data. Prerequisite Tasks ^^^^^^^^^^^^^^^^^^ -.. include::/operators/_partials/prerequisite_tasks.rst +.. include:: ../_partials/prerequisite_tasks.rst Manage datasets ^^^^^^^^^^^^^^^ @@ -42,7 +42,7 @@ Create dataset To create an empty dataset in a BigQuery database you can use :class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_dataset.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_create_dataset] @@ -58,7 +58,7 @@ To get the details of an existing dataset you can use This operator returns a `Dataset Resource `__. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_dataset.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_get_dataset] @@ -72,7 +72,7 @@ List tables in dataset To retrieve the list of tables in a given dataset use :class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetTablesOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_tables.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_get_dataset_tables] @@ -89,7 +89,7 @@ To update a table in BigQuery you can use The update method replaces the entire Table resource, whereas the patch method only replaces fields that are provided in the submitted Table resource. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_tables.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_update_table] @@ -106,7 +106,7 @@ To update a dataset in BigQuery you can use The update method replaces the entire dataset resource, whereas the patch method only replaces fields that are provided in the submitted dataset resource. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_dataset.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_update_dataset] @@ -120,7 +120,7 @@ Delete dataset To delete an existing dataset from a BigQuery database you can use :class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteDatasetOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_dataset.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_delete_dataset] @@ -143,7 +143,7 @@ ways. You may either directly pass the schema fields in, or you may point the operator to a Google Cloud Storage object name. The object in Google Cloud Storage must be a JSON file with the schema fields in it. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_tables.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_create_table] @@ -151,7 +151,7 @@ Storage must be a JSON file with the schema fields in it. You can use this operator to create a view on top of an existing table. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_tables.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_create_view] @@ -160,7 +160,7 @@ You can use this operator to create a view on top of an existing table. You can also use this operator to create a materialized view that periodically cache results of a query for increased performance and efficiency. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_tables.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_create_materialized_view] @@ -177,15 +177,22 @@ you can use Similarly to :class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator` -you may either directly pass the schema fields in, or you may point the operator -to a Google Cloud Storage object name. +you can directly pass the schema fields in. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_operations.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_create_external_table] :end-before: [END howto_operator_bigquery_create_external_table] +Or you may point the operator to a Google Cloud Storage object name where the schema is stored. + +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_tables.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_bigquery_create_table_schema_json] + :end-before: [END howto_operator_bigquery_create_table_schema_json] + .. _howto/operator:BigQueryGetDataOperator: Fetch data from table @@ -201,7 +208,7 @@ returned list will be equal to the number of rows fetched. Each element in the list will again be a list where elements would represent the column values for that row. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_queries.py :language: python :dedent: 8 :start-after: [START howto_operator_bigquery_get_data] @@ -218,7 +225,7 @@ To upsert a table you can use This operator either updates the existing table or creates a new, empty table in the given dataset. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_tables.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_upsert_table] @@ -235,7 +242,7 @@ To update the schema of a table you can use This operator updates the schema field values supplied, while leaving the rest unchanged. This is useful for instance to set new field descriptions on an existing table schema. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_tables.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_update_table_schema] @@ -249,7 +256,7 @@ Delete table To delete an existing table you can use :class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteTableOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_tables.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_delete_table] @@ -257,7 +264,7 @@ To delete an existing table you can use You can also use this operator to delete a view. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_tables.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_delete_view] @@ -265,7 +272,7 @@ You can also use this operator to delete a view. You can also use this operator to delete a materialized view. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_tables.py :language: python :dedent: 4 :start-after: [START howto_operator_bigquery_delete_materialized_view] @@ -278,7 +285,7 @@ Execute BigQuery jobs Let's say you would like to execute the following query. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_queries.py :language: python :dedent: 0 :start-after: [START howto_operator_bigquery_query] @@ -288,7 +295,7 @@ To execute the SQL query in a specific BigQuery database you can use :class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator` with proper query job configuration that can be Jinja templated. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_queries.py :language: python :dedent: 8 :start-after: [START howto_operator_bigquery_insert_job] @@ -300,7 +307,7 @@ For more information on types of BigQuery job please check If you want to include some files in your configuration you can use ``include`` clause of Jinja template language as follow: -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_queries.py :language: python :dedent: 8 :start-after: [START howto_operator_bigquery_select_job] @@ -329,7 +336,7 @@ This operator expects a sql query that will return a single row. Each value on that first row is evaluated using python ``bool`` casting. If any of the values return ``False`` the check is failed and errors out. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_queries.py :language: python :dedent: 8 :start-after: [START howto_operator_bigquery_check] @@ -347,7 +354,7 @@ This operator expects a sql query that will return a single row. Each value on that first row is evaluated against ``pass_value`` which can be either a string or numeric value. If numeric, you can also specify ``tolerance``. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_queries.py :language: python :dedent: 8 :start-after: [START howto_operator_bigquery_value_check] @@ -362,7 +369,7 @@ To check that the values of metrics given as SQL expressions are within a certai tolerance of the ones from ``days_back`` before you can use :class:`~airflow.providers.google.cloud.operators.bigquery.BigQueryIntervalCheckOperator`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_queries.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_queries.py :language: python :dedent: 8 :start-after: [START howto_operator_bigquery_interval_check] @@ -380,7 +387,7 @@ use the ``{{ ds_nodash }}`` macro as the table name suffix. :class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTableExistenceSensor`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_sensors.py :language: python :dedent: 4 :start-after: [START howto_sensor_bigquery_table] @@ -392,7 +399,7 @@ Check that a Table Partition exists To check that a table exists and has a partition you can use. :class:`~airflow.providers.google.cloud.sensors.bigquery.BigQueryTablePartitionExistenceSensor`. -.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py +.. exampleinclude:: /../../tests/system/providers/google/bigquery/example_bigquery_sensors.py :language: python :dedent: 4 :start-after: [START howto_sensor_bigquery_table_partition] diff --git a/docs/apache-airflow-providers-google/operators/cloud/data_loss_prevention.rst b/docs/apache-airflow-providers-google/operators/cloud/data_loss_prevention.rst index d0092d1984664..520b45b7987a0 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/data_loss_prevention.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/data_loss_prevention.rst @@ -297,7 +297,7 @@ Configuration information defines how you want the sensitive data de-identified. This config can either be saved and persisted in de-identification templates or defined in a :class:`~google.cloud.dlp_v2.types.DeidentifyConfig` object: -.. literalinclude:: /../../airflow/providers/google/cloud/example_dags/example_dlp.py +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dlp.py :language: python :start-after: [START dlp_deidentify_config_example] :end-before: [END dlp_deidentify_config_example] diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index d62020319704a..d4606548d3dd4 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -247,18 +247,26 @@ each parameter by following the links): Example of watcher pattern with trigger rules --------------------------------------------- -The watcher pattern is how we call a DAG with a task that is "watching" the states of the other tasks. It's primary purpose is to fail a DAG Run when any other task fail. +The watcher pattern is how we call a DAG with a task that is "watching" the states of the other tasks. +It's primary purpose is to fail a DAG Run when any other task fail. The need came from the Airflow system tests that are DAGs with different tasks (similarly like a test containing steps). -Normally, when any task fails, all other tasks are not executed and the whole DAG Run gets failed status too. But when we use trigger rules, we can disrupt the normal flow of running tasks and the whole DAG may represent different status that we expect. -For example, we can have a teardown task (with trigger rule set to ``"all_done"``) that will be executed regardless of the state of the other tasks (e.g. to clean up the resources). In such situation, the DAG would always run this task and the DAG Run will get the status of this particular task, so we can potentially lose the information about failing tasks. -If we want to ensure that the DAG with teardown task would fail if any task fails, we need to use the watcher pattern. -The watcher task is a task that will always fail if triggered, but it needs to be triggered only if any other task fails. It needs to have a trigger rule set to ``"one_failed"`` and it needs also to be a downstream task for all other tasks in the DAG. -Thanks to this, if every other task will pass, the watcher will be skipped, but when something fails, the watcher task will be executed and fail making the DAG Run fail too. +Normally, when any task fails, all other tasks are not executed and the whole DAG Run gets failed status too. But +when we use trigger rules, we can disrupt the normal flow of running tasks and the whole DAG may represent different +status that we expect. For example, we can have a teardown task (with trigger rule set to ``TriggerRule.ALL_DONE``) +that will be executed regardless of the state of the other tasks (e.g. to clean up the resources). In such +situation, the DAG would always run this task and the DAG Run will get the status of this particular task, so we can +potentially lose the information about failing tasks. If we want to ensure that the DAG with teardown task would fail +if any task fails, we need to use the watcher pattern. The watcher task is a task that will always fail if +triggered, but it needs to be triggered only if any other task fails. It needs to have a trigger rule set to +``TriggerRule.ONE_FAILED`` and it needs also to be a downstream task for all other tasks in the DAG. Thanks to +this, if every other task will pass, the watcher will be skipped, but when something fails, the watcher task will be +executed and fail making the DAG Run fail too. .. note:: - Be aware that trigger rules only rely on the direct upstream (parent) tasks, e.g. ``one_failed`` will ignore any failed (or ``upstream_failed``) tasks that are not a direct parent of the parameterized task. + Be aware that trigger rules only rely on the direct upstream (parent) tasks, e.g. ``TriggerRule.ONE_FAILED`` + will ignore any failed (or ``upstream_failed``) tasks that are not a direct parent of the parameterized task. It's easier to grab the concept with an example. Let's say that we have the following DAG: @@ -270,9 +278,10 @@ It's easier to grab the concept with an example. Let's say that we have the foll from airflow.exceptions import AirflowException from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator + from airflow.utils.trigger_rule import TriggerRule - @task(trigger_rule="one_failed", retries=0) + @task(trigger_rule=TriggerRule.ONE_FAILED, retries=0) def watcher(): raise AirflowException("Failing task because one or more upstream tasks failed.") @@ -290,7 +299,9 @@ It's easier to grab the concept with an example. Let's say that we have the foll task_id="passing_task", bash_command="echo passing_task" ) teardown = BashOperator( - task_id="teardown", bash_command="echo teardown", trigger_rule="all_done" + task_id="teardown", + bash_command="echo teardown", + trigger_rule=TriggerRule.ALL_DONE, ) failing_task >> passing_task >> teardown diff --git a/docs/exts/docs_build/lint_checks.py b/docs/exts/docs_build/lint_checks.py index a1af22ef5fe8e..d80419d7ebd62 100644 --- a/docs/exts/docs_build/lint_checks.py +++ b/docs/exts/docs_build/lint_checks.py @@ -230,13 +230,13 @@ def find_modules(deprecated_only: bool = False) -> Set[str]: def check_exampleinclude_for_example_dags() -> List[DocBuildError]: - """Checks all exampleincludes for example dags.""" - all_docs_files = glob(f"${DOCS_DIR}/**/*.rst", recursive=True) + """Checks all exampleincludes for example dags.""" + all_docs_files = glob(f"{DOCS_DIR}/**/*.rst", recursive=True) build_errors = [] for doc_file in all_docs_files: build_error = assert_file_not_contains( file_path=doc_file, - pattern=r"literalinclude::.+example_dags", + pattern=r"literalinclude::.+(?:example_dags|tests/system/)", message=( "literalinclude directive is prohibited for example DAGs. \n" "You should use the exampleinclude directive to include example DAGs." @@ -265,12 +265,18 @@ def check_enforce_code_block() -> List[DocBuildError]: return build_errors +def find_example_dags(provider_dir): + system_tests_dir = provider_dir.replace(f"{ROOT_PACKAGE_DIR}/", "") + yield from glob(f"{provider_dir}/**/*example_dags", recursive=True) + yield from glob(f"{ROOT_PROJECT_DIR}/tests/system/{system_tests_dir}/*/", recursive=True) + + def check_example_dags_in_provider_tocs() -> List[DocBuildError]: """Checks that each documentation for provider packages has a link to example DAGs in the TOC.""" build_errors = [] for provider in ALL_PROVIDER_YAMLS: - example_dags_dirs = list(glob(f"{provider['package-dir']}/**/example_dags", recursive=True)) + example_dags_dirs = list(find_example_dags(provider['package-dir'])) if not example_dags_dirs: continue doc_file_path = f"{DOCS_DIR}/{provider['package-name']}/index.rst" diff --git a/pytest.ini b/pytest.ini index 26e748291ed39..53004a28436f1 100644 --- a/pytest.ini +++ b/pytest.ini @@ -35,3 +35,4 @@ filterwarnings = markers = need_serialized_dag asyncio_mode = strict +python_files = test_*.py example_*.py diff --git a/scripts/ci/pre_commit/pre_commit_check_system_tests.py b/scripts/ci/pre_commit/pre_commit_check_system_tests.py new file mode 100755 index 0000000000000..8fc162a421105 --- /dev/null +++ b/scripts/ci/pre_commit/pre_commit_check_system_tests.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import re +import sys +from pathlib import Path +from typing import List + +from rich.console import Console + +if __name__ not in ("__main__", "__mp_main__"): + raise SystemExit( + "This file is intended to be executed as an executable program. You cannot use it as a module." + f"To execute this script, run ./{__file__} [FILE] ..." + ) + + +console = Console(color_system="standard", width=200) + +errors: List[str] = [] + +WATCHER_APPEND_INSTRUCTION = "list(dag.tasks) >> watcher()" + +PYTEST_FUNCTION = """ +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) +""" +PYTEST_FUNCTION_PATTERN = re.compile( + r"from tests\.system\.utils import get_test_run(?: # noqa: E402)?\s+" + r"(?:# .+\))?\s+" + r"test_run = get_test_run\(dag\)" +) + + +def _check_file(file: Path): + content = file.read_text() + if "from tests.system.utils.watcher import watcher" in content: + index = content.find(WATCHER_APPEND_INSTRUCTION) + if index == -1: + errors.append( + f"[red]The example {file} imports tests.system.utils.watcher " + f"but does not use it properly![/]\n\n" + "[yellow]Make sure you have:[/]\n\n" + f" {WATCHER_APPEND_INSTRUCTION}\n\n" + "[yellow]as the last instruction in your example DAG.[/]\n" + ) + else: + operator_leftshift_index = content.find("<<", index + len(WATCHER_APPEND_INSTRUCTION)) + operator_rightshift_index = content.find(">>", index + len(WATCHER_APPEND_INSTRUCTION)) + if operator_leftshift_index != -1 or operator_rightshift_index != -1: + errors.append( + f"[red]In the example {file} " + f"watcher is not the last instruction in your DAG " + f"(there are << or >> operators after it)![/]\n\n" + "[yellow]Make sure you have:[/]\n" + f" {WATCHER_APPEND_INSTRUCTION}\n\n" + "[yellow]as the last instruction in your example DAG.[/]\n" + ) + if not PYTEST_FUNCTION_PATTERN.search(content): + errors.append( + f"[yellow]The example {file} missed the pytest function at the end.[/]\n\n" + "All example tests should have this function added:\n\n" + PYTEST_FUNCTION + "\n\n" + "[yellow]Automatically adding it now![/]\n" + ) + file.write_text(content + "\n" + PYTEST_FUNCTION) + + +if __name__ == '__main__': + for file in sys.argv[1:]: + _check_file(Path(file)) + if errors: + console.print("[red]There were some errors in the example files[/]\n") + for error in errors: + console.print(error) + sys.exit(1) diff --git a/tests/always/test_example_dags.py b/tests/always/test_example_dags.py index a21056dcfc087..058e342e30aea 100644 --- a/tests/always/test_example_dags.py +++ b/tests/always/test_example_dags.py @@ -16,9 +16,10 @@ # under the License. import os -import unittest from glob import glob +import pytest + from airflow.models import DagBag from tests.test_utils.asserts import assert_queries_count @@ -29,33 +30,38 @@ NO_DB_QUERY_EXCEPTION = ["/airflow/example_dags/example_subdag_operator.py"] -class TestExampleDags(unittest.TestCase): - def test_should_be_importable(self): - example_dags = list(glob(f"{ROOT_FOLDER}/airflow/**/example_dags/example_*.py", recursive=True)) - assert 0 != len(example_dags) - for filepath in example_dags: - relative_filepath = os.path.relpath(filepath, ROOT_FOLDER) - with self.subTest(f"File {relative_filepath} should contain dags"): - dagbag = DagBag( - dag_folder=filepath, - include_examples=False, - ) - assert 0 == len(dagbag.import_errors), f"import_errors={str(dagbag.import_errors)}" - assert len(dagbag.dag_ids) >= 1 - - def test_should_not_do_database_queries(self): - example_dags = glob(f"{ROOT_FOLDER}/airflow/**/example_dags/example_*.py", recursive=True) - example_dags = [ - dag_file - for dag_file in example_dags - if any(not dag_file.endswith(e) for e in NO_DB_QUERY_EXCEPTION) - ] - assert 0 != len(example_dags) - for filepath in example_dags: - relative_filepath = os.path.relpath(filepath, ROOT_FOLDER) - with self.subTest(f"File {relative_filepath} shouldn't do database queries"): - with assert_queries_count(0): - DagBag( - dag_folder=filepath, - include_examples=False, - ) +def example_dags(): + example_dirs = ["airflow/**/example_dags/example_*.py", "tests/system/providers/**/example_*.py"] + for example_dir in example_dirs: + yield from glob(f"{ROOT_FOLDER}/{example_dir}", recursive=True) + + +def example_dags_except_db_exception(): + return [ + dag_file + for dag_file in example_dags() + if any(not dag_file.endswith(e) for e in NO_DB_QUERY_EXCEPTION) + ] + + +def relative_path(path): + return os.path.relpath(path, ROOT_FOLDER) + + +@pytest.mark.parametrize("example", example_dags(), ids=relative_path) +def test_should_be_importable(example): + dagbag = DagBag( + dag_folder=example, + include_examples=False, + ) + assert len(dagbag.import_errors) == 0, f"import_errors={str(dagbag.import_errors)}" + assert len(dagbag.dag_ids) >= 1 + + +@pytest.mark.parametrize("example", example_dags_except_db_exception(), ids=relative_path) +def test_should_not_do_database_queries(example): + with assert_queries_count(0): + DagBag( + dag_folder=example, + include_examples=False, + ) diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 9c019f418744f..6cf994ebe8bbe 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -153,12 +153,12 @@ def get_classes_from_file(filepath: str): class TestGoogleProviderProjectStructure(unittest.TestCase): MISSING_EXAMPLE_DAGS = { - ('cloud', 'adls_to_gcs'), - ('cloud', 'sql_to_gcs'), - ('cloud', 'bigquery_to_mysql'), - ('cloud', 'cassandra_to_gcs'), - ('suite', 'drive'), - ('ads', 'ads_to_gcs'), + 'adls_to_gcs', + 'sql_to_gcs', + 'bigquery_to_mysql', + 'cassandra_to_gcs', + 'drive', + 'ads_to_gcs', } # Those operators are deprecated and we do not need examples for them @@ -192,6 +192,7 @@ class TestGoogleProviderProjectStructure(unittest.TestCase): # Please at the examples to those operators at the earliest convenience :) MISSING_EXAMPLES_FOR_OPERATORS = { + 'airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator', 'airflow.providers.google.cloud.operators.mlengine.MLEngineTrainingCancelJobOperator', 'airflow.providers.google.cloud.operators.dlp.CloudDLPGetStoredInfoTypeOperator', 'airflow.providers.google.cloud.operators.dlp.CloudDLPReidentifyContentOperator', @@ -217,87 +218,36 @@ class TestGoogleProviderProjectStructure(unittest.TestCase): 'airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor', } - def test_example_dags(self): - operators_modules = itertools.chain( - *(self.find_resource_files(resource_type=d) for d in ["operators", "sensors", "transfers"]) - ) - example_dags_files = self.find_resource_files(resource_type="example_dags") - # Generate tuple of department and service e.g. ('marketing_platform', 'display_video') - operator_sets = [(f.split("/")[-3], f.split("/")[-1].rsplit(".")[0]) for f in operators_modules] - example_sets = [ - (f.split("/")[-3], f.split("/")[-1].rsplit(".")[0].replace("example_", "", 1)) - for f in example_dags_files - ] - - def has_example_dag(operator_set): - for e in example_sets: - if e[0] != operator_set[0]: - continue - if e[1].startswith(operator_set[1]): - return True - - return False - - with self.subTest("Detect missing example dags"): - missing_example = {s for s in operator_sets if not has_example_dag(s)} - missing_example -= self.MISSING_EXAMPLE_DAGS - assert set() == missing_example - - with self.subTest("Keep update missing example dags list"): - new_example_dag = set(example_sets).intersection(set(self.MISSING_EXAMPLE_DAGS)) - if new_example_dag: - new_example_dag_text = '\n'.join(str(f) for f in new_example_dag) - self.fail( - "You've added a example dag currently listed as missing:\n" - f"{new_example_dag_text}" - "\n" - "Thank you very much.\n" - "Can you remove it from the list of missing example, please?" - ) - - with self.subTest("Remove extra elements"): - extra_example_dags = set(self.MISSING_EXAMPLE_DAGS) - set(operator_sets) - if extra_example_dags: - new_example_dag_text = '\n'.join(str(f) for f in extra_example_dags) - self.fail( - "You've added a example dag currently listed as missing:\n" - f"{new_example_dag_text}" - "\n" - "Thank you very much.\n" - "Can you remove it from the list of missing example, please?" - ) - def test_missing_example_for_operator(self): - missing_operators = [] - + """ + Assert that all operators defined under operators, sensors and transfers directories + are used in any of the example dags + """ + all_operators = set() + services = set() for resource_type in ["operators", "sensors", "transfers"]: operator_files = set( self.find_resource_files(top_level_directory="airflow", resource_type=resource_type) ) for filepath in operator_files: service_name = os.path.basename(filepath)[: -(len(".py"))] - example_dags = list( - glob.glob( - f"{ROOT_FOLDER}/airflow/providers/google/*/example_dags/example_{service_name}*.py" - ) - ) - if not example_dags: - # Ignore. We have separate tests that detect this. + if service_name in self.MISSING_EXAMPLE_DAGS: continue - example_paths = { - path for example_dag in example_dags for path in get_imports_from_file(example_dag) - } - example_paths = { - path for path in example_paths if f'.{resource_type}.{service_name}.' in path - } - print("example_paths=", example_paths) + services.add(service_name) operators_paths = set(get_classes_from_file(f"{ROOT_FOLDER}/{filepath}")) - missing_operators.extend(operators_paths - example_paths) - full_set = set() - full_set.update(self.MISSING_EXAMPLES_FOR_OPERATORS) - full_set.update(self.DEPRECATED_OPERATORS) - full_set.update(self.BASE_OPERATORS) - assert set(missing_operators) == full_set + all_operators.update(operators_paths) + + for service in services: + example_dags = self.examples_for_service(service) + example_paths = { + path for example_dag in example_dags for path in get_imports_from_file(example_dag) + } + all_operators -= example_paths + + all_operators -= self.MISSING_EXAMPLES_FOR_OPERATORS + all_operators -= self.DEPRECATED_OPERATORS + all_operators -= self.BASE_OPERATORS + assert set() == all_operators @parameterized.expand( itertools.product(["_system.py", "_system_helper.py"], ["operators", "sensors", "transfers"]) @@ -330,6 +280,13 @@ def find_resource_files( resource_files = (f for f in resource_files if not f.endswith("__init__.py")) return resource_files + @staticmethod + def examples_for_service(service_name): + yield from glob.glob( + f"{ROOT_FOLDER}/airflow/providers/google/*/example_dags/example_{service_name}*.py" + ) + yield from glob.glob(f"{ROOT_FOLDER}/tests/system/providers/google/{service_name}/example_*.py") + class TestOperatorsHooks(unittest.TestCase): def test_no_illegal_suffixes(self): diff --git a/tests/providers/google/cloud/operators/test_bigquery_system.py b/tests/providers/google/cloud/operators/test_bigquery_system.py deleted file mode 100644 index adb0cc71dfbf3..0000000000000 --- a/tests/providers/google/cloud/operators/test_bigquery_system.py +++ /dev/null @@ -1,63 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""System tests for Google Cloud Build operators""" -import pytest - -from airflow.providers.google.cloud.example_dags.example_bigquery_transfer import DATA_EXPORT_BUCKET_NAME -from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY -from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context - - -@pytest.mark.backend("mysql", "postgres") -@pytest.mark.system("google.cloud") -@pytest.mark.credential_file(GCP_BIGQUERY_KEY) -class BigQueryExampleDagsSystemTest(GoogleSystemTest): - """ - System tests for Google BigQuery operators - It use a real service. - """ - - @provide_gcp_context(GCP_BIGQUERY_KEY) - def setUp(self): - super().setUp() - self.create_gcs_bucket(DATA_EXPORT_BUCKET_NAME) - - @provide_gcp_context(GCP_BIGQUERY_KEY) - def test_run_example_dag_operations(self): - self.run_dag('example_bigquery_operations', CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_BIGQUERY_KEY) - def test_run_example_dag_operations_location(self): - self.run_dag('example_bigquery_operations_location', CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_BIGQUERY_KEY) - def test_run_example_dag_queries(self): - self.run_dag('example_bigquery_queries', CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_BIGQUERY_KEY) - def test_run_example_dag_sensors(self): - self.run_dag('example_bigquery_sensors', CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_BIGQUERY_KEY) - def test_run_example_dag_queries_location(self): - self.run_dag('example_bigquery_queries_location', CLOUD_DAG_FOLDER) - - @provide_gcp_context(GCP_BIGQUERY_KEY) - def tearDown(self): - self.delete_gcs_bucket(DATA_EXPORT_BUCKET_NAME) - super().tearDown() diff --git a/tests/system/README.md b/tests/system/README.md new file mode 100644 index 0000000000000..912bfc248ce0d --- /dev/null +++ b/tests/system/README.md @@ -0,0 +1,104 @@ + + +# Airflow System Tests + +- [How to run system tests](#how_to_run) + - [Running via Airflow](#run_via_airflow) + - [Running via Pytest](#run_via_pytest) + - [Running via Airflow CLI](#run_via_airflow_cli) +- [How to write system tests](#how_to_write) + +System tests verify the correctness of Airflow Operators by running them in DAGs and allowing to communicate with +external services. A system test tries to look as close to a regular DAG as possible, and it generally checks the +"happy path" (a scenario featuring no errors) ensuring that the Operator works as expected. + +The purpose of these tests is to: + +- assure high quality of providers and their integration with Airflow core, +- avoid regression in providers when doing changes to the Airflow, +- autogenerate documentation for Operators from code, +- provide runnable example DAGs with use cases for different Operators, +- serve both as examples and test files. + +> This is the new design of system tests which temporarily exists along with the old one documented at +> [TESTING.rst](../../TESTING.rst) and soon will completely replace it. The new design is based on the +> [AIP-47](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-47+New+design+of+Airflow+System+Tests). +> Please use it and write any new system tests according to this documentation. + +## How to run system tests + +There are multiple ways of running system tests. Each system test is a self-contained DAG, so it can be run as any +other DAG. Some tests may require access to external services, enabled APIs or specific permissions. Make sure to +prepare your environment correctly, depending on the system tests you want to run - some may require additional +configuration which should be documented by the relevant providers in their subdirectory +`tests/system/providers//README.md`. + +### Running via Airflow + +If you have a working Airflow environment with a scheduler and a webserver, you can import system test files into +your Airflow instance and they will be automatically triggered. If the setup of the environment is correct +(depending on the type of tests you want to run), they should be executed without any issues. The instructions on +how to set up the environment is documented in each provider's system tests directory. Make sure that all resource +required by the tests are also imported. + +### Running via Pytest + +Running system tests with pytest is the easiest with Breeze. Thanks to it, you don't need to bother about setting up +the correct environment, that is able to execute the tests. +You can either run them using your IDE (if you have installed plugin/widget supporting pytest) or using the following +example of command: + +```commandline +# pytest --system [provider_name] [path_to_test(s)] +pytest --system google tests/system/providers/google/bigquery/example_bigquery_queries.py +``` + +You can specify several `--system` flags if you want to execute tests for several providers: + +```commandline +pytest --system google --system aws tests/system +``` + +### Running via Airflow CLI + +It is possible to run system tests using Airflow CLI. To execute a specific system test, you need to provide +`dag_id` of the test to be run, `execution_date` (preferably the one from the past) and a `-S/--subdir` option +followed by the path where the tests are stored (the command by default looks into `$AIRFLOW_HOME/dags`): + +```commandline +# airflow dags test -S [path_to_tests] [dag_id] [execution date] +airflow dags test -S tests/system bigquery_dataset 2022-01-01 +``` + +> Some additional setup may be required to use Airflow CLI. Please refer +> [here](https://airflow.apache.org/docs/apache-airflow/stable/usage-cli.html) for a documentation. + + +## How to write system tests + +If you are going to implement new system tests, it is recommended to familiarize with the content of the +[AIP-47](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-47+New+design+of+Airflow+System+Tests). There are +many changes in comparison to the old design documented at [TESTING.rst](../../TESTING.rst), so you need to be +aware of them and be compliant with the new design. + +To make it easier to migrate old system tests or write new ones, we +documented the whole **process of migration in details** (which can be found +[here](https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-47+New+design+of+Airflow+System+Tests#AIP47NewdesignofAirflowSystemTests-Processofmigrationindetails)) +and also prepared an example of a test (located just below the migration details). diff --git a/tests/system/conftest.py b/tests/system/conftest.py new file mode 100644 index 0000000000000..d0e82187e1c50 --- /dev/null +++ b/tests/system/conftest.py @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import os +import re +from itertools import chain +from pathlib import Path +from unittest import mock + +import pytest + +REQUIRED_ENV_VARS = ("SYSTEM_TESTS_ENV_ID",) + + +@pytest.fixture(scope="package", autouse=True) +def use_debug_executor(): + with mock.patch.dict("os.environ", AIRFLOW__CORE__EXECUTOR="DebugExecutor"): + yield + + +@pytest.fixture() +def provider_env_vars(): + """Override this fixture in provider's conftest.py""" + return () + + +@pytest.fixture(autouse=True) +def skip_if_env_var_not_set(provider_env_vars): + for env in chain(REQUIRED_ENV_VARS, provider_env_vars): + if env not in os.environ: + pytest.skip(f"Missing required environment variable {env}") + return + + +def pytest_collection_modifyitems(config, items): + """Add @pytest.mark.system(provider_name) for every system test.""" + rootdir = Path(config.rootdir) + for item in items: + rel_path = Path(item.fspath).relative_to(rootdir) + match = re.match(".+/providers/([^/]+)", str(rel_path)) + if not match: + continue + provider = match.group(1) + item.add_marker(pytest.mark.system(provider)) diff --git a/tests/system/providers/google/README.md b/tests/system/providers/google/README.md new file mode 100644 index 0000000000000..35d07c8e1e72d --- /dev/null +++ b/tests/system/providers/google/README.md @@ -0,0 +1,99 @@ + + +# Google provider system tests + +## Tests structure + +All Google-related system tests are located inside this subdirectory of system tests which is +`tests/system/providers/google/`. They are grouped in directories by the related service name, e.g. all BigQuery +tests are stored inside `tests/system/providers/google/bigquery/` directory. In each directory you will find test files +as self-contained DAGs (one DAG per file). Each test may require some additional resources which should be placed in +`resources` directory found on the same level as tests. Each test file should start with prefix `example_*`. If there +is anything more needed for the test to be executed, it should be documented in the docstrings. + +Example files structure: + +``` +tests/system/providers/google +├── bigquery +│ ├── resources +│ │ ├── example_bigquery_query.sql +│ │ └── us-states.csv +│ ├── example_bigquery_queries.py +│ ├── example_bigquery_operations.py +. . +│ └── example_bigquery_*.py +├── dataflow +├── gcs +. +└── * +``` + +## Initial configuration + +Each test requires some environment variables. Check how to set them up on your operating system, but on UNIX-based +OSes this should work: + +```commandline +export NAME_OF_ENV_VAR=value +``` + +To confirm that it is set up correctly, run `echo $NAME_OF_ENV_VAR` which will display its value. + +### Required environment variables + +- `SYSTEM_TESTS_GCP_PROJECT` - GCP project name that will be used to run system tests (this can be checked on the UI + dashboard of the GCP or by running `gcloud config list`). + +- `SYSTEM_TESTS_ENV_ID` - environment ID that is unique across different executions of system tests (if they + are run in parallel). The need for this variable comes from the possibility, that the tests may be run on various + versions of Airflow at the same time using CI environment. If this is the case, the value of this variable ensures + that all resources that are created during the tests, will not interfere with other resources in the same project, + that can be created at the same time. + + If you run your tests in parallel, make sure to set this variable with randomized generated value, preferably no + longer than 8 characters (which should be fine to avoid collisions). Otherwise, please put whatever value you want, + but use only lowercase letters A-Z and digits 0-9. + + The value of this environment variable is commonly attached to other variables like bucket names so it needs to + follow the same guidelines as bucket names, which can be found + [here](https://cloud.google.com/storage/docs/naming-buckets#requirements). + +### GCP Project setup + +System tests are executed in a working environment, in this case a GCP project that is deposited with credits. Such +project is prepared and dedicated for the exclusive use of the Airflow Community, especially for the CI integrated +system tests. If you want to run the tests manually, you need to have your own GCP project and configure it properly. + +Tests need to know which GCP project to use to execute them. If your environment is not already configured to use +particular GCP project (like Airflow running in GCP Composer) you need to configure it using +[gcloud CLI](https://cloud.google.com/sdk/gcloud). Use your project ID to execute this command: + +```commandline +gcloud config set project +``` + +Keep in mind that some additional commands may be required. + + +## Settings for specific tests + +Some tests may require extra setup. If this is the case, the steps should be documented inside the docstring of +related test file. diff --git a/tests/system/providers/google/bigquery/example_bigquery_dataset.py b/tests/system/providers/google/bigquery/example_bigquery_dataset.py new file mode 100644 index 0000000000000..f5e6d7f044020 --- /dev/null +++ b/tests/system/providers/google/bigquery/example_bigquery_dataset.py @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for Google BigQuery service testing dataset operations. +""" +import os +from datetime import datetime + +from airflow import models +from airflow.operators.bash import BashOperator +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateEmptyDatasetOperator, + BigQueryDeleteDatasetOperator, + BigQueryGetDatasetOperator, + BigQueryUpdateDatasetOperator, +) +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "bigquery_dataset" + +DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" + + +with models.DAG( + DAG_ID, + schedule_interval="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "bigquery"], +) as dag: + # [START howto_operator_bigquery_create_dataset] + create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME) + # [END howto_operator_bigquery_create_dataset] + + # [START howto_operator_bigquery_update_dataset] + update_dataset = BigQueryUpdateDatasetOperator( + task_id="update_dataset", + dataset_id=DATASET_NAME, + dataset_resource={"description": "Updated dataset"}, + ) + # [END howto_operator_bigquery_update_dataset] + + # [START howto_operator_bigquery_get_dataset] + get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME) + # [END howto_operator_bigquery_get_dataset] + + get_dataset_result = BashOperator( + task_id="get_dataset_result", + bash_command="echo \"{{ task_instance.xcom_pull('get-dataset')['id'] }}\"", + ) + + # [START howto_operator_bigquery_delete_dataset] + delete_dataset = BigQueryDeleteDatasetOperator( + task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True + ) + # [END howto_operator_bigquery_delete_dataset] + delete_dataset.trigger_rule = TriggerRule.ALL_DONE + + ( + # TEST BODY + create_dataset + >> update_dataset + >> get_dataset + >> get_dataset_result + # TEST TEARDOWN + >> delete_dataset + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/bigquery/example_bigquery_operations.py b/tests/system/providers/google/bigquery/example_bigquery_operations.py new file mode 100644 index 0000000000000..c2320f96e6278 --- /dev/null +++ b/tests/system/providers/google/bigquery/example_bigquery_operations.py @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for Google BigQuery service local file upload and external table creation. +""" +import os +from datetime import datetime +from pathlib import Path + +from airflow import models +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateEmptyDatasetOperator, + BigQueryCreateExternalTableOperator, + BigQueryDeleteDatasetOperator, +) +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "bigquery_operations" + +DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" +DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +DATA_SAMPLE_GCS_OBJECT_NAME = "bigquery/us-states/us-states.csv" +CSV_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / "us-states.csv") + + +with models.DAG( + DAG_ID, + schedule_interval="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "bigquery"], +) as dag: + create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME) + + create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME) + + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file_to_bucket", + src=CSV_FILE_LOCAL_PATH, + dst=DATA_SAMPLE_GCS_OBJECT_NAME, + bucket=DATA_SAMPLE_GCS_BUCKET_NAME, + ) + + # [START howto_operator_bigquery_create_external_table] + create_external_table = BigQueryCreateExternalTableOperator( + task_id="create_external_table", + destination_project_dataset_table=f"{DATASET_NAME}.external_table", + bucket=DATA_SAMPLE_GCS_BUCKET_NAME, + source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME], + schema_fields=[ + {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, + ], + ) + # [END howto_operator_bigquery_create_external_table] + + delete_dataset = BigQueryDeleteDatasetOperator( + task_id="delete_dataset", + dataset_id=DATASET_NAME, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=DATA_SAMPLE_GCS_BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) + + ( + # TEST SETUP + [create_bucket, create_dataset] + # TEST BODY + >> upload_file + >> create_external_table + # TEST TEARDOWN + >> delete_dataset + >> delete_bucket + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/bigquery/example_bigquery_operations_location.py b/tests/system/providers/google/bigquery/example_bigquery_operations_location.py new file mode 100644 index 0000000000000..391a336bfdfbd --- /dev/null +++ b/tests/system/providers/google/bigquery/example_bigquery_operations_location.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for Google BigQuery service testing data structures with location. +""" +import os +from datetime import datetime + +from airflow import models +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateEmptyDatasetOperator, + BigQueryCreateEmptyTableOperator, + BigQueryDeleteDatasetOperator, +) +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "bigquery_operations_location" + +BQ_LOCATION = "europe-north1" +DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" + + +with models.DAG( + DAG_ID, + schedule_interval="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "bigquery"], +) as dag: + create_dataset_with_location = BigQueryCreateEmptyDatasetOperator( + task_id="create_dataset_with_location", + dataset_id=DATASET_NAME, + location=BQ_LOCATION, + ) + + create_table_with_location = BigQueryCreateEmptyTableOperator( + task_id="create_table_with_location", + dataset_id=DATASET_NAME, + table_id="test_table", + schema_fields=[ + {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, + ], + ) + + delete_dataset_with_location = BigQueryDeleteDatasetOperator( + task_id="delete_dataset_with_location", + dataset_id=DATASET_NAME, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( + # TEST BODY + create_dataset_with_location + >> create_table_with_location + # TEST TEARDOWN + >> delete_dataset_with_location + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/google/cloud/example_dags/example_bigquery_queries.py b/tests/system/providers/google/bigquery/example_bigquery_queries.py similarity index 79% rename from airflow/providers/google/cloud/example_dags/example_bigquery_queries.py rename to tests/system/providers/google/bigquery/example_bigquery_queries.py index f4e435518dc2a..be34df79ef79e 100644 --- a/airflow/providers/google/cloud/example_dags/example_bigquery_queries.py +++ b/tests/system/providers/google/bigquery/example_bigquery_queries.py @@ -21,6 +21,7 @@ """ import os from datetime import datetime +from pathlib import Path from airflow import models from airflow.operators.bash import BashOperator @@ -34,10 +35,12 @@ BigQueryIntervalCheckOperator, BigQueryValueCheckOperator, ) +from airflow.utils.trigger_rule import TriggerRule -PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") -DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset") -LOCATION = "southamerica-east1" +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") +LOCATION = "us-east1" +QUERY_SQL_PATH = str(Path(__file__).parent / "resources" / "example_bigquery_query.sql") TABLE_1 = "table1" TABLE_2 = "table2" @@ -48,29 +51,31 @@ {"name": "ds", "type": "DATE", "mode": "NULLABLE"}, ] +DAGS_LIST = [] locations = [None, LOCATION] for index, location in enumerate(locations, 1): - dag_id = "example_bigquery_queries_location" if location else "example_bigquery_queries" - DATASET = DATASET_NAME + str(index) + DAG_ID = "bigquery_queries_location" if location else "bigquery_queries" + DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" + DATASET = f"{DATASET_NAME}{index}" INSERT_DATE = datetime.now().strftime("%Y-%m-%d") # [START howto_operator_bigquery_query] INSERT_ROWS_QUERY = ( f"INSERT {DATASET}.{TABLE_1} VALUES " - f"(42, 'monthy python', '{INSERT_DATE}'), " + f"(42, 'monty python', '{INSERT_DATE}'), " f"(42, 'fishy fish', '{INSERT_DATE}');" ) # [END howto_operator_bigquery_query] with models.DAG( - dag_id, - schedule_interval='@once', # Override to match your needs + DAG_ID, + schedule_interval="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example"], - user_defined_macros={"DATASET": DATASET, "TABLE": TABLE_1}, - ) as dag_with_locations: + tags=["example", "bigquery"], + user_defined_macros={"DATASET": DATASET, "TABLE": TABLE_1, "QUERY_SQL_PATH": QUERY_SQL_PATH}, + ) as dag: create_dataset = BigQueryCreateEmptyDatasetOperator( - task_id="create-dataset", + task_id="create_dataset", dataset_id=DATASET, location=location, ) @@ -91,12 +96,6 @@ location=location, ) - create_dataset >> [create_table_1, create_table_2] - - delete_dataset = BigQueryDeleteDatasetOperator( - task_id="delete_dataset", dataset_id=DATASET, delete_contents=True - ) - # [START howto_operator_bigquery_insert_job] insert_query_job = BigQueryInsertJobOperator( task_id="insert_query_job", @@ -115,7 +114,7 @@ task_id="select_query_job", configuration={ "query": { - "query": "{% include 'example_bigquery_query.sql' %}", + "query": "{% include QUERY_SQL_PATH %}", "useLegacySql": False, } }, @@ -134,17 +133,6 @@ location=location, ) - bigquery_execute_multi_query = BigQueryInsertJobOperator( - task_id="execute_multi_query", - configuration={ - "query": { - "query": f"SELECT * FROM {DATASET}.{TABLE_2};SELECT COUNT(*) FROM {DATASET}.{TABLE_2}", - "useLegacySql": False, - } - }, - location=location, - ) - execute_query_save = BigQueryInsertJobOperator( task_id="execute_query_save", configuration={ @@ -161,6 +149,20 @@ location=location, ) + bigquery_execute_multi_query = BigQueryInsertJobOperator( + task_id="execute_multi_query", + configuration={ + "query": { + "query": [ + f"SELECT * FROM {DATASET}.{TABLE_2}", + f"SELECT COUNT(*) FROM {DATASET}.{TABLE_2}", + ], + "useLegacySql": False, + } + }, + location=location, + ) + # [START howto_operator_bigquery_get_data] get_data = BigQueryGetDataOperator( task_id="get_data", @@ -207,11 +209,32 @@ ) # [END howto_operator_bigquery_interval_check] - [create_table_1, create_table_2] >> insert_query_job >> select_query_job + delete_dataset = BigQueryDeleteDatasetOperator( + task_id="delete_dataset", + dataset_id=DATASET, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) - insert_query_job >> execute_insert_query + # TEST SETUP + create_dataset >> [create_table_1, create_table_2] + # TEST BODY + [create_table_1, create_table_2] >> insert_query_job >> [select_query_job, execute_insert_query] execute_insert_query >> get_data >> get_data_result >> delete_dataset execute_insert_query >> execute_query_save >> bigquery_execute_multi_query >> delete_dataset execute_insert_query >> [check_count, check_value, check_interval] >> delete_dataset - globals()[dag_id] = dag_with_locations + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + DAGS_LIST.append(dag) + globals()[DAG_ID] = dag + +for dag in DAGS_LIST: + from tests.system.utils import get_test_run + + # Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) + test_run = get_test_run(dag) diff --git a/airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py b/tests/system/providers/google/bigquery/example_bigquery_sensors.py similarity index 74% rename from airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py rename to tests/system/providers/google/bigquery/example_bigquery_sensors.py index 28a86fc4017c1..c51e7c905d73f 100644 --- a/airflow/providers/google/cloud/example_dags/example_bigquery_sensors.py +++ b/tests/system/providers/google/bigquery/example_bigquery_sensors.py @@ -33,9 +33,13 @@ BigQueryTableExistenceSensor, BigQueryTablePartitionExistenceSensor, ) +from airflow.utils.trigger_rule import TriggerRule -PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") -DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_sensors_dataset") +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "bigquery_sensors" + +DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "") TABLE_NAME = "partitioned_table" INSERT_DATE = datetime.now().strftime("%Y-%m-%d") @@ -49,19 +53,19 @@ {"name": "ds", "type": "DATE", "mode": "NULLABLE"}, ] -dag_id = "example_bigquery_sensors" with models.DAG( - dag_id, - schedule_interval='@once', # Override to match your needs + DAG_ID, + schedule_interval="@once", start_date=datetime(2021, 1, 1), catchup=False, - tags=["example"], + tags=["example", "bigquery"], user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_NAME}, default_args={"project_id": PROJECT_ID}, -) as dag_with_locations: +) as dag: + create_dataset = BigQueryCreateEmptyDatasetOperator( - task_id="create-dataset", dataset_id=DATASET_NAME, project_id=PROJECT_ID + task_id="create_dataset", dataset_id=DATASET_NAME, project_id=PROJECT_ID ) create_table = BigQueryCreateEmptyTableOperator( @@ -101,12 +105,25 @@ # [END howto_sensor_bigquery_table_partition] delete_dataset = BigQueryDeleteDatasetOperator( - task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True + task_id="delete_dataset", + dataset_id=DATASET_NAME, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, ) create_dataset >> create_table - create_table >> check_table_exists - create_table >> execute_insert_query + create_table >> [check_table_exists, execute_insert_query] execute_insert_query >> check_table_partition_exists - check_table_exists >> delete_dataset - check_table_partition_exists >> delete_dataset + [check_table_exists, check_table_partition_exists] >> delete_dataset + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/google/cloud/example_dags/example_bigquery_operations.py b/tests/system/providers/google/bigquery/example_bigquery_tables.py similarity index 58% rename from airflow/providers/google/cloud/example_dags/example_bigquery_operations.py rename to tests/system/providers/google/bigquery/example_bigquery_tables.py index d12a749474fe3..be7e909d55155 100644 --- a/airflow/providers/google/cloud/example_dags/example_bigquery_operations.py +++ b/tests/system/providers/google/bigquery/example_bigquery_tables.py @@ -17,53 +17,60 @@ # under the License. """ -Example Airflow DAG for Google BigQuery service. +Example Airflow DAG for Google BigQuery service testing tables. """ import os import time from datetime import datetime -from urllib.parse import urlparse +from pathlib import Path from airflow import models -from airflow.operators.bash import BashOperator from airflow.providers.google.cloud.operators.bigquery import ( BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, - BigQueryCreateExternalTableOperator, BigQueryDeleteDatasetOperator, BigQueryDeleteTableOperator, - BigQueryGetDatasetOperator, BigQueryGetDatasetTablesOperator, BigQueryUpdateDatasetOperator, BigQueryUpdateTableOperator, BigQueryUpdateTableSchemaOperator, BigQueryUpsertTableOperator, ) +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator +from airflow.utils.trigger_rule import TriggerRule -START_DATE = datetime(2021, 1, 1) +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "bigquery_tables" -PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project") -BQ_LOCATION = "europe-north1" - -DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset_operations") -LOCATION_DATASET_NAME = f"{DATASET_NAME}_location" -DATA_SAMPLE_GCS_URL = os.environ.get( - "GCP_BIGQUERY_DATA_GCS_URL", - "gs://INVALID BUCKET NAME/bigquery/us-states/us-states.csv", -) - -DATA_SAMPLE_GCS_URL_PARTS = urlparse(DATA_SAMPLE_GCS_URL) -DATA_SAMPLE_GCS_BUCKET_NAME = DATA_SAMPLE_GCS_URL_PARTS.netloc -DATA_SAMPLE_GCS_OBJECT_NAME = DATA_SAMPLE_GCS_URL_PARTS.path[1:] +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") +SCHEMA_JSON_LOCAL_SRC = str(Path(__file__).parent / "resources" / "update_table_schema.json") +SCHEMA_JSON_DESTINATION = "update_table_schema.json" +GCS_PATH_TO_SCHEMA_JSON = f"gs://{BUCKET_NAME}/{SCHEMA_JSON_DESTINATION}" with models.DAG( - "example_bigquery_operations", - schedule_interval='@once', # Override to match your needs - start_date=START_DATE, + DAG_ID, + schedule_interval="@once", + start_date=datetime(2021, 1, 1), catchup=False, - tags=["example"], + tags=["example", "bigquery"], ) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID + ) + + upload_schema_json = LocalFilesystemToGCSOperator( + task_id="upload_schema_json", + src=SCHEMA_JSON_LOCAL_SRC, + dst=SCHEMA_JSON_DESTINATION, + bucket=BUCKET_NAME, + ) + + create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME) + # [START howto_operator_bigquery_create_table] create_table = BigQueryCreateEmptyTableOperator( task_id="create_table", @@ -76,25 +83,6 @@ ) # [END howto_operator_bigquery_create_table] - # [START howto_operator_bigquery_update_table_schema] - update_table_schema = BigQueryUpdateTableSchemaOperator( - task_id="update_table_schema", - dataset_id=DATASET_NAME, - table_id="test_table", - schema_fields_updates=[ - {"name": "emp_name", "description": "Name of employee"}, - {"name": "salary", "description": "Monthly salary in USD"}, - ], - ) - # [END howto_operator_bigquery_update_table_schema] - - # [START howto_operator_bigquery_delete_table] - delete_table = BigQueryDeleteTableOperator( - task_id="delete_table", - deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table", - ) - # [END howto_operator_bigquery_delete_table] - # [START howto_operator_bigquery_create_view] create_view = BigQueryCreateEmptyTableOperator( task_id="create_view", @@ -107,56 +95,38 @@ ) # [END howto_operator_bigquery_create_view] - # [START howto_operator_bigquery_delete_view] - delete_view = BigQueryDeleteTableOperator( - task_id="delete_view", deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view" - ) - # [END howto_operator_bigquery_delete_view] - # [START howto_operator_bigquery_create_materialized_view] create_materialized_view = BigQueryCreateEmptyTableOperator( task_id="create_materialized_view", dataset_id=DATASET_NAME, table_id="test_materialized_view", materialized_view={ - "query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`", + "query": f"SELECT SUM(salary) AS sum_salary " f"FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`", "enableRefresh": True, "refreshIntervalMs": 2000000, }, ) # [END howto_operator_bigquery_create_materialized_view] - # [START howto_operator_bigquery_delete_materialized_view] - delete_materialized_view = BigQueryDeleteTableOperator( - task_id="delete_materialized_view", - deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view", + # [START howto_operator_bigquery_delete_view] + delete_view = BigQueryDeleteTableOperator( + task_id="delete_view", + deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view", ) - # [END howto_operator_bigquery_delete_materialized_view] + # [END howto_operator_bigquery_delete_view] - # [START howto_operator_bigquery_create_external_table] - create_external_table = BigQueryCreateExternalTableOperator( - task_id="create_external_table", + # [START howto_operator_bigquery_update_table] + update_table = BigQueryUpdateTableOperator( + task_id="update_table", + dataset_id=DATASET_NAME, + table_id="test_table", + fields=["friendlyName", "description"], table_resource={ - "tableReference": { - "projectId": PROJECT_ID, - "datasetId": DATASET_NAME, - "tableId": "external_table", - }, - "schema": { - "fields": [ - {"name": "name", "type": "STRING"}, - {"name": "post_abbr", "type": "STRING"}, - ] - }, - "externalDataConfiguration": { - "sourceFormat": "CSV", - "compression": "NONE", - "csvOptions": {"skipLeadingRows": 1}, - "sourceUris": [DATA_SAMPLE_GCS_URL], - }, + "friendlyName": "Updated Table", + "description": "Updated Table", }, ) - # [END howto_operator_bigquery_create_external_table] + # [END howto_operator_bigquery_update_table] # [START howto_operator_bigquery_upsert_table] upsert_table = BigQueryUpsertTableOperator( @@ -169,98 +139,95 @@ ) # [END howto_operator_bigquery_upsert_table] - # [START howto_operator_bigquery_create_dataset] - create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME) - # [END howto_operator_bigquery_create_dataset] - - # [START howto_operator_bigquery_get_dataset_tables] - get_dataset_tables = BigQueryGetDatasetTablesOperator( - task_id="get_dataset_tables", dataset_id=DATASET_NAME + # [START howto_operator_bigquery_update_table_schema] + update_table_schema = BigQueryUpdateTableSchemaOperator( + task_id="update_table_schema", + dataset_id=DATASET_NAME, + table_id="test_table", + schema_fields_updates=[ + {"name": "emp_name", "description": "Name of employee"}, + {"name": "salary", "description": "Monthly salary in USD"}, + ], ) - # [END howto_operator_bigquery_get_dataset_tables] + # [END howto_operator_bigquery_update_table_schema] - # [START howto_operator_bigquery_get_dataset] - get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME) - # [END howto_operator_bigquery_get_dataset] + # [START howto_operator_bigquery_create_table_schema_json] + update_table_schema_json = BigQueryCreateEmptyTableOperator( + task_id="update_table_schema_json", + dataset_id=DATASET_NAME, + table_id="test_table", + gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON, + ) + # [END howto_operator_bigquery_create_table_schema_json] - get_dataset_result = BashOperator( - task_id="get_dataset_result", - bash_command="echo \"{{ task_instance.xcom_pull('get-dataset')['id'] }}\"", + # [START howto_operator_bigquery_delete_materialized_view] + delete_materialized_view = BigQueryDeleteTableOperator( + task_id="delete_materialized_view", + deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view", ) + # [END howto_operator_bigquery_delete_materialized_view] - # [START howto_operator_bigquery_update_table] - update_table = BigQueryUpdateTableOperator( - task_id="update_table", - dataset_id=DATASET_NAME, - table_id="test_table", - fields=["friendlyName", "description"], - table_resource={ - "friendlyName": "Updated Table", - "description": "Updated Table", - }, + # [START howto_operator_bigquery_get_dataset_tables] + get_dataset_tables = BigQueryGetDatasetTablesOperator( + task_id="get_dataset_tables", dataset_id=DATASET_NAME ) - # [END howto_operator_bigquery_update_table] + # [END howto_operator_bigquery_get_dataset_tables] - # [START howto_operator_bigquery_update_dataset] update_dataset = BigQueryUpdateDatasetOperator( task_id="update_dataset", dataset_id=DATASET_NAME, dataset_resource={"description": "Updated dataset"}, ) - # [END howto_operator_bigquery_update_dataset] - # [START howto_operator_bigquery_delete_dataset] + # [START howto_operator_bigquery_delete_table] + delete_table = BigQueryDeleteTableOperator( + task_id="delete_table", + deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table", + ) + # [END howto_operator_bigquery_delete_table] + delete_dataset = BigQueryDeleteDatasetOperator( task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True ) - # [END howto_operator_bigquery_delete_dataset] + delete_dataset.trigger_rule = TriggerRule.ALL_DONE - create_dataset >> update_dataset >> get_dataset >> get_dataset_result >> delete_dataset + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) ( - update_dataset + # TEST SETUP + create_bucket + >> create_dataset + >> upload_schema_json + # TEST BODY + >> update_dataset >> create_table >> create_view >> create_materialized_view - >> update_table >> [ get_dataset_tables, delete_view, ] + >> update_table >> upsert_table >> update_table_schema + >> update_table_schema_json >> delete_materialized_view >> delete_table + # TEST TEARDOWN + >> delete_bucket >> delete_dataset ) - update_dataset >> create_external_table >> delete_dataset -with models.DAG( - "example_bigquery_operations_location", - schedule_interval='@once', # Override to match your needs - start_date=START_DATE, - catchup=False, - tags=["example"], -) as dag_with_location: - create_dataset_with_location = BigQueryCreateEmptyDatasetOperator( - task_id="create_dataset_with_location", - dataset_id=LOCATION_DATASET_NAME, - location=BQ_LOCATION, - ) + from tests.system.utils.watcher import watcher - create_table_with_location = BigQueryCreateEmptyTableOperator( - task_id="create_table_with_location", - dataset_id=LOCATION_DATASET_NAME, - table_id="test_table", - schema_fields=[ - {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, - {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, - ], - ) + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() - delete_dataset_with_location = BigQueryDeleteDatasetOperator( - task_id="delete_dataset_with_location", - dataset_id=LOCATION_DATASET_NAME, - delete_contents=True, - ) - create_dataset_with_location >> create_table_with_location >> delete_dataset_with_location + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/bigquery/example_bigquery_to_bigquery.py b/tests/system/providers/google/bigquery/example_bigquery_to_bigquery.py new file mode 100644 index 0000000000000..b0358046c59df --- /dev/null +++ b/tests/system/providers/google/bigquery/example_bigquery_to_bigquery.py @@ -0,0 +1,104 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Airflow System Test DAG that verifies BigQueryToBigQueryOperator. +""" +import os +from datetime import datetime + +from airflow import models +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateEmptyDatasetOperator, + BigQueryCreateEmptyTableOperator, + BigQueryDeleteDatasetOperator, +) +from airflow.providers.google.cloud.transfers.bigquery_to_bigquery import BigQueryToBigQueryOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "bigquery_to_bigquery" + +DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" +ORIGIN = "origin" +TARGET = "target" + + +with models.DAG( + DAG_ID, + schedule_interval="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "bigquery"], +) as dag: + create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME) + + create_origin_table = BigQueryCreateEmptyTableOperator( + task_id="create_origin_table", + dataset_id=DATASET_NAME, + table_id="origin", + schema_fields=[ + {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, + ], + ) + + create_target_table = BigQueryCreateEmptyTableOperator( + task_id="create_target_table", + dataset_id=DATASET_NAME, + table_id="target", + schema_fields=[ + {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, + ], + ) + + copy_selected_data = BigQueryToBigQueryOperator( + task_id="copy_selected_data", + source_project_dataset_tables=f"{DATASET_NAME}.{ORIGIN}", + destination_project_dataset_table=f"{DATASET_NAME}.{TARGET}", + ) + + delete_dataset = BigQueryDeleteDatasetOperator( + task_id="delete_dataset", + dataset_id=DATASET_NAME, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( + # TEST SETUP + create_dataset + >> [create_origin_table, create_target_table] + # TEST BODY + >> copy_selected_data + # TEST TEARDOWN + >> delete_dataset + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/google/bigquery/example_bigquery_to_gcs.py b/tests/system/providers/google/bigquery/example_bigquery_to_gcs.py new file mode 100644 index 0000000000000..e5275d91ba281 --- /dev/null +++ b/tests/system/providers/google/bigquery/example_bigquery_to_gcs.py @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Airflow System Test DAG that verifies BigQueryToGCSOperator. +""" +import os +from datetime import datetime + +from airflow import models +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateEmptyDatasetOperator, + BigQueryCreateEmptyTableOperator, + BigQueryDeleteDatasetOperator, +) +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.providers.google.cloud.transfers.bigquery_to_gcs import BigQueryToGCSOperator +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "bigquery_to_gcs" + +DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") +BUCKET_FILE = "test.csv" +TABLE = "test" + + +with models.DAG( + DAG_ID, + schedule_interval="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "bigquery"], +) as dag: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID + ) + + create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME) + + create_table = BigQueryCreateEmptyTableOperator( + task_id="create_table", + dataset_id=DATASET_NAME, + table_id=TABLE, + schema_fields=[ + {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, + ], + ) + + bigquery_to_gcs = BigQueryToGCSOperator( + task_id="bigquery_to_gcs", + source_project_dataset_table=f"{DATASET_NAME}.{TABLE}", + destination_cloud_storage_uris=[f"gs://{BUCKET_NAME}/{BUCKET_FILE}"], + ) + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) + + delete_dataset = BigQueryDeleteDatasetOperator( + task_id="delete_dataset", + dataset_id=DATASET_NAME, + delete_contents=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + ( + # TEST SETUP + [create_bucket, create_dataset] + >> create_table + # TEST BODY + >> bigquery_to_gcs + # TEST TEARDOWN + >> [delete_bucket, delete_dataset] + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/airflow/providers/google/cloud/example_dags/example_bigquery_query.sql b/tests/system/providers/google/bigquery/resources/example_bigquery_query.sql similarity index 100% rename from airflow/providers/google/cloud/example_dags/example_bigquery_query.sql rename to tests/system/providers/google/bigquery/resources/example_bigquery_query.sql diff --git a/tests/system/providers/google/bigquery/resources/update_table_schema.json b/tests/system/providers/google/bigquery/resources/update_table_schema.json new file mode 100644 index 0000000000000..f28d55f7996a7 --- /dev/null +++ b/tests/system/providers/google/bigquery/resources/update_table_schema.json @@ -0,0 +1,12 @@ +[ + { + "mode": "NULLABLE", + "name": "friendlyName", + "type": "STRING" + }, + { + "mode": "NULLABLE", + "name": "description", + "type": "STRING" + } +] diff --git a/tests/system/providers/google/bigquery/resources/us-states.csv b/tests/system/providers/google/bigquery/resources/us-states.csv new file mode 100644 index 0000000000000..52cc001884b5a --- /dev/null +++ b/tests/system/providers/google/bigquery/resources/us-states.csv @@ -0,0 +1,51 @@ +name,post_abbr +Alabama,AL +Alaska,AK +Arizona,AZ +Arkansas,AR +California,CA +Colorado,CO +Connecticut,CT +Delaware,DE +Florida,FL +Georgia,GA +Hawaii,HI +Idaho,ID +Illinois,IL +Indiana,IN +Iowa,IA +Kansas,KS +Kentucky,KY +Louisiana,LA +Maine,ME +Maryland,MD +Massachusetts,MA +Michigan,MI +Minnesota,MN +Mississippi,MS +Missouri,MO +Montana,MT +Nebraska,NE +Nevada,NV +New Hampshire,NH +New Jersey,NJ +New Mexico,NM +New York,NY +North Carolina,NC +North Dakota,ND +Ohio,OH +Oklahoma,OK +Oregon,OR +Pennsylvania,PA +Rhode Island,RI +South Carolina,SC +South Dakota,SD +Tennessee,TN +Texas,TX +Utah,UT +Vermont,VT +Virginia,VA +Washington,WA +West Virginia,WV +Wisconsin,WI +Wyoming,WY diff --git a/tests/system/providers/google/conftest.py b/tests/system/providers/google/conftest.py new file mode 100644 index 0000000000000..bb3aa4f981f38 --- /dev/null +++ b/tests/system/providers/google/conftest.py @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +REQUIRED_ENV_VARS = ("SYSTEM_TESTS_GCP_PROJECT",) + + +@pytest.fixture() +def provider_env_vars(): + return REQUIRED_ENV_VARS diff --git a/tests/system/utils/__init__.py b/tests/system/utils/__init__.py new file mode 100644 index 0000000000000..31b0672b907d9 --- /dev/null +++ b/tests/system/utils/__init__.py @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow.utils.state import State + + +def get_test_run(dag): + def test_run(): + dag.clear(dag_run_state=State.NONE) + dag.run() + + return test_run diff --git a/tests/system/utils/watcher.py b/tests/system/utils/watcher.py new file mode 100644 index 0000000000000..2fa6d14bd85cc --- /dev/null +++ b/tests/system/utils/watcher.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from airflow.decorators import task +from airflow.exceptions import AirflowException +from airflow.utils.trigger_rule import TriggerRule + + +@task(trigger_rule=TriggerRule.ONE_FAILED, retries=0) +def watcher(): + """Watcher task raises an AirflowException and is used to 'watch' tasks for failures + and propagates fail status to the whole DAG Run""" + raise AirflowException("Failing task because one or more upstream tasks failed.")