Skip to content

Commit

Permalink
Source Google Analytics Data API: common improvements
Browse files Browse the repository at this point in the history
- Added validation for reports defined by user to avoid errors like KeyError 'name'
- Added 429 error handling: return empty data and keep going
- Streams without 'date' dimension are considered to be a full refresh streams
  • Loading branch information
roman-yermilov-gl committed Dec 23, 2022
1 parent 416767d commit b0c57b1
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,16 @@ class GoogleAnalyticsDataApiAbstractStream(HttpStream, ABC):
def __init__(self, config: Mapping[str, Any], *args, **kwargs):
super().__init__(*args, **kwargs)
self._config = config
self._raise_on_http_errors = True

@property
def config(self):
return self._config

@property
def raise_on_http_errors(self):
return self._raise_on_http_errors


class GoogleAnalyticsDataApiBaseStream(GoogleAnalyticsDataApiAbstractStream):
row_limit = 100000
Expand Down Expand Up @@ -213,6 +218,9 @@ def parse_response(
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Iterable[Mapping]:
if not response.ok:
return {}

r = response.json()

dimensions = [h["name"] for h in r["dimensionHeaders"]]
Expand All @@ -235,6 +243,35 @@ def parse_response(

yield r

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 429:
self.logger.info(f"{response.json()['error']['message']}. "
f"More info: https://developers.google.com/analytics/devguides/reporting/data/v1/quotas")
self._raise_on_http_errors = False
return False
return super(GoogleAnalyticsDataApiBaseStream, self).should_retry(response)


class FullRefreshGoogleAnalyticsDataApi(GoogleAnalyticsDataApiBaseStream, ABC):
def request_body_json(
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> Optional[Mapping]:
start_date = utils.string_to_date(self.config["date_ranges_start_date"])
end_date = datetime.datetime.now().date()
return {
"metrics": [{"name": m} for m in self.config["metrics"]],
"dimensions": [{"name": d} for d in self.config["dimensions"]],
"dateRanges": [
{
"startDate": utils.date_to_string(start_date),
"endDate": utils.date_to_string(end_date),
}
]
}


class IncrementalGoogleAnalyticsDataApiStream(GoogleAnalyticsDataApiBaseStream, IncrementalMixin, ABC):
_date_format: str = "%Y-%m-%d"
Expand Down Expand Up @@ -282,7 +319,7 @@ def request_body_json(
return {
"metrics": [{"name": m} for m in self.config["metrics"]],
"dimensions": [{"name": d} for d in self.config["dimensions"]],
"dateRanges": [stream_slice],
"dateRanges": [stream_slice]
}

def read_records(
Expand All @@ -296,7 +333,7 @@ def read_records(
return []
records = super().read_records(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state)
for record in records:
for row in record["records"]:
for row in record.get("records", []):
self._cursor_value: str = max(self._cursor_value, row[self.cursor_field]) if self._cursor_value else row[self.cursor_field]
yield row

Expand Down Expand Up @@ -370,16 +407,78 @@ def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) ->
return False, str(e)
return True, None

def _validate_custom_reports(self, custom_reports):
"""
Load, validate and return custom reports. Expect custom reports to be a json string of the following format:
[{"name": "<report name>", "dimensions": ["<dimension-name>", ...], "metrics": ["<metric-name>", ...]}, ...]
:param custom_reports: custom reports to be validated
:return: custom reports
"""
# Custom report root object is of type list
if not isinstance(custom_reports, list):
raise TypeError(f"Expected Custom Reports to be a list of objects. Got {type(custom_reports)}.")

# Report items are of type dict
incorrect_report_item_types = [(i, type(report)) for i, report in enumerate(custom_reports) if type(report) is not dict]
if any(incorrect_report_item_types):
raise TypeError("Expected Report item to be an object. " + ", ".join([f"Got {t} at position {i}" for i, t in incorrect_report_item_types]))

def validate_name(report):
"""Report name is defined as a non-empty string. Returns key name if any problems"""
if not (
"name" in report
and report["name"]
and type(report['name']) is str
):
return "name"

def validate_dimensions(report):
"""Dimensions are defined as a non-empty list of strings. Returns key dimensions if any problems"""
if not (
"dimensions" in report
and report["dimensions"]
and isinstance(report["dimensions"], list)
and all(type(d) is str and d for d in report["dimensions"])
):
return "dimensions"

def validate_metrics(report):
"""Metrics are defined as a non-empty list of strings. Returns key metrics if any problems"""
if not (
"metrics" in report
and report["metrics"]
and isinstance(report["metrics"], list)
and all(type(m) is str and m for m in report["metrics"])
):
return "metrics"

# Collect all invalid reports with their positions and invalid keys
incorrect_report_item_fields = [
(i, *filter(lambda x: x, (validate_name(report), validate_dimensions(report), validate_metrics(report))))
for i, report in enumerate(custom_reports)
if any([validate_name(report), validate_dimensions(report), validate_metrics(report)])
]
# Raise an error if any invalid reports provided
if any(incorrect_report_item_fields):
msg = 'Report format: [{"name": "<report name>", "dimensions": ["<dimension-name>", ...], "metrics": ["<metric-name>", ...]}, ...]'
errors = ", ".join([
f"Check {missing_fields} at position {position + 1}"
for position, *missing_fields in incorrect_report_item_fields
])
raise TypeError(f'{msg}.\n {errors}')

return custom_reports

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = self.get_authenticator(config)

reports = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/default_reports.json"))
if "custom_reports" in config:
custom_reports = json.loads(config["custom_reports"])
custom_reports = self._validate_custom_reports(json.loads(config["custom_reports"]))
reports += custom_reports

return [
type(report["name"], (GoogleAnalyticsDataApiGenericStream,), {})(
type(report["name"], (GoogleAnalyticsDataApiGenericStream if "date" in report["dimensions"] else FullRefreshGoogleAnalyticsDataApi,), {})(
config=dict(**config, metrics=report["metrics"], dimensions=report["dimensions"]), authenticator=authenticator
)
for report in reports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ def test_http_method(patch_base_class):
[
(HTTPStatus.OK, False),
(HTTPStatus.BAD_REQUEST, False),
(HTTPStatus.TOO_MANY_REQUESTS, True),
(HTTPStatus.TOO_MANY_REQUESTS, False),
(HTTPStatus.INTERNAL_SERVER_ERROR, True),
],
)
Expand Down

0 comments on commit b0c57b1

Please sign in to comment.