Skip to content

Commit

Permalink
🐛 Source Shopify: fix the connector's performance for `incremental re…
Browse files Browse the repository at this point in the history
…fresh` (#5945)

* Added ability to run the incremental sync within slicing, by leveraging the cached_state 

Co-authored-by: Oleksandr Bazarnov <oleksandr.bazarnov@globallogic.com>
  • Loading branch information
bazarnov and bazarnov authored Sep 13, 2021
1 parent 03e1e08 commit 51e2718
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "9da77001-af33-4bcd-be46-6252bf9342b9",
"name": "Shopify",
"dockerRepository": "airbyte/source-shopify",
"dockerImageTag": "0.1.15",
"dockerImageTag": "0.1.16",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/shopify"
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@
- sourceDefinitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
name: Shopify
dockerRepository: airbyte/source-shopify
dockerImageTag: 0.1.15
dockerImageTag: 0.1.16
documentationUrl: https://docs.airbyte.io/integrations/sources/shopify
- sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
name: Redshift
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-shopify/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ COPY source_shopify ./source_shopify
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.15
LABEL io.airbyte.version=0.1.16
LABEL io.airbyte.name=airbyte/source-shopify
3 changes: 2 additions & 1 deletion airbyte-integrations/connectors/source-shopify/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ Customize `acceptance-test-config.yml` file to configure tests. See [Source Acce
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
```
python3 -m pytest integration_tests -p integration_tests.acceptance
docker build . --no-cache -t airbyte/source-shopify:dev \
&& python -m pytest -p source_acceptance_test.plugin
```
To run your integration tests with docker

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
{
"customers": {
"updated_at": "2021-07-19T06:41:49-07:00"
"updated_at": "2021-09-09T02:57:47-07:00"
},
"orders": {
"updated_at": "2021-07-19T06:52:05-07:00"
"updated_at": "2021-09-09T02:57:43-07:00"
},
"draft_orders": {
"updated_at": "2021-07-07T08:18:58-07:00"
},
"products": {
"updated_at": "2021-07-19T06:56:05-07:00"
"updated_at": "2021-08-18T02:40:22-07:00"
},
"abandoned_checkouts": {
"updated_at": "2021-07-08T05:41:47-07:00"
Expand All @@ -18,19 +18,19 @@
"updated_at": "2021-07-08T03:38:45-07:00"
},
"collects": {
"id": 29523654213790
"id": 29427031703741
},
"custom_collections": {
"updated_at": "2021-07-19T07:01:36-07:00"
"updated_at": "2021-08-18T02:39:34-07:00"
},
"order_refunds": {
"created_at": "2021-07-19T06:41:46-07:00"
"created_at": "2021-09-09T02:57:43-07:00"
},
"order_risks": {
"id": 6161307599037
},
"transactions": {
"created_at": "2021-07-19T06:41:45-07:00"
"created_at": "2021-09-09T02:57:43-07:00"
},
"pages": {
"updated_at": "2021-07-08T05:24:10-07:00"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@


from abc import ABC, abstractmethod
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Tuple
from urllib.parse import parse_qsl, urlparse

import requests
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator

from .utils import EagerlyCachedStreamState as stream_state_cache
from .utils import ShopifyRateLimiter as limiter


Expand All @@ -49,14 +49,13 @@ class ShopifyStream(HttpStream, ABC):
order_field = "updated_at"
filter_field = "updated_at_min"

def __init__(self, shop: str, start_date: str, **kwargs):
super().__init__(**kwargs)
self.start_date = start_date
self.shop = shop
def __init__(self, config: Dict):
super().__init__(authenticator=config["authenticator"])
self.config = config

@property
def url_base(self) -> str:
return f"https://{self.shop}.myshopify.com/admin/api/{self.api_version}/"
return f"https://{self.config['shop']}.myshopify.com/admin/api/{self.api_version}/"

@staticmethod
def next_page_token(response: requests.Response) -> Optional[Mapping[str, Any]]:
Expand All @@ -72,7 +71,7 @@ def request_params(self, next_page_token: Mapping[str, Any] = None, **kwargs) ->
params.update(**next_page_token)
else:
params["order"] = f"{self.order_field} asc"
params[self.filter_field] = self.start_date
params[self.filter_field] = self.config["start_date"]
return params

@limiter.balance_rate_limit()
Expand All @@ -89,16 +88,16 @@ def data_field(self) -> str:

# Basic incremental stream
class IncrementalShopifyStream(ShopifyStream, ABC):
# Getting page size as 'limit' from parrent class

# Setting the check point interval to the limit of the records output
@property
def limit(self):
def state_checkpoint_interval(self) -> int:
return super().limit

# Setting the check point interval to the limit of the records output
state_checkpoint_interval = limit
# Setting the default cursor field for all streams
cursor_field = "updated_at"

@stream_state_cache.cache_stream_state
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
return {self.cursor_field: max(latest_record.get(self.cursor_field, ""), current_stream_state.get(self.cursor_field, ""))}

Expand All @@ -118,29 +117,31 @@ def filter_records_newer_than_state(self, stream_state: Mapping[str, Any] = None
# Getting records >= state
if stream_state:
for record in records_slice:
if record[self.cursor_field] >= stream_state.get(self.cursor_field):
if record.get(self.cursor_field) >= stream_state.get(self.cursor_field):
yield record
else:
yield from records_slice


class Customers(IncrementalShopifyStream):
data_field = "customers"

def path(self, **kwargs) -> str:
return f"{self.data_field}.json"


class OrderSubstream(IncrementalShopifyStream):
def read_records(
self, stream_state: Mapping[str, Any] = None, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs
) -> Iterable[Mapping[str, Any]]:
orders_stream = Orders(authenticator=self.authenticator, shop=self.shop, start_date=self.start_date)
for data in orders_stream.read_records(sync_mode=SyncMode.full_refresh):
# get the last saved orders stream state
orders_stream = Orders(self.config)
orders_stream_state = stream_state_cache.cached_state.get(orders_stream.name)
for data in orders_stream.read_records(stream_state=orders_stream_state, **kwargs):
slice = super().read_records(stream_slice={"order_id": data["id"]}, **kwargs)
yield from self.filter_records_newer_than_state(stream_state=stream_state, records_slice=slice)


class Customers(IncrementalShopifyStream):
data_field = "customers"

def path(self, **kwargs) -> str:
return f"{self.data_field}.json"


class Orders(IncrementalShopifyStream):
data_field = "orders"

Expand Down Expand Up @@ -302,11 +303,12 @@ def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
def read_records(
self, stream_state: Mapping[str, Any] = None, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
price_rules_stream = PriceRules(authenticator=self.authenticator, shop=self.shop, start_date=self.start_date)
for data in price_rules_stream.read_records(sync_mode=SyncMode.full_refresh):
discount_slice = super().read_records(stream_slice={"price_rule_id": data["id"]}, **kwargs)
yield from self.filter_records_newer_than_state(stream_state=stream_state, records_slice=discount_slice)
# get the last saved price_rules stream state
price_rules_stream = PriceRules(self.config)
price_rules_stream_state = stream_state_cache.cached_state.get(price_rules_stream.name)
for data in price_rules_stream.read_records(stream_state=price_rules_stream_state, **kwargs):
slice = super().read_records(stream_slice={"price_rule_id": data["id"]}, **kwargs)
yield from self.filter_records_newer_than_state(stream_state=stream_state, records_slice=slice)


class ShopifyAuthenticator(TokenAuthenticator):
Expand Down Expand Up @@ -345,21 +347,21 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
Defining streams to run.
"""

auth = ShopifyAuthenticator(token=config["api_password"])
args = {"authenticator": auth, "shop": config["shop"], "start_date": config["start_date"]}
config["authenticator"] = ShopifyAuthenticator(token=config["api_password"])

return [
Customers(**args),
Orders(**args),
DraftOrders(**args),
Products(**args),
AbandonedCheckouts(**args),
Metafields(**args),
CustomCollections(**args),
Collects(**args),
OrderRefunds(**args),
OrderRisks(**args),
Transactions(**args),
Pages(**args),
PriceRules(**args),
DiscountCodes(**args),
Customers(config),
Orders(config),
DraftOrders(config),
Products(config),
AbandonedCheckouts(config),
Metafields(config),
CustomCollections(config),
Collects(config),
OrderRefunds(config),
OrderRisks(config),
Transactions(config),
Pages(config),
PriceRules(config),
DiscountCodes(config),
]
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

from functools import wraps
from time import sleep
from typing import Dict

import requests

Expand Down Expand Up @@ -108,3 +109,53 @@ def wrapper_balance_rate_limit(*args, **kwargs):
return wrapper_balance_rate_limit

return decorator


class EagerlyCachedStreamState:

"""
This is the placeholder for the tmp stream state for each incremental stream,
It's empty, once the sync has started and is being updated while sync operation takes place,
It holds the `temporary stream state values` before they are updated to have the opportunity to reuse this state.
"""

cached_state: Dict = {}

@staticmethod
def stream_state_to_tmp(*args, state_object: Dict = cached_state) -> Dict:
"""
Method to save the current stream state for future re-use within slicing.
The method requires having the temporary `state_object` as placeholder.
Because of the specific of Shopify's entities relations, we have the opportunity to fetch the updates,
for particular stream using the `Incremental Refresh`, inside slicing.
For example:
if `order refund` records were updated, then the `orders` is updated as well.
if 'transaction` was added to the order, then the `orders` is updated as well.
etc.
"""
# Map the input *args, the sequece should be always keeped up to the input function
# change the mapping if needed
stream: object = args[0] # the self instance of the stream
current_stream_state: Dict = args[1]
latest_record: Dict = args[2]
# get the current tmp_state_value
tmp_stream_state_value = state_object.get(stream.name, {}).get(stream.cursor_field, "")
# Compare the `current_stream_state` with `latest_record` to have the initial state value
if current_stream_state:
state_object[stream.name] = {
stream.cursor_field: min(current_stream_state.get(stream.cursor_field, ""), latest_record.get(stream.cursor_field, ""))
}
# Check if we have the saved state and keep the minimun value
if tmp_stream_state_value:
state_object[stream.name] = {
stream.cursor_field: min(current_stream_state.get(stream.cursor_field, ""), tmp_stream_state_value)
}
return state_object

def cache_stream_state(func):
@wraps(func)
def decorator(*args):
EagerlyCachedStreamState.stream_state_to_tmp(*args)
return func(*args)

return decorator
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# MIT License
#
# Copyright (c) 2020 Airbyte
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#


import pytest
from source_shopify.source import Orders
from source_shopify.utils import EagerlyCachedStreamState as stream_state_cache

# Define the Stream class for the test
STREAM = Orders(config={"authenticator": "token"})


@pytest.mark.parametrize(
"stream, cur_stream_state, latest_record, state_object, expected_output",
[
# When Full-Refresh: state_object: empty.
(STREAM, {STREAM.cursor_field: ""}, {STREAM.cursor_field: ""}, {}, {STREAM.name: {STREAM.cursor_field: ""}}),
(STREAM, {STREAM.cursor_field: ""}, {STREAM.cursor_field: "2021-01-01T01-01-01"}, {}, {STREAM.name: {STREAM.cursor_field: ""}}),
],
ids=["Sync Started", "Sync in progress"],
)
def test_full_refresh(stream, cur_stream_state, latest_record, state_object, expected_output):
"""
When Sync = Full-Refresh: we don't have any state yet, so we need to keep the state_object at min value, thus empty.
"""
# create the fixure for *args based on input
args = [stream, cur_stream_state, latest_record]
# use the external tmp_state_object for this test
actual = stream_state_cache.stream_state_to_tmp(*args, state_object=state_object)
assert actual == expected_output


@pytest.mark.parametrize(
"stream, cur_stream_state, latest_record, state_object, expected_output",
[
# When start the incremental refresh, assuming we have the state of STREAM.
(
STREAM,
{STREAM.cursor_field: "2021-01-01T01-01-01"},
{STREAM.cursor_field: "2021-01-05T02-02-02"},
{},
{STREAM.name: {STREAM.cursor_field: "2021-01-01T01-01-01"}},
),
# While doing the incremental refresh, we keeping the original state, even if the state is updated during the sync.
(
STREAM,
{STREAM.cursor_field: "2021-01-05T02-02-02"},
{STREAM.cursor_field: "2021-01-10T10-10-10"},
{},
{STREAM.name: {STREAM.cursor_field: "2021-01-05T02-02-02"}},
),
],
ids=["Sync Started", "Sync in progress"],
)
def test_incremental_sync(stream, cur_stream_state, latest_record, state_object, expected_output):
"""
When Sync = Incremental Refresh: we already have the saved state from Full-Refresh sync,
we have it passed as input to the Incremental Sync, so we need to back it up and reuse.
"""
# create the fixure for *args based on input
args = [stream, cur_stream_state, latest_record]
actual = stream_state_cache.stream_state_to_tmp(*args, state_object=state_object)
assert actual == expected_output
1 change: 1 addition & 0 deletions docs/integrations/sources/shopify.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ With given error message the sync operation is still goes on, but will require m

| Version | Date | Pull Request | Subject |
| :------ | :-------- | :----- | :------ |
| 0.1.16 | 2021-09-09 | [5965](https://github.com/airbytehq/airbyte/pull/5945) | Fixed the connector's performance for `Incremental refresh` |
| 0.1.15 | 2021-09-02 | [5853](https://github.com/airbytehq/airbyte/pull/5853) | Fixed `amount` type in `order_refund` schema |
| 0.1.14 | 2021-09-02 | [5801](https://github.com/airbytehq/airbyte/pull/5801) | Fixed `line_items/discount allocations` & `duties` parts of `orders` schema |
| 0.1.13 | 2021-08-17 | [5470](https://github.com/airbytehq/airbyte/pull/5470) | Fixed rate limits throttling |
Expand Down

0 comments on commit 51e2718

Please sign in to comment.