Skip to content

Commit

Permalink
🐛 Source Shopify: fix duplicates for Product Images, `Metafield P…
Browse files Browse the repository at this point in the history
…roduct Images` and `Metafield Products` streams for Incremental syncs (#46095)
  • Loading branch information
bazarnov authored Oct 1, 2024
1 parent 4b3533a commit f517c3b
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ acceptance_tests:
discovery:
tests:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
# specified the Type for `customer_journey_summary` field,
# for `customer_journey_summary` stream.
disable_for_version: 2.14.17
basic_read:
tests:
- config_path: "secrets/config_transactions_with_user_id.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 2.5.2
dockerImageTag: 2.5.3
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
erdUrl: https://dbdocs.io/airbyteio/source-shopify?view=relationships
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.5.2"
version = "2.5.3"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@ def prepare(query: str) -> str:
@dataclass
class ShopifyBulkQuery:
config: Mapping[str, Any]
parent_stream_name: Optional[str] = None
parent_stream_cursor: Optional[str] = None

@property
def has_parent_stream(self) -> bool:
return True if self.parent_stream_name and self.parent_stream_cursor else False

@property
def parent_cursor_key(self) -> Optional[str]:
if self.has_parent_stream:
return f"{self.parent_stream_name}_{self.parent_stream_cursor}"

@property
def shop_id(self) -> int:
Expand Down Expand Up @@ -132,6 +143,38 @@ def query_nodes(self) -> Optional[Union[List[Field], List[str]]]:
"""
return ["__typename", "id"]

def _inject_parent_cursor_field(self, nodes: List[Field], key: str = "updatedAt", index: int = 2) -> List[Field]:
if self.has_parent_stream:
# inject parent cursor key as alias to the `updatedAt` parent cursor field
nodes.insert(index, Field(name="updatedAt", alias=self.parent_cursor_key))

return nodes

def _add_parent_record_state(self, record: MutableMapping[str, Any], items: List[dict], to_rfc3339: bool = False) -> List[dict]:
"""
Adds a parent cursor value to each item in the list.
This method iterates over a list of dictionaries and adds a new key-value pair to each dictionary.
The key is the value of `self.query_name`, and the value is another dictionary with a single key "updated_at"
and the provided `parent_cursor_value`.
Args:
items (List[dict]): A list of dictionaries to which the parent cursor value will be added.
parent_cursor_value (str): The value to be set for the "updated_at" key in the nested dictionary.
Returns:
List[dict]: The modified list of dictionaries with the added parent cursor values.
"""

if self.has_parent_stream:
parent_cursor_value: Optional[str] = record.get(self.parent_cursor_key, None)
parent_state = self.tools._datetime_str_to_rfc3339(parent_cursor_value) if to_rfc3339 and parent_cursor_value else None

for item in items:
item[self.parent_stream_name] = {self.parent_stream_cursor: parent_state}

return items

def get(self, filter_field: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None) -> str:
# define filter query string, if passed
filter_query = f"{filter_field}:>='{start}' AND {filter_field}:<='{end}'" if filter_field else None
Expand Down Expand Up @@ -285,15 +328,22 @@ def query_nodes(self) -> List[Field]:
List of available fields:
https://shopify.dev/docs/api/admin-graphql/unstable/objects/Metafield
"""

nodes = super().query_nodes

# define metafield node
metafield_node = self.get_edge_node("metafields", self.metafield_fields)

if isinstance(self.type.value, list):
return ["__typename", "id", self.get_edge_node(self.type.value[1], ["__typename", "id", metafield_node])]
nodes = [*nodes, self.get_edge_node(self.type.value[1], [*nodes, metafield_node])]
elif isinstance(self.type.value, str):
return ["__typename", "id", metafield_node]
nodes = [*nodes, metafield_node]

def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]:
nodes = self._inject_parent_cursor_field(nodes)

return nodes

def _process_metafield(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
# resolve parent id from `str` to `int`
record["owner_id"] = self.tools.resolve_str_id(record.get(BULK_PARENT_KEY))
# add `owner_resource` field
Expand All @@ -304,7 +354,28 @@ def record_process_components(self, record: MutableMapping[str, Any]) -> Iterabl
record["createdAt"] = self.tools.from_iso8601_to_rfc3339(record, "createdAt")
record["updatedAt"] = self.tools.from_iso8601_to_rfc3339(record, "updatedAt")
record = self.tools.fields_names_to_snake_case(record)
yield record
return record

def _process_components(self, entity: List[dict]) -> Iterable[MutableMapping[str, Any]]:
for item in entity:
# resolve the id from string
item["admin_graphql_api_id"] = item.get("id")
item["id"] = self.tools.resolve_str_id(item.get("id"))
yield self._process_metafield(item)

def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]:
# get the joined record components collected for the record
record_components = record.get("record_components", {})
# process record components
if not record_components:
yield self._process_metafield(record)
else:
metafields = record_components.get("Metafield", [])
if len(metafields) > 0:
if self.has_parent_stream:
# add parent state to each metafield
metafields = self._add_parent_record_state(record, metafields, to_rfc3339=True)
yield from self._process_components(metafields)


class MetafieldCollection(Metafield):
Expand Down Expand Up @@ -343,7 +414,9 @@ class MetafieldCustomer(Metafield):
customers(query: "updated_at:>='2023-02-07T00:00:00+00:00' AND updated_at:<='2023-12-04T00:00:00+00:00'", sortKey: UPDATED_AT) {
edges {
node {
__typename
id
customer_updated_at: updatedAt
metafields {
edges {
node {
Expand All @@ -366,6 +439,11 @@ class MetafieldCustomer(Metafield):

type = MetafieldType.CUSTOMERS

record_composition = {
"new_record": "Customer",
"record_components": ["Metafield"],
}


class MetafieldLocation(Metafield):
"""
Expand Down Expand Up @@ -464,7 +542,9 @@ class MetafieldProduct(Metafield):
products(query: "updated_at:>='2023-02-07T00:00:00+00:00' AND updated_at:<='2023-12-04T00:00:00+00:00'", sortKey: UPDATED_AT) {
edges {
node {
__typename
id
product_updated_at: updatedAt
metafields {
edges {
node {
Expand All @@ -487,6 +567,11 @@ class MetafieldProduct(Metafield):

type = MetafieldType.PRODUCTS

record_composition = {
"new_record": "Product",
"record_components": ["Metafield"],
}


class MetafieldProductImage(Metafield):
"""
Expand All @@ -496,6 +581,7 @@ class MetafieldProductImage(Metafield):
node {
__typename
id
product_updated_at: updatedAt
media {
edges {
node {
Expand Down Expand Up @@ -527,6 +613,13 @@ class MetafieldProductImage(Metafield):
}
"""

type = MetafieldType.PRODUCT_IMAGES

record_composition = {
"new_record": "Product",
"record_components": ["Metafield"],
}

@property
def query_nodes(self) -> List[Field]:
"""
Expand All @@ -537,19 +630,16 @@ def query_nodes(self) -> List[Field]:
More info here:
https://shopify.dev/docs/api/release-notes/2024-04#productimage-value-removed
"""

# define metafield node
metafield_node = self.get_edge_node("metafields", self.metafield_fields)
media_fields: List[Field] = [
"__typename",
"id",
InlineFragment(type="MediaImage", fields=[metafield_node]),
]
# define media node
media_fields: List[Field] = ["__typename", "id", InlineFragment(type="MediaImage", fields=[metafield_node])]
media_node = self.get_edge_node("media", media_fields)

fields: List[Field] = ["__typename", "id", media_node]
return fields
fields = self._inject_parent_cursor_field(fields)

type = MetafieldType.PRODUCT_IMAGES
return fields


class MetafieldProductVariant(Metafield):
Expand Down Expand Up @@ -2238,6 +2328,7 @@ class ProductImage(ShopifyBulkQuery):
node {
__typename
id
products_updated_at: updatedAt
# THE MEDIA NODE IS NEEDED TO PROVIDE THE CURSORS
media {
edges {
Expand Down Expand Up @@ -2314,8 +2405,7 @@ class ProductImage(ShopifyBulkQuery):
# media property fields
media_fields: List[Field] = [Field(name="edges", fields=[Field(name="node", fields=media_fragment)])]

# main query
query_nodes: List[Field] = [
nodes: List[Field] = [
"__typename",
"id",
Field(name="media", fields=media_fields),
Expand All @@ -2330,6 +2420,10 @@ class ProductImage(ShopifyBulkQuery):
"record_components": ["MediaImage", "Image"],
}

@property
def query_nodes(self) -> List[Field]:
return self._inject_parent_cursor_field(self.nodes)

def _process_component(self, entity: List[dict]) -> List[dict]:
for item in entity:
# remove the `__parentId` from the object
Expand Down Expand Up @@ -2405,6 +2499,8 @@ def record_process_components(self, record: MutableMapping[str, Any]) -> Iterabl

# add the product_id to each `Image`
record["images"] = self._add_product_id(record.get("images", []), record.get("id"))
# add the product cursor to each `Image`
record["images"] = self._add_parent_record_state(record, record.get("images", []), to_rfc3339=True)
record["images"] = self._merge_with_media(record_components)
record.pop("record_components")

Expand All @@ -2413,7 +2509,6 @@ def record_process_components(self, record: MutableMapping[str, Any]) -> Iterabl
if len(images) > 0:
# convert dates from ISO-8601 to RFC-3339
record["images"] = self._convert_datetime_to_rfc3339(images)

yield from self._emit_complete_records(images)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ def shop_name_from_url(url: str) -> str:
return url

@staticmethod
def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Mapping[str, Any]:
def _datetime_str_to_rfc3339(value: str) -> str:
return pdm.parse(value).to_rfc3339_string()

@staticmethod
def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Optional[str]:
"""
Converts date-time as follows:
Input: "2023-01-01T15:00:00Z"
Expand All @@ -73,7 +77,7 @@ def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Mapping[st
# some fields that expected to be resolved as ids, might not be populated for the particular `RECORD`,
# we should return `None` to make the field `null` in the output as the result of the transformation.
target_value = record.get(field)
return pdm.parse(target_value).to_rfc3339_string() if target_value else record.get(field)
return BulkTools._datetime_str_to_rfc3339(target_value) if target_value else record.get(field)

def fields_names_to_snake_case(self, dict_input: Optional[Mapping[str, Any]] = None) -> Optional[MutableMapping[str, Any]]:
# transforming record field names from camel to snake case, leaving the `__parent_id` relation in place
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ def __init__(self, config: Dict) -> None:
self.job_manager: ShopifyBulkManager = ShopifyBulkManager(
http_client=self.bulk_http_client,
base_url=f"{self.url_base}{self.path()}",
query=self.bulk_query(config),
query=self.bulk_query(config, self.parent_stream_name, self.parent_stream_cursor),
job_termination_threshold=float(config.get("job_termination_threshold", 3600)),
# overide the default job slice size, if provided (it's auto-adjusted, later on)
job_size=config.get("bulk_window_in_days", 30.0),
Expand All @@ -670,6 +670,20 @@ def parent_stream(self) -> Union[ShopifyStream, IncrementalShopifyStream]:
"""
return self.parent_stream_class(self.config) if self.parent_stream_class else None

@property
def parent_stream_name(self) -> Optional[str]:
"""
Returns the parent stream name, if the substream has a `parent_stream_class` dependency.
"""
return self.parent_stream.name if self.parent_stream_class else None

@property
def parent_stream_cursor(self) -> Optional[str]:
"""
Returns the parent stream cursor, if the substream has a `parent_stream_class` dependency.
"""
return self.parent_stream.cursor_field if self.parent_stream_class else None

@property
@abstractmethod
def bulk_query(self) -> ShopifyBulkQuery:
Expand Down Expand Up @@ -716,21 +730,37 @@ def get_updated_state(
"""
updated_state = super().get_updated_state(current_stream_state, latest_record)
if self.parent_stream_class:
parent_state = latest_record.get(self.parent_stream.name, {})
parent_state_value = (
parent_state.get(self.parent_stream.cursor_field) if parent_state else latest_record.get(self.parent_stream.cursor_field)
)
# add parent_stream_state to `updated_state`
updated_state[self.parent_stream.name] = {self.parent_stream.cursor_field: latest_record.get(self.parent_stream.cursor_field)}
updated_state[self.parent_stream.name] = {self.parent_stream.cursor_field: parent_state_value}
return updated_state

def get_stream_state_value(self, stream_state: Optional[Mapping[str, Any]]) -> str:
if self.parent_stream_class:
# get parent stream state from the stream_state object.
parent_state = stream_state.get(self.parent_stream.name, {})
if parent_state:
return parent_state.get(self.parent_stream.cursor_field, self.default_state_comparison_value)
else:
# get the stream state, if no `parent_stream_class` was assigned.
def _get_stream_cursor_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[str]:
if stream_state:
return stream_state.get(self.cursor_field, self.default_state_comparison_value)
else:
return self.config.get("start_date")

def get_stream_state_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[str]:
if stream_state:
if self.parent_stream_class:
# get parent stream state from the stream_state object.
parent_state = stream_state.get(self.parent_stream.name, {})
if parent_state:
return parent_state.get(self.parent_stream.cursor_field, self.default_state_comparison_value)
else:
# use the streams cursor value, if no parent state available
return self._get_stream_cursor_value(stream_state)
else:
# get the stream state, if no `parent_stream_class` was assigned.
return self._get_stream_cursor_value(stream_state)
else:
return self.config.get("start_date")

def get_state_value(self, stream_state: Mapping[str, Any] = None) -> Optional[Union[str, int]]:
def get_state_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[Union[str, int]]:
if stream_state:
return self.get_stream_state_value(stream_state)
else:
Expand Down
Loading

0 comments on commit f517c3b

Please sign in to comment.