Skip to content

Commit

Permalink
Write test cases and changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
harshithmullapudi committed Jun 15, 2021
1 parent a71de82 commit 4473a50
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Amazon Seller Partner Source CHANGELOG

| Date | Released Version | Notes |
| :--- | :--- | :--- |
| `2021-06-15` | `0.1.2` | `Fixed: Sync fails with timeout when create report is CANCELLED` |

Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-amazon-seller-partner
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#


from typing import Optional
from typing import Optional, List

from sp_api.api import Orders, Reports
from sp_api.base import Marketplaces
Expand Down Expand Up @@ -66,7 +66,7 @@ def __init__(self, credentials: dict, marketplace: str):
self.credentials = credentials
self.marketplace = self.MARKETPLACES_TO_ID[marketplace]

def get_entities(self):
def get_entities(self) -> List[str]:
return self._ENTITIES

def is_report(self, stream_name: str) -> bool:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import time
from datetime import datetime
import pendulum
from typing import Any, Dict, Generator, MutableMapping, Tuple
from typing import Any, Dict, Generator, MutableMapping, Tuple, List

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, AirbyteStateMessage, AirbyteStream, Type
Expand All @@ -38,7 +38,6 @@


class BaseClient:
MAX_SLEEP_TIME = 512
CONVERSION_WINDOW_DAYS = 14

def __init__(
Expand Down Expand Up @@ -134,7 +133,7 @@ def read_reports(
reportId = response["reportId"]

# Wait for the report status
status, document_id = self._wait_for_report(logger, self._amazon_client, reportId)
status, document_id = BaseClient._wait_for_report(logger, self._amazon_client, reportId)

# Move to next month when the report is CANCELLED
if status == False:
Expand All @@ -156,7 +155,18 @@ def read_reports(

current_date = self._increase_date_by_month(current_date)

def _wait_for_report(self, logger, amazon_client: AmazonClient, reportId: str):
def _get_records(self, data: Dict[str, Any]):
records = data["document"].splitlines()
headers = records[0].split("\t")
records = records[1:]
return self._convert_array_into_dict(headers, records)

def _apply_conversion_window(self, current_date: str) -> str:
return pendulum.parse(current_date).subtract(days=self.CONVERSION_WINDOW_DAYS).to_date_string()

@staticmethod
def _wait_for_report(logger, amazon_client: AmazonClient, reportId: str):
MAX_SLEEP_TIME = 512
current_sleep_time = 4

logger.info(f"Waiting for the report {reportId}")
Expand All @@ -173,32 +183,23 @@ def _wait_for_report(self, logger, amazon_client: AmazonClient, reportId: str):
logger.info(f"Report CANCELLED: {reportId}")
return False, None

if current_sleep_time > self.MAX_SLEEP_TIME:
if current_sleep_time > MAX_SLEEP_TIME:
logger.error("Max wait reached")
raise Exception("Max wait time reached")

logger.info(f"Sleeping for {current_sleep_time}")
time.sleep(current_sleep_time)
current_sleep_time = current_sleep_time * 2

def _get_records(self, data):
records = data["document"].splitlines()
headers = records[0].split("\t")
records = records[1:]
return self._convert_array_into_dict(headers, records)

def _apply_conversion_window(self, current_date: str) -> str:
return pendulum.parse(current_date).subtract(days=self.CONVERSION_WINDOW_DAYS).to_date_string()

@staticmethod
def _convert_array_into_dict(headers, values):
def _convert_array_into_dict(headers: List[Dict[str, Any]], values: List[Dict[str, Any]]):
records = []
for value in values:
records.append(dict(zip(headers, value.split("\t"))))
return records

@staticmethod
def _increase_date_by_month(current_date: str):
def _increase_date_by_month(current_date: str) -> str:
return pendulum.parse(current_date).add(months=1).to_date_string()

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"seller_id": {
"title": "Seller ID",
"type": "string",
"description": "This is used as an identifier if passed. Not used inside API"
"description": "Amazon doesn't return seller_id in the response thus seller_id is added to each row as an identifier. Note: It is not used in querying the data."
},
"marketplace": {
"title": "Marketplace",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

from datetime import datetime
from typing import Mapping
import abc

from airbyte_cdk.logger import AirbyteLogger
from dateutil.relativedelta import relativedelta
from source_amazon_seller_partner.client import BaseClient

Expand All @@ -48,17 +50,49 @@


class MockAmazonClient:
COUNT = 1

def __init__(self, credentials, marketplace):
self.credentials = credentials
self.marketplace = marketplace

def fetch_orders(updated_after, page_count, next_token=None):
return ORDERS_RESPONSE


@abc.abstractmethod
def get_report(self, reportId):
return

class AmazonSuccess(MockAmazonClient):
def get_report(self, reportId):
if self.COUNT == 3:
return { "processingStatus": "DONE", "reportDocumentId": 1 }
else:
self.COUNT = self.COUNT + 1
return { "processingStatus": "IN_PROGRESS" }
class AmazonCancelled(MockAmazonClient):
def get_report(self, reportId):
if self.COUNT == 3:
return { "processingStatus": "CANCELLED" }
else:
self.COUNT = self.COUNT + 1
return { "processingStatus": "IN_PROGRESS" }

def get_base_client(config: Mapping):
return BaseClient(**config)

def test_wait_for_report(mocker):
reportId = "123"

amazon_client = AmazonCancelled(credentials={}, marketplace="USA")
wait_response = BaseClient._wait_for_report(AirbyteLogger(), amazon_client, reportId)

assert wait_response == (False, None)

amazon_client = AmazonSuccess(credentials={}, marketplace="USA")

wait_response = BaseClient._wait_for_report(AirbyteLogger(), amazon_client, reportId)
assert wait_response == (True, 1)

def test_check_connection(mocker):
mocker.patch("source_amazon_seller_partner.client.AmazonClient", return_value=MockAmazonClient)
Expand Down
3 changes: 3 additions & 0 deletions docs/integrations/sources/amazon-seller-partner.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ Information about rate limits you may find [here](https://github.com/amzn/sellin
* AWS USER ACCESS KEY
* AWS USER SECRET KEY
* role_arn
* seller_id

Amazon doesn't return seller_id in the response thus seller_id is added to each row as an identifier. Note: It is not used in querying the data.

### Setup guide

Expand Down

0 comments on commit 4473a50

Please sign in to comment.