diff --git a/providers/google/src/airflow/providers/google/ads/hooks/ads.py b/providers/google/src/airflow/providers/google/ads/hooks/ads.py index 2f734aa8a9c1d..cd3e019ad9d1b 100644 --- a/providers/google/src/airflow/providers/google/ads/hooks/ads.py +++ b/providers/google/src/airflow/providers/google/ads/hooks/ads.py @@ -32,10 +32,10 @@ from airflow.providers.google.common.hooks.base_google import get_field if TYPE_CHECKING: - from google.ads.googleads.v19.services.services.customer_service import CustomerServiceClient - from google.ads.googleads.v19.services.services.google_ads_service import GoogleAdsServiceClient - from google.ads.googleads.v19.services.services.google_ads_service.pagers import SearchPager - from google.ads.googleads.v19.services.types.google_ads_service import GoogleAdsRow + from google.ads.googleads.v20.services.services.customer_service import CustomerServiceClient + from google.ads.googleads.v20.services.services.google_ads_service import GoogleAdsServiceClient + from google.ads.googleads.v20.services.services.google_ads_service.pagers import SearchPager + from google.ads.googleads.v20.services.types.google_ads_service import GoogleAdsRow class GoogleAdsHook(BaseHook): diff --git a/providers/google/tests/system/google/ads/example_ads.py b/providers/google/tests/system/google/ads/example_ads.py index cd622b6b7c4fc..eecc52191acbd 100644 --- a/providers/google/tests/system/google/ads/example_ads.py +++ b/providers/google/tests/system/google/ads/example_ads.py @@ -17,32 +17,53 @@ # under the License. """ Example Airflow DAG that shows how to use GoogleAdsToGcsOperator. + +In order to run this test, make sure you followed steps: +1. In your GCP project create a service account that will be used to operate on Google Ads. +The name should be in format `google-ads-service-account@{PROJECT_ID}.iam.gserviceaccount.com` +2. Generate a key for this service account and store it in the Secret Manager +under the name `google_ads_service_account_key`. +3. Give this service account Editor permissions. +4. Make sure Google Ads API is enabled in your GCP project. +5. Login to https://ads.google.com +6. In the Admin section go to Access and Security and give your GCP service account Admin permissions. +7. Store values of your developer token and client ID to Secret Manager under names `google_ads_client_id` +and `google_ads_developer_token`. """ from __future__ import annotations +import json +import logging import os from datetime import datetime +from google.cloud.exceptions import NotFound + +from airflow.decorators import task from airflow.models.dag import DAG from airflow.providers.google.ads.operators.ads import GoogleAdsListAccountsOperator from airflow.providers.google.ads.transfers.ads_to_gcs import GoogleAdsToGcsOperator +from airflow.providers.google.cloud.hooks.secret_manager import GoogleCloudSecretManagerHook from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator from airflow.utils.trigger_rule import TriggerRule -from system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID +from tests_common.test_utils.api_client_helpers import create_airflow_connection, delete_airflow_connection # [START howto_google_ads_env_variables] ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") -PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID -API_VERSION = "v19" +PROJECT_ID = os.environ.get("SYSTEM_TESTS_PROJECT_ID", "default") +API_VERSION = "v20" -DAG_ID = "example_google_ads" +DAG_ID = "google_ads" + +GOOGLE_ADS_CLIENT_ID = "google_ads_client_id" +GOOGLE_ADS_SERVICE_ACCOUNT_KEY = "google_ads_service_account_key" +GOOGLE_ADS_DEVELOPER_TOKEN = "google_ads_developer_token" BUCKET_NAME = f"bucket_ads_{ENV_ID}" -CLIENT_IDS = ["1111111111", "2222222222"] -GCS_OBJ_PATH = "folder_name/google-ads-api-results.csv" -GCS_ACCOUNTS_CSV = "folder_name/accounts.csv" +GCS_OBJ_PATH = f"gs://{BUCKET_NAME}/google-ads-api-results.csv" +GCS_ACCOUNTS_CSV = "accounts.csv" QUERY = """ SELECT segments.date, @@ -61,6 +82,9 @@ segments.date >= '2020-02-01' AND segments.date <= '2020-02-29' """ +CONNECTION_GLOUD_ID = f"connection_cloud_{DAG_ID}_{ENV_ID}" +CONNECTION_ADS_ID = "google_ads_default" +CONNECTION_TYPE = "google_cloud_platform" FIELDS_TO_EXTRACT = [ "segments.date.value", @@ -76,47 +100,150 @@ ] # [END howto_google_ads_env_variables] +log = logging.getLogger(__name__) + + +def get_secret(secret_id: str) -> str: + hook = GoogleCloudSecretManagerHook() + if hook.secret_exists(secret_id=secret_id): + return hook.access_secret(secret_id=secret_id).payload.data.decode() + raise NotFound("The secret '%s' not found", secret_id) + + with DAG( DAG_ID, schedule="@once", start_date=datetime(2021, 1, 1), catchup=False, tags=["example", "ads"], + render_template_as_native_obj=True, ) as dag: + + @task + def get_google_ads_client_id(): + return get_secret(secret_id=GOOGLE_ADS_CLIENT_ID).strip() + + get_google_ads_client_id_task = get_google_ads_client_id() + + @task + def get_google_ads_service_account_key(): + return get_secret(secret_id=GOOGLE_ADS_SERVICE_ACCOUNT_KEY) + + get_google_ads_service_account_key_task = get_google_ads_service_account_key() + + @task + def get_google_ads_developer_token(): + return get_secret(secret_id=GOOGLE_ADS_DEVELOPER_TOKEN).strip() + + get_google_ads_developer_token_task = get_google_ads_developer_token() + + @task + def create_connection_gcloud_for_ads(connection_id: str, key) -> None: + conn_extra_json = json.dumps( + { + "keyfile_dict": key, + "project": PROJECT_ID, + "scope": "https://www.googleapis.com/auth/adwords, https://www.googleapis.com/auth/cloud-platform", + } + ) + create_airflow_connection( + connection_id=connection_id, + connection_conf={"conn_type": CONNECTION_TYPE, "extra": conn_extra_json}, + ) + + create_connection_gcloud_for_ads = create_connection_gcloud_for_ads( + connection_id=CONNECTION_GLOUD_ID, key=get_google_ads_service_account_key_task + ) + + @task + def create_connection_ads(connection_id: str, token) -> None: + conn_extra_json = json.dumps( + { + "google_ads_client": { + "developer_token": token, + # this parameter is required to be not None, but the actual content will be overwritten, so can be some dummy string + "json_key_file_path": "some_string", + "impersonated_email": f"google-ads-service-account@{PROJECT_ID}.iam.gserviceaccount.com", + "use_proto_plus": False, + }, + "project": PROJECT_ID, + "scope": "https://www.googleapis.com/auth/adwords, https://www.googleapis.com/auth/cloud-platform", + } + ) + create_airflow_connection( + connection_id=connection_id, + connection_conf={"conn_type": CONNECTION_TYPE, "extra": conn_extra_json}, + ) + + create_connection_ads_task = create_connection_ads( + connection_id=CONNECTION_ADS_ID, token=get_google_ads_developer_token_task + ) + create_bucket = GCSCreateBucketOperator( - task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID + task_id="create_bucket", + bucket_name=BUCKET_NAME, + project_id=PROJECT_ID, + gcp_conn_id=CONNECTION_GLOUD_ID, ) # [START howto_google_ads_to_gcs_operator] run_operator = GoogleAdsToGcsOperator( - client_ids=CLIENT_IDS, + client_ids=[get_google_ads_client_id_task], query=QUERY, attributes=FIELDS_TO_EXTRACT, obj=GCS_OBJ_PATH, bucket=BUCKET_NAME, task_id="run_operator", api_version=API_VERSION, + gcp_conn_id=CONNECTION_GLOUD_ID, ) # [END howto_google_ads_to_gcs_operator] # [START howto_ads_list_accounts_operator] list_accounts = GoogleAdsListAccountsOperator( - task_id="list_accounts", bucket=BUCKET_NAME, object_name=GCS_ACCOUNTS_CSV + task_id="list_accounts", + bucket=BUCKET_NAME, + object_name=GCS_ACCOUNTS_CSV, + api_version=API_VERSION, + gcp_conn_id=CONNECTION_GLOUD_ID, ) # [END howto_ads_list_accounts_operator] delete_bucket = GCSDeleteBucketOperator( - task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + task_id="delete_bucket", + bucket_name=BUCKET_NAME, + gcp_conn_id=CONNECTION_GLOUD_ID, + trigger_rule=TriggerRule.ALL_DONE, ) + @task(task_id="delete_connection_gloud") + def delete_connection_gloud(connection_id: str) -> None: + delete_airflow_connection(connection_id=connection_id) + + delete_connection_gloud_task = delete_connection_gloud(connection_id=CONNECTION_GLOUD_ID) + + @task(task_id="delete_connection_ads") + def delete_connection_ads(connection_id: str) -> None: + delete_airflow_connection(connection_id=connection_id) + + delete_connection_ads_task = delete_connection_ads(connection_id=CONNECTION_ADS_ID) + ( # TEST SETUP - create_bucket + [ + get_google_ads_client_id_task, + get_google_ads_service_account_key_task, + get_google_ads_developer_token_task, + ] + >> create_connection_gcloud_for_ads # type: ignore + >> create_connection_ads_task + >> create_bucket # TEST BODY >> run_operator >> list_accounts # TEST TEARDOWN >> delete_bucket + >> [delete_connection_gloud_task, delete_connection_ads_task] ) from tests_common.test_utils.watcher import watcher