Skip to content

Commit

Permalink
🐛 Source Netsuite: fix early adopter issues (#19798)
Browse files Browse the repository at this point in the history
  • Loading branch information
bazarnov authored Jan 4, 2023
1 parent f5b793d commit 5f55f25
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,7 @@
- name: Netsuite
sourceDefinitionId: 4f2f093d-ce44-4121-8118-9d13b7bfccd0
dockerRepository: airbyte/source-netsuite
dockerImageTag: 0.1.1
dockerImageTag: 0.1.2
documentationUrl: https://docs.airbyte.com/integrations/sources/netsuite
sourceType: api
releaseStage: alpha
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9205,7 +9205,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-netsuite:0.1.1"
- dockerImage: "airbyte/source-netsuite:0.1.2"
spec:
documentationUrl: "https://docsurl.com"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-netsuite/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ COPY source_netsuite ./source_netsuite
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-netsuite
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ tests:
- config_path: "sample_files/invalid_config.json"
status: "failed"
discovery:
# Discovery stage is dynamic, so timeout iscreased
- config_path: "secrets/config.json"
# Discovery stage is dynamic, so timeout iscreased
timeout_seconds: 1200
basic_read:
- config_path: "secrets/config.json"
Expand All @@ -33,4 +33,5 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
timeout_seconds: 3600
timeout_seconds: 7200
threshold_days: 30
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,24 @@
"streams": [
{
"stream": {
"name": "customrecord01",
"name": "customer",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
"supported_sync_modes": ["full_refresh", "incremental"]
},
"source_defined_cursor": true,
"default_cursor_field": ["lastModifiedDate"],
"source_defined_primary_key": [["id"]],
"sync_mode": "full_refresh",
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "customer",
"name": "customrecord01",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
"supported_sync_modes": ["full_refresh"]
},
"source_defined_cursor": true,
"default_cursor_field": ["lastModifiedDate"],
"source_defined_primary_key": [["id"]],
"sync_mode": "incremental",
"sync_mode": "full_refresh",
"destination_sync_mode": "append"
},
{
Expand Down Expand Up @@ -94,18 +94,6 @@
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "task",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"]
},
"source_defined_cursor": true,
"default_cursor_field": ["lastModifiedDate"],
"source_defined_primary_key": [["id"]],
"sync_mode": "incremental",
"destination_sync_mode": "append"
},
{
"stream": {
"name": "salesorder",
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-netsuite/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"requests-oauthlib~=1.3",
"airbyte-cdk",
"requests-oauthlib",
]

TEST_REQUIREMENTS = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@
INCREMENTAL_CURSOR: str = "lastModifiedDate"
CUSTOM_INCREMENTAL_CURSOR: str = "lastmodified"

# NETSUITE ERROR CODES BY THEIR HTTP TWINS
NETSUITE_ERRORS_MAPPING: dict = {
400: {
"USER_ERROR": "reading an Admin record allowed for Admin only",
"NONEXISTENT_FIELD": "cursor_field declared in schema but doesn't exist in object",
"INVALID_PARAMETER": "cannot read or find the object. Skipping",
},
}

NETSUITE_INPUT_DATE_FORMATS: list[str] = ["%m/%d/%Y", "%Y-%m-%d"]
NETSUITE_OUTPUT_DATETIME_FORMAT: str = "%Y-%m-%dT%H:%M:%SZ"
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#


# NETSUITE ERROR CODES BY THEIR HTTP TWINS
NETSUITE_ERRORS_MAPPING: dict = {
400: {
"USER_ERROR": "reading an Admin record allowed for Admin only",
"NONEXISTENT_FIELD": "cursor_field declared in schema but doesn't exist in object",
"INVALID_PARAMETER": "cannot read or find the object. Skipping",
},
403: {
"INSUFFICIENT_PERMISSION": "not enough permissions to access the object",
},
}


# NETSUITE API ERRORS EXCEPTIONS
class DateFormatExeption(Exception):
"""API CANNOT HANDLE REQUEST USING GIVEN DATETIME FORMAT"""
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import logging
from collections import Counter
from json import JSONDecodeError
from typing import Any, List, Mapping, Tuple, Union

import requests
Expand All @@ -16,6 +17,9 @@


class SourceNetsuite(AbstractSource):

logger: logging.Logger = logging.getLogger("airbyte")

def auth(self, config: Mapping[str, Any]) -> OAuth1:
return OAuth1(
client_key=config["consumer_key"],
Expand Down Expand Up @@ -50,7 +54,7 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any
# check connectivity to all provided `object_types`
for object in object_types:
try:
response = session.get(url=base_url + RECORD_PATH + object, params={"limit": 1})
response = session.get(url=base_url + RECORD_PATH + object.lower(), params={"limit": 1})
response.raise_for_status()
return True, None
except requests.exceptions.HTTPError as e:
Expand All @@ -67,11 +71,29 @@ def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any
return False, e

def get_schemas(self, object_names: Union[List[str], str], session: requests.Session, metadata_url: str) -> Mapping[str, Any]:
# fetch schemas
if isinstance(object_names, list):
return {object_name: session.get(metadata_url + object_name, headers=SCHEMA_HEADERS).json() for object_name in object_names}
elif isinstance(object_names, str):
return {object_names: session.get(metadata_url + object_names, headers=SCHEMA_HEADERS).json()}
"""
Handles multivariance of object_names type input and fetches the schema for each object type provided.
"""
try:
if isinstance(object_names, list):
schemas = {}
for object_name in object_names:
schemas.update(**self.fetch_schema(object_name, session, metadata_url))
return schemas
elif isinstance(object_names, str):
return self.fetch_schema(object_names, session, metadata_url)
else:
raise NotImplementedError(
f"Object Types has unknown structure, should be either `dict` or `str`, actual input: {object_names}"
)
except JSONDecodeError as e:
self.logger.error(f"Unexpected output while fetching the object schema. Full error: {e.__repr__()}")

def fetch_schema(self, object_name: str, session: requests.Session, metadata_url: str) -> Mapping[str, Any]:
"""
Calls the API for specific object type and returns schema as a dict.
"""
return {object_name.lower(): session.get(metadata_url + object_name, headers=SCHEMA_HEADERS).json()}

def generate_stream(
self,
Expand All @@ -83,35 +105,40 @@ def generate_stream(
base_url: str,
start_datetime: str,
window_in_days: int,
max_retry: int = 3,
) -> Union[NetsuiteStream, IncrementalNetsuiteStream, CustomIncrementalNetsuiteStream]:

logger: logging.Logger = (logging.Logger,)

input_args = {
"auth": auth,
"object_name": object_name,
"base_url": base_url,
"start_datetime": start_datetime,
"window_in_days": window_in_days,
}
try:
schema = schemas[object_name]
schema_props = schema["properties"]
if schema_props:
if INCREMENTAL_CURSOR in schema_props.keys():
return IncrementalNetsuiteStream(**input_args)
elif CUSTOM_INCREMENTAL_CURSOR in schema_props.keys():
return CustomIncrementalNetsuiteStream(**input_args)
else:
# all other streams are full_refresh
return NetsuiteStream(**input_args)
except KeyError:
logger.warn(f"Object `{object_name}` schema has missing `properties` key. Retry...")
# somethimes object metadata returns data with missing `properties` key,
# we should try to fetch metadata again to that object
schemas = self.get_schemas(object_name, session, metadata_url)
input_args.update(**{"session": session, "metadata_url": metadata_url, "schemas": schemas})
return self.generate_stream(**input_args)

schema = schemas[object_name]
schema_props = schema.get("properties")
if schema_props:
if INCREMENTAL_CURSOR in schema_props.keys():
return IncrementalNetsuiteStream(**input_args)
elif CUSTOM_INCREMENTAL_CURSOR in schema_props.keys():
return CustomIncrementalNetsuiteStream(**input_args)
else:
# all other streams are full_refresh
return NetsuiteStream(**input_args)
else:
retry_attempt = 1
while retry_attempt <= max_retry:
self.logger.warn(f"Object `{object_name}` schema has missing `properties` key. Retry attempt: {retry_attempt}/{max_retry}")
# somethimes object metadata returns data with missing `properties` key,
# we should try to fetch metadata again to that object
schemas = self.get_schemas(object_name, session, metadata_url)
if schemas[object_name].get("properties"):
input_args.update(**{"session": session, "metadata_url": metadata_url, "schemas": schemas})
return self.generate_stream(**input_args)
retry_attempt += 1
self.logger.warn(f"Object `{object_name}` schema is not available. Skipping this stream.")
return None

def streams(self, config: Mapping[str, Any]) -> List[Stream]:
auth = self.auth(config)
Expand All @@ -121,15 +148,15 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
object_names = config.get("object_types")

# retrieve all record types if `object_types` config field is not specified
if not config.get("object_types"):
if not object_names:
objects_metadata = session.get(metadata_url).json().get("items")
object_names = [object["name"] for object in objects_metadata]

input_args = {"session": session, "metadata_url": metadata_url}
schemas = self.get_schemas(object_names, **input_args)
input_args.update(
**{
"auth": self.auth(config),
"auth": auth,
"base_url": base_url,
"start_datetime": config["start_datetime"],
"window_in_days": config["window_in_days"],
Expand All @@ -139,6 +166,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
# build streams
streams: list = []
for name in object_names:
streams.append(self.generate_stream(object_name=name, **input_args))

stream = self.generate_stream(object_name=name.lower(), **input_args)
if stream:
streams.append(stream)
return streams
Loading

0 comments on commit 5f55f25

Please sign in to comment.