diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 701ebdf4a1a1..cdba173d1276 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -1029,7 +1029,7 @@ - name: Netsuite sourceDefinitionId: 4f2f093d-ce44-4121-8118-9d13b7bfccd0 dockerRepository: airbyte/source-netsuite - dockerImageTag: 0.1.1 + dockerImageTag: 0.1.2 documentationUrl: https://docs.airbyte.com/integrations/sources/netsuite sourceType: api releaseStage: alpha diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 7296f4db22a7..e4cd6f1d13da 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -9205,7 +9205,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-netsuite:0.1.1" +- dockerImage: "airbyte/source-netsuite:0.1.2" spec: documentationUrl: "https://docsurl.com" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-netsuite/Dockerfile b/airbyte-integrations/connectors/source-netsuite/Dockerfile index 322770ae5b55..6633bd9c7308 100644 --- a/airbyte-integrations/connectors/source-netsuite/Dockerfile +++ b/airbyte-integrations/connectors/source-netsuite/Dockerfile @@ -35,5 +35,5 @@ COPY source_netsuite ./source_netsuite ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.1 +LABEL io.airbyte.version=0.1.2 LABEL io.airbyte.name=airbyte/source-netsuite diff --git a/airbyte-integrations/connectors/source-netsuite/acceptance-test-config.yml b/airbyte-integrations/connectors/source-netsuite/acceptance-test-config.yml index 43f0d73ffe32..b2099398732a 100644 --- a/airbyte-integrations/connectors/source-netsuite/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-netsuite/acceptance-test-config.yml @@ -10,8 +10,8 @@ tests: - config_path: "sample_files/invalid_config.json" status: "failed" discovery: - # Discovery stage is dynamic, so timeout iscreased - config_path: "secrets/config.json" + # Discovery stage is dynamic, so timeout iscreased timeout_seconds: 1200 basic_read: - config_path: "secrets/config.json" @@ -33,4 +33,5 @@ tests: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" future_state_path: "integration_tests/abnormal_state.json" - timeout_seconds: 3600 + timeout_seconds: 7200 + threshold_days: 30 diff --git a/airbyte-integrations/connectors/source-netsuite/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-netsuite/integration_tests/configured_catalog.json index 62499ac8e539..65e822853ef7 100644 --- a/airbyte-integrations/connectors/source-netsuite/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-netsuite/integration_tests/configured_catalog.json @@ -2,24 +2,24 @@ "streams": [ { "stream": { - "name": "customrecord01", + "name": "customer", "json_schema": {}, - "supported_sync_modes": ["full_refresh"] + "supported_sync_modes": ["full_refresh", "incremental"] }, + "source_defined_cursor": true, + "default_cursor_field": ["lastModifiedDate"], "source_defined_primary_key": [["id"]], - "sync_mode": "full_refresh", + "sync_mode": "incremental", "destination_sync_mode": "append" }, { "stream": { - "name": "customer", + "name": "customrecord01", "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"] + "supported_sync_modes": ["full_refresh"] }, - "source_defined_cursor": true, - "default_cursor_field": ["lastModifiedDate"], "source_defined_primary_key": [["id"]], - "sync_mode": "incremental", + "sync_mode": "full_refresh", "destination_sync_mode": "append" }, { @@ -94,18 +94,6 @@ "sync_mode": "incremental", "destination_sync_mode": "append" }, - { - "stream": { - "name": "task", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"] - }, - "source_defined_cursor": true, - "default_cursor_field": ["lastModifiedDate"], - "source_defined_primary_key": [["id"]], - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, { "stream": { "name": "salesorder", diff --git a/airbyte-integrations/connectors/source-netsuite/setup.py b/airbyte-integrations/connectors/source-netsuite/setup.py index 572013714c36..85d053fd38c0 100644 --- a/airbyte-integrations/connectors/source-netsuite/setup.py +++ b/airbyte-integrations/connectors/source-netsuite/setup.py @@ -6,8 +6,8 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1", - "requests-oauthlib~=1.3", + "airbyte-cdk", + "requests-oauthlib", ] TEST_REQUIREMENTS = [ diff --git a/airbyte-integrations/connectors/source-netsuite/source_netsuite/constraints.py b/airbyte-integrations/connectors/source-netsuite/source_netsuite/constraints.py index 59a72028f833..664cad8e98d4 100644 --- a/airbyte-integrations/connectors/source-netsuite/source_netsuite/constraints.py +++ b/airbyte-integrations/connectors/source-netsuite/source_netsuite/constraints.py @@ -38,11 +38,6 @@ INCREMENTAL_CURSOR: str = "lastModifiedDate" CUSTOM_INCREMENTAL_CURSOR: str = "lastmodified" -# NETSUITE ERROR CODES BY THEIR HTTP TWINS -NETSUITE_ERRORS_MAPPING: dict = { - 400: { - "USER_ERROR": "reading an Admin record allowed for Admin only", - "NONEXISTENT_FIELD": "cursor_field declared in schema but doesn't exist in object", - "INVALID_PARAMETER": "cannot read or find the object. Skipping", - }, -} + +NETSUITE_INPUT_DATE_FORMATS: list[str] = ["%m/%d/%Y", "%Y-%m-%d"] +NETSUITE_OUTPUT_DATETIME_FORMAT: str = "%Y-%m-%dT%H:%M:%SZ" diff --git a/airbyte-integrations/connectors/source-netsuite/source_netsuite/errors.py b/airbyte-integrations/connectors/source-netsuite/source_netsuite/errors.py new file mode 100644 index 000000000000..13b2500b3493 --- /dev/null +++ b/airbyte-integrations/connectors/source-netsuite/source_netsuite/errors.py @@ -0,0 +1,21 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +# NETSUITE ERROR CODES BY THEIR HTTP TWINS +NETSUITE_ERRORS_MAPPING: dict = { + 400: { + "USER_ERROR": "reading an Admin record allowed for Admin only", + "NONEXISTENT_FIELD": "cursor_field declared in schema but doesn't exist in object", + "INVALID_PARAMETER": "cannot read or find the object. Skipping", + }, + 403: { + "INSUFFICIENT_PERMISSION": "not enough permissions to access the object", + }, +} + + +# NETSUITE API ERRORS EXCEPTIONS +class DateFormatExeption(Exception): + """API CANNOT HANDLE REQUEST USING GIVEN DATETIME FORMAT""" diff --git a/airbyte-integrations/connectors/source-netsuite/source_netsuite/source.py b/airbyte-integrations/connectors/source-netsuite/source_netsuite/source.py index ee97bf344a7d..cb4cff8ee66f 100644 --- a/airbyte-integrations/connectors/source-netsuite/source_netsuite/source.py +++ b/airbyte-integrations/connectors/source-netsuite/source_netsuite/source.py @@ -5,6 +5,7 @@ import logging from collections import Counter +from json import JSONDecodeError from typing import Any, List, Mapping, Tuple, Union import requests @@ -16,6 +17,9 @@ class SourceNetsuite(AbstractSource): + + logger: logging.Logger = logging.getLogger("airbyte") + def auth(self, config: Mapping[str, Any]) -> OAuth1: return OAuth1( client_key=config["consumer_key"], @@ -50,7 +54,7 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any # check connectivity to all provided `object_types` for object in object_types: try: - response = session.get(url=base_url + RECORD_PATH + object, params={"limit": 1}) + response = session.get(url=base_url + RECORD_PATH + object.lower(), params={"limit": 1}) response.raise_for_status() return True, None except requests.exceptions.HTTPError as e: @@ -67,11 +71,29 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any return False, e def get_schemas(self, object_names: Union[List[str], str], session: requests.Session, metadata_url: str) -> Mapping[str, Any]: - # fetch schemas - if isinstance(object_names, list): - return {object_name: session.get(metadata_url + object_name, headers=SCHEMA_HEADERS).json() for object_name in object_names} - elif isinstance(object_names, str): - return {object_names: session.get(metadata_url + object_names, headers=SCHEMA_HEADERS).json()} + """ + Handles multivariance of object_names type input and fetches the schema for each object type provided. + """ + try: + if isinstance(object_names, list): + schemas = {} + for object_name in object_names: + schemas.update(**self.fetch_schema(object_name, session, metadata_url)) + return schemas + elif isinstance(object_names, str): + return self.fetch_schema(object_names, session, metadata_url) + else: + raise NotImplementedError( + f"Object Types has unknown structure, should be either `dict` or `str`, actual input: {object_names}" + ) + except JSONDecodeError as e: + self.logger.error(f"Unexpected output while fetching the object schema. Full error: {e.__repr__()}") + + def fetch_schema(self, object_name: str, session: requests.Session, metadata_url: str) -> Mapping[str, Any]: + """ + Calls the API for specific object type and returns schema as a dict. + """ + return {object_name.lower(): session.get(metadata_url + object_name, headers=SCHEMA_HEADERS).json()} def generate_stream( self, @@ -83,10 +105,9 @@ def generate_stream( base_url: str, start_datetime: str, window_in_days: int, + max_retry: int = 3, ) -> Union[NetsuiteStream, IncrementalNetsuiteStream, CustomIncrementalNetsuiteStream]: - logger: logging.Logger = (logging.Logger,) - input_args = { "auth": auth, "object_name": object_name, @@ -94,24 +115,30 @@ def generate_stream( "start_datetime": start_datetime, "window_in_days": window_in_days, } - try: - schema = schemas[object_name] - schema_props = schema["properties"] - if schema_props: - if INCREMENTAL_CURSOR in schema_props.keys(): - return IncrementalNetsuiteStream(**input_args) - elif CUSTOM_INCREMENTAL_CURSOR in schema_props.keys(): - return CustomIncrementalNetsuiteStream(**input_args) - else: - # all other streams are full_refresh - return NetsuiteStream(**input_args) - except KeyError: - logger.warn(f"Object `{object_name}` schema has missing `properties` key. Retry...") - # somethimes object metadata returns data with missing `properties` key, - # we should try to fetch metadata again to that object - schemas = self.get_schemas(object_name, session, metadata_url) - input_args.update(**{"session": session, "metadata_url": metadata_url, "schemas": schemas}) - return self.generate_stream(**input_args) + + schema = schemas[object_name] + schema_props = schema.get("properties") + if schema_props: + if INCREMENTAL_CURSOR in schema_props.keys(): + return IncrementalNetsuiteStream(**input_args) + elif CUSTOM_INCREMENTAL_CURSOR in schema_props.keys(): + return CustomIncrementalNetsuiteStream(**input_args) + else: + # all other streams are full_refresh + return NetsuiteStream(**input_args) + else: + retry_attempt = 1 + while retry_attempt <= max_retry: + self.logger.warn(f"Object `{object_name}` schema has missing `properties` key. Retry attempt: {retry_attempt}/{max_retry}") + # somethimes object metadata returns data with missing `properties` key, + # we should try to fetch metadata again to that object + schemas = self.get_schemas(object_name, session, metadata_url) + if schemas[object_name].get("properties"): + input_args.update(**{"session": session, "metadata_url": metadata_url, "schemas": schemas}) + return self.generate_stream(**input_args) + retry_attempt += 1 + self.logger.warn(f"Object `{object_name}` schema is not available. Skipping this stream.") + return None def streams(self, config: Mapping[str, Any]) -> List[Stream]: auth = self.auth(config) @@ -121,7 +148,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: object_names = config.get("object_types") # retrieve all record types if `object_types` config field is not specified - if not config.get("object_types"): + if not object_names: objects_metadata = session.get(metadata_url).json().get("items") object_names = [object["name"] for object in objects_metadata] @@ -129,7 +156,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: schemas = self.get_schemas(object_names, **input_args) input_args.update( **{ - "auth": self.auth(config), + "auth": auth, "base_url": base_url, "start_datetime": config["start_datetime"], "window_in_days": config["window_in_days"], @@ -139,6 +166,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: # build streams streams: list = [] for name in object_names: - streams.append(self.generate_stream(object_name=name, **input_args)) - + stream = self.generate_stream(object_name=name.lower(), **input_args) + if stream: + streams.append(stream) return streams diff --git a/airbyte-integrations/connectors/source-netsuite/source_netsuite/streams.py b/airbyte-integrations/connectors/source-netsuite/source_netsuite/streams.py index 1fa385f79f98..525a88aac97b 100644 --- a/airbyte-integrations/connectors/source-netsuite/source_netsuite/streams.py +++ b/airbyte-integrations/connectors/source-netsuite/source_netsuite/streams.py @@ -5,6 +5,7 @@ from abc import ABC from datetime import date, datetime, timedelta +from json import JSONDecodeError from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union import requests @@ -14,13 +15,15 @@ CUSTOM_INCREMENTAL_CURSOR, INCREMENTAL_CURSOR, META_PATH, - NETSUITE_ERRORS_MAPPING, + NETSUITE_INPUT_DATE_FORMATS, + NETSUITE_OUTPUT_DATETIME_FORMAT, RECORD_PATH, REFERAL_SCHEMA, REFERAL_SCHEMA_URL, SCHEMA_HEADERS, USLESS_SCHEMA_ELEMENTS, ) +from source_netsuite.errors import NETSUITE_ERRORS_MAPPING, DateFormatExeption class NetsuiteStream(HttpStream, ABC): @@ -40,9 +43,15 @@ def __init__( super().__init__(authenticator=auth) primary_key = "id" + + # instance input date format format selector + index_datetime_format = 0 + raise_on_http_errors = True - output_datetime_format = "%Y-%m-%dT%H:%M:%SZ" - input_datetime_format = "%m/%d/%Y" + + @property + def default_datetime_format(self) -> str: + return NETSUITE_INPUT_DATE_FORMATS[self.index_datetime_format] @property def name(self) -> str: @@ -62,6 +71,13 @@ def ref_schema(self) -> Mapping[str, str]: return schema def get_schema(self, ref: str) -> Union[Mapping[str, Any], str]: + def get_json_response(response: requests.Response) -> dict: + try: + return response.json() + except JSONDecodeError as e: + self.logger.error(f"Cannot get schema for {self.name}, actual response: {e.response.text}") + raise + # try to retrieve the schema from the cache schema = self.schemas.get(ref) if not schema: @@ -72,8 +88,11 @@ def get_schema(self, ref: str) -> Union[Mapping[str, Any], str]: if resp.status_code == 404: schema = {"title": ref, "type": "string"} else: + # check for 200 status resp.raise_for_status - schema = resp.json() + # handle response + schema = get_json_response(resp) + self.schemas[ref] = schema return schema @@ -112,8 +131,8 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, def format_date(self, last_modified_date: str) -> str: # the date format returned is differnet than what we need to use in the query - lmd_datetime = datetime.strptime(last_modified_date, self.output_datetime_format) - return lmd_datetime.strftime(self.input_datetime_format) + lmd_datetime = datetime.strptime(last_modified_date, NETSUITE_OUTPUT_DATETIME_FORMAT) + return lmd_datetime.strftime(self.default_datetime_format) def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: params = {} @@ -155,16 +174,35 @@ def should_retry(self, response: requests.Response) -> bool: error_code = message[0].get("o:errorCode") detail_message = message[0].get("detail") known_error = NETSUITE_ERRORS_MAPPING.get(response.status_code) + if error_code in known_error.keys(): setattr(self, "raise_on_http_errors", False) - self.logger.warn( - f"Stream `{self.name}`: {known_error.get(error_code)}, full error message: {detail_message}", - ) - pass + + # handle data-format error + if "INVALID_PARAMETER" in error_code and "failed with date format" in detail_message: + self.logger.warn(f"Stream `{self.name}`: cannot read using date format `{self.default_datetime_format}") + self.index_datetime_format += 1 + if self.index_datetime_format < len(NETSUITE_INPUT_DATE_FORMATS): + self.logger.warn(f"Stream `{self.name}`: retry using next date format `{self.default_datetime_format}") + raise DateFormatExeption + else: + self.logger.error(f"DATE FORMAT exception. Cannot read using known formats {NETSUITE_INPUT_DATE_FORMATS}") + + # handle other known errors + self.logger.error(f"Stream `{self.name}`: {error_code} error occured, full error message: {detail_message}") + return False else: return super().should_retry(response) return super().should_retry(response) + def read_records( + self, stream_slice: Mapping[str, Any] = None, stream_state: Mapping[str, Any] = None, **kwargs + ) -> Iterable[Mapping[str, Any]]: + try: + yield from super().read_records(stream_slice=stream_slice, stream_state=stream_state, **kwargs) + except DateFormatExeption: + """continue trying other formats, until the list is exhausted""" + class IncrementalNetsuiteStream(NetsuiteStream): @property @@ -179,11 +217,23 @@ def filter_records_newer_than_state( """Parse the records with respect to `stream_state` for `incremental` sync.""" if stream_state: for record in records: - if record.get(self.cursor_field, "") >= stream_state.get(self.cursor_field): + if record.get(self.cursor_field, self.start_datetime) >= stream_state.get(self.cursor_field): yield record else: yield from records + def get_state_value(self, stream_state: Mapping[str, Any] = None) -> str: + """ + Sometimes the object has no `cursor_field` value assigned, and the ` "" ` emmited as state value, + this causes conflicts with datetime lib to parse the `time component`, + to avoid the errors we falling back to default start_date from input config. + """ + state = stream_state.get(self.cursor_field) if stream_state else self.start_datetime + if not state: + self.logger.info(f"Stream state for `{self.name}` was not emmited, falling back to default value: {self.start_datetime}") + return self.start_datetime + return state + def parse_response( self, response: requests.Response, @@ -200,8 +250,8 @@ def get_updated_state( current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any], ) -> Mapping[str, Any]: - latest_cursor = latest_record.get(self.cursor_field, "") - current_cursor = current_stream_state.get(self.cursor_field, "") + latest_cursor = latest_record.get(self.cursor_field, self.start_datetime) + current_cursor = current_stream_state.get(self.cursor_field, self.start_datetime) return {self.cursor_field: max(latest_cursor, current_cursor)} def request_params( @@ -214,30 +264,23 @@ def request_params( ) return params - def stream_slices( - self, - stream_state: Mapping[str, Any] = None, - **kwargs: Optional[Mapping[str, Any]], - ) -> Iterable[Optional[Mapping[str, Any]]]: + def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: # Netsuite cannot order records returned by the API, so we need stream slices # to maintain state properly https://docs.airbyte.com/connector-development/cdk-python/incremental-stream/#streamstream_slices + slices = [] - start_str = stream_state.get(self.cursor_field) if stream_state else self.start_datetime - start = datetime.strptime(start_str, self.output_datetime_format).date() + state = self.get_state_value(stream_state) + start = datetime.strptime(state, NETSUITE_OUTPUT_DATETIME_FORMAT).date() # handle abnormal state values if start > date.today(): return slices else: while start <= date.today(): next_day = start + timedelta(days=self.window_in_days) - slices.append( - { - "start": start.strftime(self.input_datetime_format), - "end": next_day.strftime(self.input_datetime_format), - } - ) + slice_start = start.strftime(self.default_datetime_format) + slice_end = next_day.strftime(self.default_datetime_format) + yield {"start": slice_start, "end": slice_end} start = next_day - return slices class CustomIncrementalNetsuiteStream(IncrementalNetsuiteStream):