diff --git a/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml b/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml index ebdbd463b736..5473d1c9d319 100644 --- a/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml @@ -24,10 +24,6 @@ acceptance_tests: discovery: tests: - config_path: "secrets/config.json" - backward_compatibility_tests_config: - # specified the Type for `customer_journey_summary` field, - # for `customer_journey_summary` stream. - disable_for_version: 2.14.17 basic_read: tests: - config_path: "secrets/config_transactions_with_user_id.json" diff --git a/airbyte-integrations/connectors/source-shopify/metadata.yaml b/airbyte-integrations/connectors/source-shopify/metadata.yaml index 8a60be2e333e..5367d136376e 100644 --- a/airbyte-integrations/connectors/source-shopify/metadata.yaml +++ b/airbyte-integrations/connectors/source-shopify/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: api connectorType: source definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9 - dockerImageTag: 2.5.2 + dockerImageTag: 2.5.3 dockerRepository: airbyte/source-shopify documentationUrl: https://docs.airbyte.com/integrations/sources/shopify erdUrl: https://dbdocs.io/airbyteio/source-shopify?view=relationships diff --git a/airbyte-integrations/connectors/source-shopify/pyproject.toml b/airbyte-integrations/connectors/source-shopify/pyproject.toml index b30832d838d5..433fe880da65 100644 --- a/airbyte-integrations/connectors/source-shopify/pyproject.toml +++ b/airbyte-integrations/connectors/source-shopify/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "2.5.2" +version = "2.5.3" name = "source-shopify" description = "Source CDK implementation for Shopify." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py index 070c7b5edf76..4efe79709517 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py @@ -80,6 +80,17 @@ def prepare(query: str) -> str: @dataclass class ShopifyBulkQuery: config: Mapping[str, Any] + parent_stream_name: Optional[str] = None + parent_stream_cursor: Optional[str] = None + + @property + def has_parent_stream(self) -> bool: + return True if self.parent_stream_name and self.parent_stream_cursor else False + + @property + def parent_cursor_key(self) -> Optional[str]: + if self.has_parent_stream: + return f"{self.parent_stream_name}_{self.parent_stream_cursor}" @property def shop_id(self) -> int: @@ -132,6 +143,38 @@ def query_nodes(self) -> Optional[Union[List[Field], List[str]]]: """ return ["__typename", "id"] + def _inject_parent_cursor_field(self, nodes: List[Field], key: str = "updatedAt", index: int = 2) -> List[Field]: + if self.has_parent_stream: + # inject parent cursor key as alias to the `updatedAt` parent cursor field + nodes.insert(index, Field(name="updatedAt", alias=self.parent_cursor_key)) + + return nodes + + def _add_parent_record_state(self, record: MutableMapping[str, Any], items: List[dict], to_rfc3339: bool = False) -> List[dict]: + """ + Adds a parent cursor value to each item in the list. + + This method iterates over a list of dictionaries and adds a new key-value pair to each dictionary. + The key is the value of `self.query_name`, and the value is another dictionary with a single key "updated_at" + and the provided `parent_cursor_value`. + + Args: + items (List[dict]): A list of dictionaries to which the parent cursor value will be added. + parent_cursor_value (str): The value to be set for the "updated_at" key in the nested dictionary. + + Returns: + List[dict]: The modified list of dictionaries with the added parent cursor values. + """ + + if self.has_parent_stream: + parent_cursor_value: Optional[str] = record.get(self.parent_cursor_key, None) + parent_state = self.tools._datetime_str_to_rfc3339(parent_cursor_value) if to_rfc3339 and parent_cursor_value else None + + for item in items: + item[self.parent_stream_name] = {self.parent_stream_cursor: parent_state} + + return items + def get(self, filter_field: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None) -> str: # define filter query string, if passed filter_query = f"{filter_field}:>='{start}' AND {filter_field}:<='{end}'" if filter_field else None @@ -285,15 +328,22 @@ def query_nodes(self) -> List[Field]: List of available fields: https://shopify.dev/docs/api/admin-graphql/unstable/objects/Metafield """ + + nodes = super().query_nodes + # define metafield node metafield_node = self.get_edge_node("metafields", self.metafield_fields) if isinstance(self.type.value, list): - return ["__typename", "id", self.get_edge_node(self.type.value[1], ["__typename", "id", metafield_node])] + nodes = [*nodes, self.get_edge_node(self.type.value[1], [*nodes, metafield_node])] elif isinstance(self.type.value, str): - return ["__typename", "id", metafield_node] + nodes = [*nodes, metafield_node] - def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]: + nodes = self._inject_parent_cursor_field(nodes) + + return nodes + + def _process_metafield(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: # resolve parent id from `str` to `int` record["owner_id"] = self.tools.resolve_str_id(record.get(BULK_PARENT_KEY)) # add `owner_resource` field @@ -304,7 +354,28 @@ def record_process_components(self, record: MutableMapping[str, Any]) -> Iterabl record["createdAt"] = self.tools.from_iso8601_to_rfc3339(record, "createdAt") record["updatedAt"] = self.tools.from_iso8601_to_rfc3339(record, "updatedAt") record = self.tools.fields_names_to_snake_case(record) - yield record + return record + + def _process_components(self, entity: List[dict]) -> Iterable[MutableMapping[str, Any]]: + for item in entity: + # resolve the id from string + item["admin_graphql_api_id"] = item.get("id") + item["id"] = self.tools.resolve_str_id(item.get("id")) + yield self._process_metafield(item) + + def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]: + # get the joined record components collected for the record + record_components = record.get("record_components", {}) + # process record components + if not record_components: + yield self._process_metafield(record) + else: + metafields = record_components.get("Metafield", []) + if len(metafields) > 0: + if self.has_parent_stream: + # add parent state to each metafield + metafields = self._add_parent_record_state(record, metafields, to_rfc3339=True) + yield from self._process_components(metafields) class MetafieldCollection(Metafield): @@ -343,7 +414,9 @@ class MetafieldCustomer(Metafield): customers(query: "updated_at:>='2023-02-07T00:00:00+00:00' AND updated_at:<='2023-12-04T00:00:00+00:00'", sortKey: UPDATED_AT) { edges { node { + __typename id + customer_updated_at: updatedAt metafields { edges { node { @@ -366,6 +439,11 @@ class MetafieldCustomer(Metafield): type = MetafieldType.CUSTOMERS + record_composition = { + "new_record": "Customer", + "record_components": ["Metafield"], + } + class MetafieldLocation(Metafield): """ @@ -464,7 +542,9 @@ class MetafieldProduct(Metafield): products(query: "updated_at:>='2023-02-07T00:00:00+00:00' AND updated_at:<='2023-12-04T00:00:00+00:00'", sortKey: UPDATED_AT) { edges { node { + __typename id + product_updated_at: updatedAt metafields { edges { node { @@ -487,6 +567,11 @@ class MetafieldProduct(Metafield): type = MetafieldType.PRODUCTS + record_composition = { + "new_record": "Product", + "record_components": ["Metafield"], + } + class MetafieldProductImage(Metafield): """ @@ -496,6 +581,7 @@ class MetafieldProductImage(Metafield): node { __typename id + product_updated_at: updatedAt media { edges { node { @@ -527,6 +613,13 @@ class MetafieldProductImage(Metafield): } """ + type = MetafieldType.PRODUCT_IMAGES + + record_composition = { + "new_record": "Product", + "record_components": ["Metafield"], + } + @property def query_nodes(self) -> List[Field]: """ @@ -537,19 +630,16 @@ def query_nodes(self) -> List[Field]: More info here: https://shopify.dev/docs/api/release-notes/2024-04#productimage-value-removed """ + # define metafield node metafield_node = self.get_edge_node("metafields", self.metafield_fields) - media_fields: List[Field] = [ - "__typename", - "id", - InlineFragment(type="MediaImage", fields=[metafield_node]), - ] - # define media node + media_fields: List[Field] = ["__typename", "id", InlineFragment(type="MediaImage", fields=[metafield_node])] media_node = self.get_edge_node("media", media_fields) + fields: List[Field] = ["__typename", "id", media_node] - return fields + fields = self._inject_parent_cursor_field(fields) - type = MetafieldType.PRODUCT_IMAGES + return fields class MetafieldProductVariant(Metafield): @@ -2238,6 +2328,7 @@ class ProductImage(ShopifyBulkQuery): node { __typename id + products_updated_at: updatedAt # THE MEDIA NODE IS NEEDED TO PROVIDE THE CURSORS media { edges { @@ -2314,8 +2405,7 @@ class ProductImage(ShopifyBulkQuery): # media property fields media_fields: List[Field] = [Field(name="edges", fields=[Field(name="node", fields=media_fragment)])] - # main query - query_nodes: List[Field] = [ + nodes: List[Field] = [ "__typename", "id", Field(name="media", fields=media_fields), @@ -2330,6 +2420,10 @@ class ProductImage(ShopifyBulkQuery): "record_components": ["MediaImage", "Image"], } + @property + def query_nodes(self) -> List[Field]: + return self._inject_parent_cursor_field(self.nodes) + def _process_component(self, entity: List[dict]) -> List[dict]: for item in entity: # remove the `__parentId` from the object @@ -2405,6 +2499,8 @@ def record_process_components(self, record: MutableMapping[str, Any]) -> Iterabl # add the product_id to each `Image` record["images"] = self._add_product_id(record.get("images", []), record.get("id")) + # add the product cursor to each `Image` + record["images"] = self._add_parent_record_state(record, record.get("images", []), to_rfc3339=True) record["images"] = self._merge_with_media(record_components) record.pop("record_components") @@ -2413,7 +2509,6 @@ def record_process_components(self, record: MutableMapping[str, Any]) -> Iterabl if len(images) > 0: # convert dates from ISO-8601 to RFC-3339 record["images"] = self._convert_datetime_to_rfc3339(images) - yield from self._emit_complete_records(images) diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/tools.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/tools.py index 7a23aa1a2d02..dfa3fafdd0c6 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/tools.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/tools.py @@ -63,7 +63,11 @@ def shop_name_from_url(url: str) -> str: return url @staticmethod - def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Mapping[str, Any]: + def _datetime_str_to_rfc3339(value: str) -> str: + return pdm.parse(value).to_rfc3339_string() + + @staticmethod + def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Optional[str]: """ Converts date-time as follows: Input: "2023-01-01T15:00:00Z" @@ -73,7 +77,7 @@ def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Mapping[st # some fields that expected to be resolved as ids, might not be populated for the particular `RECORD`, # we should return `None` to make the field `null` in the output as the result of the transformation. target_value = record.get(field) - return pdm.parse(target_value).to_rfc3339_string() if target_value else record.get(field) + return BulkTools._datetime_str_to_rfc3339(target_value) if target_value else record.get(field) def fields_names_to_snake_case(self, dict_input: Optional[Mapping[str, Any]] = None) -> Optional[MutableMapping[str, Any]]: # transforming record field names from camel to snake case, leaving the `__parent_id` relation in place diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py index 586c03806a72..53ecd349db23 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py @@ -644,7 +644,7 @@ def __init__(self, config: Dict) -> None: self.job_manager: ShopifyBulkManager = ShopifyBulkManager( http_client=self.bulk_http_client, base_url=f"{self.url_base}{self.path()}", - query=self.bulk_query(config), + query=self.bulk_query(config, self.parent_stream_name, self.parent_stream_cursor), job_termination_threshold=float(config.get("job_termination_threshold", 3600)), # overide the default job slice size, if provided (it's auto-adjusted, later on) job_size=config.get("bulk_window_in_days", 30.0), @@ -670,6 +670,20 @@ def parent_stream(self) -> Union[ShopifyStream, IncrementalShopifyStream]: """ return self.parent_stream_class(self.config) if self.parent_stream_class else None + @property + def parent_stream_name(self) -> Optional[str]: + """ + Returns the parent stream name, if the substream has a `parent_stream_class` dependency. + """ + return self.parent_stream.name if self.parent_stream_class else None + + @property + def parent_stream_cursor(self) -> Optional[str]: + """ + Returns the parent stream cursor, if the substream has a `parent_stream_class` dependency. + """ + return self.parent_stream.cursor_field if self.parent_stream_class else None + @property @abstractmethod def bulk_query(self) -> ShopifyBulkQuery: @@ -716,21 +730,37 @@ def get_updated_state( """ updated_state = super().get_updated_state(current_stream_state, latest_record) if self.parent_stream_class: + parent_state = latest_record.get(self.parent_stream.name, {}) + parent_state_value = ( + parent_state.get(self.parent_stream.cursor_field) if parent_state else latest_record.get(self.parent_stream.cursor_field) + ) # add parent_stream_state to `updated_state` - updated_state[self.parent_stream.name] = {self.parent_stream.cursor_field: latest_record.get(self.parent_stream.cursor_field)} + updated_state[self.parent_stream.name] = {self.parent_stream.cursor_field: parent_state_value} return updated_state - def get_stream_state_value(self, stream_state: Optional[Mapping[str, Any]]) -> str: - if self.parent_stream_class: - # get parent stream state from the stream_state object. - parent_state = stream_state.get(self.parent_stream.name, {}) - if parent_state: - return parent_state.get(self.parent_stream.cursor_field, self.default_state_comparison_value) - else: - # get the stream state, if no `parent_stream_class` was assigned. + def _get_stream_cursor_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[str]: + if stream_state: return stream_state.get(self.cursor_field, self.default_state_comparison_value) + else: + return self.config.get("start_date") + + def get_stream_state_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[str]: + if stream_state: + if self.parent_stream_class: + # get parent stream state from the stream_state object. + parent_state = stream_state.get(self.parent_stream.name, {}) + if parent_state: + return parent_state.get(self.parent_stream.cursor_field, self.default_state_comparison_value) + else: + # use the streams cursor value, if no parent state available + return self._get_stream_cursor_value(stream_state) + else: + # get the stream state, if no `parent_stream_class` was assigned. + return self._get_stream_cursor_value(stream_state) + else: + return self.config.get("start_date") - def get_state_value(self, stream_state: Mapping[str, Any] = None) -> Optional[Union[str, int]]: + def get_state_value(self, stream_state: Optional[Mapping[str, Any]] = None) -> Optional[Union[str, int]]: if stream_state: return self.get_stream_state_value(stream_state) else: diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py index a5708d32594d..2751a3ab9756 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py @@ -76,6 +76,7 @@ class Customers(IncrementalShopifyStream): class MetafieldCustomers(IncrementalShopifyGraphQlBulkStream): + parent_stream_class = Customers bulk_query: MetafieldCustomer = MetafieldCustomer @@ -170,14 +171,17 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp class MetafieldProducts(IncrementalShopifyGraphQlBulkStream): + parent_stream_class = Products bulk_query: MetafieldProduct = MetafieldProduct class ProductImages(IncrementalShopifyGraphQlBulkStream): + parent_stream_class = Products bulk_query: ProductImage = ProductImage class MetafieldProductImages(IncrementalShopifyGraphQlBulkStream): + parent_stream_class = Products bulk_query: MetafieldProductImage = MetafieldProductImage diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py b/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py index 78842454a757..afb2b504847e 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py @@ -947,6 +947,9 @@ def product_images_response_expected_result(): "admin_graphql_api_id": "gid://shopify/ProductImage/111", "product_id": 123, "shop_url": "test_shop", + "products": { + "updated_at": None, + }, }, { "created_at": "2021-06-23T01:09:47+00:00", @@ -958,6 +961,9 @@ def product_images_response_expected_result(): "width": 2200, "admin_graphql_api_id": "gid://shopify/ProductImage/222", "product_id": 456, + "products": { + "updated_at": None, + }, "shop_url": "test_shop", }, ] diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_query.py b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_query.py index 480dffa5f11f..ecaead988d68 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_query.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_query.py @@ -12,6 +12,7 @@ ShopifyBulkQuery, ShopifyBulkTemplates, ) +from source_shopify.streams.streams import Customers, Products def test_query_status() -> None: @@ -123,13 +124,14 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start, @pytest.mark.parametrize( - "query_class, filter_field, start, end, expected", + "query_class, filter_field, start, end, parent_stream_class, expected", [ ( MetafieldCustomer, "updated_at", "2023-01-01", "2023-01-02", + Customers, Operation( type="", queries=[ @@ -139,7 +141,7 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start, Argument(name="query", value=f"\"updated_at:>='2023-01-01' AND updated_at:<='2023-01-02'\""), Argument(name="sortKey", value="UPDATED_AT"), ], - fields=[Field(name='edges', fields=[Field(name='node', fields=['__typename', 'id', Field(name="metafields", fields=[Field(name="edges", fields=[Field(name="node", fields=["__typename", "id", "namespace", "value", "key", "description", "createdAt", "updatedAt", "type"])])])])])] + fields=[Field(name='edges', fields=[Field(name='node', fields=['__typename', 'id', Field(name="updatedAt", alias="customers_updated_at"), Field(name="metafields", fields=[Field(name="edges", fields=[Field(name="node", fields=["__typename", "id", "namespace", "value", "key", "description", "createdAt", "updatedAt", "type"])])])])])] ) ] ), @@ -149,6 +151,7 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start, "updated_at", "2023-01-01", "2023-01-02", + Products, Operation( type="", queries=[ @@ -167,6 +170,10 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start, fields=[ "__typename", "id", + Field( + name="updatedAt", + alias="products_updated_at", + ), Field( name="media", fields=[ @@ -227,6 +234,7 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start, "updated_at", "2023-01-01", "2023-01-02", + None, Operation( type="", queries=[ @@ -298,6 +306,11 @@ def test_base_build_query(basic_config, query_name, fields, filter_field, start, "InventoryLevel query", ] ) -def test_bulk_query(basic_config, query_class, filter_field, start, end, expected) -> None: - stream = query_class(basic_config) - assert stream.get(filter_field, start, end) == expected.render() \ No newline at end of file +def test_bulk_query(auth_config, query_class, filter_field, start, end, parent_stream_class, expected) -> None: + if parent_stream_class: + parent_stream = parent_stream_class(auth_config) + stream_query = query_class(auth_config, parent_stream.name, parent_stream.cursor_field) + else: + stream_query = query_class(auth_config) + + assert stream_query.get(filter_field, start, end) == expected.render() \ No newline at end of file diff --git a/docs/integrations/sources/shopify.md b/docs/integrations/sources/shopify.md index b656ee7e3de0..77a220b7cd04 100644 --- a/docs/integrations/sources/shopify.md +++ b/docs/integrations/sources/shopify.md @@ -27,6 +27,7 @@ For existing **Airbyte Cloud** customers, if you are currently using the **API P ::: + ### For Airbyte Cloud: 1. [Log into your Airbyte Cloud](https://cloud.airbyte.com/workspaces) account. @@ -246,6 +247,7 @@ For all `Shopify GraphQL BULK` api requests these limitations are applied: https | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.5.3 | 2024-09-27 | [46095](https://github.com/airbytehq/airbyte/pull/46095) | Fixed duplicates for `Product Images`, `Metafield Product Images` and `Metafield Products` streams for Incremental syncs | | 2.5.2 | 2024-09-17 | [45633](https://github.com/airbytehq/airbyte/pull/45633) | Adds `read_inventory` as a required scope for `product_variants` stream | | 2.5.1 | 2024-09-14 | [45255](https://github.com/airbytehq/airbyte/pull/45255) | Update dependencies | | 2.5.0 | 2024-09-06 | [45190](https://github.com/airbytehq/airbyte/pull/45190) | Migrate to CDK v5 |