From f6cc98fd6e413207a103f5a65dba230dba59ed5d Mon Sep 17 00:00:00 2001 From: Artem Inzhyyants <36314070+artem1205@users.noreply.github.com> Date: Wed, 4 Jan 2023 21:36:24 +0100 Subject: [PATCH] Source Monday: fix schema loader; delete old files (#20996) * Source Monday: fix schema loader; delete old files * Source Monday: fix schema loader; delete old files * Source Monday: fix tests * auto-bump connector version Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com> --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-monday/Dockerfile | 2 +- .../source-monday/source_monday/__init__.py | 5 +- .../graphql_request_options_provider.py | 10 +- .../source-monday/source_monday/monday.yaml | 5 +- .../source-monday/source_monday/source.py | 185 +----------------- .../source-monday/source_monday/source_lc.py | 17 -- .../source-monday/unit_tests/conftest.py | 13 -- .../source-monday/unit_tests/test_source.py | 21 -- docs/integrations/sources/monday.md | 5 +- 11 files changed, 24 insertions(+), 243 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-monday/source_monday/source_lc.py delete mode 100644 airbyte-integrations/connectors/source-monday/unit_tests/conftest.py delete mode 100644 airbyte-integrations/connectors/source-monday/unit_tests/test_source.py diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index e1878fa79a31..f0770408ed69 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -982,7 +982,7 @@ - name: Monday sourceDefinitionId: 80a54ea2-9959-4040-aac1-eee42423ec9b dockerRepository: airbyte/source-monday - dockerImageTag: 0.2.1 + dockerImageTag: 0.2.2 documentationUrl: https://docs.airbyte.com/integrations/sources/monday icon: monday.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e8193a79216a..cc626b1fbc27 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -8530,7 +8530,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-monday:0.2.1" +- dockerImage: "airbyte/source-monday:0.2.2" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/monday" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-monday/Dockerfile b/airbyte-integrations/connectors/source-monday/Dockerfile index 505eddafbf71..aa731be04e8f 100644 --- a/airbyte-integrations/connectors/source-monday/Dockerfile +++ b/airbyte-integrations/connectors/source-monday/Dockerfile @@ -34,5 +34,5 @@ COPY source_monday ./source_monday ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.1 +LABEL io.airbyte.version=0.2.2 LABEL io.airbyte.name=airbyte/source-monday diff --git a/airbyte-integrations/connectors/source-monday/source_monday/__init__.py b/airbyte-integrations/connectors/source-monday/source_monday/__init__.py index 433e7ff4d9b0..6b7b7e1b43a8 100644 --- a/airbyte-integrations/connectors/source-monday/source_monday/__init__.py +++ b/airbyte-integrations/connectors/source-monday/source_monday/__init__.py @@ -2,11 +2,8 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # - from .dpath_string_extractor import DpathStringExtractor from .graphql_request_options_provider import GraphQLRequestOptionsProvider - -# from .source import SourceMonday -from .source_lc import SourceMonday +from .source import SourceMonday __all__ = ["SourceMonday", "GraphQLRequestOptionsProvider", "DpathStringExtractor"] diff --git a/airbyte-integrations/connectors/source-monday/source_monday/graphql_request_options_provider.py b/airbyte-integrations/connectors/source-monday/source_monday/graphql_request_options_provider.py index bb255c248baa..7bf8acfc96f9 100644 --- a/airbyte-integrations/connectors/source-monday/source_monday/graphql_request_options_provider.py +++ b/airbyte-integrations/connectors/source-monday/source_monday/graphql_request_options_provider.py @@ -2,13 +2,12 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -import json -import os from dataclasses import dataclass from typing import Any, Mapping, MutableMapping, Optional, Type, Union from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.requesters.request_options import InterpolatedRequestOptionsProvider +from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader from airbyte_cdk.sources.declarative.types import StreamSlice, StreamState @@ -33,10 +32,9 @@ def _ensure_type(self, t: Type, o: Any): raise TypeError(f"{type(o)} {o} is not of type {t}") def _get_schema_root_properties(self): - schema_path = os.path.join(os.path.abspath(os.curdir), "source_monday", f"schemas/{self.name}.json") - with open(schema_path) as f: - schema_dict = json.load(f) - return schema_dict["properties"] + schema_loader = JsonFileSchemaLoader(config=self.config, options={"name": self.name}) + schema = schema_loader.get_json_schema() + return schema["properties"] def _get_object_arguments(self, **object_arguments) -> str: return ",".join([f"{argument}:{value}" for argument, value in object_arguments.items() if value is not None]) diff --git a/airbyte-integrations/connectors/source-monday/source_monday/monday.yaml b/airbyte-integrations/connectors/source-monday/source_monday/monday.yaml index 2e058e23f7d5..fec85441ed83 100644 --- a/airbyte-integrations/connectors/source-monday/source_monday/monday.yaml +++ b/airbyte-integrations/connectors/source-monday/source_monday/monday.yaml @@ -2,7 +2,7 @@ version: "0.1.0" definitions: schema_loader: - type: JsonSchema + type: JsonFileSchemaLoader file_path: "./source_monday/schemas/{{ options['name'] }}.json" selector: type: RecordSelector @@ -18,6 +18,7 @@ definitions: type: BearerAuthenticator api_token: "{{ config['credentials']['api_token'] if config['credentials']['auth_type'] == 'api_token' else config['credentials']['access_token'] if config['credentials']['auth_type'] == 'oauth2.0' else config.get('api_token', '') }}" request_options_provider: + type: InterpolatedRequestOptionsProvider class_name: "source_monday.GraphQLRequestOptionsProvider" limit: "{{ options['items_per_page'] }}" error_handler: @@ -59,7 +60,7 @@ definitions: record_selector: type: RecordSelector extractor: - type: DpathExtractor + type: CustomRecordExtractor field_pointer: "/data/boards/*/items/*" class_name: "source_monday.DpathStringExtractor" paginator: diff --git a/airbyte-integrations/connectors/source-monday/source_monday/source.py b/airbyte-integrations/connectors/source-monday/source_monday/source.py index 026b29203cef..932f5206a4b6 100644 --- a/airbyte-integrations/connectors/source-monday/source_monday/source.py +++ b/airbyte-integrations/connectors/source-monday/source_monday/source.py @@ -2,181 +2,16 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource -import json -import os -from abc import ABC -from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple +""" +This file provides the necessary constructs to interpret a provided declarative YAML configuration file into +source connector. +WARNING: Do not modify this file. +""" -import requests -from airbyte_cdk.sources import AbstractSource -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http import HttpStream -from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator -from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer - -# Basic full refresh stream -class MondayStream(HttpStream, ABC): - url_base: str = "https://api.monday.com/v2" - primary_key: str = "id" - page: int = 1 - limit: Optional[int] = None - transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - json_response = response.json().get("data", {}) - records = json_response.get(self.name.lower(), []) - self.page += 1 - if records: - return {"page": self.page} - - def load_schema(self): - """ - Load schema from file and make a GraphQL query - """ - script_dir = os.path.dirname(__file__) - schema_path = os.path.join(script_dir, f"schemas/{self.name.lower()}.json") - with open(schema_path) as f: - schema_dict = json.load(f) - schema = schema_dict["properties"] - graphql_schema = [] - for col in schema: - if "properties" in schema[col]: - nested_ids = ",".join(schema[col]["properties"]) - graphql_schema.append(f"{col}{{{nested_ids}}}") - else: - graphql_schema.append(col) - return ",".join(graphql_schema) - - def should_retry(self, response: requests.Response) -> bool: - # Monday API return code 200 with and errors key if complexity is too high. - # https://api.developer.monday.com/docs/complexity-queries - is_complex_query = response.json().get("errors") - if is_complex_query: - self.logger.error(response.text) - return response.status_code == 429 or 500 <= response.status_code < 600 or is_complex_query - - @property - def retry_factor(self) -> int: - return 15 - - def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - graphql_params = {} - if self.limit: - graphql_params["limit"] = self.limit - if next_page_token: - graphql_params.update(next_page_token) - - graphql_query = ",".join([f"{k}:{v}" for k, v in graphql_params.items()]) - # Monday uses a query string to pass in environments - params = {"query": f"query {{ {self.name.lower()} ({graphql_query}) {{ {self.load_schema()} }} }}"} - return params - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - json_response = response.json().get("data", {}) - records = json_response.get(self.name.lower(), []) - yield from records - - def path( - self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None - ) -> str: - return "" - - -class Items(MondayStream): - """ - API Documentation: https://api.developer.monday.com/docs/items-queries - """ - - limit = 100 - - @property - def retry_factor(self) -> int: - # this stream has additional rate limits, please see https://api.developer.monday.com/docs/items-queries#additional-rate-limit - return 30 - - -class Boards(MondayStream): - """ - API Documentation: https://api.developer.monday.com/docs/groups-queries#groups-queries - """ - - -class Teams(MondayStream): - """ - API Documentation: https://api.developer.monday.com/docs/teams-queries - """ - - def request_params( - self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None - ) -> MutableMapping[str, Any]: - # Stream teams doesn't support pagination - params = {"query": f"query {{ {self.name.lower()} () {{ {self.load_schema()} }} }}"} - return params - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - return {} - - -class Updates(MondayStream): - """ - API Documentation: https://api.developer.monday.com/docs/updates-queries - """ - - -class Users(MondayStream): - """ - API Documentation: https://api.developer.monday.com/docs/users-queries-1 - """ - - def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - # Stream Users doesn't support pagination - return - - -class MondayAuthentication: - """Provides the authentication capabilities for both old and new methods.""" - - def __init__(self, config: Dict): - self.config = config - - def get_token(self): - # the old config supports for backward capability - token = self.config.get("api_token") - if not token: - auth_type = self.config["credentials"]["auth_type"] - if auth_type == "oauth2.0": - token = self.config["credentials"]["access_token"] - if auth_type == "api_token": - token = self.config["credentials"]["api_token"] - return token - - def get_auth(self) -> TokenAuthenticator: - """Return the TokenAuthenticator object with access or api token.""" - return TokenAuthenticator(token=self.get_token()) - - -class SourceMonday(AbstractSource): - def check_connection(self, logger, config) -> Tuple[bool, any]: - url = "https://api.monday.com/v2" - params = {"query": "query { me { is_guest created_at name id}}"} - auth_header = MondayAuthentication(config).get_auth().get_auth_header() - try: - response = requests.post(url, params=params, headers=auth_header) - response.raise_for_status() - return True, None - except requests.exceptions.RequestException as e: - return False, e - - def streams(self, config: Mapping[str, Any]) -> List[Stream]: - auth = MondayAuthentication(config).get_auth() - return [ - Items(authenticator=auth), - Boards(authenticator=auth), - Teams(authenticator=auth), - Updates(authenticator=auth), - Users(authenticator=auth), - ] +# Declarative Source +class SourceMonday(YamlDeclarativeSource): + def __init__(self): + super().__init__(**{"path_to_yaml": "monday.yaml"}) diff --git a/airbyte-integrations/connectors/source-monday/source_monday/source_lc.py b/airbyte-integrations/connectors/source-monday/source_monday/source_lc.py deleted file mode 100644 index 932f5206a4b6..000000000000 --- a/airbyte-integrations/connectors/source-monday/source_monday/source_lc.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource - -""" -This file provides the necessary constructs to interpret a provided declarative YAML configuration file into -source connector. -WARNING: Do not modify this file. -""" - - -# Declarative Source -class SourceMonday(YamlDeclarativeSource): - def __init__(self): - super().__init__(**{"path_to_yaml": "monday.yaml"}) diff --git a/airbyte-integrations/connectors/source-monday/unit_tests/conftest.py b/airbyte-integrations/connectors/source-monday/unit_tests/conftest.py deleted file mode 100644 index c40a4656127d..000000000000 --- a/airbyte-integrations/connectors/source-monday/unit_tests/conftest.py +++ /dev/null @@ -1,13 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -import json - -import pytest - - -@pytest.fixture(scope="session", name="config") -def config_fixture(): - with open("secrets/config.json", "r") as config_file: - return json.load(config_file) diff --git a/airbyte-integrations/connectors/source-monday/unit_tests/test_source.py b/airbyte-integrations/connectors/source-monday/unit_tests/test_source.py deleted file mode 100644 index 3fe784d7345c..000000000000 --- a/airbyte-integrations/connectors/source-monday/unit_tests/test_source.py +++ /dev/null @@ -1,21 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - -from unittest.mock import MagicMock - -from source_monday.source import SourceMonday - - -def test_check_connection(mocker, config): - source = SourceMonday() - logger_mock = MagicMock() - assert source.check_connection(logger_mock, config) == (True, None) - - -def test_stream_count(mocker): - source = SourceMonday() - config_mock = MagicMock() - streams = source.streams(config_mock) - expected_streams_number = 5 - assert len(streams) == expected_streams_number diff --git a/docs/integrations/sources/monday.md b/docs/integrations/sources/monday.md index ebdaf03f4d1c..6fa6f8ea4e1a 100644 --- a/docs/integrations/sources/monday.md +++ b/docs/integrations/sources/monday.md @@ -57,8 +57,9 @@ The Monday connector should not run into Monday API limitations under normal usa | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------| -| 0.2.1 | 2022-12-15 | [20533](https://github.com/airbytehq/airbyte/pull/20533) | Bump CDK version| -| 0.2.0 | 2022-12-13 | [19586](https://github.com/airbytehq/airbyte/pull/19586) | Migrate to low-code | +| 0.2.2 | 2023-01-04 | [20996](https://github.com/airbytehq/airbyte/pull/20996) | Fix json schema loader | +| 0.2.1 | 2022-12-15 | [20533](https://github.com/airbytehq/airbyte/pull/20533) | Bump CDK version | +| 0.2.0 | 2022-12-13 | [19586](https://github.com/airbytehq/airbyte/pull/19586) | Migrate to low-code | | 0.1.4 | 2022-06-06 | [14443](https://github.com/airbytehq/airbyte/pull/14443) | Increase retry_factor for Items stream | | 0.1.3 | 2021-12-23 | [8172](https://github.com/airbytehq/airbyte/pull/8172) | Add oauth2.0 support | | 0.1.2 | 2021-12-07 | [8429](https://github.com/airbytehq/airbyte/pull/8429) | Update titles and descriptions |