From 80cf24997ba383a2c0bfb36181e290458df16435 Mon Sep 17 00:00:00 2001 From: Baz Date: Tue, 14 May 2024 12:50:31 +0300 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Source=20Shopify:=20migrate=20`p?= =?UTF-8?q?roducts`,=20`product=20images`=20and=20`product=20variants`=20t?= =?UTF-8?q?o=20`GraphQL=20BULK`=20(#37767)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../source-shopify/acceptance-test-config.yml | 5 +- .../integration_tests/abnormal_state.json | 15 +- .../integration_tests/expected_records.jsonl | 5 +- ...ed_records_transactions_with_user_id.jsonl | 5 +- .../integration_tests/state.json | 35 +- .../connectors/source-shopify/metadata.yaml | 10 +- .../connectors/source-shopify/poetry.lock | 31 +- .../connectors/source-shopify/pyproject.toml | 2 +- .../schemas/product_variants.json | 13 +- .../shopify_graphql/bulk/__init__.py | 0 .../shopify_graphql/bulk/exceptions.py | 6 +- .../shopify_graphql/bulk/job.py | 86 +-- .../shopify_graphql/bulk/query.py | 498 +++++++++++++++++- .../shopify_graphql/bulk/retry.py | 2 +- .../source_shopify/streams/base_streams.py | 4 +- .../source_shopify/streams/streams.py | 23 +- .../source-shopify/unit_tests/__init__.py | 0 .../source-shopify/unit_tests/conftest.py | 182 +++++++ .../unit_tests/graphql_bulk/__init__.py | 0 .../unit_tests/graphql_bulk/test_job.py | 135 ++--- .../unit_tests/test_deleted_events_stream.py | 42 +- .../source-shopify/unit_tests/test_source.py | 39 +- .../sources/shopify-migrations.md | 22 + docs/integrations/sources/shopify.md | 1 + 24 files changed, 923 insertions(+), 238 deletions(-) create mode 100644 airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/__init__.py create mode 100644 airbyte-integrations/connectors/source-shopify/unit_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/__init__.py diff --git a/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml b/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml index 1c8a3d2f58d5..fa685bce30ea 100644 --- a/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml @@ -25,8 +25,9 @@ acceptance_tests: tests: - config_path: "secrets/config.json" backward_compatibility_tests_config: - # The cursor field for `fulfillments` stream has changed from `id` to `updated_at` - disable_for_version: "1.1.8" + # the `product_variants` steam schema has changed, mainly: + # see this PR: https://github.com/airbytehq/airbyte/pull/37767 + disable_for_version: "2.0.8" basic_read: tests: - config_path: "secrets/config_transactions_with_user_id.json" diff --git a/airbyte-integrations/connectors/source-shopify/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-shopify/integration_tests/abnormal_state.json index 00af9172d88d..82c9d349ade2 100644 --- a/airbyte-integrations/connectors/source-shopify/integration_tests/abnormal_state.json +++ b/airbyte-integrations/connectors/source-shopify/integration_tests/abnormal_state.json @@ -420,13 +420,7 @@ "type": "STREAM", "stream": { "stream_state": { - "id": 99999999999999, - "products": { - "updated_at": "2027-07-11T13:07:45-07:00", - "deleted": { - "deleted_at": "2027-07-11T13:07:45-07:00" - } - } + "updated_at": "2027-07-11T13:07:45-07:00" }, "stream_descriptor": { "name": "product_images" @@ -454,13 +448,6 @@ "type": "STREAM", "stream": { "stream_state": { - "id": 99999999999999, - "products": { - "updated_at": "2027-07-11T13:07:45-07:00", - "deleted": { - "deleted_at": "2027-07-11T13:07:45-07:00" - } - }, "updated_at": "2027-07-11T13:07:45-07:00" }, "stream_descriptor": { diff --git a/airbyte-integrations/connectors/source-shopify/integration_tests/expected_records.jsonl b/airbyte-integrations/connectors/source-shopify/integration_tests/expected_records.jsonl index d43ad7bf0369..d4baa9840c1f 100644 --- a/airbyte-integrations/connectors/source-shopify/integration_tests/expected_records.jsonl +++ b/airbyte-integrations/connectors/source-shopify/integration_tests/expected_records.jsonl @@ -70,9 +70,8 @@ {"stream": "price_rules", "data": {"id": 1112171741373, "value_type": "fixed_amount", "value": "-10.0", "customer_selection": "all", "target_type": "line_item", "target_selection": "all", "allocation_method": "across", "allocation_limit": null, "once_per_customer": false, "usage_limit": null, "starts_at": "2017-01-19T09:59:10-08:00", "ends_at": null, "created_at": "2022-10-14T10:19:39-07:00", "updated_at": "2023-04-14T05:24:53-07:00", "entitled_product_ids": [], "entitled_variant_ids": [], "entitled_collection_ids": [], "entitled_country_ids": [], "prerequisite_product_ids": [], "prerequisite_variant_ids": [], "prerequisite_collection_ids": [], "customer_segment_prerequisite_ids": [], "prerequisite_customer_ids": [], "prerequisite_subtotal_range": null, "prerequisite_quantity_range": null, "prerequisite_shipping_price_range": null, "prerequisite_to_entitlement_quantity_ratio": {"prerequisite_quantity": null, "entitled_quantity": null}, "prerequisite_to_entitlement_purchase": {"prerequisite_amount": null}, "title": "New Title 2023", "admin_graphql_api_id": "gid://shopify/PriceRule/1112171741373", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953899511} {"stream": "price_rules", "data": {"id": 945000284349, "value_type": "percentage", "value": "-3.0", "customer_selection": "all", "target_type": "line_item", "target_selection": "all", "allocation_method": "across", "allocation_limit": null, "once_per_customer": true, "usage_limit": 10, "starts_at": "2021-07-07T07:22:04-07:00", "ends_at": null, "created_at": "2021-07-07T07:23:11-07:00", "updated_at": "2023-04-24T05:52:22-07:00", "entitled_product_ids": [], "entitled_variant_ids": [], "entitled_collection_ids": [], "entitled_country_ids": [], "prerequisite_product_ids": [], "prerequisite_variant_ids": [], "prerequisite_collection_ids": [], "customer_segment_prerequisite_ids": [], "prerequisite_customer_ids": [], "prerequisite_subtotal_range": null, "prerequisite_quantity_range": null, "prerequisite_shipping_price_range": null, "prerequisite_to_entitlement_quantity_ratio": {"prerequisite_quantity": null, "entitled_quantity": null}, "prerequisite_to_entitlement_purchase": {"prerequisite_amount": null}, "title": "1V8Z165KSH5T", "admin_graphql_api_id": "gid://shopify/PriceRule/945000284349", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953899512} {"stream": "price_rules", "data": {"id": 945205379261, "value_type": "percentage", "value": "-100.0", "customer_selection": "all", "target_type": "shipping_line", "target_selection": "all", "allocation_method": "each", "allocation_limit": null, "once_per_customer": false, "usage_limit": null, "starts_at": "2021-07-08T05:40:13-07:00", "ends_at": "2024-01-01T23:59:59-08:00", "created_at": "2021-07-08T05:40:37-07:00", "updated_at": "2023-12-07T03:40:44-08:00", "entitled_product_ids": [], "entitled_variant_ids": [], "entitled_collection_ids": [], "entitled_country_ids": [], "prerequisite_product_ids": [], "prerequisite_variant_ids": [], "prerequisite_collection_ids": [], "customer_segment_prerequisite_ids": [], "prerequisite_customer_ids": [], "prerequisite_subtotal_range": {"greater_than_or_equal_to": "1.0"}, "prerequisite_quantity_range": null, "prerequisite_shipping_price_range": null, "prerequisite_to_entitlement_quantity_ratio": {"prerequisite_quantity": null, "entitled_quantity": null}, "prerequisite_to_entitlement_purchase": {"prerequisite_amount": null}, "title": "HZAVNV2487WC", "admin_graphql_api_id": "gid://shopify/PriceRule/945205379261", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953899513} -{"stream": "product_images", "data": {"id": 29301295481021, "alt": null, "position": 1, "product_id": 6796218138813, "created_at": "2021-06-22T18:09:28-07:00", "updated_at": "2021-06-22T18:09:28-07:00", "admin_graphql_api_id": "gid://shopify/ProductImage/29301295481021", "width": 4393, "height": 2929, "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/tin-of-beard-balm.jpg?v=1624410568", "variant_ids": [], "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953901149} -{"stream": "product_images", "data": {"id": 29301295513789, "alt": null, "position": 1, "product_id": 6796218269885, "created_at": "2021-06-22T18:09:29-07:00", "updated_at": "2021-06-22T18:09:29-07:00", "admin_graphql_api_id": "gid://shopify/ProductImage/29301295513789", "width": 3840, "height": 2560, "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/pair-of-all-black-sneakers.jpg?v=1624410569", "variant_ids": [], "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953901154} -{"stream": "product_images", "data": {"id": 29301295546557, "alt": null, "position": 1, "product_id": 6796218302653, "created_at": "2021-06-22T18:09:29-07:00", "updated_at": "2021-06-22T18:09:29-07:00", "admin_graphql_api_id": "gid://shopify/ProductImage/29301295546557", "width": 3960, "height": 2640, "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/red-silver-fishing-lure.jpg?v=1624410569", "variant_ids": [], "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953901155} +{"stream": "product_images", "data": {"created_at": "2023-04-14T10:34:46+00:00", "updated_at": "2023-04-14T11:05:13+00:00", "id": 33290489659581, "height": 64, "alt": "Test", "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/Airbytelogo64x64.png?v=1681468487", "width": 64, "admin_graphql_api_id": "gid://shopify/ProductImage/33290489659581", "product_id": 6796229574845, "shop_url": "airbyte-integration-test"}, "emitted_at": 1714673982582} +{"stream": "product_images", "data": {"created_at": "2021-06-23T01:09:47+00:00", "updated_at": "2023-04-24T17:27:15+00:00", "id": 29301297316029, "height": 1467, "alt": "updated_mon_24.04.2023", "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/4-ounce-soy-candle.jpg?v=1624410587", "width": 2200, "admin_graphql_api_id": "gid://shopify/ProductImage/29301297316029", "product_id": 6796220989629, "shop_url": "airbyte-integration-test"}, "emitted_at": 1714673982587} {"stream": "products", "data": {"id": 6796217909437, "title": "Red And Navy Tee Sleeve", "body_html": "Zoom in on the sleeve of a red t-shirt with navy blue trim along the sleeve. Looks like a great tennis outfit.", "vendor": "Little Group", "product_type": "Movies", "created_at": "2021-06-22T18:09:27-07:00", "handle": "red-and-navy-tee-sleeve", "updated_at": "2023-04-20T04:12:25-07:00", "published_at": "2021-06-22T18:09:27-07:00", "template_suffix": null, "published_scope": "web", "tags": "developer-tools-generator", "status": "active", "admin_graphql_api_id": "gid://shopify/Product/6796217909437", "variants": [{"id": 40090579992765, "product_id": 6796217909437, "title": "Plastic", "price": 23.0, "sku": "", "position": 1, "inventory_policy": "deny", "compare_at_price": null, "fulfillment_service": "manual", "inventory_management": "shopify", "option1": "Plastic", "option2": null, "option3": null, "created_at": "2021-06-22T18:09:27-07:00", "updated_at": "2023-10-27T09:55:54-07:00", "taxable": true, "barcode": null, "grams": 39, "weight": 39.0, "weight_unit": "g", "inventory_item_id": 42185194700989, "inventory_quantity": 3, "old_inventory_quantity": 3, "requires_shipping": true, "admin_graphql_api_id": "gid://shopify/ProductVariant/40090579992765", "image_id": null}], "options": [{"id": 8720175235261, "product_id": 6796217909437, "name": "Title", "position": 1, "values": ["Plastic"]}], "images": [], "image": null, "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953903012} {"stream": "products", "data": {"id": 6796217942205, "title": "Grey T-Shirt", "body_html": "A grey t-shirt on a hanger. Simple. Classic. Grey.", "vendor": "Lang - Bogisich", "product_type": "Home", "created_at": "2021-06-22T18:09:27-07:00", "handle": "grey-t-shirt", "updated_at": "2023-04-20T04:12:25-07:00", "published_at": "2021-06-22T18:09:27-07:00", "template_suffix": null, "published_scope": "web", "tags": "developer-tools-generator", "status": "active", "admin_graphql_api_id": "gid://shopify/Product/6796217942205", "variants": [{"id": 40090580025533, "product_id": 6796217942205, "title": "Granite", "price": 70.0, "sku": "", "position": 1, "inventory_policy": "deny", "compare_at_price": null, "fulfillment_service": "manual", "inventory_management": "shopify", "option1": "Granite", "option2": null, "option3": null, "created_at": "2021-06-22T18:09:27-07:00", "updated_at": "2023-10-27T09:55:54-07:00", "taxable": true, "barcode": null, "grams": 0, "weight": 0.0, "weight_unit": "g", "inventory_item_id": 42185194733757, "inventory_quantity": 38, "old_inventory_quantity": 38, "requires_shipping": false, "admin_graphql_api_id": "gid://shopify/ProductVariant/40090580025533", "image_id": null}], "options": [{"id": 8720175268029, "product_id": 6796217942205, "name": "Title", "position": 1, "values": ["Granite"]}], "images": [], "image": null, "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953903015} {"stream": "products", "data": {"id": 6796217974973, "title": "Pool Floaty Icecream", "body_html": "Inflatable pink ice cream pool toy.", "vendor": "Fritsch - Ferry", "product_type": "Grocery", "created_at": "2021-06-22T18:09:27-07:00", "handle": "pool-floaty-icecream", "updated_at": "2023-04-20T04:12:25-07:00", "published_at": "2021-06-22T18:09:27-07:00", "template_suffix": null, "published_scope": "web", "tags": "developer-tools-generator", "status": "active", "admin_graphql_api_id": "gid://shopify/Product/6796217974973", "variants": [{"id": 40090580091069, "product_id": 6796217974973, "title": "magenta", "price": 57.0, "sku": "", "position": 1, "inventory_policy": "deny", "compare_at_price": null, "fulfillment_service": "manual", "inventory_management": "shopify", "option1": "magenta", "option2": null, "option3": null, "created_at": "2021-06-22T18:09:27-07:00", "updated_at": "2023-10-27T09:55:54-07:00", "taxable": true, "barcode": null, "grams": 499, "weight": 499.0, "weight_unit": "g", "inventory_item_id": 42185194766525, "inventory_quantity": 1, "old_inventory_quantity": 1, "requires_shipping": true, "admin_graphql_api_id": "gid://shopify/ProductVariant/40090580091069", "image_id": null}], "options": [{"id": 8720175300797, "product_id": 6796217974973, "name": "Title", "position": 1, "values": ["magenta"]}], "images": [], "image": null, "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953903015} diff --git a/airbyte-integrations/connectors/source-shopify/integration_tests/expected_records_transactions_with_user_id.jsonl b/airbyte-integrations/connectors/source-shopify/integration_tests/expected_records_transactions_with_user_id.jsonl index 95af9f2580ce..5b9e041c9bd3 100644 --- a/airbyte-integrations/connectors/source-shopify/integration_tests/expected_records_transactions_with_user_id.jsonl +++ b/airbyte-integrations/connectors/source-shopify/integration_tests/expected_records_transactions_with_user_id.jsonl @@ -70,9 +70,8 @@ {"stream": "price_rules", "data": {"id": 1112171741373, "value_type": "fixed_amount", "value": "-10.0", "customer_selection": "all", "target_type": "line_item", "target_selection": "all", "allocation_method": "across", "allocation_limit": null, "once_per_customer": false, "usage_limit": null, "starts_at": "2017-01-19T09:59:10-08:00", "ends_at": null, "created_at": "2022-10-14T10:19:39-07:00", "updated_at": "2023-04-14T05:24:53-07:00", "entitled_product_ids": [], "entitled_variant_ids": [], "entitled_collection_ids": [], "entitled_country_ids": [], "prerequisite_product_ids": [], "prerequisite_variant_ids": [], "prerequisite_collection_ids": [], "customer_segment_prerequisite_ids": [], "prerequisite_customer_ids": [], "prerequisite_subtotal_range": null, "prerequisite_quantity_range": null, "prerequisite_shipping_price_range": null, "prerequisite_to_entitlement_quantity_ratio": {"prerequisite_quantity": null, "entitled_quantity": null}, "prerequisite_to_entitlement_purchase": {"prerequisite_amount": null}, "title": "New Title 2023", "admin_graphql_api_id": "gid://shopify/PriceRule/1112171741373", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953671890} {"stream": "price_rules", "data": {"id": 945000284349, "value_type": "percentage", "value": "-3.0", "customer_selection": "all", "target_type": "line_item", "target_selection": "all", "allocation_method": "across", "allocation_limit": null, "once_per_customer": true, "usage_limit": 10, "starts_at": "2021-07-07T07:22:04-07:00", "ends_at": null, "created_at": "2021-07-07T07:23:11-07:00", "updated_at": "2023-04-24T05:52:22-07:00", "entitled_product_ids": [], "entitled_variant_ids": [], "entitled_collection_ids": [], "entitled_country_ids": [], "prerequisite_product_ids": [], "prerequisite_variant_ids": [], "prerequisite_collection_ids": [], "customer_segment_prerequisite_ids": [], "prerequisite_customer_ids": [], "prerequisite_subtotal_range": null, "prerequisite_quantity_range": null, "prerequisite_shipping_price_range": null, "prerequisite_to_entitlement_quantity_ratio": {"prerequisite_quantity": null, "entitled_quantity": null}, "prerequisite_to_entitlement_purchase": {"prerequisite_amount": null}, "title": "1V8Z165KSH5T", "admin_graphql_api_id": "gid://shopify/PriceRule/945000284349", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953671890} {"stream": "price_rules", "data": {"id": 945205379261, "value_type": "percentage", "value": "-100.0", "customer_selection": "all", "target_type": "shipping_line", "target_selection": "all", "allocation_method": "each", "allocation_limit": null, "once_per_customer": false, "usage_limit": null, "starts_at": "2021-07-08T05:40:13-07:00", "ends_at": "2024-01-01T23:59:59-08:00", "created_at": "2021-07-08T05:40:37-07:00", "updated_at": "2023-12-07T03:40:44-08:00", "entitled_product_ids": [], "entitled_variant_ids": [], "entitled_collection_ids": [], "entitled_country_ids": [], "prerequisite_product_ids": [], "prerequisite_variant_ids": [], "prerequisite_collection_ids": [], "customer_segment_prerequisite_ids": [], "prerequisite_customer_ids": [], "prerequisite_subtotal_range": {"greater_than_or_equal_to": "1.0"}, "prerequisite_quantity_range": null, "prerequisite_shipping_price_range": null, "prerequisite_to_entitlement_quantity_ratio": {"prerequisite_quantity": null, "entitled_quantity": null}, "prerequisite_to_entitlement_purchase": {"prerequisite_amount": null}, "title": "HZAVNV2487WC", "admin_graphql_api_id": "gid://shopify/PriceRule/945205379261", "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953671891} -{"stream": "product_images", "data": {"id": 29301295481021, "alt": null, "position": 1, "product_id": 6796218138813, "created_at": "2021-06-22T18:09:28-07:00", "updated_at": "2021-06-22T18:09:28-07:00", "admin_graphql_api_id": "gid://shopify/ProductImage/29301295481021", "width": 4393, "height": 2929, "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/tin-of-beard-balm.jpg?v=1624410568", "variant_ids": [], "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953673537} -{"stream": "product_images", "data": {"id": 29301295513789, "alt": null, "position": 1, "product_id": 6796218269885, "created_at": "2021-06-22T18:09:29-07:00", "updated_at": "2021-06-22T18:09:29-07:00", "admin_graphql_api_id": "gid://shopify/ProductImage/29301295513789", "width": 3840, "height": 2560, "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/pair-of-all-black-sneakers.jpg?v=1624410569", "variant_ids": [], "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953673539} -{"stream": "product_images", "data": {"id": 29301295546557, "alt": null, "position": 1, "product_id": 6796218302653, "created_at": "2021-06-22T18:09:29-07:00", "updated_at": "2021-06-22T18:09:29-07:00", "admin_graphql_api_id": "gid://shopify/ProductImage/29301295546557", "width": 3960, "height": 2640, "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/red-silver-fishing-lure.jpg?v=1624410569", "variant_ids": [], "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953673539} +{"stream": "product_images", "data": {"created_at": "2023-04-14T10:34:46+00:00", "updated_at": "2023-04-14T11:05:13+00:00", "id": 33290489659581, "height": 64, "alt": "Test", "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/Airbytelogo64x64.png?v=1681468487", "width": 64, "admin_graphql_api_id": "gid://shopify/ProductImage/33290489659581", "product_id": 6796229574845, "shop_url": "airbyte-integration-test"}, "emitted_at": 1714673982582} +{"stream": "product_images", "data": {"created_at": "2021-06-23T01:09:47+00:00", "updated_at": "2023-04-24T17:27:15+00:00", "id": 29301297316029, "height": 1467, "alt": "updated_mon_24.04.2023", "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/4-ounce-soy-candle.jpg?v=1624410587", "width": 2200, "admin_graphql_api_id": "gid://shopify/ProductImage/29301297316029", "product_id": 6796220989629, "shop_url": "airbyte-integration-test"}, "emitted_at": 1714673982587} {"stream": "products", "data": {"id": 6796217909437, "title": "Red And Navy Tee Sleeve", "body_html": "Zoom in on the sleeve of a red t-shirt with navy blue trim along the sleeve. Looks like a great tennis outfit.", "vendor": "Little Group", "product_type": "Movies", "created_at": "2021-06-22T18:09:27-07:00", "handle": "red-and-navy-tee-sleeve", "updated_at": "2023-04-20T04:12:25-07:00", "published_at": "2021-06-22T18:09:27-07:00", "template_suffix": null, "published_scope": "web", "tags": "developer-tools-generator", "status": "active", "admin_graphql_api_id": "gid://shopify/Product/6796217909437", "variants": [{"id": 40090579992765, "product_id": 6796217909437, "title": "Plastic", "price": 23.0, "sku": "", "position": 1, "inventory_policy": "deny", "compare_at_price": null, "fulfillment_service": "manual", "inventory_management": "shopify", "option1": "Plastic", "option2": null, "option3": null, "created_at": "2021-06-22T18:09:27-07:00", "updated_at": "2023-10-27T09:55:54-07:00", "taxable": true, "barcode": null, "grams": 39, "weight": 39.0, "weight_unit": "g", "inventory_item_id": 42185194700989, "inventory_quantity": 3, "old_inventory_quantity": 3, "requires_shipping": true, "admin_graphql_api_id": "gid://shopify/ProductVariant/40090579992765", "image_id": null}], "options": [{"id": 8720175235261, "product_id": 6796217909437, "name": "Title", "position": 1, "values": ["Plastic"]}], "images": [], "image": null, "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953675398} {"stream": "products", "data": {"id": 6796217942205, "title": "Grey T-Shirt", "body_html": "A grey t-shirt on a hanger. Simple. Classic. Grey.", "vendor": "Lang - Bogisich", "product_type": "Home", "created_at": "2021-06-22T18:09:27-07:00", "handle": "grey-t-shirt", "updated_at": "2023-04-20T04:12:25-07:00", "published_at": "2021-06-22T18:09:27-07:00", "template_suffix": null, "published_scope": "web", "tags": "developer-tools-generator", "status": "active", "admin_graphql_api_id": "gid://shopify/Product/6796217942205", "variants": [{"id": 40090580025533, "product_id": 6796217942205, "title": "Granite", "price": 70.0, "sku": "", "position": 1, "inventory_policy": "deny", "compare_at_price": null, "fulfillment_service": "manual", "inventory_management": "shopify", "option1": "Granite", "option2": null, "option3": null, "created_at": "2021-06-22T18:09:27-07:00", "updated_at": "2023-10-27T09:55:54-07:00", "taxable": true, "barcode": null, "grams": 0, "weight": 0.0, "weight_unit": "g", "inventory_item_id": 42185194733757, "inventory_quantity": 38, "old_inventory_quantity": 38, "requires_shipping": false, "admin_graphql_api_id": "gid://shopify/ProductVariant/40090580025533", "image_id": null}], "options": [{"id": 8720175268029, "product_id": 6796217942205, "name": "Title", "position": 1, "values": ["Granite"]}], "images": [], "image": null, "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953675399} {"stream": "products", "data": {"id": 6796217974973, "title": "Pool Floaty Icecream", "body_html": "Inflatable pink ice cream pool toy.", "vendor": "Fritsch - Ferry", "product_type": "Grocery", "created_at": "2021-06-22T18:09:27-07:00", "handle": "pool-floaty-icecream", "updated_at": "2023-04-20T04:12:25-07:00", "published_at": "2021-06-22T18:09:27-07:00", "template_suffix": null, "published_scope": "web", "tags": "developer-tools-generator", "status": "active", "admin_graphql_api_id": "gid://shopify/Product/6796217974973", "variants": [{"id": 40090580091069, "product_id": 6796217974973, "title": "magenta", "price": 57.0, "sku": "", "position": 1, "inventory_policy": "deny", "compare_at_price": null, "fulfillment_service": "manual", "inventory_management": "shopify", "option1": "magenta", "option2": null, "option3": null, "created_at": "2021-06-22T18:09:27-07:00", "updated_at": "2023-10-27T09:55:54-07:00", "taxable": true, "barcode": null, "grams": 499, "weight": 499.0, "weight_unit": "g", "inventory_item_id": 42185194766525, "inventory_quantity": 1, "old_inventory_quantity": 1, "requires_shipping": true, "admin_graphql_api_id": "gid://shopify/ProductVariant/40090580091069", "image_id": null}], "options": [{"id": 8720175300797, "product_id": 6796217974973, "name": "Title", "position": 1, "values": ["magenta"]}], "images": [], "image": null, "shop_url": "airbyte-integration-test"}, "emitted_at": 1708953675399} diff --git a/airbyte-integrations/connectors/source-shopify/integration_tests/state.json b/airbyte-integrations/connectors/source-shopify/integration_tests/state.json index c29ae675c6c5..3142eac4e37f 100644 --- a/airbyte-integrations/connectors/source-shopify/integration_tests/state.json +++ b/airbyte-integrations/connectors/source-shopify/integration_tests/state.json @@ -144,9 +144,6 @@ } }, "metafield_customers": { - "customers": { - "updated_at": "2023-04-24T06:53:48-07:00" - }, "updated_at": "2023-04-13T04:50:10-07:00" }, "metafield_orders": { @@ -165,48 +162,18 @@ "updated_at": "2023-04-24T07:18:06-07:00" }, "metafield_products": { - "products": { - "updated_at": "2023-04-20T04:12:59-07:00", - "deleted": { - "deleted_at": "" - } - }, "updated_at": "2023-04-14T04:04:46-07:00" }, "product_images": { - "products": { - "updated_at": "2023-04-24T11:05:13-07:00", - "deleted": { - "deleted_at": "2023-09-05T13:32:22-07:00" - } - }, "updated_at": "2023-04-24T10:27:15-07:00" }, "metafield_product_images": { - "products": { - "updated_at": "", - "deleted": { - "deleted_at": "2023-09-05T13:32:22-07:00" - } - }, "updated_at": "2023-04-24T10:32:19-07:00" }, "product_variants": { - "id": 42778150305981, - "products": { - "updated_at": "", - "deleted": { - "deleted_at": "2023-09-05T13:32:22-07:00" - } - } + "updated_at": "2023-12-11T10:37:41+00:00" }, "metafield_product_variants": { - "products": { - "updated_at": "", - "deleted": { - "deleted_at": "2023-09-05T13:32:22-07:00" - } - }, "updated_at": "2023-04-14T03:29:27-07:00" }, "collections": { diff --git a/airbyte-integrations/connectors/source-shopify/metadata.yaml b/airbyte-integrations/connectors/source-shopify/metadata.yaml index 13e1c3f24296..caf37588b64a 100644 --- a/airbyte-integrations/connectors/source-shopify/metadata.yaml +++ b/airbyte-integrations/connectors/source-shopify/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: api connectorType: source definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9 - dockerImageTag: 2.0.8 + dockerImageTag: 2.1.0 dockerRepository: airbyte/source-shopify documentationUrl: https://docs.airbyte.com/integrations/sources/shopify githubIssueLabel: source-shopify @@ -64,6 +64,14 @@ data: "product_variants", "transactions", ] + 2.1.0: + message: + "This upgrade changes the `Products`, `Product Images` and `Product Variants` streams to use `Shopify GraphQL BULK`. + More details here: https://github.com/airbytehq/airbyte/pull/37767." + upgradeDeadline: "2024-06-10" + scopedImpact: + - scopeType: stream + impactedScopes: ["product_variants"] suggestedStreams: streams: - customers diff --git a/airbyte-integrations/connectors/source-shopify/poetry.lock b/airbyte-integrations/connectors/source-shopify/poetry.lock index b33eac57dfb5..422b97e2a0d8 100644 --- a/airbyte-integrations/connectors/source-shopify/poetry.lock +++ b/airbyte-integrations/connectors/source-shopify/poetry.lock @@ -278,13 +278,13 @@ files = [ [[package]] name = "exceptiongroup" -version = "1.2.0" +version = "1.2.1" description = "Backport of PEP 654 (exception groups)" optional = false python-versions = ">=3.7" files = [ - {file = "exceptiongroup-1.2.0-py3-none-any.whl", hash = "sha256:4bfd3996ac73b41e9b9628b04e079f193850720ea5945fc96a08633c66912f14"}, - {file = "exceptiongroup-1.2.0.tar.gz", hash = "sha256:91f5c769735f051a4290d52edd0858999b57e5876e9f85937691bd4c9fa3ed68"}, + {file = "exceptiongroup-1.2.1-py3-none-any.whl", hash = "sha256:5258b9ed329c5bbdd31a309f53cbfb0b155341807f6ff7606a1e801a891b29ad"}, + {file = "exceptiongroup-1.2.1.tar.gz", hash = "sha256:a4785e48b045528f5bfe627b6ad554ff32def154f42372786903b7abcfe1aa16"}, ] [package.extras] @@ -532,28 +532,29 @@ pytzdata = ">=2020.1" [[package]] name = "platformdirs" -version = "4.2.0" -description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." +version = "4.2.1" +description = "A small Python package for determining appropriate platform-specific dirs, e.g. a `user data dir`." optional = false python-versions = ">=3.8" files = [ - {file = "platformdirs-4.2.0-py3-none-any.whl", hash = "sha256:0614df2a2f37e1a662acbd8e2b25b92ccf8632929bc6d43467e17fe89c75e068"}, - {file = "platformdirs-4.2.0.tar.gz", hash = "sha256:ef0cc731df711022c174543cb70a9b5bd22e5a9337c8624ef2c2ceb8ddad8768"}, + {file = "platformdirs-4.2.1-py3-none-any.whl", hash = "sha256:17d5a1161b3fd67b390023cb2d3b026bbd40abde6fdb052dfbd3a29c3ba22ee1"}, + {file = "platformdirs-4.2.1.tar.gz", hash = "sha256:031cd18d4ec63ec53e82dceaac0417d218a6863f7745dfcc9efe7793b7039bdf"}, ] [package.extras] docs = ["furo (>=2023.9.10)", "proselint (>=0.13)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1.25.2)"] test = ["appdirs (==1.4.4)", "covdefaults (>=2.3)", "pytest (>=7.4.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)"] +type = ["mypy (>=1.8)"] [[package]] name = "pluggy" -version = "1.4.0" +version = "1.5.0" description = "plugin and hook calling mechanisms for python" optional = false python-versions = ">=3.8" files = [ - {file = "pluggy-1.4.0-py3-none-any.whl", hash = "sha256:7db9f7b503d67d1c5b95f59773ebb58a8c1c288129a88665838012cfb07b8981"}, - {file = "pluggy-1.4.0.tar.gz", hash = "sha256:8c85c2876142a764e5b7548e7d9a0e0ddb46f5185161049a79b7e974454223be"}, + {file = "pluggy-1.5.0-py3-none-any.whl", hash = "sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"}, + {file = "pluggy-1.5.0.tar.gz", hash = "sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"}, ] [package.extras] @@ -670,13 +671,13 @@ files = [ [[package]] name = "pytest" -version = "8.1.1" +version = "8.2.0" description = "pytest: simple powerful testing with Python" optional = false python-versions = ">=3.8" files = [ - {file = "pytest-8.1.1-py3-none-any.whl", hash = "sha256:2a8386cfc11fa9d2c50ee7b2a57e7d898ef90470a7a34c4b949ff59662bb78b7"}, - {file = "pytest-8.1.1.tar.gz", hash = "sha256:ac978141a75948948817d360297b7aae0fcb9d6ff6bc9ec6d514b85d5a65c044"}, + {file = "pytest-8.2.0-py3-none-any.whl", hash = "sha256:1733f0620f6cda4095bbf0d9ff8022486e91892245bb9e7d5542c018f612f233"}, + {file = "pytest-8.2.0.tar.gz", hash = "sha256:d507d4482197eac0ba2bae2e9babf0672eb333017bcedaa5fb1a3d42c1174b3f"}, ] [package.dependencies] @@ -684,11 +685,11 @@ colorama = {version = "*", markers = "sys_platform == \"win32\""} exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""} iniconfig = "*" packaging = "*" -pluggy = ">=1.4,<2.0" +pluggy = ">=1.5,<2.0" tomli = {version = ">=1", markers = "python_version < \"3.11\""} [package.extras] -testing = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] [[package]] name = "pytest-mock" diff --git a/airbyte-integrations/connectors/source-shopify/pyproject.toml b/airbyte-integrations/connectors/source-shopify/pyproject.toml index 6f5ca760852d..bf22a128ceac 100644 --- a/airbyte-integrations/connectors/source-shopify/pyproject.toml +++ b/airbyte-integrations/connectors/source-shopify/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "2.0.8" +version = "2.1.0" name = "source-shopify" description = "Source CDK implementation for Shopify." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/schemas/product_variants.json b/airbyte-integrations/connectors/source-shopify/source_shopify/schemas/product_variants.json index 56d609b88678..c9d658d3a4ed 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/schemas/product_variants.json +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/schemas/product_variants.json @@ -121,8 +121,17 @@ } }, "compare_at_price": { - "description": "The original price of the variant in a different currency before any discount", - "type": ["null", "number"] + "type": ["null", "object"], + "properties": { + "amount": { + "description": "The amount of the price", + "type": ["null", "number"] + }, + "currency_code": { + "description": "The currency code of the price", + "type": ["null", "string"] + } + } } } } diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/__init__.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py index 59aaec8641c8..1177d0fbdcf1 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py @@ -19,8 +19,10 @@ def __init__(self, message: str, **kwargs) -> None: class BulkJobError(BaseBulkException): """Raised when there are BULK Job Errors in response""" - class BulkJobUnknownError(BaseBulkException): - """Raised when BULK Job has FAILED with Unknown status""" + class BulkJobNonHandableError(BaseBulkException): + """Raised when there are non-actionable BULK Job Errors in response""" + + failure_type: FailureType = FailureType.system_error class BulkJobBadResponse(BaseBulkException): """Raised when the requests.Response object could not be parsed""" diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py index fdbccbca9735..e6cf79b7d9d5 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py @@ -67,7 +67,7 @@ class ShopifyBulkManager: # P365D, upper boundary for slice size _job_size_max: Final[float] = 365.0 # dynamically adjusted slice interval - _job_size: float = field(init=False, default=0.0) + job_size: float = field(init=False, default=0.0) # expand slice factor _job_size_expand_factor: int = field(init=False, default=2) # reduce slice factor @@ -138,10 +138,10 @@ def _is_long_running_job(self) -> bool: return False def _expand_job_size(self) -> None: - self._job_size += self._job_size_adjusted_expand_factor + self.job_size += self._job_size_adjusted_expand_factor def _reduce_job_size(self) -> None: - self._job_size /= self._job_size_adjusted_reduce_factor + self.job_size /= self._job_size_adjusted_reduce_factor def _save_latest_request(self, response: requests.Response) -> None: self._request = response.request @@ -162,7 +162,7 @@ def __adjust_job_size(self, job_current_elapsed_time: float) -> None: # set the last job time self._job_last_elapsed_time = job_current_elapsed_time # check the job size slice interval are acceptable - self._job_size = max(self._job_size_min, min(self._job_size, self._job_size_max)) + self.job_size = max(self._job_size_min, min(self.job_size, self._job_size_max)) def __reset_state(self) -> None: # reset the job state to default @@ -282,19 +282,47 @@ def _on_access_denied_job(self, **kwagrs) -> AirbyteTracedException: ) def _on_job_with_errors(self, errors: List[Mapping[str, Any]]) -> AirbyteTracedException: - raise ShopifyBulkExceptions.BulkJobUnknownError( - f"Could not validate the status of the BULK Job `{self._job_id}`. Errors: {errors}." - ) + raise ShopifyBulkExceptions.BulkJobError(f"Could not validate the status of the BULK Job `{self._job_id}`. Errors: {errors}.") - def _job_check_for_errors(self, response: requests.Response) -> Optional[Iterable[Mapping[str, Any]]]: - try: + def _on_non_handable_job_error(self, errors: List[Mapping[str, Any]]) -> AirbyteTracedException: + raise ShopifyBulkExceptions.BulkJobNonHandableError(f"The Stream: `{self.stream_name}`, Non-handable error occured: {errors}") - return response.json().get("errors") or response.json().get("data", {}).get("bulkOperationRunQuery", {}).get("userErrors", []) + def _collect_bulk_errors(self, response: requests.Response) -> List[Optional[dict]]: + try: + server_errors = response.json().get("errors", []) + user_errors = response.json().get("data", {}).get("bulkOperationRunQuery", {}).get("userErrors", []) + errors = server_errors + user_errors + return errors except (Exception, JSONDecodeError) as e: raise ShopifyBulkExceptions.BulkJobBadResponse( f"Couldn't check the `response` for `errors`, status: {response.status_code}, response: `{response.text}`. Trace: {repr(e)}." ) + def _job_healthcheck(self, response: requests.Response) -> Optional[Exception]: + try: + # save the latest request to retry + self._save_latest_request(response) + + # get the errors, if occured + errors = self._collect_bulk_errors(response) + + # when the concurrent job takes place, + # another job could not be created + # we typically need to wait and retry, but no longer than 10 min. + if self._has_running_concurrent_job(errors): + return self._job_retry_on_concurrency() + + # when the job was already created and the error appears in the middle + if self._job_state and errors: + self._on_job_with_errors(errors) + + # when the job was not created because of some errors + if not self._job_state and errors: + self._on_non_handable_job_error(errors) + + except (ShopifyBulkExceptions.BulkJobBadResponse, ShopifyBulkExceptions.BulkJobError) as e: + raise e + def _job_send_state_request(self) -> requests.Response: with self.session as job_state_request: status_args = self._job_get_request_args(ShopifyBulkTemplates.status) @@ -303,11 +331,7 @@ def _job_send_state_request(self) -> requests.Response: def _job_track_running(self) -> None: job_state_response = self._job_send_state_request() - errors = self._job_check_for_errors(job_state_response) - if errors: - # the exception raised when there are job-related errors, and the Job cannot be run futher. - self._on_job_with_errors(errors) - + self._job_healthcheck(job_state_response) self._job_update_state(job_state_response) self._job_state_to_fn_map.get(self._job_state)(response=job_state_response) @@ -348,7 +372,7 @@ def _job_retry_concurrent(self) -> Optional[requests.Response]: ) sleep(self._concurrent_interval) retried_response = self._job_retry_request() - return self._job_healthcheck(retried_response) + return self.job_process_created(retried_response) def _job_retry_on_concurrency(self) -> Optional[requests.Response]: if self._has_reached_max_concurrency(): @@ -360,17 +384,6 @@ def _job_retry_on_concurrency(self) -> Optional[requests.Response]: else: return self._job_retry_concurrent() - def _job_healthcheck(self, response: requests.Response) -> Optional[requests.Response]: - # save the latest request to retry - self._save_latest_request(response) - # check for query errors - errors = self._job_check_for_errors(response) - # when the concurrent job takes place, we typically need to wait and retry, but no longer than 10 min. - if self._has_running_concurrent_job(errors): - return self._job_retry_on_concurrency() - - return response if not errors else None - @bulk_retry_on_exception(logger) def _job_check_state(self) -> Optional[str]: while not self._job_completed(): @@ -381,12 +394,13 @@ def _job_check_state(self) -> Optional[str]: # external method to be used within other components + @bulk_retry_on_exception(logger) def job_process_created(self, response: requests.Response) -> None: """ The Bulk Job with CREATED status, should be processed, before we move forward with Job Status Checks. """ - response = self._job_healthcheck(response) - bulk_response = response.json().get("data", {}).get("bulkOperationRunQuery", {}).get("bulkOperation", {}) + self._job_healthcheck(response) + bulk_response = response.json().get("data", {}).get("bulkOperationRunQuery", {}).get("bulkOperation", {}) if response else None if bulk_response and bulk_response.get("status") == ShopifyBulkJobStatus.CREATED.value: self._job_id = bulk_response.get("id") self._job_created_at = bulk_response.get("createdAt") @@ -396,10 +410,10 @@ def job_size_normalize(self, start: datetime, end: datetime) -> datetime: # adjust slice size when it's bigger than the loop point when it should end, # to preserve correct job size adjustments when this is the only job we need to run, based on STATE provided requested_slice_size = (end - start).total_days() - self._job_size = requested_slice_size if requested_slice_size < self._job_size else self._job_size + self.job_size = requested_slice_size if requested_slice_size < self.job_size else self.job_size def get_adjusted_job_start(self, slice_start: datetime) -> datetime: - step = self._job_size if self._job_size else self._job_size_min + step = self.job_size if self.job_size else self._job_size_min return slice_start.add(days=step) def get_adjusted_job_end(self, slice_start: datetime, slice_end: datetime) -> datetime: @@ -415,19 +429,19 @@ def job_check_for_completion(self) -> Optional[str]: This method checks the status for the `CREATED` Shopify BULK Job, using it's `ID`. The time spent for the Job execution is tracked to understand the effort. """ - # track created job until it's COMPLETED + job_started = time() try: + # track created job until it's COMPLETED self._job_check_state() return self._job_result_filename except ( - ShopifyBulkExceptions.BulkJobCanceled, ShopifyBulkExceptions.BulkJobFailed, ShopifyBulkExceptions.BulkJobTimout, ShopifyBulkExceptions.BulkJobAccessDenied, - # this one is retryable, but stil needs to be raised, - # if the max attempts value is reached. - ShopifyBulkExceptions.BulkJobUnknownError, + # when the job is canceled by non-source actions, + # we should raise the system_error + ShopifyBulkExceptions.BulkJobCanceled, ) as bulk_job_error: raise bulk_job_error finally: diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py index 0b01e4e787d8..6b0543ca358a 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py @@ -179,7 +179,7 @@ def resolve(self, query: Query) -> str: # return the constructed query operation return Operation(type="", queries=[query]).render() - def record_process_components(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]: """ Defines how to process collected components, default `as is`. """ @@ -281,7 +281,7 @@ def query_nodes(self) -> List[Field]: elif isinstance(self.type.value, str): return ["__typename", "id", metafield_node] - def record_process_components(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]: # resolve parent id from `str` to `int` record["owner_id"] = self.tools.resolve_str_id(record.get(BULK_PARENT_KEY)) # add `owner_resource` field @@ -670,7 +670,7 @@ class DiscountCode(ShopifyBulkQuery): "record_components": ["DiscountRedeemCode"], } - def record_process_components(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def record_process_components(self, record: MutableMapping[str, Any]) -> Optional[Iterable[MutableMapping[str, Any]]]: """ Defines how to process collected components. """ @@ -751,7 +751,7 @@ class Collection(ShopifyBulkQuery): "record_components": ["CollectionPublication"], } - def record_process_components(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]: """ Defines how to process collected components. """ @@ -834,7 +834,9 @@ class CustomerAddresses(ShopifyBulkQuery): "new_record": "Customer", } - def set_default_address(self, record: MutableMapping[str, Any], address_record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def set_default_address( + self, record: MutableMapping[str, Any], address_record: MutableMapping[str, Any] + ) -> Iterable[MutableMapping[str, Any]]: default_address = record.get("defaultAddress", {}) # the default_address could be literal `None`, additional check is required if default_address: @@ -924,7 +926,7 @@ class InventoryItem(ShopifyBulkQuery): "new_record": "InventoryItem", } - def record_process_components(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]: """ Defines how to process collected components. """ @@ -1006,7 +1008,7 @@ def query(self, filter_query: Optional[str] = None) -> Query: additional_query_args=self.locations_query_args, ) - def record_process_components(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]: """ Defines how to process collected components. """ @@ -1310,7 +1312,7 @@ def process_merchant_request(self, record: MutableMapping[str, Any]) -> MutableM record = self.tools.fields_names_to_snake_case(record) return record - def record_process_components(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: + def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]: """ Defines how to process collected components. """ @@ -1502,3 +1504,483 @@ def record_process_components(self, record: MutableMapping[str, Any]) -> Optiona transaction["order_id"] = record.get("id") transaction["currency"] = record.get("currency") yield self.process_transaction(transaction) + + +class Product(ShopifyBulkQuery): + """ + { + products(query: "updated_at:>='2020-01-20T00:00:00+00:00' AND updated_at:<'2024-04-25T00:00:00+00:00'", sortKey:UPDATED_AT) { + edges { + node { + __typename + id + publishedAt + createdAt + status + vendor + updatedAt + bodyHtml + productType + tags + options { + __typename + id + values + position + } + handle + images { + edges { + node { + __typename + id + } + } + + } + templateSuffix + title + variants { + edges { + node { + __typename + id + } + } + } + } + } + } + } + """ + + query_name = "products" + sort_key = "UPDATED_AT" + # images property fields + images_fields: List[Field] = [Field(name="edges", fields=[Field(name="node", fields=["__typename", "id"])])] + # variants property fields, we re-use the same field names as for the `images` property + variants_fields: List[Field] = images_fields + # main query + query_nodes: List[Field] = [ + "__typename", + "id", + "publishedAt", + "createdAt", + "status", + "vendor", + "updatedAt", + "bodyHtml", + "productType", + "tags", + "handle", + "templateSuffix", + "title", + Field(name="options", fields=["id", "name", "values", "position"]), + Field(name="images", fields=images_fields), + Field(name="variants", fields=variants_fields), + ] + + record_composition = { + "new_record": "Product", + # each product could have `Image` and `ProductVariant` associated with the product + "record_components": ["Image", "ProductVariant"], + } + + def _process_component(self, entity: List[dict]) -> List[dict]: + for item in entity: + # remove the `__parentId` from the object + if BULK_PARENT_KEY in item: + item.pop(BULK_PARENT_KEY) + # resolve the id from string + item["id"] = self.tools.resolve_str_id(item.get("id")) + return entity + + def _process_options(self, options: List[dict], product_id: Optional[int] = None) -> List[dict]: + for option in options: + # add product_id to each option + option["product_id"] = product_id if product_id else None + return options + + def _unnest_tags(self, record: MutableMapping[str, Any]) -> str: + # we keep supporting 1 tag only, as it was for the REST stream, + # to avoid breaking change. + tags = record.get("tags", []) + return tags[0] if len(tags) > 0 else None + + def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]: + """ + Defines how to process collected components. + """ + # get the joined record components collected for the record + record_components = record.get("record_components", {}) + + # process record components + if record_components: + record["images"] = self._process_component(record_components.get("Image", [])) + record["variants"] = self._process_component(record_components.get("ProductVariant", [])) + record["options"] = self._process_component(record.get("options", [])) + # add the product_id to the `options` + product_id = record.get("id") + record["options"] = self._process_options(record.get("options", []), product_id) + record.pop("record_components") + # unnest the `tags` (the list of 1) + record["tags"] = self._unnest_tags(record) + # convert dates from ISO-8601 to RFC-3339 + record["published_at"] = self.tools.from_iso8601_to_rfc3339(record, "publishedAt") + record["updatedAt"] = self.tools.from_iso8601_to_rfc3339(record, "updatedAt") + record["createdAt"] = self.tools.from_iso8601_to_rfc3339(record, "createdAt") + + yield record + + +class ProductImage(ShopifyBulkQuery): + """ + { + products( + query: "updated_at:>='2019-04-13T00:00:00+00:00' AND updated_at:<='2024-04-30T12:16:17.273363+00:00'" + sortKey: UPDATED_AT + ) { + edges { + node { + __typename + id + # THE MEDIA NODE IS NEEDED TO PROVIDE THE CURSORS + media { + edges { + node { + ... on MediaImage { + __typename + createdAt + updatedAt + image { + url + } + } + } + } + } + # THIS IS THE MAIN NODE WE WANT TO GET + images { + edges { + node { + __typename + id + height + alt: altText + src + url + width + } + } + } + } + } + } + } + """ + + query_name = "products" + sort_key = "UPDATED_AT" + + # images property fields + images_fields: List[Field] = [ + Field( + name="edges", + fields=[ + Field( + name="node", + fields=[ + "__typename", + "id", + "height", + Field(name="altText", alias="alt"), + "src", + "url", + "width", + ], + ) + ], + ) + ] + + # media fragment, contains the info about when the Image was created or updated. + media_fragment: List[InlineFragment] = [ + InlineFragment( + type="MediaImage", + fields=[ + "__typename", + "createdAt", + "updatedAt", + # fetch the `url` as the key for the later join + Field(name="image", fields=["url"]), + ], + ), + ] + + # media property fields + media_fields: List[Field] = [Field(name="edges", fields=[Field(name="node", fields=media_fragment)])] + + # main query + query_nodes: List[Field] = [ + "__typename", + "id", + Field(name="media", fields=media_fields), + Field(name="images", fields=images_fields), + ] + + record_composition = { + "new_record": "Product", + # each product could have `MediaImage` associated with the product, + # each product could have `Image` assiciated with the product and the related `MediaImage`, + # there could be multiple `MediaImage` and `Image` assigned to the product. + "record_components": ["MediaImage", "Image"], + } + + def _process_component(self, entity: List[dict]) -> List[dict]: + for item in entity: + # remove the `__parentId` from the object + if BULK_PARENT_KEY in item: + item.pop(BULK_PARENT_KEY) + # resolve the id from string + item["admin_graphql_api_id"] = item.get("id") + item["id"] = self.tools.resolve_str_id(item.get("id")) + return entity + + def _add_product_id(self, options: List[dict], product_id: Optional[int] = None) -> List[dict]: + for option in options: + # add product_id to each option + option["product_id"] = product_id if product_id else None + return options + + def _merge_with_media(self, record_components: List[dict]) -> Optional[Iterable[MutableMapping[str, Any]]]: + media = record_components.get("MediaImage", []) + images = record_components.get("Image", []) + + # Create a dictionary to map the 'url' key in images + url_map = {item["url"]: item for item in images} + + # Merge images with data from media when 'image.url' matches 'url' + for item in media: + # remove the `__parentId` from Media + if BULK_PARENT_KEY in item: + item.pop(BULK_PARENT_KEY) + + image_url = item.get("image", {}).get("url") + if image_url in url_map: + # Merge images into media + item.update(url_map.get(image_url)) + # remove lefovers + item.pop("image", None) + item.pop("url", None) + # make the `alt` None, if it's an empty str, since server sends the "" instead of Null + alt = item.get("alt") + item["alt"] = None if not alt else alt + + # return merged list of images + return media + + def _convert_datetime_to_rfc3339(self, images: List[dict]) -> MutableMapping[str, Any]: + for image in images: + image["createdAt"] = self.tools.from_iso8601_to_rfc3339(image, "createdAt") + image["updatedAt"] = self.tools.from_iso8601_to_rfc3339(image, "updatedAt") + return images + + def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]: + """ + Defines how to process collected components. + """ + # get the joined record components collected for the record + record_components = record.get("record_components", {}) + + # process record components + if record_components: + record["images"] = self._process_component(record_components.get("Image", [])) + # add the product_id to each `Image` + record["images"] = self._add_product_id(record.get("images", []), record.get("id")) + record["images"] = self._merge_with_media(record_components) + record.pop("record_components") + # produce images records + if len(record.get("images", [])) > 0: + # convert dates from ISO-8601 to RFC-3339 + record["images"] = self._convert_datetime_to_rfc3339(record.get("images", [])) + yield from record.get("images", []) + + +class ProductVariant(ShopifyBulkQuery): + """ + { + productVariants( + query: "updated_at:>='2019-04-13T00:00:00+00:00' AND updated_at:<='2024-04-30T12:16:17.273363+00:00'" + sortKey: UPDATED_AT + ) { + edges { + node { + __typename + id + product { + product_id: id + } + title + price + sku + position + inventoryPolicy + compareAtPrice + fulfillmentService { + fulfillment_service: handle + } + inventoryManagement + createdAt + updatedAt + taxable + barcode + grams: weight + weight + weightUnit + inventoryItem { + inventory_item_id: id + } + inventoryQuantity + old_inventory_quantity: inventoryQuantity + presentmentPrices { + edges { + node { + __typename + price { + amount + currencyCode + } + compareAtPrice { + amount + currencyCode + } + } + } + } + requiresShipping + image { + image_id: id + } + } + } + } + } + """ + + query_name = "productVariants" + sort_key = "ID" + + prices_fields: List[str] = ["amount", "currencyCode"] + presentment_prices_fields: List[Field] = [ + Field( + name="edges", + fields=[ + Field( + name="node", + fields=["__typename", Field(name="price", fields=prices_fields), Field(name="compareAtPrice", fields=prices_fields)], + ) + ], + ) + ] + + # main query + query_nodes: List[Field] = [ + "__typename", + "id", + "title", + "price", + "sku", + "position", + "inventoryPolicy", + "compareAtPrice", + "inventoryManagement", + "createdAt", + "updatedAt", + "taxable", + "barcode", + "weight", + "weightUnit", + "inventoryQuantity", + "requiresShipping", + Field(name="weight", alias="grams"), + Field(name="image", fields=[Field(name="id", alias="image_id")]), + Field(name="inventoryQuantity", alias="old_inventory_quantity"), + Field(name="product", fields=[Field(name="id", alias="product_id")]), + Field(name="fulfillmentService", fields=[Field(name="handle", alias="fulfillment_service")]), + Field(name="inventoryItem", fields=[Field(name="id", alias="inventory_item_id")]), + Field(name="presentmentPrices", fields=presentment_prices_fields), + ] + + record_composition = { + "new_record": "ProductVariant", + # each `ProductVariant` could have `ProductVariantPricePair` associated with the product variant. + "record_components": ["ProductVariantPricePair"], + } + + def _process_presentment_prices(self, entity: List[dict]) -> List[dict]: + for item in entity: + # remove the `__parentId` from the object + if BULK_PARENT_KEY in item: + item.pop(BULK_PARENT_KEY) + + # these objects could be literally `Null/None` from the response, + # this is treated like a real value, so we need to assigne the correct values instead + price: Optional[Mapping[str, Any]] = item.get("price", {}) + if not price: + price = {} + # get the amount values + price_amount = price.get("amount") if price else None + # make the nested object's values up to the schema, (cast the `str` > `float`) + item["price"]["amount"] = float(price_amount) if price_amount else None + # convert field names to snake case + item["price"] = self.tools.fields_names_to_snake_case(item.get("price")) + + compare_at_price: Optional[Mapping[str, Any]] = item.get("compareAtPrice", {}) + if not compare_at_price: + compare_at_price = {} + # assign the correct value, if there is no object from response + item["compareAtPrice"] = compare_at_price + compare_at_price_amount = compare_at_price.get("amount") if compare_at_price else None + item["compareAtPrice"]["amount"] = float(compare_at_price_amount) if compare_at_price_amount else None + item["compare_at_price"] = self.tools.fields_names_to_snake_case(item["compareAtPrice"]) + # remove leftovers + item.pop("compareAtPrice", None) + + return entity + + def _unnest_and_resolve_id(self, record: MutableMapping[str, Any], from_property: str, id_field: str) -> int: + entity = record.get(from_property, {}) + return self.tools.resolve_str_id(entity.get(id_field)) if entity else None + + def record_process_components(self, record: MutableMapping[str, Any]) -> Iterable[MutableMapping[str, Any]]: + """ + Defines how to process collected components. + """ + + # get the joined record components collected for the record + record_components = record.get("record_components", {}) + # process record components + if record_components: + record["presentment_prices"] = self._process_presentment_prices(record_components.get("ProductVariantPricePair", [])) + record.pop("record_components") + + # unnest mandatory fields from their placeholders + record["product_id"] = self._unnest_and_resolve_id(record, "product", "product_id") + record["inventory_item_id"] = self._unnest_and_resolve_id(record, "inventoryItem", "inventory_item_id") + record["image_id"] = self._unnest_and_resolve_id(record, "image", "image_id") + # unnest `fulfillment_service` from `fulfillmentService` + record["fulfillment_service"] = record.get("fulfillmentService", {}).get("fulfillment_service") + # cast the `price` to number, could be literally `None` + price = record.get("price") + record["price"] = float(price) if price else None + # cast the `grams` to integer + record["grams"] = int(record.get("grams", 0)) + # convert date-time cursors + record["createdAt"] = self.tools.from_iso8601_to_rfc3339(record, "createdAt") + record["updatedAt"] = self.tools.from_iso8601_to_rfc3339(record, "updatedAt") + # clean up the leftovers + record.pop("image", None) + record.pop("product", None) + record.pop("inventoryItem", None) + + yield record diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/retry.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/retry.py index d3550a0826ff..c081db0e917c 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/retry.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/retry.py @@ -10,7 +10,7 @@ BULK_RETRY_ERRORS: Final[Tuple] = ( ShopifyBulkExceptions.BulkJobBadResponse, - ShopifyBulkExceptions.BulkJobUnknownError, + ShopifyBulkExceptions.BulkJobError, ) diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py index f33545e3c449..d5b85a991369 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py @@ -748,7 +748,7 @@ def get_state_value(self, stream_state: Mapping[str, Any] = None) -> Optional[Un return self.config.get("start_date") def emit_slice_message(self, slice_start: datetime, slice_end: datetime) -> None: - slice_size_message = f"Slice size: `P{round(self.job_manager._job_size, 1)}D`" + slice_size_message = f"Slice size: `P{round(self.job_manager.job_size, 1)}D`" self.logger.info(f"Stream: `{self.name}` requesting BULK Job for period: {slice_start} -- {slice_end}. {slice_size_message}") @stream_state_cache.cache_stream_state @@ -772,7 +772,7 @@ def process_bulk_results( self, response: requests.Response, stream_state: Optional[Mapping[str, Any]] = None, - ) -> Iterable[Mapping[str, Any]]: + ) -> Optional[Iterable[Mapping[str, Any]]]: # process the CREATED Job prior to other actions self.job_manager.job_process_created(response) # get results fetched from COMPLETED BULK Job diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py index 17643e9f774c..c5854cb3a7c6 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py @@ -24,6 +24,9 @@ MetafieldProduct, MetafieldProductImage, MetafieldProductVariant, + Product, + ProductImage, + ProductVariant, Transaction, ) from source_shopify.shopify_graphql.graphql import get_query_products @@ -108,10 +111,8 @@ class MetafieldDraftOrders(IncrementalShopifyGraphQlBulkStream): bulk_query: MetafieldDraftOrder = MetafieldDraftOrder -class Products(IncrementalShopifyStreamWithDeletedEvents): - use_cache = True - data_field = "products" - deleted_events_api_name = "Product" +class Products(IncrementalShopifyGraphQlBulkStream): + bulk_query: Product = Product class ProductsGraphQl(IncrementalShopifyStream): @@ -167,22 +168,16 @@ class MetafieldProducts(IncrementalShopifyGraphQlBulkStream): bulk_query: MetafieldProduct = MetafieldProduct -class ProductImages(IncrementalShopifyNestedStream): - parent_stream_class = Products - nested_entity = "images" - # add `product_id` to each nested subrecord - mutation_map = {"product_id": "id"} +class ProductImages(IncrementalShopifyGraphQlBulkStream): + bulk_query: ProductImage = ProductImage class MetafieldProductImages(IncrementalShopifyGraphQlBulkStream): bulk_query: MetafieldProductImage = MetafieldProductImage -class ProductVariants(IncrementalShopifyNestedStream): - parent_stream_class = Products - nested_entity = "variants" - # add `product_id` to each nested subrecord - mutation_map = {"product_id": "id"} +class ProductVariants(IncrementalShopifyGraphQlBulkStream): + bulk_query: ProductVariant = ProductVariant class MetafieldProductVariants(IncrementalShopifyGraphQlBulkStream): diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/__init__.py b/airbyte-integrations/connectors/source-shopify/unit_tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py b/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py index b235659f65d9..127c9d86e19f 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py @@ -510,6 +510,32 @@ def filfillment_order_jsonl_content_example(): {"__typename":"FulfillmentOrderMerchantRequest","id":"gid:\/\/shopify\/FulfillmentOrderMerchantRequest\/333","message":null,"kind":"FULFILLMENT_REQUEST","requestOptions":{"notify_customer":true},"__parentId":"gid:\/\/shopify\/FulfillmentOrder\/2"}\n""" +@pytest.fixture +def products_jsonl_content_example(): + return """{"__typename":"Product","id":"gid:\/\/shopify\/Product\/123","publishedAt":"2021-06-23T01:09:29Z","createdAt":"2021-06-23T01:09:29Z","status":"ACTIVE","vendor":"Blanda, O'Kon and Bartell","updatedAt":"2023-04-20T11:12:26Z","bodyHtml":"Gold and silver glitter iPhone 7 cases with geometric line patterns, stacked","productType":"Music","tags":["developer-tools-generator"],"handle":"gold-silver-iphone-7-case","templateSuffix":null,"title":"Gold Silver iPhone 7 Case","options":[{"id":"gid:\/\/shopify\/ProductOption\/444","name":"Title","values":["Plastic","indigo"],"position":1}]} +{"__typename":"Image","id":"gid:\/\/shopify\/ProductImage\/111","__parentId":"gid:\/\/shopify\/Product\/123"} +{"__typename":"ProductVariant","id":"gid:\/\/shopify\/ProductVariant\/111","__parentId":"gid:\/\/shopify\/Product\/123"} +{"__typename":"ProductVariant","id":"gid:\/\/shopify\/ProductVariant\/222","__parentId":"gid:\/\/shopify\/Product\/123"}\n""" + + +@pytest.fixture +def product_images_jsonl_content_example(): + return """{"__typename":"Product","id":"gid:\/\/shopify\/Product\/123"} +{"__typename":"MediaImage","createdAt":"2023-01-06T18:29:17Z","updatedAt":"2023-01-06T18:29:19Z","image":{"url":"https:\/\/cdn.shopify.com\/s\/files\/1\/0580\/3317\/6765\/products\/white-t-shirt.jpg?v=1673029759"},"__parentId":"gid:\/\/shopify\/Product\/123"} +{"__typename":"Image","id":"gid:\/\/shopify\/ProductImage\/111","height":280,"alt":"","src":"https:\/\/cdn.shopify.com\/s\/files\/1\/0580\/3317\/6765\/products\/white-t-shirt.jpg?v=1673029759","url":"https:\/\/cdn.shopify.com\/s\/files\/1\/0580\/3317\/6765\/products\/white-t-shirt.jpg?v=1673029759","width":265,"__parentId":"gid:\/\/shopify\/Product\/123"} +{"__typename":"Product","id":"gid:\/\/shopify\/Product\/456"} +{"__typename":"MediaImage","createdAt":"2021-06-23T01:09:47Z","updatedAt":"2023-04-24T17:27:15Z","image":{"url":"https:\/\/cdn.shopify.com\/s\/files\/1\/0580\/3317\/6765\/products\/4-ounce-soy-candle.jpg?v=1624410587"},"__parentId":"gid:\/\/shopify\/Product\/456"} +{"__typename":"Image","id":"gid:\/\/shopify\/ProductImage\/222","height":1467,"alt":"updated_mon_24.04.2023","src":"https:\/\/cdn.shopify.com\/s\/files\/1\/0580\/3317\/6765\/products\/4-ounce-soy-candle.jpg?v=1624410587","url":"https:\/\/cdn.shopify.com\/s\/files\/1\/0580\/3317\/6765\/products\/4-ounce-soy-candle.jpg?v=1624410587","width":2200,"__parentId":"gid:\/\/shopify\/Product\/456"}\n""" + + +@pytest.fixture +def product_variants_jsonl_content_example(): + return """{"__typename":"ProductVariant","id":"gid:\/\/shopify\/ProductVariant\/123","title":"Test 234","price":"59.00","sku":"","position":3,"inventoryPolicy":"DENY","compareAtPrice":null,"inventoryManagement":"SHOPIFY","createdAt":"2023-04-14T10:29:27Z","updatedAt":"2023-10-27T16:56:39Z","taxable":true,"barcode":"","weight":0.0,"weightUnit":"GRAMS","inventoryQuantity":0,"requiresShipping":false,"grams":0.0,"image":null,"old_inventory_quantity":0,"product":{"product_id":"gid:\/\/shopify\/Product\/111"},"fulfillmentService":{"fulfillment_service":"manual"},"inventoryItem":{"inventory_item_id":"gid:\/\/shopify\/InventoryItem\/222"}} +{"__typename":"ProductVariantPricePair","price":{"amount":"59.0","currencyCode":"USD"},"compareAtPrice":null,"__parentId":"gid:\/\/shopify\/ProductVariant\/123"} +{"__typename":"ProductVariant","id":"gid:\/\/shopify\/ProductVariant\/456","title":"Test Variant","price":"113.00","sku":"123","position":4,"inventoryPolicy":"CONTINUE","compareAtPrice":"1.00","inventoryManagement":"SHOPIFY","createdAt":"2023-12-11T10:37:41Z","updatedAt":"2023-12-11T10:37:41Z","taxable":true,"barcode":"123","weight":127.0,"weightUnit":"GRAMS","inventoryQuantity":1,"requiresShipping":true,"grams":127.0,"image":{"image_id":"gid:\/\/shopify\/ProductImage\/123456"},"old_inventory_quantity":1,"product":{"product_id":"gid:\/\/shopify\/Product\/222"},"fulfillmentService":{"fulfillment_service":"manual"},"inventoryItem":{"inventory_item_id":"gid:\/\/shopify\/InventoryItem\/333"}} +{"__typename":"ProductVariantPricePair","price":{"amount":"113.0","currencyCode":"USD"},"compareAtPrice":{"amount":"1.0","currencyCode":"USD"},"__parentId":"gid:\/\/shopify\/ProductVariant\/456"}\n""" + + @pytest.fixture def inventory_items_jsonl_content_example(): return """{"__typename":"InventoryItem","id":"gid:\/\/shopify\/InventoryItem\/44871665713341","unitCost":null,"countryCodeOfOrigin":null,"harmonizedSystemCode":null,"provinceCodeOfOrigin":null,"updatedAt":"2023-04-14T10:29:27Z","createdAt":"2023-04-14T10:29:27Z","sku":"","tracked":true,"requiresShipping":false} @@ -661,6 +687,162 @@ def fulfillment_orders_response_expected_result(): } +@pytest.fixture +def products_response_expected_result(): + return { + "id": 123, + "published_at": "2021-06-23T01:09:29+00:00", + "created_at": "2021-06-23T01:09:29+00:00", + "status": "ACTIVE", + "vendor": "Blanda, O'Kon and Bartell", + "updated_at": "2023-04-20T11:12:26+00:00", + "body_html": "Gold and silver glitter iPhone 7 cases with geometric line patterns, stacked", + "product_type": "Music", + "tags": "developer-tools-generator", + "handle": "gold-silver-iphone-7-case", + "template_suffix": None, + "title": "Gold Silver iPhone 7 Case", + "options": [ + { + "id": 444, + "name": "Title", + "values": [ + "Plastic", + "indigo" + ], + "position": 1, + "product_id": 123 + } + ], + "admin_graphql_api_id": "gid://shopify/Product/123", + "images": [ + { + "id": 111 + } + ], + "variants": [ + { + "id": 111 + }, + { + "id": 222 + } + ], + "shop_url": "test_shop" + } + + +@pytest.fixture +def product_images_response_expected_result(): + return [ + { + "created_at": "2023-01-06T18:29:17+00:00", + "updated_at": "2023-01-06T18:29:19+00:00", + "id": 111, + "height": 280, + "alt": None, + "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/white-t-shirt.jpg?v=1673029759", + "width": 265, + "admin_graphql_api_id": "gid://shopify/ProductImage/111", + "product_id": 123, + "shop_url": "test_shop" + }, + { + "created_at": "2021-06-23T01:09:47+00:00", + "updated_at": "2023-04-24T17:27:15+00:00", + "id": 222, + "height": 1467, + "alt": "updated_mon_24.04.2023", + "src": "https://cdn.shopify.com/s/files/1/0580/3317/6765/products/4-ounce-soy-candle.jpg?v=1624410587", + "width": 2200, + "admin_graphql_api_id": "gid://shopify/ProductImage/222", + "product_id": 456, + "shop_url": "test_shop" + } + ] + + +@pytest.fixture +def product_variants_response_expected_result(): + return [ + { + "id": 123, + "title": "Test 234", + "price": 59.00, + "sku": "", + "position": 3, + "inventory_policy": "DENY", + "compare_at_price": None, + "inventory_management": "SHOPIFY", + "created_at": "2023-04-14T10:29:27+00:00", + "updated_at": "2023-10-27T16:56:39+00:00", + "taxable": True, + "barcode": "", + "weight": 0.0, + "weight_unit": "GRAMS", + "inventory_quantity": 0, + "requires_shipping": False, + "grams": 0, + "image_id": None, + "old_inventory_quantity": 0, + "fulfillment_service": "manual", + "admin_graphql_api_id": "gid://shopify/ProductVariant/123", + "presentment_prices": [ + { + "price": { + "amount": 59.0, + "currency_code": "USD" + }, + "compare_at_price": { + "amount": None + } + } + ], + "product_id": 111, + "inventory_item_id": 222, + "shop_url": "test_shop" + }, + { + "id": 456, + "title": "Test Variant", + "price": 113.00, + "sku": "123", + "position": 4, + "inventory_policy": "CONTINUE", + "compare_at_price": "1.00", + "inventory_management": "SHOPIFY", + "created_at": "2023-12-11T10:37:41+00:00", + "updated_at": "2023-12-11T10:37:41+00:00", + "taxable": True, + "barcode": "123", + "weight": 127.0, + "weight_unit": "GRAMS", + "inventory_quantity": 1, + "requires_shipping": True, + "grams": 127, + "image_id": 123456, + "old_inventory_quantity": 1, + "fulfillment_service": "manual", + "admin_graphql_api_id": "gid://shopify/ProductVariant/456", + "presentment_prices": [ + { + "price": { + "amount": 113.0, + "currency_code": "USD" + }, + "compare_at_price": { + "amount": 1.0, + "currency_code": "USD" + } + } + ], + "product_id": 222, + "inventory_item_id": 333, + "shop_url": "test_shop" + } + ] + + @pytest.fixture def inventory_items_response_expected_result(): return [ diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/__init__.py b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py index 69e2838fc16e..015242ac065f 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py @@ -15,26 +15,13 @@ InventoryItems, InventoryLevels, MetafieldOrders, + ProductImages, + Products, + ProductVariants, TransactionsGraphql, ) -@pytest.mark.parametrize( - "bulk_job_response, expected_len", - [ - ("bulk_error", 1), - ("bulk_unknown_error", 1), - ("bulk_no_errors", 0), - ], -) -def test_check_for_errors(request, requests_mock, bulk_job_response, expected_len, auth_config) -> None: - stream = MetafieldOrders(auth_config) - requests_mock.get(stream.job_manager.base_url, json=request.getfixturevalue(bulk_job_response)) - test_response = requests.get(stream.job_manager.base_url) - test_errors = stream.job_manager._job_check_for_errors(test_response) - assert len(test_errors) == expected_len - - def test_get_errors_from_response_invalid_response(auth_config) -> None: expected = "Couldn't check the `response` for `errors`" stream = MetafieldOrders(auth_config) @@ -42,46 +29,29 @@ def test_get_errors_from_response_invalid_response(auth_config) -> None: response.status_code = 404 response.url = "https://example.com/invalid" with pytest.raises(ShopifyBulkExceptions.BulkJobBadResponse) as error: - stream.job_manager._job_check_for_errors(response) + stream.job_manager._job_healthcheck(response) assert expected in repr(error.value) -@pytest.mark.parametrize( - "bulk_job_response, expected", - [ - ("bulk_error_with_concurrent_job", True), - ("bulk_successful_response", False), - ("bulk_error", False), - ], -) -def test_has_running_concurrent_job(request, requests_mock, bulk_job_response, auth_config, expected) -> None: +def test_retry_on_concurrent_job(request, requests_mock, auth_config) -> None: stream = MetafieldOrders(auth_config) - requests_mock.get(stream.job_manager.base_url, json=request.getfixturevalue(bulk_job_response)) - test_response = requests.get(stream.job_manager.base_url) - test_errors = stream.job_manager._job_check_for_errors(test_response) - assert stream.job_manager._has_running_concurrent_job(test_errors) == expected - - -@pytest.mark.parametrize( - "bulk_job_response, expected", - [ - ("bulk_successful_response", "gid://shopify/BulkOperation/4046733967549"), - ("bulk_successful_response_with_no_id", None), - ], -) -def test_job_process_created(request, requests_mock, bulk_job_response, auth_config, expected) -> None: - stream = MetafieldOrders(auth_config) - requests_mock.get(stream.job_manager.base_url, json=request.getfixturevalue(bulk_job_response)) + stream.job_manager._concurrent_interval = 0 + # mocking responses + requests_mock.get( + stream.job_manager.base_url, + [ + # concurrent request is running (3 - retries) + {"json": request.getfixturevalue("bulk_error_with_concurrent_job")}, + {"json": request.getfixturevalue("bulk_error_with_concurrent_job")}, + {"json": request.getfixturevalue("bulk_error_with_concurrent_job")}, + # concurrent request has finished + {"json": request.getfixturevalue("bulk_successful_response")}, + ]) + test_response = requests.get(stream.job_manager.base_url) - # process the job with id (typically CREATED one) - stream.job_manager.job_process_created(test_response) - assert stream.job_manager._job_id == expected - - -def test_job_state_completed(auth_config) -> None: - stream = MetafieldOrders(auth_config) - stream.job_manager._job_state = ShopifyBulkJobStatus.COMPLETED.value - assert stream.job_manager._job_completed() == True + stream.job_manager._job_healthcheck(test_response) + # call count should be 4 (3 retries, 1 - succeeded) + assert requests_mock.call_count == 4 @pytest.mark.parametrize( @@ -122,6 +92,28 @@ def test_job_retry_on_concurrency(request, requests_mock, bulk_job_response, con assert requests_mock.call_count == 2 +@pytest.mark.parametrize( + "bulk_job_response, expected", + [ + ("bulk_successful_response", "gid://shopify/BulkOperation/4046733967549"), + ("bulk_successful_response_with_no_id", None), + ], +) +def test_job_process_created(request, requests_mock, bulk_job_response, auth_config, expected) -> None: + stream = MetafieldOrders(auth_config) + requests_mock.get(stream.job_manager.base_url, json=request.getfixturevalue(bulk_job_response)) + test_response = requests.get(stream.job_manager.base_url) + # process the job with id (typically CREATED one) + stream.job_manager.job_process_created(test_response) + assert stream.job_manager._job_id == expected + + +def test_job_state_completed(auth_config) -> None: + stream = MetafieldOrders(auth_config) + stream.job_manager._job_state = ShopifyBulkJobStatus.COMPLETED.value + assert stream.job_manager._job_completed() == True + + @pytest.mark.parametrize( "job_response, error_type, expected", [ @@ -160,17 +152,30 @@ def test_job_check_for_completion(mocker, request, requests_mock, job_response, @pytest.mark.parametrize( - "job_response, error_type, max_retry, expected_msg, call_count_expected", + "job_response, job_state, error_type, max_retry, expected_msg, call_count_expected", [ + # No retry - dead end + ( + "bulk_successful_response_with_errors", + False, + ShopifyBulkExceptions.BulkJobNonHandableError, + 2, + "Non-handable error occured", + 1, + ), + # Should be retried ( - "bulk_successful_response_with_errors", - ShopifyBulkExceptions.BulkJobUnknownError, + "bulk_successful_response_with_errors", + True, + ShopifyBulkExceptions.BulkJobError, 2, "Could not validate the status of the BULK Job", 3, ), + # Should be retried ( None, + False, ShopifyBulkExceptions.BulkJobBadResponse, 1, "Couldn't check the `response` for `errors`", @@ -178,17 +183,22 @@ def test_job_check_for_completion(mocker, request, requests_mock, job_response, ), ], ids=[ - "BulkJobUnknownError", + "BulkJobNonHandableError", + "BulkJobError", "BulkJobBadResponse", ], ) -def test_retry_on_job_exception(mocker, request, requests_mock, job_response, auth_config, error_type, max_retry, call_count_expected, expected_msg) -> None: +def test_retry_on_job_exception(mocker, request, requests_mock, job_response, auth_config, job_state, error_type, max_retry, call_count_expected, expected_msg) -> None: stream = MetafieldOrders(auth_config) stream.job_manager._job_backoff_time = 0 stream.job_manager._job_max_retries = max_retry # patching the method to get the right ID checks if job_response: stream.job_manager._job_id = request.getfixturevalue(job_response).get("data", {}).get("node", {}).get("id") + + if job_state: + # setting job_state to simulate the error-in-the-middle + stream.job_manager._job_state = request.getfixturevalue(job_response).get("data", {}).get("node", {}).get("status") # mocking the response for STATUS CHECKS json_mock_response = request.getfixturevalue(job_response) if job_response else None @@ -237,7 +247,6 @@ def test_job_check_with_running_scenario(request, requests_mock, job_response, a assert stream.job_manager._job_state == expected - def test_job_read_file_invalid_filename(mocker, auth_config) -> None: stream = MetafieldOrders(auth_config) expected = "An error occured while producing records from BULK Job result" @@ -260,6 +269,9 @@ def test_job_read_file_invalid_filename(mocker, auth_config) -> None: (TransactionsGraphql, "transactions_jsonl_content_example", "transactions_response_expected_result"), (InventoryItems, "inventory_items_jsonl_content_example", "inventory_items_response_expected_result"), (InventoryLevels, "inventory_levels_jsonl_content_example", "inventory_levels_response_expected_result"), + (Products, "products_jsonl_content_example", "products_response_expected_result"), + (ProductImages, "product_images_jsonl_content_example", "product_images_response_expected_result"), + (ProductVariants, "product_variants_jsonl_content_example", "product_variants_response_expected_result"), ], ids=[ "CustomerAddress", @@ -270,6 +282,9 @@ def test_job_read_file_invalid_filename(mocker, auth_config) -> None: "TransactionsGraphql", "InventoryItems", "InventoryLevels", + "Products", + "ProductImages", + "ProductVariants", ], ) def test_bulk_stream_parse_response( @@ -327,7 +342,7 @@ def test_stream_slices( auth_config["start_date"] = "2020-01-01" stream = stream(auth_config) - stream.job_manager._job_size = 1000 + stream.job_manager.job_size = 1000 test_result = list(stream.stream_slices(stream_state=stream_state)) test_query_from_slice = test_result[0].get("query") assert expected in test_query_from_slice @@ -366,11 +381,11 @@ def test_expand_stream_slices_job_size( # for the sake of simplicity we fake some parts to simulate the `current_job_time_elapsed` # fake current slice interval value - stream.job_manager._job_size = previous_slice_size + stream.job_manager.job_size = previous_slice_size # fake `last job elapsed time` if last_job_elapsed_time: stream.job_manager._job_last_elapsed_time = last_job_elapsed_time # parsing result from completed job list(stream.parse_response(test_bulk_response)) # check the next slice - assert stream.job_manager._job_size == adjusted_slice_size + assert stream.job_manager.job_size == adjusted_slice_size diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/test_deleted_events_stream.py b/airbyte-integrations/connectors/source-shopify/unit_tests/test_deleted_events_stream.py index 126d28b7e66d..8046d2aad928 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/test_deleted_events_stream.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/test_deleted_events_stream.py @@ -6,7 +6,7 @@ import pytest from source_shopify.auth import ShopifyAuthenticator from source_shopify.streams.base_streams import ShopifyDeletedEventsStream -from source_shopify.streams.streams import Products +from source_shopify.streams.streams import CustomCollections @pytest.fixture @@ -19,7 +19,7 @@ def config(basic_config): @pytest.mark.parametrize( "stream,expected_main_path,expected_events_path", [ - (Products, "products.json", "events.json"), + (CustomCollections, "custom_collections.json", "events.json"), ], ) def test_path(stream, expected_main_path, expected_events_path, config) -> None: @@ -33,7 +33,7 @@ def test_path(stream, expected_main_path, expected_events_path, config) -> None: @pytest.mark.parametrize( "stream,expected_events_schema", [ - (Products, {}), + (CustomCollections, {}), ], ) def test_get_json_schema(stream, expected_events_schema, config) -> None: @@ -46,7 +46,7 @@ def test_get_json_schema(stream, expected_events_schema, config) -> None: @pytest.mark.parametrize( "stream,expected_data_field,expected_pk,expected_cursor_field", [ - (Products, "events", "id", "deleted_at"), + (CustomCollections, "events", "id", "deleted_at"), ], ) def test_has_correct_instance_vars(stream, expected_data_field, expected_pk, expected_cursor_field, config) -> None: @@ -59,7 +59,7 @@ def test_has_correct_instance_vars(stream, expected_data_field, expected_pk, exp @pytest.mark.parametrize( "stream,expected", [ - (Products, None), + (CustomCollections, None), ], ) def test_has_no_availability_strategy(stream, expected, config) -> None: @@ -72,13 +72,13 @@ def test_has_no_availability_strategy(stream, expected, config) -> None: "stream,deleted_records_json,expected", [ ( - Products, + CustomCollections, [ { "id": 123, "subject_id": 234, "created_at": "2023-09-05T14:02:00-07:00", - "subject_type": "Product", + "subject_type": "Collection", "verb": "destroy", "arguments": [], "message": "Test Message", @@ -92,7 +92,7 @@ def test_has_no_availability_strategy(stream, expected, config) -> None: "id": 123, "subject_id": 234, "created_at": "2023-09-05T14:02:00-07:00", - "subject_type": "Product", + "subject_type": "Collection", "verb": "destroy", "arguments": [], "message": "Test Message", @@ -116,13 +116,13 @@ def test_read_deleted_records(stream, requests_mock, deleted_records_json, expec "stream,input,expected", [ ( - Products, + CustomCollections, [ { "id": 123, "subject_id": 234, "created_at": "2023-09-05T14:02:00-07:00", - "subject_type": "Product", + "subject_type": "Collection", "verb": "destroy", "arguments": [], "message": "Test Message", @@ -155,23 +155,23 @@ def test_produce_deleted_records_from_events(stream, input, expected, config) -> [ # params with NO STATE ( - Products, + CustomCollections, {}, None, {"limit": 250, "order": "updated_at asc", "updated_at_min": "2020-11-01"}, - {"filter": "Product", "verb": "destroy"}, + {"filter": "Collection", "verb": "destroy"}, ), # params with STATE ( - Products, + CustomCollections, {"updated_at": "2028-01-01", "deleted": {"deleted_at": "2029-01-01"}}, None, {"limit": 250, "order": "updated_at asc", "updated_at_min": "2028-01-01"}, - {"created_at_min": "2029-01-01", "filter": "Product", "verb": "destroy"}, + {"created_at_min": "2029-01-01", "filter": "Collection", "verb": "destroy"}, ), # params with NO STATE but with NEXT_PAGE_TOKEN ( - Products, + CustomCollections, {}, {"page_info": "next_page_token"}, {"limit": 250, "page_info": "next_page_token"}, @@ -188,7 +188,7 @@ def test_request_params(config, stream, stream_state, next_page_token, expected_ @pytest.mark.parametrize( "stream,expected", [ - (Products, ShopifyDeletedEventsStream), + (CustomCollections, ShopifyDeletedEventsStream), ], ) def test_deleted_events_instance(stream, config, expected) -> None: @@ -199,7 +199,7 @@ def test_deleted_events_instance(stream, config, expected) -> None: @pytest.mark.parametrize( "stream,expected", [ - (Products, ""), + (CustomCollections, ""), ], ) def test_default_deleted_state_comparison_value(stream, config, expected) -> None: @@ -212,28 +212,28 @@ def test_default_deleted_state_comparison_value(stream, config, expected) -> Non [ # NO INITIAL STATE ( - Products, + CustomCollections, {"id": 1, "updated_at": "2021-01-01"}, {}, {"updated_at": "2021-01-01", "deleted": {"deleted_at": ""}}, ), # with INITIAL STATE ( - Products, + CustomCollections, {"id": 1, "updated_at": "2022-01-01"}, {"updated_at": "2021-01-01", "deleted": {"deleted_at": ""}}, {"updated_at": "2022-01-01", "deleted": {"deleted_at": ""}}, ), # with NO Last Record value and NO current state value ( - Products, + CustomCollections, {}, {}, {"updated_at": "", "deleted": {"deleted_at": ""}}, ), # with NO Last Record value but with Current state value ( - Products, + CustomCollections, {}, {"updated_at": "2030-01-01", "deleted": {"deleted_at": ""}}, {"updated_at": "2030-01-01", "deleted": {"deleted_at": ""}}, diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/test_source.py b/airbyte-integrations/connectors/source-shopify/unit_tests/test_source.py index 277c969517de..e042f57b60e4 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/test_source.py @@ -8,7 +8,6 @@ import pytest from airbyte_cdk.utils import AirbyteTracedException -from conftest import records_per_slice from source_shopify.auth import ShopifyAuthenticator from source_shopify.source import ConnectionCheckTest, SourceShopify from source_shopify.streams.streams import ( @@ -50,6 +49,8 @@ TransactionsGraphql, ) +from .conftest import records_per_slice + @pytest.fixture def config(basic_config) -> dict: @@ -73,18 +74,18 @@ def config(basic_config) -> dict: (MetafieldProductVariants, None, "graphql.json"), (MetafieldLocations, None, "graphql.json"), (MetafieldCollections, None, "graphql.json"), - # + (Products, None, "graphql.json"), + (ProductImages, None, "graphql.json"), + (ProductVariants, None, "graphql.json"), + # Nested Substreams + (OrderRefunds, None, ""), + # regular streams (MetafieldSmartCollections, {"id": 123}, "smart_collections/123/metafields.json"), (MetafieldPages, {"id": 123}, "pages/123/metafields.json"), (MetafieldShops, None, "metafields.json"), - # Nested Substreams - (ProductImages, None, ""), - (ProductVariants, None, ""), - # (Customers, None, "customers.json"), (Orders, None, "orders.json"), (DraftOrders, None, "draft_orders.json"), - (Products, None, "products.json"), (AbandonedCheckouts, None, "checkouts.json"), (Collects, None, "collects.json"), (TenderTransactions, None, "tender_transactions.json"), @@ -131,13 +132,13 @@ def test_path_with_stream_slice_param(stream, stream_slice, expected_path, confi "stream, parent_records, state_checkpoint_interval", [ ( - ProductImages, + OrderRefunds, [ - {"id": 1, "images": [{"updated_at": "2021-01-01T00:00:00+00:00"}]}, - {"id": 2, "images": [{"updated_at": "2021-02-01T00:00:00+00:00"}]}, - {"id": 3, "images": [{"updated_at": "2021-03-01T00:00:00+00:00"}]}, - {"id": 4, "images": [{"updated_at": "2021-04-01T00:00:00+00:00"}]}, - {"id": 5, "images": [{"updated_at": "2021-05-01T00:00:00+00:00"}]}, + {"id": 1, "refunds": [{"created_at": "2021-01-01T00:00:00+00:00"}]}, + {"id": 2, "refunds": [{"created_at": "2021-02-01T00:00:00+00:00"}]}, + {"id": 3, "refunds": [{"created_at": "2021-03-01T00:00:00+00:00"}]}, + {"id": 4, "refunds": [{"created_at": "2021-04-01T00:00:00+00:00"}]}, + {"id": 5, "refunds": [{"created_at": "2021-05-01T00:00:00+00:00"}]}, ], 2, ), @@ -211,17 +212,17 @@ def test_request_params(config, stream, expected) -> None: "last_record, current_state, expected", [ # no init state - ({"created_at": "2022-10-10T06:21:53-07:00"}, {}, {"created_at": "2022-10-10T06:21:53-07:00", "orders": None}), + ({"created_at": "2022-10-10T06:21:53-07:00"}, {}, {"created_at": "2022-10-10T06:21:53-07:00", "orders": {"updated_at": "", "deleted": {"deleted_at": ""}}}), # state is empty str - ({"created_at": "2022-10-10T06:21:53-07:00"}, {"created_at": ""}, {"created_at": "2022-10-10T06:21:53-07:00", "orders": None}), + ({"created_at": "2022-10-10T06:21:53-07:00"}, {"created_at": ""}, {"created_at": "2022-10-10T06:21:53-07:00", "orders": {"updated_at": "", "deleted": {"deleted_at": ""}}}), # state is None - ({"created_at": "2022-10-10T06:21:53-07:00"}, {"created_at": None}, {"created_at": "2022-10-10T06:21:53-07:00", "orders": None}), + ({"created_at": "2022-10-10T06:21:53-07:00"}, {"created_at": None}, {"created_at": "2022-10-10T06:21:53-07:00", "orders": {"updated_at": "", "deleted": {"deleted_at": ""}}}), # last rec cursor is None - ({"created_at": None}, {"created_at": None}, {"created_at": "", "orders": None}), + ({"created_at": None}, {"created_at": None}, {"created_at": "", "orders": {"updated_at": "", "deleted": {"deleted_at": ""}}}), # last rec cursor is empty str - ({"created_at": ""}, {"created_at": "null"}, {"created_at": "null", "orders": None}), + ({"created_at": ""}, {"created_at": "null"}, {"created_at": "null", "orders": {"updated_at": "", "deleted": {"deleted_at": ""}}}), # no values at all - ({}, {}, {"created_at": "", "orders": None}), + ({}, {}, {"created_at": "", "orders": {"updated_at": "", "deleted": {"deleted_at": ""}}}), ], ids=[ "no init state", diff --git a/docs/integrations/sources/shopify-migrations.md b/docs/integrations/sources/shopify-migrations.md index 699d4be8fb4c..731ad1e295c8 100644 --- a/docs/integrations/sources/shopify-migrations.md +++ b/docs/integrations/sources/shopify-migrations.md @@ -1,5 +1,27 @@ # Shopify Migration Guide +## Upgrading to 2.1.0 +This version implements `Shopify GraphQL BULK Operations` to speed up the following streams: + - `Products` + - `Product Images` + - `Product Variants` + +* In the `Products` stream, the `published_scope` property is no longer available. +* In the `Products` stream, the `images` property now contains only the `id` of the image. Refer to the `Product Images` stream instead. +* In the `Products` stream, the `variants` property now contains only the `id` of the variant. Refer to the `Product Variants` stream instead. +* In the `Products` stream, the `position` property is no longer available. +* The `Product Variants` stream now has the cursor field `updated_at` instead of `id`. +* In the `Product Variants` stream, the date-time fields, such as `created_at` and `updated_at`, now use `UTC` format without a timezone component. +* In the `Product Variants` stream, the `presentment_prices.compare_at_price` property has changed from a `number` to an `object of strings`. This field was not populated in the `REST API` stream version, but it is correctly covered in the GraphQL stream version. +* The `Product Variants` stream's `inventory_policy` and `inventory_management` properties now contain `uppercase string` values, instead of `lowercase`. +* In the `Product Images` stream, the date-time fields, such as `created_at` and `updated_at`, now use `UTC` format without a timezone component. +* In the `Product Images` stream, the `variant_ids` and `position` properties are no longer available. Refer to the `Product variants` stream instead. +* Retrieving the `deleted` records for `Products`, `Product Images` and `Product Variants` streams are no longer available, due to the `GraphQL` limitations. + +### Action items required for 2.1.0 +- `Refresh Schema` + `Reset` is required for this stream after the upgrade from previous version. + + ## Upgrading to 2.0.0 This version implements `Shopify GraphQL BULK Operations` to speed up the following streams: diff --git a/docs/integrations/sources/shopify.md b/docs/integrations/sources/shopify.md index 92dfca573f47..2163933c7bbb 100644 --- a/docs/integrations/sources/shopify.md +++ b/docs/integrations/sources/shopify.md @@ -208,6 +208,7 @@ For all `Shopify GraphQL BULK` api requests these limitations are applied: https | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 2.1.0 | 2024-05-02 | [37767](https://github.com/airbytehq/airbyte/pull/37767) | Migrated `Products`, `Product Images` and `Product Variants` to `GraphQL BULK` | | 2.0.8 | 2024-05-02 | [37589](https://github.com/airbytehq/airbyte/pull/37589) | Added retry for known HTTP Errors for BULK streams | | 2.0.7 | 2024-04-24 | [36660](https://github.com/airbytehq/airbyte/pull/36660) | Schema descriptions | | 2.0.6 | 2024-04-22 | [37468](https://github.com/airbytehq/airbyte/pull/37468) | Fixed one time retry for `Internal Server Error` for BULK streams |