Skip to content

Commit

Permalink
Implement new Google Search 360 operators
Browse files Browse the repository at this point in the history
Remove the decommissioned operators

# Conflicts:
#	tests/providers/google/marketing_platform/operators/test_search_ads.py
  • Loading branch information
molcay committed Sep 16, 2024
1 parent f3b238a commit 7ef2b3b
Show file tree
Hide file tree
Showing 10 changed files with 658 additions and 542 deletions.
137 changes: 111 additions & 26 deletions airflow/providers/google/marketing_platform/hooks/search_ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,73 +19,158 @@

from __future__ import annotations

from typing import Any, Sequence
from functools import cached_property
from typing import TYPE_CHECKING, Any

from googleapiclient.discovery import build

from airflow.providers.google.common.hooks.base_google import GoogleBaseHook

if TYPE_CHECKING:
from googleapiclient.discovery import Resource


class GoogleSearchAdsHook(GoogleBaseHook):
"""Hook for Google Search Ads 360."""
"""Hook for the Google Search Ads 360 Reporting API."""

_conn: build | None = None
default_api_version: str = "v0"

def __init__(
self,
api_version: str = "v2",
api_version: str | None = None,
gcp_conn_id: str = "google_cloud_default",
delegate_to: str | None = None,
impersonation_chain: str | Sequence[str] | None = None,
) -> None:
super().__init__(
gcp_conn_id=gcp_conn_id,
delegate_to=delegate_to,
impersonation_chain=impersonation_chain,
)
self.api_version = api_version
self.api_version = api_version or self.default_api_version

def get_conn(self):
"""Retrieve connection to Google SearchAds."""
def get_conn(self) -> Resource:
if not self._conn:
http_authorized = self._authorize()
self._conn = build(
"doubleclicksearch",
"searchads360",
self.api_version,
http=http_authorized,
cache_discovery=False,
)
return self._conn

def insert_report(self, report: dict[str, Any]) -> Any:
@cached_property
def customer_service(self):
return self.get_conn().customers()

@cached_property
def fields_service(self):
return self.get_conn().searchAds360Fields()

def search(
self,
customer_id: str,
query: str,
page_token: str | None = None,
page_size: int = 10000,
return_total_results_count: bool = False,
summary_row_setting: str | None = None,
validate_only: bool = False,
):
"""
Insert a report request into the reporting system.
Search and download the report. Use pagination to download entire report.
:param report: Report to be generated.
:param customer_id: The ID of the customer being queried.
:param query: The query to execute.
:param page_token: Token of the page to retrieve. If not specified, the first page of results will be
returned. Use the value obtained from `next_page_token` in the previous response
in order to request the next page of results.
:param page_size: Number of elements to retrieve in a single page. When too large a page is requested,
the server may decide to further limit the number of returned resources.
Default is 10000.
:param return_total_results_count: If true, the total number of results that match the query ignoring
the LIMIT clause will be included in the response. Default is false.
:param summary_row_setting: Determines whether a summary row will be returned. By default,
summary row is not returned. If requested, the summary row will be sent
in a response by itself after all others query results are returned.
:param validate_only: If true, the request is validated but not executed. Default is false.
"""
response = self.get_conn().reports().request(body=report).execute(num_retries=self.num_retries)
params: dict[str, Any] = {
"query": query,
"pageSize": page_size,
"returnTotalResultsCount": return_total_results_count,
"validateOnly": validate_only,
}
if page_token is not None:
params.update({"pageToken": page_token})
if summary_row_setting is not None:
params.update({"summaryRowSetting": summary_row_setting})

response = (
self.customer_service.searchAds360()
.search(customerId=customer_id, body=params)
.execute(num_retries=self.num_retries)
)
self.log.info("Search response: %s", response)
return response

def get(self, report_id: str) -> Any:
def get_custom_column(self, customer_id: str, custom_column_id: str):
"""
Poll for the status of a report request.
Retrieve the requested custom column in full detail.
:param report_id: ID of the report request being polled.
:param customer_id: The customer id
:param custom_column_id: The custom column id
"""
response = self.get_conn().reports().get(reportId=report_id).execute(num_retries=self.num_retries)
resource_name = f"customers/{customer_id}/customColumns/{custom_column_id}"
response = (
self.customer_service.custom_columns()
.get(resourceName=resource_name)
.execute(num_retries=self.num_retries)
)
self.log.info("Retrieved custom column: %s", response)
return response

def get_file(self, report_fragment: int, report_id: str) -> Any:
def list_custom_columns(self, customer_id: str):
"""
Download a report file encoded in UTF-8.
Retrieve all the custom columns associated with the customer in full detail.
:param report_fragment: The index of the report fragment to download.
:param report_id: ID of the report.
:param customer_id: The customer id
"""
response = (
self.get_conn()
.reports()
.getFile(reportFragment=report_fragment, reportId=report_id)
self.customer_service.custom_columns()
.list(customerId=customer_id)
.execute(num_retries=self.num_retries)
)
self.log.info("Listing the custom columns: %s", response)
return response

def get_field(self, field_name: str):
"""
Retrieve the requested field details.
:param field_name: The name of the field.
"""
resource_name = f"searchAds360Fields/{field_name}"
response = self.fields_service.get(resourceName=resource_name).execute(num_retries=self.num_retries)
self.log.info("Retrieved field: %s", response)
return response

def search_fields(self, query: str, page_token: str | None = None, page_size: int | None = 10000):
"""
Retrieve all the fields that match with the given search.
:param query: The query string to execute.
:param page_token: Token of the page to retrieve. If not specified, the first page of results will be
returned. Use the value obtained from `next_page_token` in the previous response
in order to request the next page of results.
:param page_size: Number of elements to retrieve in a single page. When too large a page is requested,
the server may decide to further limit the number of returned resources.
Default 10000.
"""
params: dict[str, Any] = {
"query": query,
"pageSize": page_size,
}
if page_token is not None:
params.update({"pageToken": page_token})
response = self.fields_service.search(body=params).execute(num_retries=self.num_retries)
self.log.info("Retrieved fields: %s", response)
return response
Loading

0 comments on commit 7ef2b3b

Please sign in to comment.