diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 9d8c8f28fef6..67b0aa13a014 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -622,7 +622,7 @@ - name: Google Analytics 4 (GA4) sourceDefinitionId: 3cc2eafd-84aa-4dca-93af-322d9dfeec1a dockerRepository: airbyte/source-google-analytics-data-api - dockerImageTag: 0.0.3 + dockerImageTag: 0.1.0 documentationUrl: https://docs.airbyte.com/integrations/sources/google-analytics-v4 icon: google-analytics.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 3fc745170a08..f8bc18391712 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5397,7 +5397,7 @@ oauthFlowOutputParameters: - - "access_token" - - "refresh_token" -- dockerImage: "airbyte/source-google-analytics-data-api:0.0.3" +- dockerImage: "airbyte/source-google-analytics-data-api:0.1.0" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/google-analytics-v4" connectionSpecification: @@ -5476,9 +5476,11 @@ airbyte_secret: true date_ranges_start_date: type: "string" - title: "Date Range Start Date" - description: "The start date. One of the values Ndaysago, yesterday, today\ - \ or in the format YYYY-MM-DD" + title: "Start Date" + description: "The start date from which to replicate report data in the\ + \ format YYYY-MM-DD. Data generated before this date will not be included\ + \ in the report." + format: "date" order: 2 custom_reports: order: 3 @@ -5499,7 +5501,7 @@ \ causing inaccuracies in the returned results. We recommend setting this\ \ to 1 unless you have a hard requirement to make the sync faster at the\ \ expense of accuracy. The minimum allowed value for this field is 1,\ - \ and the maximum is 364. " + \ and the maximum is 364." examples: - 30 - 60 @@ -5507,6 +5509,8 @@ - 120 - 200 - 364 + minimum: 1 + maximum: 364 default: 1 order: 4 supportsNormalization: false diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile b/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile index e3e6b791c953..25ea59c934e4 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/Dockerfile @@ -28,5 +28,5 @@ COPY source_google_analytics_data_api ./source_google_analytics_data_api ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.0.3 +LABEL io.airbyte.version=0.1.0 LABEL io.airbyte.name=airbyte/source-google-analytics-data-api diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml b/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml index 3ceab93b6499..9e15ef3ee92c 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/acceptance-test-config.yml @@ -1,31 +1,43 @@ # See [Source Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/source-acceptance-tests-reference) # for more information about how to configure these tests connector_image: airbyte/source-google-analytics-data-api:dev -tests: +acceptance_tests: spec: - - spec_path: "source_google_analytics_data_api/spec.json" + tests: + - spec_path: "source_google_analytics_data_api/spec.json" + backward_compatibility_tests_config: + disable_for_version: "0.0.3" connection: - - config_path: "secrets/config.json" - status: "succeed" - - config_path: "integration_tests/invalid_config.json" - status: "failed" + tests: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" discovery: - - config_path: "secrets/config.json" - backward_compatibility_tests_config: - disable_for_version: "0.0.2" + tests: + - config_path: "secrets/config.json" + backward_compatibility_tests_config: + disable_for_version: "0.0.2" basic_read: - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" - empty_streams: [] + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" full_refresh: - - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" - ignored_fields: - "daily_active_users": ["uuid"] - "weekly_active_users": ["uuid"] - "four_weekly_active_users": ["uuid"] - "devices": ["uuid"] - "locations": ["uuid"] - "pages": ["uuid"] - "traffic_sources": ["uuid"] - "website_overview": ["uuid"] + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + ignored_fields: + "daily_active_users": ["uuid"] + "weekly_active_users": ["uuid"] + "four_weekly_active_users": ["uuid"] + "devices": ["uuid"] + "locations": ["uuid"] + "pages": ["uuid"] + "traffic_sources": ["uuid"] + "website_overview": ["uuid"] + incremental: + tests: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state: + future_state_path: "integration_tests/abnormal_state.json" diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/abnormal_state.json new file mode 100644 index 000000000000..cd3adac6a364 --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/abnormal_state.json @@ -0,0 +1,90 @@ +[ + { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "daily_active_users" + }, + "stream_state": { + "date": "20990101" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "weekly_active_users" + }, + "stream_state": { + "date": "20990101" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "four_weekly_active_users" + }, + "stream_state": { + "date": "20990101" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "devices" + }, + "stream_state": { + "date": "20990101" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "locations" + }, + "stream_state": { + "date": "20990101" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "pages" + }, + "stream_state": { + "date": "20990101" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "traffic_sources" + }, + "stream_state": { + "date": "20990101" + } + } + }, + { + "type": "STREAM", + "stream": { + "stream_descriptor": { + "name": "website_overview" + }, + "stream_state": { + "date": "20990101" + } + } + } +] diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/configured_catalog.json index 0b74613f76d3..0900aa6dbfed 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/integration_tests/configured_catalog.json @@ -4,9 +4,10 @@ "stream": { "name": "daily_active_users", "json_schema": {}, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": false, - "default_cursor_field": ["date"] + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date"], + "source_defined_primary_key": [["uuid"]] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" @@ -15,9 +16,10 @@ "stream": { "name": "weekly_active_users", "json_schema": {}, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": false, - "default_cursor_field": ["date"] + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date"], + "source_defined_primary_key": [["uuid"]] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" @@ -26,9 +28,10 @@ "stream": { "name": "four_weekly_active_users", "json_schema": {}, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": false, - "default_cursor_field": ["date"] + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date"], + "source_defined_primary_key": [["uuid"]] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" @@ -37,9 +40,10 @@ "stream": { "name": "devices", "json_schema": {}, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": false, - "default_cursor_field": ["date"] + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date"], + "source_defined_primary_key": [["uuid"]] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" @@ -48,9 +52,10 @@ "stream": { "name": "locations", "json_schema": {}, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": false, - "default_cursor_field": ["date"] + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date"], + "source_defined_primary_key": [["uuid"]] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" @@ -59,9 +64,10 @@ "stream": { "name": "pages", "json_schema": {}, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": false, - "default_cursor_field": ["date"] + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date"], + "source_defined_primary_key": [["uuid"]] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" @@ -70,9 +76,10 @@ "stream": { "name": "traffic_sources", "json_schema": {}, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": false, - "default_cursor_field": ["date"] + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date"], + "source_defined_primary_key": [["uuid"]] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" @@ -81,9 +88,10 @@ "stream": { "name": "website_overview", "json_schema": {}, - "supported_sync_modes": ["incremental"], - "source_defined_cursor": false, - "default_cursor_field": ["date"] + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["date"], + "source_defined_primary_key": [["uuid"]] }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/defaults/custom_reports_schema.json b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/defaults/custom_reports_schema.json new file mode 100644 index 000000000000..01753bf4bcdc --- /dev/null +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/defaults/custom_reports_schema.json @@ -0,0 +1,26 @@ +{ + "type": "array", + "items": { + "type": "object", + "required": ["name", "dimensions", "metrics"], + "properties": { + "name": { + "type": "string" + }, + "dimensions": { + "type": "array", + "minItems": 1, + "items": { + "type": "string" + } + }, + "metrics": { + "type": "array", + "minItems": 1, + "items": { + "type": "string" + } + } + } + } +} diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py index 430697749452..2983b2dd19be 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/source.py @@ -2,22 +2,23 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -import collections import datetime import json import logging import pkgutil import uuid from abc import ABC -from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple +import jsonschema import requests from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource -from airbyte_cdk.sources.streams import IncrementalMixin, Stream +from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream, auth from source_google_analytics_data_api import utils from source_google_analytics_data_api.authenticator import GoogleServiceKeyAuthenticator +from source_google_analytics_data_api.utils import DATE_FORMAT metrics_data_types_map: Dict = { "METRIC_TYPE_UNSPECIFIED": "string", @@ -66,7 +67,7 @@ def get_dimensions_type(d: str) -> str: authenticator_class_map: Dict = { - "Service": (GoogleServiceKeyAuthenticator, lambda credentials: {"credentials": json.loads(credentials["credentials_json"])}), + "Service": (GoogleServiceKeyAuthenticator, lambda credentials: {"credentials": credentials["credentials_json"]}), "Client": ( auth.Oauth2Authenticator, lambda credentials: { @@ -80,12 +81,8 @@ def get_dimensions_type(d: str) -> str: } -def get_authenticator(credentials): - try: - authenticator_class, get_credentials = authenticator_class_map[credentials["auth_type"]] - except KeyError as e: - raise e - return authenticator_class(**get_credentials(credentials)) +class ConfigurationError(Exception): + pass class MetadataDescriptor: @@ -94,17 +91,10 @@ def __init__(self): def __get__(self, instance, owner): if not self._metadata: - authenticator = ( - instance.authenticator - if not isinstance(instance.authenticator, auth.NoAuth) - else get_authenticator(instance.config["credentials"]) - ) - stream = GoogleAnalyticsDataApiTestConnectionStream(config=instance.config, authenticator=authenticator) - try: - metadata = next(iter(stream.read_records(sync_mode=SyncMode.full_refresh))) - except Exception as e: - raise e - + stream = GoogleAnalyticsDataApiMetadataStream(config=instance.config, authenticator=instance.config["authenticator"]) + metadata = next(stream.read_records(sync_mode=SyncMode.full_refresh), None) + if not metadata: + raise Exception("failed to get metadata, over quota, try later") self._metadata = { "dimensions": {m["apiName"]: m for m in metadata["dimensions"]}, "metrics": {m["apiName"]: m for m in metadata["metrics"]}, @@ -117,23 +107,39 @@ class GoogleAnalyticsDataApiAbstractStream(HttpStream, ABC): url_base = "https://analyticsdata.googleapis.com/v1beta/" http_method = "POST" - def __init__(self, config: Mapping[str, Any], *args, **kwargs): - super().__init__(*args, **kwargs) + def __init__(self, *, config: Mapping[str, Any], **kwargs): + super().__init__(**kwargs) self._config = config + self._stop_iteration = False @property def config(self): return self._config + def should_retry(self, response: requests.Response) -> bool: + if response.status_code == 429: + return False + return super().should_retry(response) + + def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: + try: + yield from super().read_records(**kwargs) + except requests.exceptions.HTTPError as e: + self._stop_iteration = True + if e.response.status_code != 429: + raise e + class GoogleAnalyticsDataApiBaseStream(GoogleAnalyticsDataApiAbstractStream): - row_limit = 100000 + """ + https://developers.google.com/analytics/devguides/reporting/data/v1/rest/v1beta/properties/runReport + """ - metadata = MetadataDescriptor() + _record_date_format = "%Y%m%d" + primary_key = "uuid" + cursor_field = "date" - @property - def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: - return "uuid" + metadata = MetadataDescriptor() @staticmethod def add_primary_key() -> dict: @@ -219,47 +225,19 @@ def parse_response( metrics = [h["name"] for h in r["metricHeaders"]] metrics_type_map = {h["name"]: h["type"] for h in r["metricHeaders"]} - rows = [] - for row in r.get("rows", []): - rows.append( - collections.ChainMap( - *[ - self.add_primary_key(), - self.add_property_id(self.config["property_id"]), - self.add_dimensions(dimensions, row), - self.add_metrics(metrics, metrics_type_map, row), - ] - ) - ) - r["records"] = rows - - yield r - - -class IncrementalGoogleAnalyticsDataApiStream(GoogleAnalyticsDataApiBaseStream, IncrementalMixin, ABC): - _date_format = "%Y-%m-%d" - - def __init__(self, *args, **kwargs): - super(IncrementalGoogleAnalyticsDataApiStream, self).__init__(*args, **kwargs) - self._cursor_value = None - - -class GoogleAnalyticsDataApiGenericStream(IncrementalGoogleAnalyticsDataApiStream): - _default_window_in_days = 1 - _record_date_format = "%Y%m%d" - - @property - def cursor_field(self) -> Union[str, List[str]]: - return "date" - - @property - def state(self) -> MutableMapping[str, Any]: - return {self.cursor_field: self._cursor_value or utils.string_to_date(self.config["date_ranges_start_date"], self._date_format)} - - @state.setter - def state(self, value): - self._cursor_value = utils.string_to_date(value[self.cursor_field], self._date_format) + datetime.timedelta(days=1) + yield self.add_primary_key() | self.add_property_id(self.config["property_id"]) | self.add_dimensions( + dimensions, row + ) | self.add_metrics(metrics, metrics_type_map, row) + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]): + updated_state = utils.string_to_date(latest_record[self.cursor_field], self._record_date_format) + stream_state_value = current_stream_state.get(self.cursor_field) + if stream_state_value: + stream_state_value = utils.string_to_date(stream_state_value, self._record_date_format, old_format=DATE_FORMAT) + updated_state = max(updated_state, stream_state_value) + current_stream_state[self.cursor_field] = updated_state.strftime(self._record_date_format) + return current_stream_state def request_body_json( self, @@ -273,50 +251,39 @@ def request_body_json( "dateRanges": [stream_slice], } - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Mapping[str, Any]]: - if not stream_slice: - return [] - records = super().read_records(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state) - for record in records: - for row in record["records"]: - next_cursor_value = utils.string_to_date(row[self.cursor_field], self._record_date_format) - self._cursor_value = max(self._cursor_value, next_cursor_value) if self._cursor_value else next_cursor_value - yield row - def stream_slices( self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None ) -> Iterable[Optional[Mapping[str, Any]]]: - dates = [] today: datetime.date = datetime.date.today() - start_date: datetime.date = self.state[self.cursor_field] - timedelta: int = self.config["window_in_days"] or self._default_window_in_days + start_date = stream_state and stream_state.get(self.cursor_field) + if start_date: + start_date = utils.string_to_date(start_date, self._record_date_format, old_format=DATE_FORMAT) + start_date = max(start_date, self.config["date_ranges_start_date"]) + else: + start_date = self.config["date_ranges_start_date"] + + timedelta: int = self.config["window_in_days"] while start_date <= today: end_date: datetime.date = start_date + datetime.timedelta(days=timedelta) if timedelta > 1 and end_date > today: end_date: datetime.date = start_date + datetime.timedelta(days=timedelta - (end_date - today).days) - dates.append( - { - "startDate": utils.date_to_string(start_date, self._date_format), - "endDate": utils.date_to_string(end_date, self._date_format), - } - ) + if self._stop_iteration: + return + + yield {"startDate": utils.date_to_string(start_date), "endDate": utils.date_to_string(end_date)} start_date: datetime.date = end_date + datetime.timedelta(days=1) - return dates or [None] +class GoogleAnalyticsDataApiMetadataStream(GoogleAnalyticsDataApiAbstractStream): + """ + https://developers.google.com/analytics/devguides/reporting/data/v1/rest/v1beta/properties/getMetadata + """ -class GoogleAnalyticsDataApiTestConnectionStream(GoogleAnalyticsDataApiAbstractStream): primary_key = None http_method = "GET" @@ -328,48 +295,96 @@ def path( ) -> str: return f"properties/{self.config['property_id']}/metadata" - def parse_response( - self, - response: requests.Response, - *, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Iterable[Mapping]: + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: yield response.json() class SourceGoogleAnalyticsDataApi(AbstractSource): - def __init__(self, *args, **kwargs): - super(SourceGoogleAnalyticsDataApi, self).__init__(*args, **kwargs) + def _validate_and_transform(self, config: Mapping[str, Any], report_names: Set[str]): + if "custom_reports" in config: + try: + config["custom_reports"] = json.loads(config["custom_reports"]) + except ValueError: + raise ConfigurationError("custom_reports is not valid JSON") + else: + config["custom_reports"] = [] - self._authenticator = None + schema = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/custom_reports_schema.json")) + try: + jsonschema.validate(instance=config["custom_reports"], schema=schema) + except jsonschema.ValidationError as e: + key_path = "custom_reports" + if e.path: + key_path += "." + ".".join(map(str, e.path)) + raise ConfigurationError(f"{key_path}: {e.message}") + + existing_names = {r["name"] for r in config["custom_reports"]} & report_names + if existing_names: + existing_names = ", ".join(existing_names) + raise ConfigurationError(f"custom_reports: {existing_names} already exist as a default report(s).") + + for report in config["custom_reports"]: + # "date" dimension is mandatory because it's cursor_field + if "date" not in report["dimensions"]: + report["dimensions"].append("date") + + if "credentials_json" in config["credentials"]: + try: + config["credentials"]["credentials_json"] = json.loads(config["credentials"]["credentials_json"]) + except ValueError: + raise ConfigurationError("credentials.credentials_json is not valid JSON") + + try: + config["date_ranges_start_date"] = utils.string_to_date(config["date_ranges_start_date"]) + except ValueError as e: + raise ConfigurationError(str(e)) + + if not config.get("window_in_days"): + source_spec = self.spec(logging.getLogger("airbyte")) + config["window_in_days"] = source_spec.connectionSpecification["properties"]["window_in_days"]["default"] + + return config def get_authenticator(self, config: Mapping[str, Any]): - if not self._authenticator: - self._authenticator = get_authenticator(config["credentials"]) - return self._authenticator + credentials = config["credentials"] + authenticator_class, get_credentials = authenticator_class_map[credentials["auth_type"]] + return authenticator_class(**get_credentials(credentials)) def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: - authenticator = self.get_authenticator(config) - stream = GoogleAnalyticsDataApiTestConnectionStream(config=config, authenticator=authenticator) + reports = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/default_reports.json")) try: - next(iter(stream.read_records(sync_mode=SyncMode.full_refresh))) - except Exception as e: + config = self._validate_and_transform(config, report_names={r["name"] for r in reports}) + except ConfigurationError as e: return False, str(e) + config["authenticator"] = self.get_authenticator(config) + + stream = GoogleAnalyticsDataApiMetadataStream(config=config, authenticator=config["authenticator"]) + metadata = next(stream.read_records(sync_mode=SyncMode.full_refresh), None) + if not metadata: + return False, "failed to get metadata, over quota, try later" + + dimensions = {d["apiName"] for d in metadata["dimensions"]} + metrics = {d["apiName"] for d in metadata["metrics"]} + + for report in config["custom_reports"]: + invalid_dimensions = set(report["dimensions"]) - dimensions + if invalid_dimensions: + invalid_dimensions = ", ".join(invalid_dimensions) + return False, f"custom_reports: invalid dimension(s): {invalid_dimensions} for the custom report: {report['name']}" + invalid_metrics = set(report["metrics"]) - metrics + if invalid_metrics: + invalid_metrics = ", ".join(invalid_metrics) + return False, f"custom_reports: invalid metric(s): {invalid_metrics} for the custom report: {report['name']}" return True, None def streams(self, config: Mapping[str, Any]) -> List[Stream]: - authenticator = self.get_authenticator(config) - reports = json.loads(pkgutil.get_data("source_google_analytics_data_api", "defaults/default_reports.json")) - if "custom_reports" in config: - custom_reports = json.loads(config["custom_reports"]) - reports += custom_reports + config = self._validate_and_transform(config, report_names={r["name"] for r in reports}) + config["authenticator"] = self.get_authenticator(config) return [ - type(report["name"], (GoogleAnalyticsDataApiGenericStream,), {})( - config=dict(**config, metrics=report["metrics"], dimensions=report["dimensions"]), authenticator=authenticator + type(report["name"], (GoogleAnalyticsDataApiBaseStream,), {})( + config=dict(**config, metrics=report["metrics"], dimensions=report["dimensions"]), authenticator=config["authenticator"] ) - for report in reports + for report in reports + config["custom_reports"] ] diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json index 29856c48a765..9ff3ddc21cdb 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/spec.json @@ -84,8 +84,9 @@ }, "date_ranges_start_date": { "type": "string", - "title": "Date Range Start Date", - "description": "The start date. One of the values Ndaysago, yesterday, today or in the format YYYY-MM-DD", + "title": "Start Date", + "description": "The start date from which to replicate report data in the format YYYY-MM-DD. Data generated before this date will not be included in the report.", + "format": "date", "order": 2 }, "custom_reports": { @@ -97,8 +98,10 @@ "window_in_days": { "type": "integer", "title": "Data request time increment in days", - "description": "The time increment used by the connector when requesting data from the Google Analytics API. More information is available in the the docs. The bigger this value is, the faster the sync will be, but the more likely that sampling will be applied to your data, potentially causing inaccuracies in the returned results. We recommend setting this to 1 unless you have a hard requirement to make the sync faster at the expense of accuracy. The minimum allowed value for this field is 1, and the maximum is 364. ", + "description": "The time increment used by the connector when requesting data from the Google Analytics API. More information is available in the the docs. The bigger this value is, the faster the sync will be, but the more likely that sampling will be applied to your data, potentially causing inaccuracies in the returned results. We recommend setting this to 1 unless you have a hard requirement to make the sync faster at the expense of accuracy. The minimum allowed value for this field is 1, and the maximum is 364.", "examples": [30, 60, 90, 120, 200, 364], + "minimum": 1, + "maximum": 364, "default": 1, "order": 4 } diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/utils.py b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/utils.py index cad009238bde..302410bb3e26 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/utils.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/source_google_analytics_data_api/utils.py @@ -5,14 +5,23 @@ import calendar import datetime +DATE_FORMAT = "%Y-%m-%d" + def datetime_to_secs(dt: datetime.datetime) -> int: return calendar.timegm(dt.utctimetuple()) -def string_to_date(d: str, f: str = "%Y-%m-%d") -> datetime.date: +def string_to_date(d: str, f: str = DATE_FORMAT, old_format=None) -> datetime.date: + # To convert the old STATE date format "YYYY-MM-DD" to the new format "YYYYMMDD" we need this `old_format` additional param. + # As soon as all current cloud sync will be converted to the new format we can remove this double format support. + if old_format: + try: + return datetime.datetime.strptime(d, old_format).date() + except ValueError: + pass return datetime.datetime.strptime(d, f).date() -def date_to_string(d: datetime.date, f: str = "%Y-%m-%d") -> str: +def date_to_string(d: datetime.date, f: str = DATE_FORMAT) -> str: return d.strftime(f) diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_source.py b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_source.py index 16cead27e553..dfd58fead420 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_source.py @@ -3,11 +3,12 @@ # import datetime +import json +from copy import deepcopy from unittest.mock import MagicMock import pytest from airbyte_cdk.models import AirbyteConnectionStatus, Status -from airbyte_cdk.sources.streams.http import HttpStream from source_google_analytics_data_api import SourceGoogleAnalyticsDataApi json_credentials = """ @@ -15,7 +16,7 @@ "type": "service_account", "project_id": "unittest-project-id", "private_key_id": "9qf98e52oda52g5ne23al6evnf13649c2u077162c", - "private_key": "", + "private_key": "-----BEGIN PRIVATE KEY-----\\nMIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEA3slcXL+dA36ESmOi\\n1xBhZmp5Hn0WkaHDtW4naba3plva0ibloBNWhFhjQOh7Ff01PVjhT4D5jgqXBIgc\\nz9Gv3QIDAQABAkEArlhYPoD5SB2/O1PjwHgiMPrL1C9B9S/pr1cH4vPJnpY3VKE3\\n5hvdil14YwRrcbmIxMkK2iRLi9lM4mJmdWPy4QIhAPsRFXZSGx0TZsDxD9V0ZJmZ\\n0AuDCj/NF1xB5KPLmp7pAiEA4yoFox6w7ql/a1pUVaLt0NJkDfE+22pxYGNQaiXU\\nuNUCIQCsFLaIJZiN4jlgbxlyLVeya9lLuqIwvqqPQl6q4ad12QIgS9gG48xmdHig\\n8z3IdIMedZ8ZCtKmEun6Cp1+BsK0wDUCIF0nHfSuU+eTQ2qAON2SHIrJf8UeFO7N\\nzdTN1IwwQqjI\\n-----END PRIVATE KEY-----\\n", "client_email": "google-analytics-access@unittest-project-id.iam.gserviceaccount.com", "client_id": "213243192021686092537", "auth_uri": "https://accounts.google.com/o/oauth2/auth", @@ -27,7 +28,7 @@ @pytest.fixture -def patch_base_class(mocker): +def patch_base_class(): return { "config": { "property_id": "108176369", @@ -37,15 +38,53 @@ def patch_base_class(mocker): } -def test_check_connection(mocker, patch_base_class): - source = SourceGoogleAnalyticsDataApi() - record = MagicMock() +@pytest.fixture +def config(): + return { + "property_id": "108176369", + "credentials": {"auth_type": "Service", "credentials_json": json_credentials}, + "date_ranges_start_date": datetime.datetime.strftime((datetime.datetime.now() - datetime.timedelta(days=1)), "%Y-%m-%d"), + "custom_reports": json.dumps([{ + "name": "report1", + "dimensions": ["date", "country"], + "metrics": ["totalUsers", "screenPageViews"] + }]), + } - logger_mock, config_mock = MagicMock(), MagicMock() - config_mock.__getitem__.side_effect = patch_base_class["config"].__getitem__ - mocker.patch.object(HttpStream, "read_records", return_value=[record]) - assert source.check(logger_mock, config_mock) == AirbyteConnectionStatus(status=Status.SUCCEEDED) +@pytest.fixture +def config_gen(config): + def inner(**kwargs): + new_config = deepcopy(config) + # WARNING, no support deep dictionaries + new_config.update(kwargs) + return {k: v for k, v in new_config.items() if v is not ...} + return inner + + +def test_check(requests_mock, config_gen): + requests_mock.register_uri("POST", "https://oauth2.googleapis.com/token", json={"access_token": "access_token", "expires_in": 3600, "token_type": "Bearer"}) + requests_mock.register_uri("GET", "https://analyticsdata.googleapis.com/v1beta/properties/108176369/metadata", json={ + "dimensions": [{"apiName": "date"}, {"apiName": "country"}], + "metrics": [{"apiName": "totalUsers"}, {"apiName": "screenPageViews"}], + }) + + source = SourceGoogleAnalyticsDataApi() + logger = MagicMock() + + assert source.check(logger, config_gen()) == AirbyteConnectionStatus(status=Status.SUCCEEDED) + assert source.check(logger, config_gen(custom_reports=...)) == AirbyteConnectionStatus(status=Status.SUCCEEDED) + assert source.check(logger, config_gen(custom_reports="[]")) == AirbyteConnectionStatus(status=Status.SUCCEEDED) + assert source.check(logger, config_gen(custom_reports="invalid")) == AirbyteConnectionStatus(status=Status.FAILED, message="'custom_reports is not valid JSON'") + assert source.check(logger, config_gen(custom_reports="{}")) == AirbyteConnectionStatus(status=Status.FAILED, message='"custom_reports: {} is not of type \'array\'"') + assert source.check(logger, config_gen(custom_reports="[{}]")) == AirbyteConnectionStatus(status=Status.FAILED, message='"custom_reports.0: \'name\' is a required property"') + assert source.check(logger, config_gen(custom_reports='[{"name": "name"}]')) == AirbyteConnectionStatus(status=Status.FAILED, message='"custom_reports.0: \'dimensions\' is a required property"') + assert source.check(logger, config_gen(custom_reports='[{"name": "name", "dimensions": [], "metrics": []}]')) == AirbyteConnectionStatus(status=Status.FAILED, message="'custom_reports.0.dimensions: [] is too short'") + assert source.check(logger, config_gen(custom_reports='[{"name": "daily_active_users", "dimensions": ["date"], "metrics": ["totalUsers"]}]')) == AirbyteConnectionStatus(status=Status.FAILED, message="'custom_reports: daily_active_users already exist as a default report(s).'") + assert source.check(logger, config_gen(custom_reports='[{"name": "name", "dimensions": ["unknown"], "metrics": ["totalUsers"]}]')) == AirbyteConnectionStatus(status=Status.FAILED, message="'custom_reports: invalid dimension(s): unknown for the custom report: name'") + assert source.check(logger, config_gen(custom_reports='[{"name": "name", "dimensions": ["date"], "metrics": ["unknown"]}]')) == AirbyteConnectionStatus(status=Status.FAILED, message="'custom_reports: invalid metric(s): unknown for the custom report: name'") + assert source.check(logger, config_gen(credentials={"auth_type": "Service", "credentials_json": "invalid"})) == AirbyteConnectionStatus(status=Status.FAILED, message="'credentials.credentials_json is not valid JSON'") + assert source.check(logger, config_gen(date_ranges_start_date="2022-20-20")) == AirbyteConnectionStatus(status=Status.FAILED, message='"time data \'2022-20-20\' does not match format \'%Y-%m-%d\'"') def test_streams(mocker, patch_base_class): diff --git a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py index e872259c5fc2..3cd7d0881e2b 100644 --- a/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-google-analytics-data-api/unit_tests/test_streams.py @@ -2,7 +2,6 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -import copy import datetime import random from http import HTTPStatus @@ -10,7 +9,7 @@ from unittest.mock import MagicMock import pytest -from source_google_analytics_data_api.source import GoogleAnalyticsDataApiGenericStream +from source_google_analytics_data_api.source import GoogleAnalyticsDataApiBaseStream json_credentials = """ { @@ -31,9 +30,9 @@ @pytest.fixture def patch_base_class(mocker): # Mock abstract methods to enable instantiating abstract class - mocker.patch.object(GoogleAnalyticsDataApiGenericStream, "path", f"{random.randint(100000000, 999999999)}:runReport") - mocker.patch.object(GoogleAnalyticsDataApiGenericStream, "primary_key", "test_primary_key") - mocker.patch.object(GoogleAnalyticsDataApiGenericStream, "__abstractmethods__", set()) + mocker.patch.object(GoogleAnalyticsDataApiBaseStream, "path", f"{random.randint(100000000, 999999999)}:runReport") + mocker.patch.object(GoogleAnalyticsDataApiBaseStream, "primary_key", "test_primary_key") + mocker.patch.object(GoogleAnalyticsDataApiBaseStream, "__abstractmethods__", set()) return { "config": { @@ -57,7 +56,7 @@ def patch_base_class(mocker): def test_request_params(patch_base_class): assert ( - GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]).request_params( + GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]).request_params( stream_state=MagicMock(), stream_slice=MagicMock(), next_page_token=MagicMock() ) == {} @@ -87,12 +86,12 @@ def test_request_body_json(patch_base_class): "dateRanges": [request_body_params["stream_slice"]], } - request_body_json = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]).request_body_json(**request_body_params) + request_body_json = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]).request_body_json(**request_body_params) assert request_body_json == expected_body_json def test_next_page_token_equal_chunk(patch_base_class): - stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]) response = MagicMock() response.json.side_effect = [ {"limit": 100000, "offset": 0, "rowCount": 200000}, @@ -118,7 +117,7 @@ def test_next_page_token_equal_chunk(patch_base_class): def test_next_page_token(patch_base_class): - stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]) response = MagicMock() response.json.side_effect = [ {"limit": 100000, "offset": 0, "rowCount": 250000}, @@ -149,7 +148,7 @@ def test_next_page_token(patch_base_class): def test_parse_response(patch_base_class): - stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]) response_data = { "dimensionHeaders": [{"name": "date"}, {"name": "deviceCategory"}, {"name": "operatingSystem"}, {"name": "browser"}], @@ -196,8 +195,7 @@ def test_parse_response(patch_base_class): "kind": "analyticsData#runReport", } - expected_data = copy.deepcopy(response_data) - expected_data["records"] = [ + expected_data = [ { "property_id": "496180525", "date": "20220731", @@ -233,21 +231,21 @@ def test_parse_response(patch_base_class): response = MagicMock() response.json.return_value = response_data inputs = {"response": response, "stream_state": {}} - actual_records: Mapping[str, Any] = next(iter(stream.parse_response(**inputs))) - for record in actual_records["records"]: + actual_records: Mapping[str, Any] = list(stream.parse_response(**inputs)) + for record in actual_records: del record["uuid"] assert actual_records == expected_data def test_request_headers(patch_base_class): - stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]) inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} expected_headers = {} assert stream.request_headers(**inputs) == expected_headers def test_http_method(patch_base_class): - stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]) expected_method = "POST" assert stream.http_method == expected_method @@ -257,19 +255,19 @@ def test_http_method(patch_base_class): [ (HTTPStatus.OK, False), (HTTPStatus.BAD_REQUEST, False), - (HTTPStatus.TOO_MANY_REQUESTS, True), + (HTTPStatus.TOO_MANY_REQUESTS, False), (HTTPStatus.INTERNAL_SERVER_ERROR, True), ], ) def test_should_retry(patch_base_class, http_status, should_retry): response_mock = MagicMock() response_mock.status_code = http_status - stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]) assert stream.should_retry(response_mock) == should_retry def test_backoff_time(patch_base_class): response_mock = MagicMock() - stream = GoogleAnalyticsDataApiGenericStream(config=patch_base_class["config"]) + stream = GoogleAnalyticsDataApiBaseStream(authenticator=MagicMock(), config=patch_base_class["config"]) expected_backoff_time = None assert stream.backoff_time(response_mock) == expected_backoff_time diff --git a/docs/integrations/sources/google-analytics-v4.md b/docs/integrations/sources/google-analytics-v4.md index e4d56c1009b3..5043ed1a2c2a 100644 --- a/docs/integrations/sources/google-analytics-v4.md +++ b/docs/integrations/sources/google-analytics-v4.md @@ -70,6 +70,7 @@ added by default to any report. There are 8 default reports. To add more reports | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------| +| 0.1.0 | 2023-01-08 | [20889](https://github.com/airbytehq/airbyte/pull/20889) | Improved config validation, SAT | | 0.0.3 | 2022-08-15 | [15229](https://github.com/airbytehq/airbyte/pull/15229) | Source Google Analytics Data Api: code refactoring | | 0.0.2 | 2022-07-27 | [15087](https://github.com/airbytehq/airbyte/pull/15087) | fix documentationUrl | | 0.0.1 | 2022-05-09 | [12701](https://github.com/airbytehq/airbyte/pull/12701) | Introduce Google Analytics Data API source |