Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸ› Source Shopify: fixed store redirection, add bulk checkpointing #42095

Merged
merged 46 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
ef41bf9
test
bazarnov Jul 11, 2024
edd3ee8
Merge remote-tracking branch 'origin/master'
bazarnov Jul 11, 2024
cb299d0
Merge remote-tracking branch 'origin/master'
bazarnov Jul 13, 2024
adcae54
Merge branch 'master' of https://github.com/airbytehq/airbyte
bazarnov Jul 16, 2024
5057187
Merge remote-tracking branch 'origin/master'
bazarnov Jul 16, 2024
7203d14
Merge remote-tracking branch 'origin/master'
bazarnov Jul 16, 2024
2a8143d
Merge remote-tracking branch 'origin/master'
bazarnov Jul 17, 2024
f71eb18
Merge remote-tracking branch 'origin/master'
bazarnov Jul 17, 2024
706d507
added checkpointing to BULK
bazarnov Jul 17, 2024
aa9b31e
added url_base switch to redirect
bazarnov Jul 18, 2024
598a1a1
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 18, 2024
dfb71c7
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 18, 2024
f03bf11
updated and formated
bazarnov Jul 18, 2024
d809c17
updated changelog
bazarnov Jul 18, 2024
958cdbe
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 19, 2024
67e23f8
updated after review
bazarnov Jul 19, 2024
d2a2812
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 19, 2024
6268a18
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 22, 2024
aff67d6
added more BULK related info to the log messages, changed the min val…
bazarnov Jul 22, 2024
af063d1
added default ASC sorting for BUlk streams
bazarnov Jul 22, 2024
f0864d4
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 22, 2024
edfe43f
formatted
bazarnov Jul 22, 2024
260bdcc
reverted .dockerignore
bazarnov Jul 22, 2024
f5fbbc7
reverted non-functional changes
bazarnov Jul 22, 2024
e04faed
fixed unit_tests up to the changes
bazarnov Jul 22, 2024
e3a1c41
formatted
bazarnov Jul 22, 2024
b8493f6
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 22, 2024
908a119
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 22, 2024
0b6c107
updated
bazarnov Jul 23, 2024
87b923f
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 23, 2024
e720941
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 23, 2024
14eb8c8
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 23, 2024
1883317
fixed inf.loop
bazarnov Jul 23, 2024
7162a62
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 23, 2024
a506fd3
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 24, 2024
b499cb0
minor corrections
bazarnov Jul 24, 2024
123792f
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 24, 2024
521ad5a
updated
bazarnov Jul 25, 2024
3bb28dc
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 25, 2024
8d51e42
updated CAT config
bazarnov Jul 25, 2024
25cb146
updated switch url logic
bazarnov Jul 25, 2024
85b7b21
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 26, 2024
6e39a8f
fixed default behaviour for pres.prices
bazarnov Jul 26, 2024
756b892
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 29, 2024
db50cfe
bumped version
bazarnov Jul 29, 2024
366fa1b
Merge remote-tracking branch 'origin/master' into baz/source/shopify/…
bazarnov Jul 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ acceptance_tests:
spec:
tests:
- spec_path: "source_shopify/spec.json"
backward_compatibility_tests_config:
# This is the intentional change.
# Added new fields: `job_checkpoint_interval`, `job_product_variants_include_pres_prices`
# to provide the ability to override this value by the User.
disable_for_version: 2.4.14
connection:
tests:
- config_path: "secrets/config.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.4.15
dockerImageTag: 2.4.16
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
githubIssueLabel: source-shopify
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-shopify/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.4.15"
version = "2.4.16"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class BulkJobCreationFailedConcurrentError(BaseBulkException):

failure_type: FailureType = FailureType.transient_error

class BulkJobRedirectToOtherShopError(BaseBulkException):
"""Raised when the response contains another shop name"""

failure_type: FailureType = FailureType.transient_error

class BulkJobConcurrentError(BaseBulkException):
"""Raised when failing the job after hitting too many BulkJobCreationFailedConcurrentError."""

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ def prepare(query: str) -> str:

@dataclass
class ShopifyBulkQuery:
shop_id: int
config: Mapping[str, Any]

@property
def shop_id(self) -> int:
return self.config.get("shop_id")

@property
def tools(self) -> BulkTools:
Expand Down Expand Up @@ -112,6 +116,14 @@ def sort_key(self) -> Optional[str]:
"""
return None

@property
def supports_checkpointing(self) -> bool:
"""
The presence of `sort_key = "UPDATED_AT"` for a query instance, usually means,
the server-side BULK Job results are fetched and ordered correctly, suitable for checkpointing.
"""
return self.sort_key == "UPDATED_AT"

@property
def query_nodes(self) -> Optional[Union[List[Field], List[str]]]:
"""
Expand Down Expand Up @@ -2382,8 +2394,7 @@ class ProductVariant(ShopifyBulkQuery):
"""
{
productVariants(
query: "updated_at:>='2019-04-13T00:00:00+00:00' AND updated_at:<='2024-04-30T12:16:17.273363+00:00'"
sortKey: UPDATED_AT
query: "updatedAt:>='2019-04-13T00:00:00+00:00' AND updatedAt:<='2024-04-30T12:16:17.273363+00:00'"
) {
edges {
node {
Expand Down Expand Up @@ -2457,64 +2468,76 @@ class ProductVariant(ShopifyBulkQuery):
"""

query_name = "productVariants"
sort_key = "ID"

prices_fields: List[str] = ["amount", "currencyCode"]
presentment_prices_fields: List[Field] = [
Field(
name="edges",
fields=[
Field(
name="node",
fields=["__typename", Field(name="price", fields=prices_fields), Field(name="compareAtPrice", fields=prices_fields)],
)
],
)
]
@property
def _should_include_presentment_prices(self) -> bool:
return self.config.get("job_product_variants_include_pres_prices", True)

option_value_fields: List[Field] = [
"id",
"name",
Field(name="hasVariants", alias="has_variants"),
Field(name="swatch", fields=["color", Field(name="image", fields=["id"])]),
]
option_fields: List[Field] = [
"name",
"value",
Field(name="optionValue", alias="option_value", fields=option_value_fields),
]
@property
def query_nodes(self) -> Optional[Union[List[Field], List[str]]]:

# main query
query_nodes: List[Field] = [
"__typename",
"id",
"title",
"price",
"sku",
"position",
"inventoryPolicy",
"compareAtPrice",
"inventoryManagement",
"createdAt",
"updatedAt",
"taxable",
"barcode",
"weight",
"weightUnit",
"inventoryQuantity",
"requiresShipping",
"availableForSale",
"displayName",
"taxCode",
Field(name="selectedOptions", alias="options", fields=option_fields),
Field(name="weight", alias="grams"),
Field(name="image", fields=[Field(name="id", alias="image_id")]),
Field(name="inventoryQuantity", alias="old_inventory_quantity"),
Field(name="product", fields=[Field(name="id", alias="product_id")]),
Field(name="fulfillmentService", fields=[Field(name="handle", alias="fulfillment_service")]),
Field(name="inventoryItem", fields=[Field(name="id", alias="inventory_item_id")]),
Field(name="presentmentPrices", fields=presentment_prices_fields),
]
prices_fields: List[str] = ["amount", "currencyCode"]
presentment_prices_fields: List[Field] = [
Field(
name="edges",
fields=[
Field(
name="node",
fields=[
"__typename",
Field(name="price", fields=prices_fields),
Field(name="compareAtPrice", fields=prices_fields),
],
)
],
)
]
option_value_fields: List[Field] = [
"id",
"name",
Field(name="hasVariants", alias="has_variants"),
Field(name="swatch", fields=["color", Field(name="image", fields=["id"])]),
]
option_fields: List[Field] = [
"name",
"value",
Field(name="optionValue", alias="option_value", fields=option_value_fields),
]
presentment_prices = (
[Field(name="presentmentPrices", fields=presentment_prices_fields)] if self._should_include_presentment_prices else []
)

query_nodes: List[Field] = [
"__typename",
"id",
"title",
"price",
"sku",
"position",
"inventoryPolicy",
"compareAtPrice",
"inventoryManagement",
"createdAt",
"updatedAt",
"taxable",
"barcode",
"weight",
"weightUnit",
"inventoryQuantity",
"requiresShipping",
"availableForSale",
"displayName",
"taxCode",
Field(name="selectedOptions", alias="options", fields=option_fields),
Field(name="weight", alias="grams"),
Field(name="image", fields=[Field(name="id", alias="image_id")]),
Field(name="inventoryQuantity", alias="old_inventory_quantity"),
Field(name="product", fields=[Field(name="id", alias="product_id")]),
Field(name="fulfillmentService", fields=[Field(name="handle", alias="fulfillment_service")]),
Field(name="inventoryItem", fields=[Field(name="id", alias="inventory_item_id")]),
] + presentment_prices

return query_nodes

record_composition = {
"new_record": "ProductVariant",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def __post_init__(self) -> None:
self.composition: Optional[Mapping[str, Any]] = self.query.record_composition
self.record_process_components: Optional[Callable[[MutableMapping], MutableMapping]] = self.query.record_process_components
self.components: List[str] = self.composition.get("record_components", []) if self.composition else []
# how many records composed
self.record_composed: int = 0

@property
def tools(self) -> BulkTools:
Expand Down Expand Up @@ -127,8 +129,12 @@ def produce_records(self, filename: str) -> Iterable[MutableMapping[str, Any]]:
"""

with open(filename, "r") as jsonl_file:
# reset the counter
self.record_composed = 0

for record in self.process_line(jsonl_file):
yield self.tools.fields_names_to_snake_case(record)
self.record_composed += 1

def read_file(self, filename: str, remove_file: Optional[bool] = True) -> Iterable[Mapping[str, Any]]:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,20 @@ def bulk_retry_on_exception(logger: logging.Logger, more_exceptions: Optional[Tu
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(self, *args, **kwargs) -> Any:
# mandatory class attributes
max_retries = self._job_max_retries
stream_name = self.stream_name
backoff_time = self._job_backoff_time

current_retries = 0
while True:
try:
return func(self, *args, **kwargs)
except BULK_RETRY_ERRORS or more_exceptions as ex:
current_retries += 1
if current_retries > max_retries:
if current_retries > self._job_max_retries:
logger.error("Exceeded retry limit. Giving up.")
raise
else:
logger.warning(
f"Stream `{stream_name}`: {ex}. Retrying {current_retries}/{max_retries} after {backoff_time} seconds."
f"Stream `{self.http_client.name}`: {ex}. Retrying {current_retries}/{self._job_max_retries} after {self._job_backoff_time} seconds."
)
sleep(backoff_time)
sleep(self._job_backoff_time)
except ShopifyBulkExceptions.BulkJobCreationFailedConcurrentError:
if self._concurrent_attempt == self._concurrent_max_retry:
message = f"The BULK Job couldn't be created at this time, since another job is running."
Expand All @@ -51,9 +46,13 @@ def wrapper(self, *args, **kwargs) -> Any:

self._concurrent_attempt += 1
logger.warning(
f"Stream: `{self.stream_name}`, the BULK concurrency limit has reached. Waiting {self._concurrent_interval} sec before retry, attempt: {self._concurrent_attempt}.",
f"Stream: `{self.http_client.name}`, the BULK concurrency limit has reached. Waiting {self._concurrent_interval} sec before retry, attempt: {self._concurrent_attempt}.",
)
sleep(self._concurrent_interval)
except ShopifyBulkExceptions.BulkJobRedirectToOtherShopError:
logger.warning(
f"Stream: `{self.http_client.name}`, the `shop name` differs from the provided in `input configuration`. Switching to the `{self._tools.shop_name_from_url(self.base_url)}`.",
)

return wrapper

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ def filename_from_url(job_result_url: str) -> str:
f"Could not extract the `filename` from `result_url` provided, details: {job_result_url}",
)

@staticmethod
def shop_name_from_url(url: str) -> str:
match = re.search(r"https://(.*?)(\.myshopify)", url)
if match:
return match.group(1)
else:
# safety net, if there is an error parsing url,
# on no match is found
return url

@staticmethod
def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Mapping[str, Any]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,27 @@
"description": "Defines which API type (REST/BULK) to use to fetch `Transactions` data. If you are a `Shopify Plus` user, leave the default value to speed up the fetch.",
"default": false
},
"job_product_variants_include_pres_prices": {
"type": "boolean",
"title": "Add `Presentment prices` to Product Variants",
"description": "If enabled, the `Product Variants` stream attempts to include `Presentment prices` field (may affect the performance).",
"default": true
},
"job_termination_threshold": {
"type": "integer",
"title": "BULK Job termination threshold",
"description": "The max time in seconds, after which the single BULK Job should be `CANCELED` and retried. The bigger the value the longer the BULK Job is allowed to run.",
"default": 3600,
"minimum": 1
"default": 7200,
"minimum": 3600,
Copy link

@jblakeman jblakeman Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, is there a specific reason the minimum needs to be 3600 or greater than 1? Adding this requirement means large jobs significantly increase the time to which they arrive at a small enough slice that can run without failure.

EDIT: My guess is that < 3600 is thought to be obviated with the introduction of job_checkpoint_interval, though given the variation across shop windows in rows collected per second, if a job of 15000 (min) rows always fails with a new flavor of retryable error after 3600 seconds and would otherwise succeed in some smaller threshold, we have added a failure requirement here without a workaround until a fix is in place.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jblakeman Thank you for raising this; we selected the values from what we got on our side and are happy to adjust them now that we have your feedback. I'll add this to the next source-shopify update.

"maximum": 21600
},
"job_checkpoint_interval": {
"type": "integer",
"title": "BULK Job checkpoint (rows collected)",
"description": "The threshold, after which the single BULK Job should be checkpointed.",
"default": 100000,
"minimum": 15000,
"maximum": 200000
}
}
},
Expand Down
Loading
Loading