diff --git a/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml b/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml index 5473d1c9d319..d9e2dda2a64f 100644 --- a/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-shopify/acceptance-test-config.yml @@ -4,6 +4,11 @@ acceptance_tests: spec: tests: - spec_path: "source_shopify/spec.json" + backward_compatibility_tests_config: + # This is the intentional change. + # Added new fields: `job_checkpoint_interval`, `job_product_variants_include_pres_prices` + # to provide the ability to override this value by the User. + disable_for_version: 2.4.14 connection: tests: - config_path: "secrets/config.json" diff --git a/airbyte-integrations/connectors/source-shopify/metadata.yaml b/airbyte-integrations/connectors/source-shopify/metadata.yaml index 70b8565fe837..8c12e5835ff6 100644 --- a/airbyte-integrations/connectors/source-shopify/metadata.yaml +++ b/airbyte-integrations/connectors/source-shopify/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: api connectorType: source definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9 - dockerImageTag: 2.4.15 + dockerImageTag: 2.4.16 dockerRepository: airbyte/source-shopify documentationUrl: https://docs.airbyte.com/integrations/sources/shopify githubIssueLabel: source-shopify diff --git a/airbyte-integrations/connectors/source-shopify/poetry.lock b/airbyte-integrations/connectors/source-shopify/poetry.lock index 55cd6bd9d05a..7d41629ddd2b 100644 --- a/airbyte-integrations/connectors/source-shopify/poetry.lock +++ b/airbyte-integrations/connectors/source-shopify/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "airbyte-cdk" diff --git a/airbyte-integrations/connectors/source-shopify/pyproject.toml b/airbyte-integrations/connectors/source-shopify/pyproject.toml index d8189f715b1d..c6107064ff48 100644 --- a/airbyte-integrations/connectors/source-shopify/pyproject.toml +++ b/airbyte-integrations/connectors/source-shopify/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "2.4.15" +version = "2.4.16" name = "source-shopify" description = "Source CDK implementation for Shopify." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py index 3dcc00d14e52..d666d86a40d6 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py @@ -52,6 +52,11 @@ class BulkJobCreationFailedConcurrentError(BaseBulkException): failure_type: FailureType = FailureType.transient_error + class BulkJobRedirectToOtherShopError(BaseBulkException): + """Raised when the response contains another shop name""" + + failure_type: FailureType = FailureType.transient_error + class BulkJobConcurrentError(BaseBulkException): """Raised when failing the job after hitting too many BulkJobCreationFailedConcurrentError.""" diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py index cf117b97d73e..e1e620bd4959 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py @@ -15,9 +15,9 @@ from source_shopify.utils import ApiTypeEnum from source_shopify.utils import ShopifyRateLimiter as limiter -from ...http_request import ShopifyErrorHandler from .exceptions import AirbyteTracedException, ShopifyBulkExceptions from .query import ShopifyBulkQuery, ShopifyBulkTemplates +from .record import ShopifyBulkRecord from .retry import bulk_retry_on_exception from .status import ShopifyBulkJobStatus from .tools import END_OF_FILE, BulkTools @@ -25,12 +25,12 @@ @dataclass class ShopifyBulkManager: - session: requests.Session + http_client: HttpClient base_url: str - stream_name: str query: ShopifyBulkQuery job_termination_threshold: float job_size: float + job_checkpoint_interval: int # default logger logger: Final[logging.Logger] = logging.getLogger("airbyte") @@ -66,8 +66,11 @@ class ShopifyBulkManager: # 0.1 ~= P2H, default value, lower boundary for slice size _job_size_min: Final[float] = 0.1 - # P365D, upper boundary for slice size - _job_size_max: Final[float] = 365.0 + + # last running job object count + _job_last_rec_count: int = field(init=False, default=0) + # the flag to adjust the next slice from the checkpointed cursor vaue + _job_adjust_slice_from_checkpoint: bool = field(init=False, default=False) # expand slice factor _job_size_expand_factor: int = field(init=False, default=2) @@ -79,14 +82,19 @@ class ShopifyBulkManager: # 2 sec is set as default value to cover the case with the empty-fast-completed jobs _job_last_elapsed_time: float = field(init=False, default=2.0) - def __post_init__(self): - self._http_client = HttpClient(self.stream_name, self.logger, ShopifyErrorHandler(), session=self.session) + def __post_init__(self) -> None: self._job_size = self.job_size + # The upper boundary for slice size is limited by the value from the config, default value is `P30D` + self._job_size_max = self.job_size # Each job ideally should be executed within the specified time (in sec), # to maximize the performance for multi-connection syncs and control the bulk job size within +- 1 hours (3600 sec), # Ideally the source will balance on it's own rate, based on the time taken to return the data for the slice. # This behaviour could be overidden by providing the `BULK Job termination threshold` option in the `config`. self._job_max_elapsed_time = self.job_termination_threshold + # how many records should be collected before we use the checkpoining + self._job_checkpoint_interval = self.job_checkpoint_interval + # define Record Producer instance + self.record_producer: ShopifyBulkRecord = ShopifyBulkRecord(self.query) @property def _tools(self) -> BulkTools: @@ -143,6 +151,21 @@ def _is_long_running_job(self) -> bool: self._job_should_revert_slice = False return False + @property + def _supports_checkpointing(self) -> bool: + """ + The flag to determine whether or not the BULK Stream supports the `BULK checkpointing`. + """ + return self.query.supports_checkpointing + + @property + def _job_should_checkpoint(self) -> bool: + return self._supports_checkpointing and self._job_last_rec_count >= self._job_checkpoint_interval + + @property + def _job_any_lines_collected(self) -> bool: + return self._job_last_rec_count > 0 + def _expand_job_size(self) -> None: self._job_size += self._job_size_adjusted_expand_factor @@ -176,6 +199,16 @@ def __reset_state(self) -> None: self._job_self_canceled = False # set the running job message counter to default self._log_job_msg_count = 0 + # set the running job object count to default + self._job_last_rec_count = 0 + + def _set_checkpointing(self) -> None: + # set the flag to adjust the next slice from the checkpointed cursor value + self._job_adjust_slice_from_checkpoint = True + + def _reset_checkpointing(self) -> None: + # reseting the checkpoint flag, if bulk job has completed normally + self._job_adjust_slice_from_checkpoint = False def _job_completed(self) -> bool: return self._job_state == ShopifyBulkJobStatus.COMPLETED.value @@ -184,7 +217,7 @@ def _job_canceled(self) -> bool: return self._job_state == ShopifyBulkJobStatus.CANCELED.value def _job_cancel(self) -> None: - _, canceled_response = self._http_client.send_request( + _, canceled_response = self.http_client.send_request( http_method="POST", url=self.base_url, data=ShopifyBulkTemplates.cancel(self._job_id), @@ -206,11 +239,14 @@ def _log_job_state_with_count(self) -> None: self._log_job_msg_count += 1 else: message = f"Elapsed time: {self._job_elapsed_time_in_state} sec" + if self._job_last_rec_count > 0: + count_message = f". Rows collected: {self._job_last_rec_count}" + message = message + count_message self._log_state(message) self._log_job_msg_count = 0 def _log_state(self, message: Optional[str] = None) -> None: - pattern = f"Stream: `{self.stream_name}`, the BULK Job: `{self._job_id}` is {self._job_state}" + pattern = f"Stream: `{self.http_client.name}`, the BULK Job: `{self._job_id}` is {self._job_state}" if message: self.logger.info(f"{pattern}. {message}.") else: @@ -218,11 +254,12 @@ def _log_state(self, message: Optional[str] = None) -> None: def _job_get_result(self, response: Optional[requests.Response] = None) -> Optional[str]: parsed_response = response.json().get("data", {}).get("node", {}) if response else None - job_result_url = parsed_response.get("url") if parsed_response and not self._job_self_canceled else None + # get `complete` or `partial` result from collected Bulk Job results + job_result_url = parsed_response.get("url", parsed_response.get("partialDataUrl")) if parsed_response else None if job_result_url: # save to local file using chunks to avoid OOM filename = self._tools.filename_from_url(job_result_url) - _, response = self._http_client.send_request(http_method="GET", url=job_result_url, request_kwargs={"stream": True}) + _, response = self.http_client.send_request(http_method="GET", url=job_result_url, request_kwargs={"stream": True}) response.raise_for_status() with open(filename, "wb") as file: for chunk in response.iter_content(chunk_size=self._retrieve_chunk_size): @@ -231,11 +268,24 @@ def _job_get_result(self, response: Optional[requests.Response] = None) -> Optio file.write(END_OF_FILE.encode()) return filename + def _job_get_checkpointed_result(self, response: Optional[requests.Response]) -> None: + if self._job_any_lines_collected or self._job_should_checkpoint: + # set the flag to adjust the next slice from the checkpointed cursor value + self._set_checkpointing() + # fetch the collected records from CANCELED Job on checkpointing + self._job_result_filename = self._job_get_result(response) + def _job_update_state(self, response: Optional[requests.Response] = None) -> None: if response: self._job_state = response.json().get("data", {}).get("node", {}).get("status") - if self._job_state in [ShopifyBulkJobStatus.RUNNING.value, ShopifyBulkJobStatus.CANCELING.value]: + self._job_last_rec_count = int(response.json().get("data", {}).get("node", {}).get("objectCount", 0)) + + if self._job_state == ShopifyBulkJobStatus.RUNNING.value: self._log_job_state_with_count() + elif self._job_state in [ShopifyBulkJobStatus.CANCELED.value, ShopifyBulkJobStatus.CANCELING.value]: + # do not emit `CANCELED / CANCELING` Bulk Job status, while checkpointing + if not self._job_should_checkpoint: + self._log_job_state_with_count() else: self._log_state() @@ -245,19 +295,30 @@ def _on_created_job(self, **kwargs) -> None: def _on_canceled_job(self, response: requests.Response) -> Optional[AirbyteTracedException]: if not self._job_self_canceled: raise ShopifyBulkExceptions.BulkJobCanceled( - f"The BULK Job: `{self._job_id}` exited with {self._job_state}, details: {response.text}", + f"The BULK Job: `{self._job_id}` exited with {self._job_state}, details: {response.text}" ) + else: + self._job_get_checkpointed_result(response) def _on_canceling_job(self, **kwargs) -> None: sleep(self._job_check_interval) + def _cancel_on_long_running_job(self) -> None: + self.logger.info( + f"Stream: `{self.http_client.name}` the BULK Job: {self._job_id} runs longer than expected ({self._job_max_elapsed_time} sec). Retry with the reduced `Slice Size` after self-cancelation." + ) + self._job_cancel() + + def _cancel_on_checkpointing(self) -> None: + self.logger.info(f"Stream: `{self.http_client.name}`, checkpointing after >= `{self._job_checkpoint_interval}` rows collected.") + # set the flag to adjust the next slice from the checkpointed cursor value + self._job_cancel() + def _on_running_job(self, **kwargs) -> None: if self._is_long_running_job: - self.logger.info( - f"Stream: `{self.stream_name}` the BULK Job: {self._job_id} runs longer than expected ({self._job_max_elapsed_time} sec). Retry with the reduced `Slice Size` after self-cancelation." - ) - # cancel the long-running bulk job - self._job_cancel() + self._cancel_on_long_running_job() + elif self._job_should_checkpoint: + self._cancel_on_checkpointing() else: sleep(self._job_check_interval) @@ -265,9 +326,14 @@ def _on_completed_job(self, response: Optional[requests.Response] = None) -> Non self._job_result_filename = self._job_get_result(response) def _on_failed_job(self, response: requests.Response) -> AirbyteTracedException: - raise ShopifyBulkExceptions.BulkJobFailed( - f"The BULK Job: `{self._job_id}` exited with {self._job_state}, details: {response.text}", - ) + if not self._job_any_lines_collected: + raise ShopifyBulkExceptions.BulkJobFailed( + f"The BULK Job: `{self._job_id}` exited with {self._job_state}, details: {response.text}", + ) + else: + # when the Bulk Job fails, usually there is a `partialDataUrl` available, + # we leverage the checkpointing in this case + self._job_get_checkpointed_result(response) def _on_timeout_job(self, **kwargs) -> AirbyteTracedException: raise ShopifyBulkExceptions.BulkJobTimout( @@ -283,14 +349,19 @@ def _on_job_with_errors(self, errors: List[Mapping[str, Any]]) -> AirbyteTracedE raise ShopifyBulkExceptions.BulkJobError(f"Could not validate the status of the BULK Job `{self._job_id}`. Errors: {errors}.") def _on_non_handable_job_error(self, errors: List[Mapping[str, Any]]) -> AirbyteTracedException: - raise ShopifyBulkExceptions.BulkJobNonHandableError(f"The Stream: `{self.stream_name}`, Non-handable error occured: {errors}") + raise ShopifyBulkExceptions.BulkJobNonHandableError(f"The Stream: `{self.http_client.name}`, Non-handable error occured: {errors}") + + def _get_server_errors(self, response: requests.Response) -> List[Optional[Mapping[str, Any]]]: + server_errors = response.json().get("errors", []) + return [server_errors] if isinstance(server_errors, str) else server_errors + + def _get_user_errors(self, response: requests.Response) -> List[Optional[Mapping[str, Any]]]: + user_errors = response.json().get("data", {}).get("bulkOperationRunQuery", {}).get("userErrors", []) + return [user_errors] if isinstance(user_errors, str) else user_errors - def _collect_bulk_errors(self, response: requests.Response) -> List[Optional[dict]]: + def _collect_bulk_errors(self, response: requests.Response) -> List[Optional[Mapping[str, Any]]]: try: - server_errors = response.json().get("errors", []) - user_errors = response.json().get("data", {}).get("bulkOperationRunQuery", {}).get("userErrors", []) - errors = server_errors + user_errors - return errors + return self._get_server_errors(response) + self._get_user_errors(response) except (Exception, JSONDecodeError) as e: raise ShopifyBulkExceptions.BulkJobBadResponse( f"Couldn't check the `response` for `errors`, status: {response.status_code}, response: `{response.text}`. Trace: {repr(e)}." @@ -303,7 +374,7 @@ def _job_healthcheck(self, response: requests.Response) -> Optional[Exception]: self._on_job_with_errors(errors) def _job_track_running(self) -> None: - _, response = self._http_client.send_request( + _, response = self.http_client.send_request( http_method="POST", url=self.base_url, data=ShopifyBulkTemplates.status(self._job_id), @@ -311,7 +382,6 @@ def _job_track_running(self) -> None: request_kwargs={}, ) self._job_healthcheck(response) - self._job_update_state(response) self._job_state_to_fn_map.get(self._job_state)(response=response) @@ -339,6 +409,26 @@ def _has_running_concurrent_job(self, errors: Optional[Iterable[Mapping[str, Any def _has_reached_max_concurrency(self) -> bool: return self._concurrent_attempt == self._concurrent_max_retry + def _should_switch_shop_name(self, response: requests.Response) -> bool: + """ + Sometimes the API returns the redirected response that points to the same Store but with different Name: + >> case: + -- The user inputs the `shop name` as "A": + while attempting to place the BULK Job + -- The response contains the redirected results to the `shop name` as "B", like: + response.url == "https://B.myshopify.com" + + This redirection is related to: + 1) `aliased` or `hidden` store names from being exposed + 2) `migrated` store data to the `new store`, but referenced within the old one stil + + reference issue: https://github.com/airbytehq/oncall/issues/5866 + """ + if self.base_url != response.url: + self.base_url = response.url + return True + return False + @bulk_retry_on_exception(logger) def _job_check_state(self) -> None: while not self._job_completed(): @@ -354,7 +444,7 @@ def create_job(self, stream_slice: Mapping[str, str], filter_field: str) -> None else: query = self.query.get() - _, response = self._http_client.send_request( + _, response = self.http_client.send_request( http_method="POST", url=self.base_url, json={"query": ShopifyBulkTemplates.prepare(query)}, @@ -365,7 +455,10 @@ def create_job(self, stream_slice: Mapping[str, str], filter_field: str) -> None if self._has_running_concurrent_job(errors): # when the concurrent job takes place, another job could not be created # we typically need to wait and retry, but no longer than 10 min. (see retry in `bulk_retry_on_exception`) - raise ShopifyBulkExceptions.BulkJobCreationFailedConcurrentError(f"Failed to create job for stream {self.stream_name}") + raise ShopifyBulkExceptions.BulkJobCreationFailedConcurrentError(f"Failed to create job for stream {self.http_client.name}") + elif self._should_switch_shop_name(response): + # assign new shop name, since the one that specified in `config` was redirected to the different one. + raise ShopifyBulkExceptions.BulkJobRedirectToOtherShopError(f"Switching the `store` name, redirected to: {response.url}") else: # There were no concurrent error for this job so even if there were other errors, we can reset this self._concurrent_attempt = 0 @@ -384,7 +477,7 @@ def _job_process_created(self, response: requests.Response) -> None: self._job_id = bulk_response.get("id") self._job_created_at = bulk_response.get("createdAt") self._job_state = ShopifyBulkJobStatus.CREATED.value - self.logger.info(f"Stream: `{self.stream_name}`, the BULK Job: `{self._job_id}` is {ShopifyBulkJobStatus.CREATED.value}") + self.logger.info(f"Stream: `{self.http_client.name}`, the BULK Job: `{self._job_id}` is {ShopifyBulkJobStatus.CREATED.value}") def job_size_normalize(self, start: datetime, end: datetime) -> datetime: # adjust slice size when it's bigger than the loop point when it should end, @@ -396,15 +489,43 @@ def get_adjusted_job_start(self, slice_start: datetime) -> datetime: step = self._job_size if self._job_size else self._job_size_min return slice_start.add(days=step) - def get_adjusted_job_end(self, slice_start: datetime, slice_end: datetime) -> datetime: + def _adjust_slice_end(self, slice_end: datetime, checkpointed_cursor: Optional[str] = None) -> datetime: + """ + Choose between the existing `slice_end` value or `checkpointed_cursor` value, if provided. + """ + return pdm.parse(checkpointed_cursor) if checkpointed_cursor else slice_end + + def get_adjusted_job_end(self, slice_start: datetime, slice_end: datetime, checkpointed_cursor: Optional[str] = None) -> datetime: + if self._job_adjust_slice_from_checkpoint: + # set the checkpointing to default, before the next slice is emitted, to avoid inf.loop + self._reset_checkpointing() + return self._adjust_slice_end(slice_end, checkpointed_cursor) + if self._is_long_running_job: self._job_size_reduce_next() return slice_start + + return slice_end + + def _emit_final_job_message(self, job_current_elapsed_time: int) -> None: + final_message = f"Stream: `{self.http_client.name}`, the BULK Job: `{self._job_id}` time elapsed: {job_current_elapsed_time} sec." + + if self._job_any_lines_collected: + lines_collected_message = f" Rows collected: {self._job_last_rec_count} --> records: `{self.record_producer.record_composed}`." + final_message = final_message + lines_collected_message + + # emit final Bulk job status message + self.logger.info(f"{final_message}") + + def _process_bulk_results(self) -> Iterable[Mapping[str, Any]]: + if self._job_result_filename: + # produce records from saved bulk job result + yield from self.record_producer.read_file(self._job_result_filename) else: - return slice_end + yield from [] @limiter.balance_rate_limit(api_type=ApiTypeEnum.graphql.value) - def job_check_for_completion(self) -> Optional[str]: + def job_get_results(self) -> Optional[Iterable[Mapping[str, Any]]]: """ This method checks the status for the `CREATED` Shopify BULK Job, using it's `ID`. The time spent for the Job execution is tracked to understand the effort. @@ -414,7 +535,7 @@ def job_check_for_completion(self) -> Optional[str]: try: # track created job until it's COMPLETED self._job_check_state() - return self._job_result_filename + yield from self._process_bulk_results() except ( ShopifyBulkExceptions.BulkJobFailed, ShopifyBulkExceptions.BulkJobTimout, @@ -426,7 +547,8 @@ def job_check_for_completion(self) -> Optional[str]: raise bulk_job_error finally: job_current_elapsed_time = round((time() - job_started), 3) - self.logger.info(f"Stream: `{self.stream_name}`, the BULK Job: `{self._job_id}` time elapsed: {job_current_elapsed_time} sec.") + # emit the final Bulk Job log message + self._emit_final_job_message(job_current_elapsed_time) # check whether or not we should expand or reduce the size of the slice self.__adjust_job_size(job_current_elapsed_time) # reset the state for COMPLETED job diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py index 777e716a29e4..f31e4260b26e 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/query.py @@ -79,7 +79,11 @@ def prepare(query: str) -> str: @dataclass class ShopifyBulkQuery: - shop_id: int + config: Mapping[str, Any] + + @property + def shop_id(self) -> int: + return self.config.get("shop_id") @property def tools(self) -> BulkTools: @@ -112,6 +116,14 @@ def sort_key(self) -> Optional[str]: """ return None + @property + def supports_checkpointing(self) -> bool: + """ + The presence of `sort_key = "UPDATED_AT"` for a query instance, usually means, + the server-side BULK Job results are fetched and ordered correctly, suitable for checkpointing. + """ + return self.sort_key == "UPDATED_AT" + @property def query_nodes(self) -> Optional[Union[List[Field], List[str]]]: """ @@ -2382,8 +2394,7 @@ class ProductVariant(ShopifyBulkQuery): """ { productVariants( - query: "updated_at:>='2019-04-13T00:00:00+00:00' AND updated_at:<='2024-04-30T12:16:17.273363+00:00'" - sortKey: UPDATED_AT + query: "updatedAt:>='2019-04-13T00:00:00+00:00' AND updatedAt:<='2024-04-30T12:16:17.273363+00:00'" ) { edges { node { @@ -2457,64 +2468,76 @@ class ProductVariant(ShopifyBulkQuery): """ query_name = "productVariants" - sort_key = "ID" - prices_fields: List[str] = ["amount", "currencyCode"] - presentment_prices_fields: List[Field] = [ - Field( - name="edges", - fields=[ - Field( - name="node", - fields=["__typename", Field(name="price", fields=prices_fields), Field(name="compareAtPrice", fields=prices_fields)], - ) - ], - ) - ] + @property + def _should_include_presentment_prices(self) -> bool: + return self.config.get("job_product_variants_include_pres_prices", True) - option_value_fields: List[Field] = [ - "id", - "name", - Field(name="hasVariants", alias="has_variants"), - Field(name="swatch", fields=["color", Field(name="image", fields=["id"])]), - ] - option_fields: List[Field] = [ - "name", - "value", - Field(name="optionValue", alias="option_value", fields=option_value_fields), - ] + @property + def query_nodes(self) -> Optional[Union[List[Field], List[str]]]: - # main query - query_nodes: List[Field] = [ - "__typename", - "id", - "title", - "price", - "sku", - "position", - "inventoryPolicy", - "compareAtPrice", - "inventoryManagement", - "createdAt", - "updatedAt", - "taxable", - "barcode", - "weight", - "weightUnit", - "inventoryQuantity", - "requiresShipping", - "availableForSale", - "displayName", - "taxCode", - Field(name="selectedOptions", alias="options", fields=option_fields), - Field(name="weight", alias="grams"), - Field(name="image", fields=[Field(name="id", alias="image_id")]), - Field(name="inventoryQuantity", alias="old_inventory_quantity"), - Field(name="product", fields=[Field(name="id", alias="product_id")]), - Field(name="fulfillmentService", fields=[Field(name="handle", alias="fulfillment_service")]), - Field(name="inventoryItem", fields=[Field(name="id", alias="inventory_item_id")]), - Field(name="presentmentPrices", fields=presentment_prices_fields), - ] + prices_fields: List[str] = ["amount", "currencyCode"] + presentment_prices_fields: List[Field] = [ + Field( + name="edges", + fields=[ + Field( + name="node", + fields=[ + "__typename", + Field(name="price", fields=prices_fields), + Field(name="compareAtPrice", fields=prices_fields), + ], + ) + ], + ) + ] + option_value_fields: List[Field] = [ + "id", + "name", + Field(name="hasVariants", alias="has_variants"), + Field(name="swatch", fields=["color", Field(name="image", fields=["id"])]), + ] + option_fields: List[Field] = [ + "name", + "value", + Field(name="optionValue", alias="option_value", fields=option_value_fields), + ] + presentment_prices = ( + [Field(name="presentmentPrices", fields=presentment_prices_fields)] if self._should_include_presentment_prices else [] + ) + + query_nodes: List[Field] = [ + "__typename", + "id", + "title", + "price", + "sku", + "position", + "inventoryPolicy", + "compareAtPrice", + "inventoryManagement", + "createdAt", + "updatedAt", + "taxable", + "barcode", + "weight", + "weightUnit", + "inventoryQuantity", + "requiresShipping", + "availableForSale", + "displayName", + "taxCode", + Field(name="selectedOptions", alias="options", fields=option_fields), + Field(name="weight", alias="grams"), + Field(name="image", fields=[Field(name="id", alias="image_id")]), + Field(name="inventoryQuantity", alias="old_inventory_quantity"), + Field(name="product", fields=[Field(name="id", alias="product_id")]), + Field(name="fulfillmentService", fields=[Field(name="handle", alias="fulfillment_service")]), + Field(name="inventoryItem", fields=[Field(name="id", alias="inventory_item_id")]), + ] + presentment_prices + + return query_nodes record_composition = { "new_record": "ProductVariant", diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/record.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/record.py index 27641669d942..acbfd472942b 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/record.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/record.py @@ -29,6 +29,8 @@ def __post_init__(self) -> None: self.composition: Optional[Mapping[str, Any]] = self.query.record_composition self.record_process_components: Optional[Callable[[MutableMapping], MutableMapping]] = self.query.record_process_components self.components: List[str] = self.composition.get("record_components", []) if self.composition else [] + # how many records composed + self.record_composed: int = 0 @property def tools(self) -> BulkTools: @@ -127,8 +129,12 @@ def produce_records(self, filename: str) -> Iterable[MutableMapping[str, Any]]: """ with open(filename, "r") as jsonl_file: + # reset the counter + self.record_composed = 0 + for record in self.process_line(jsonl_file): yield self.tools.fields_names_to_snake_case(record) + self.record_composed += 1 def read_file(self, filename: str, remove_file: Optional[bool] = True) -> Iterable[Mapping[str, Any]]: try: diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/retry.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/retry.py index 140d77e91ad5..ec0c242a0b8b 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/retry.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/retry.py @@ -24,25 +24,20 @@ def bulk_retry_on_exception(logger: logging.Logger, more_exceptions: Optional[Tu def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(self, *args, **kwargs) -> Any: - # mandatory class attributes - max_retries = self._job_max_retries - stream_name = self.stream_name - backoff_time = self._job_backoff_time - current_retries = 0 while True: try: return func(self, *args, **kwargs) except BULK_RETRY_ERRORS or more_exceptions as ex: current_retries += 1 - if current_retries > max_retries: + if current_retries > self._job_max_retries: logger.error("Exceeded retry limit. Giving up.") raise else: logger.warning( - f"Stream `{stream_name}`: {ex}. Retrying {current_retries}/{max_retries} after {backoff_time} seconds." + f"Stream `{self.http_client.name}`: {ex}. Retrying {current_retries}/{self._job_max_retries} after {self._job_backoff_time} seconds." ) - sleep(backoff_time) + sleep(self._job_backoff_time) except ShopifyBulkExceptions.BulkJobCreationFailedConcurrentError: if self._concurrent_attempt == self._concurrent_max_retry: message = f"The BULK Job couldn't be created at this time, since another job is running." @@ -51,9 +46,13 @@ def wrapper(self, *args, **kwargs) -> Any: self._concurrent_attempt += 1 logger.warning( - f"Stream: `{self.stream_name}`, the BULK concurrency limit has reached. Waiting {self._concurrent_interval} sec before retry, attempt: {self._concurrent_attempt}.", + f"Stream: `{self.http_client.name}`, the BULK concurrency limit has reached. Waiting {self._concurrent_interval} sec before retry, attempt: {self._concurrent_attempt}.", ) sleep(self._concurrent_interval) + except ShopifyBulkExceptions.BulkJobRedirectToOtherShopError: + logger.warning( + f"Stream: `{self.http_client.name}`, the `shop name` differs from the provided in `input configuration`. Switching to the `{self._tools.shop_name_from_url(self.base_url)}`.", + ) return wrapper diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/tools.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/tools.py index bd5d26099edb..7a23aa1a2d02 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/tools.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/tools.py @@ -52,6 +52,16 @@ def filename_from_url(job_result_url: str) -> str: f"Could not extract the `filename` from `result_url` provided, details: {job_result_url}", ) + @staticmethod + def shop_name_from_url(url: str) -> str: + match = re.search(r"https://(.*?)(\.myshopify)", url) + if match: + return match.group(1) + else: + # safety net, if there is an error parsing url, + # on no match is found + return url + @staticmethod def from_iso8601_to_rfc3339(record: Mapping[str, Any], field: str) -> Mapping[str, Any]: """ diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/spec.json b/airbyte-integrations/connectors/source-shopify/source_shopify/spec.json index 9f5b1ca82a85..af70d2ce01fe 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/spec.json +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/spec.json @@ -98,12 +98,27 @@ "description": "Defines which API type (REST/BULK) to use to fetch `Transactions` data. If you are a `Shopify Plus` user, leave the default value to speed up the fetch.", "default": false }, + "job_product_variants_include_pres_prices": { + "type": "boolean", + "title": "Add `Presentment prices` to Product Variants", + "description": "If enabled, the `Product Variants` stream attempts to include `Presentment prices` field (may affect the performance).", + "default": true + }, "job_termination_threshold": { "type": "integer", "title": "BULK Job termination threshold", "description": "The max time in seconds, after which the single BULK Job should be `CANCELED` and retried. The bigger the value the longer the BULK Job is allowed to run.", - "default": 3600, - "minimum": 1 + "default": 7200, + "minimum": 3600, + "maximum": 21600 + }, + "job_checkpoint_interval": { + "type": "integer", + "title": "BULK Job checkpoint (rows collected)", + "description": "The threshold, after which the single BULK Job should be checkpointed.", + "default": 100000, + "minimum": 15000, + "maximum": 200000 } } }, diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py index cfffc4f34892..6c87d856dea5 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/base_streams.py @@ -13,14 +13,14 @@ import pendulum as pdm import requests from airbyte_cdk.sources.streams.core import StreamData -from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http import HttpClient, HttpStream from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler, HttpStatusErrorHandler from airbyte_cdk.sources.streams.http.error_handlers.default_error_mapping import DEFAULT_ERROR_MAPPING from airbyte_protocol.models import SyncMode from requests.exceptions import RequestException +from source_shopify.http_request import ShopifyErrorHandler from source_shopify.shopify_graphql.bulk.job import ShopifyBulkManager -from source_shopify.shopify_graphql.bulk.query import ShopifyBulkQuery, ShopifyBulkTemplates -from source_shopify.shopify_graphql.bulk.record import ShopifyBulkRecord +from source_shopify.shopify_graphql.bulk.query import ShopifyBulkQuery from source_shopify.transform import DataTypeEnforcer from source_shopify.utils import EagerlyCachedStreamState as stream_state_cache from source_shopify.utils import ShopifyNonRetryableErrors @@ -177,9 +177,14 @@ def request_params( class IncrementalShopifyStream(ShopifyStream, ABC): # Setting the check point interval to the limit of the records output state_checkpoint_interval = 250 - # guarantee for the NestedSubstreams to emit the STATE - # when we have the abnormal STATE distance between Parent and Substream - filter_by_state_checkpoint = False + + @property + def filter_by_state_checkpoint(self) -> bool: + """ + This filtering flag stands to guarantee for the NestedSubstreams to emit the STATE correctly, + when we have the abnormal STATE distance between Parent and Substream + """ + return False # Setting the default cursor field for all streams cursor_field = "updated_at" @@ -215,15 +220,15 @@ def track_checkpoint_cursor(self, record_value: Union[str, int]) -> None: if self.filter_by_state_checkpoint: # set checkpoint cursor if not self._checkpoint_cursor: - self._checkpoint_cursor = self.config.get("start_date") + self._checkpoint_cursor = self.default_state_comparison_value # track checkpoint cursor - if record_value >= self._checkpoint_cursor: + if str(record_value) >= str(self._checkpoint_cursor): self._checkpoint_cursor = record_value def should_checkpoint(self, index: int) -> bool: return self.filter_by_state_checkpoint and index >= self.state_checkpoint_interval - # Parse the `stream_slice` with respect to `stream_state` for `Incremental refresh` + # Parse the `records` with respect to the `stream_state` for the `Incremental refresh` # cases where we slice the stream, the endpoints for those classes don't accept any other filtering, # but they provide us with the updated_at field in most cases, so we used that as incremental filtering during the order slicing. def filter_records_newer_than_state( @@ -632,24 +637,31 @@ class IncrementalShopifyGraphQlBulkStream(IncrementalShopifyStream): def __init__(self, config: Dict) -> None: super().__init__(config) - # init BULK Query instance, pass `shop_id` from config - self.query = self.bulk_query(shop_id=config.get("shop_id")) # define BULK Manager instance self.job_manager: ShopifyBulkManager = ShopifyBulkManager( - session=self._http_client._session, + http_client=self.bulk_http_client, base_url=f"{self.url_base}{self.path()}", - stream_name=self.name, - query=self.query, + query=self.bulk_query(config), job_termination_threshold=float(config.get("job_termination_threshold", 3600)), # overide the default job slice size, if provided (it's auto-adjusted, later on) - job_size=config.get("bulk_window_in_days", 0.0), + job_size=config.get("bulk_window_in_days", 30.0), + # provide the job checkpoint interval value, default value is 200k lines collected + job_checkpoint_interval=config.get("job_checkpoint_interval", 200_000), ) - # define Record Producer instance - self.record_producer: ShopifyBulkRecord = ShopifyBulkRecord(self.query) + @property + def filter_by_state_checkpoint(self) -> bool: + return self.job_manager._supports_checkpointing + + @property + def bulk_http_client(self) -> HttpClient: + """ + Returns the instance of the `HttpClient`, with the stream info. + """ + return HttpClient(self.name, self.logger, ShopifyErrorHandler(), session=self._http_client._session) @cached_property - def parent_stream(self) -> object: + def parent_stream(self) -> Union[ShopifyStream, IncrementalShopifyStream]: """ Returns the instance of parent stream, if the substream has a `parent_stream_class` dependency. """ @@ -724,7 +736,18 @@ def get_state_value(self, stream_state: Mapping[str, Any] = None) -> Optional[Un def emit_slice_message(self, slice_start: datetime, slice_end: datetime) -> None: slice_size_message = f"Slice size: `P{round(self.job_manager._job_size, 1)}D`" - self.logger.info(f"Stream: `{self.name}` requesting BULK Job for period: {slice_start} -- {slice_end}. {slice_size_message}") + slice_message = f"Stream: `{self.name}` requesting BULK Job for period: {slice_start} -- {slice_end}. {slice_size_message}." + + if self.job_manager._supports_checkpointing: + checkpointing_message = f" The BULK checkpoint after `{self.job_manager.job_checkpoint_interval}` lines." + else: + checkpointing_message = f" The BULK checkpointing is not supported." + + self.logger.info(slice_message + checkpointing_message) + + def emit_checkpoint_message(self) -> None: + if self.job_manager._job_adjust_slice_from_checkpoint: + self.logger.info(f"Stream {self.name}, continue from checkpoint: `{self._checkpoint_cursor}`.") @stream_state_cache.cache_stream_state def stream_slices(self, stream_state: Optional[Mapping[str, Any]] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: @@ -738,11 +761,28 @@ def stream_slices(self, stream_state: Optional[Mapping[str, Any]] = None, **kwar self.emit_slice_message(start, slice_end) yield {"start": start.to_rfc3339_string(), "end": slice_end.to_rfc3339_string()} # increment the end of the slice or reduce the next slice - start = self.job_manager.get_adjusted_job_end(start, slice_end) + start = self.job_manager.get_adjusted_job_end(start, slice_end, self._checkpoint_cursor) else: # for the streams that don't support filtering yield {} + def sort_output_asc(self, non_sorted_records: Iterable[Mapping[str, Any]] = None) -> Iterable[Mapping[str, Any]]: + """ + Apply sorting for collected records, to guarantee the `ASC` output. + This handles the STATE and CHECKPOINTING correctly, for the `incremental` streams. + """ + if non_sorted_records: + if not self.cursor_field: + yield from non_sorted_records + else: + yield from sorted( + non_sorted_records, + key=lambda x: x.get(self.cursor_field) if x.get(self.cursor_field) else self.default_state_comparison_value, + ) + else: + # always return an empty iterable, if no records + return [] + def read_records( self, sync_mode: SyncMode, @@ -752,13 +792,12 @@ def read_records( ) -> Iterable[StreamData]: self.job_manager.create_job(stream_slice, self.filter_field) stream_state = stream_state_cache.cached_state.get(self.name, {self.cursor_field: self.default_state_comparison_value}) - - filename = self.job_manager.job_check_for_completion() - # the `filename` could be `None`, meaning there are no data available for the slice period. - if filename: - # add `shop_url` field to each record produced - records = self.add_shop_url_field( - # produce records from saved bulk job result - self.record_producer.read_file(filename) - ) - yield from self.filter_records_newer_than_state(stream_state, records) + # add `shop_url` field to each record produced + records = self.add_shop_url_field( + # produce records from saved bulk job result + self.job_manager.job_get_results() + ) + # emit records in ASC order + yield from self.filter_records_newer_than_state(stream_state, self.sort_output_asc(records)) + # add log message about the checkpoint value + self.emit_checkpoint_message() diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py index 5de98f1798ea..a5708d32594d 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/streams/streams.py @@ -116,7 +116,6 @@ class MetafieldDraftOrders(IncrementalShopifyGraphQlBulkStream): class Products(IncrementalShopifyGraphQlBulkStream): bulk_query: Product = Product - # pin the api version class ProductsGraphQl(IncrementalShopifyStream): @@ -274,7 +273,6 @@ class OrderRefunds(IncrementalShopifyNestedStream): class OrderRisks(IncrementalShopifyGraphQlBulkStream): bulk_query: OrderRisk = OrderRisk - # the updated stream works only with >= `2024-04` shopify api version class Transactions(IncrementalShopifySubstream): diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py b/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py index 0eea96d11371..c82bed05d3f3 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py @@ -38,7 +38,11 @@ def logger(): @pytest.fixture def basic_config(): - return {"shop": "test_shop", "credentials": {"auth_method": "api_password", "api_password": "api_password"}} + return { + "shop": "test_shop", + "credentials": {"auth_method": "api_password", "api_password": "api_password"}, + "shop_id": 0, + } @pytest.fixture @@ -48,6 +52,7 @@ def auth_config(): "start_date": "2023-01-01", "credentials": {"auth_method": "api_password", "api_password": "api_password"}, "authenticator": None, + } @@ -408,6 +413,118 @@ def bulk_job_running_response(): } }, } + + +@pytest.fixture +def bulk_job_running_with_object_count_and_url_response(): + return { + "data": { + "node": { + "id": "gid://shopify/BulkOperation/4047052112061", + "status": "RUNNING", + "errorCode": None, + "objectCount": "15", + "fileSize": None, + "url": 'https://some_url?response-content-disposition=attachment;+filename="bulk-123456789.jsonl";+filename*=UTF-8', + "partialDataUrl": None, + } + }, + "extensions": { + "cost": { + "requestedQueryCost": 1, + "actualQueryCost": 1, + "throttleStatus": { + "maximumAvailable": 1000.0, + "currentlyAvailable": 999, + "restoreRate": 50.0, + }, + } + }, + } + + +@pytest.fixture +def bulk_job_canceled_with_object_count_and_url_response(): + return { + "data": { + "node": { + "id": "gid://shopify/BulkOperation/4047052112061", + "status": "CANCELED", + "errorCode": None, + "objectCount": "15", + "fileSize": None, + "url": 'https://some_url?response-content-disposition=attachment;+filename="bulk-123456789.jsonl";+filename*=UTF-8', + "partialDataUrl": None, + } + }, + "extensions": { + "cost": { + "requestedQueryCost": 1, + "actualQueryCost": 1, + "throttleStatus": { + "maximumAvailable": 1000.0, + "currentlyAvailable": 999, + "restoreRate": 50.0, + }, + } + }, + } + + +@pytest.fixture +def bulk_job_running_with_object_count_no_url_response(): + return { + "data": { + "node": { + "id": "gid://shopify/BulkOperation/4047052112061", + "status": "RUNNING", + "errorCode": None, + "objectCount": "4", + "fileSize": None, + "url": None, + "partialDataUrl": None, + } + }, + "extensions": { + "cost": { + "requestedQueryCost": 1, + "actualQueryCost": 1, + "throttleStatus": { + "maximumAvailable": 1000.0, + "currentlyAvailable": 999, + "restoreRate": 50.0, + }, + } + }, + } + + +@pytest.fixture +def bulk_job_canceled_with_object_count_no_url_response(): + return { + "data": { + "node": { + "id": "gid://shopify/BulkOperation/4047052112061", + "status": "CANCELED", + "errorCode": None, + "objectCount": "4", + "fileSize": None, + "url": None, + "partialDataUrl": None, + } + }, + "extensions": { + "cost": { + "requestedQueryCost": 1, + "actualQueryCost": 1, + "throttleStatus": { + "maximumAvailable": 1000.0, + "currentlyAvailable": 999, + "restoreRate": 50.0, + }, + } + }, + } @pytest.fixture @@ -820,36 +937,7 @@ def product_images_response_expected_result(): @pytest.fixture def product_variants_response_expected_result(): return [ - { - "id": 40091751448765, - "title": "Metal", - "price": 64.0, - "sku": "", - "position": 1, - "inventory_policy": "DENY", - "compare_at_price": None, - "inventory_management": "SHOPIFY", - "created_at": "2021-06-23T06:04:41+00:00", - "updated_at": "2023-10-27T16:56:50+00:00", - "taxable": True, - "barcode": None, - "weight": 0.0, - "weight_unit": "GRAMS", - "inventory_quantity": 6, - "requires_shipping": False, - "available_for_sale": True, - "display_name": "Waterproof iPhone Speaker - Metal", - "tax_code": "", - "grams": 0, - "old_inventory_quantity": 6, - "fulfillment_service": "manual", - "admin_graphql_api_id": "gid://shopify/ProductVariant/40091751448765", - "presentment_prices": [{"price": {"amount": 64.0, "currency_code": "USD"}, "compare_at_price": {"amount": None}}], - "product_id": 6796825198781, - "inventory_item_id": 42186366255293, - "image_id": None, - "shop_url": "test_shop", - }, + # sorted records in ASC, check the `updated_at` field { "id": 41561955827901, "title": "Test Variant 1", @@ -880,6 +968,36 @@ def product_variants_response_expected_result(): "image_id": None, "shop_url": "test_shop", }, + { + "id": 40091751448765, + "title": "Metal", + "price": 64.0, + "sku": "", + "position": 1, + "inventory_policy": "DENY", + "compare_at_price": None, + "inventory_management": "SHOPIFY", + "created_at": "2021-06-23T06:04:41+00:00", + "updated_at": "2023-10-27T16:56:50+00:00", + "taxable": True, + "barcode": None, + "weight": 0.0, + "weight_unit": "GRAMS", + "inventory_quantity": 6, + "requires_shipping": False, + "available_for_sale": True, + "display_name": "Waterproof iPhone Speaker - Metal", + "tax_code": "", + "grams": 0, + "old_inventory_quantity": 6, + "fulfillment_service": "manual", + "admin_graphql_api_id": "gid://shopify/ProductVariant/40091751448765", + "presentment_prices": [{"price": {"amount": 64.0, "currency_code": "USD"}, "compare_at_price": {"amount": None}}], + "product_id": 6796825198781, + "inventory_item_id": 42186366255293, + "image_id": None, + "shop_url": "test_shop", + }, ] diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py index dd8985f4ac9e..647f187300f9 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py @@ -3,6 +3,8 @@ # +from os import remove + import pytest import requests from airbyte_protocol.models import SyncMode @@ -27,6 +29,52 @@ _ANY_FILTER_FIELD = "any_filter_field" +def test_job_manager_default_values(auth_config) -> None: + stream = Products(auth_config) + + # 10Mb chunk size to save the file + assert stream.job_manager._retrieve_chunk_size == 10485760 # 1024 * 1024 * 10 + assert stream.job_manager._job_max_retries == 6 + assert stream.job_manager._job_backoff_time == 5 + # running job logger constrain, every 100-ish message will be printed + assert stream.job_manager._log_job_msg_frequency == 100 + assert stream.job_manager._log_job_msg_count == 0 + # attempt counter + assert stream.job_manager._concurrent_attempt == 0 + # sleep time per creation attempt + assert stream.job_manager._concurrent_interval == 30 + # max attempts for job creation + assert stream.job_manager._concurrent_max_retry == 120 + # currents: _job_id, _job_state, _job_created_at, _job_self_canceled + assert not stream.job_manager._job_id + # this string is based on ShopifyBulkJobStatus + assert not stream.job_manager._job_state + # completed and saved Bulk Job result filename + assert not stream.job_manager._job_result_filename + # date-time when the Bulk Job was created on the server + assert not stream.job_manager._job_created_at + # indicated whether or not we manually force-cancel the current job + assert not stream.job_manager._job_self_canceled + # time between job status checks + assert stream.job_manager. _job_check_interval == 3 + # 0.1 ~= P2H, default value, lower boundary for slice size + assert stream.job_manager._job_size_min == 0.1 + # last running job object count + assert stream.job_manager._job_last_rec_count == 0 + # how many records should be collected before we use the checkpoining + assert stream.job_manager._job_checkpoint_interval == 200000 + # the flag to adjust the next slice from the checkpointed cursor vaue + assert not stream.job_manager._job_adjust_slice_from_checkpoint + # expand slice factor + assert stream.job_manager._job_size_expand_factor == 2 + # reduce slice factor + assert stream.job_manager._job_size_reduce_factor == 2 + # whether or not the slicer should revert the previous start value + assert not stream.job_manager._job_should_revert_slice + # 2 sec is set as default value to cover the case with the empty-fast-completed jobs + assert stream.job_manager._job_last_elapsed_time == 2.0 + + def test_get_errors_from_response_invalid_response(auth_config) -> None: expected = "Couldn't check the `response` for `errors`" stream = MetafieldOrders(auth_config) @@ -141,14 +189,15 @@ def test_job_check_for_completion(mocker, request, requests_mock, job_response, job_result_url = test_job_status_response.json().get("data", {}).get("node", {}).get("url") if error_type: with pytest.raises(error_type) as error: - stream.job_manager.job_check_for_completion() + list(stream.job_manager.job_get_results()) assert expected in repr(error.value) else: if job_result_url: # mocking the nested request call to retrieve the data from result URL requests_mock.get(job_result_url, json=request.getfixturevalue(job_response)) - result = stream.job_manager.job_check_for_completion() - assert expected == result + mocker.patch("source_shopify.shopify_graphql.bulk.record.ShopifyBulkRecord.read_file", return_value=[]) + stream.job_manager._job_check_state() + assert expected == stream.job_manager._job_result_filename @pytest.mark.parametrize( @@ -237,13 +286,64 @@ def test_job_check_with_running_scenario(request, requests_mock, job_response, a assert stream.job_manager._job_state == expected +@pytest.mark.parametrize( + "running_job_response, canceled_job_response, expected", + [ + ( + "bulk_job_running_with_object_count_and_url_response", + "bulk_job_canceled_with_object_count_and_url_response", + "bulk-123456789.jsonl", + ), + ( + "bulk_job_running_with_object_count_no_url_response", + "bulk_job_canceled_with_object_count_no_url_response", + None, + ), + ], + ids=[ + "self-canceled with url", + "self-canceled with no url", + ], +) +def test_job_running_with_canceled_scenario(mocker, request, requests_mock, running_job_response, canceled_job_response, auth_config, expected) -> None: + stream = MetafieldOrders(auth_config) + # modify the sleep time for the test + stream.job_manager._job_check_interval = 0 + # get job_id from FIXTURE + job_id = request.getfixturevalue(running_job_response).get("data", {}).get("node", {}).get("id") + # mocking the response for STATUS CHECKS + requests_mock.post( + stream.job_manager.base_url, + [ + {"json": request.getfixturevalue(running_job_response)}, + {"json": request.getfixturevalue(canceled_job_response)}, + ], + ) + job_result_url = request.getfixturevalue(canceled_job_response).get("data", {}).get("node", {}).get("url") + # test the state of the job isn't assigned + assert stream.job_manager._job_state == None + + stream.job_manager._job_id = job_id + stream.job_manager._job_checkpoint_interval = 5 + # faking self-canceled job + stream.job_manager._job_self_canceled = True + # mocking the nested request call to retrieve the data from result URL + requests_mock.get(job_result_url, json=request.getfixturevalue(canceled_job_response)) + mocker.patch("source_shopify.shopify_graphql.bulk.record.ShopifyBulkRecord.read_file", return_value=[]) + stream.job_manager._job_check_state() + assert stream.job_manager._job_result_filename == expected + # clean up + if expected: + remove(expected) + + def test_job_read_file_invalid_filename(mocker, auth_config) -> None: stream = MetafieldOrders(auth_config) expected = "An error occured while producing records from BULK Job result" # patching the method to get the filename mocker.patch("source_shopify.shopify_graphql.bulk.record.ShopifyBulkRecord.produce_records", side_effect=Exception) with pytest.raises(ShopifyBulkExceptions.BulkRecordProduceError) as error: - list(stream.record_producer.read_file("test.jsonl")) + list(stream.job_manager.record_producer.read_file("test.jsonl")) assert expected in repr(error.value) diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_query.py b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_query.py index 87781b8e5538..9f5baaf0ef94 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_query.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_query.py @@ -99,7 +99,7 @@ def test_bulk_query_cancel() -> None: ], ids=["simple query with filter and sort"] ) -def test_base_build_query(query_name, fields, filter_field, start, end, expected) -> None: +def test_base_build_query(basic_config, query_name, fields, filter_field, start, end, expected) -> None: """ Expected result rendered: ''' @@ -116,8 +116,7 @@ def test_base_build_query(query_name, fields, filter_field, start, end, expected ''' """ - - builder = ShopifyBulkQuery(shop_id=0) + builder = ShopifyBulkQuery(basic_config) filter_query = f"{filter_field}:>'{start}' AND {filter_field}:<='{end}'" built_query = builder.build(query_name, fields, filter_query) assert expected.render() == built_query.render() @@ -240,6 +239,6 @@ def test_base_build_query(query_name, fields, filter_field, start, end, expected "InventoryLevel query", ] ) -def test_bulk_query(query_class, filter_field, start, end, expected) -> None: - stream = query_class(shop_id=0) +def test_bulk_query(basic_config, query_class, filter_field, start, end, expected) -> None: + stream = query_class(basic_config) assert stream.get(filter_field, start, end) == expected.render() \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_record.py b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_record.py index 0f18c1965949..dff60ea605d5 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_record.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_record.py @@ -16,8 +16,8 @@ ({"id": 123}, {"id": 123}), ], ) -def test_record_resolve_id(record, expected) -> None: - bulk_query = ShopifyBulkQuery(shop_id=0) +def test_record_resolve_id(basic_config, record, expected) -> None: + bulk_query = ShopifyBulkQuery(basic_config) assert ShopifyBulkRecord(bulk_query).record_resolve_id(record) == expected @@ -29,8 +29,8 @@ def test_record_resolve_id(record, expected) -> None: ({}, "Other", False), ], ) -def test_check_type(record, types, expected) -> None: - query = ShopifyBulkQuery(shop_id=0) +def test_check_type(basic_config, record, types, expected) -> None: + query = ShopifyBulkQuery(basic_config) assert ShopifyBulkRecord(query).check_type(record, types) == expected @@ -61,8 +61,8 @@ def test_check_type(record, types, expected) -> None: ) ], ) -def test_record_resolver(record, expected) -> None: - query = ShopifyBulkQuery(shop_id=0) +def test_record_resolver(basic_config, record, expected) -> None: + query = ShopifyBulkQuery(basic_config) record_instance = ShopifyBulkRecord(query) assert record_instance.record_resolve_id(record) == expected @@ -76,8 +76,8 @@ def test_record_resolver(record, expected) -> None: ), ], ) -def test_record_new(record, expected) -> None: - query = ShopifyBulkQuery(shop_id=0) +def test_record_new(basic_config, record, expected) -> None: + query = ShopifyBulkQuery(basic_config) record_instance = ShopifyBulkRecord(query) record_instance.record_new(record) assert record_instance.buffer == [expected] @@ -110,8 +110,8 @@ def test_record_new(record, expected) -> None: ], ids=["add_component"], ) -def test_record_new_component(records_from_jsonl, record_components, expected) -> None: - query = ShopifyBulkQuery(shop_id=0) +def test_record_new_component(basic_config, records_from_jsonl, record_components, expected) -> None: + query = ShopifyBulkQuery(basic_config) record_instance = ShopifyBulkRecord(query) record_instance.components = record_components.get("record_components") # register new record first @@ -161,8 +161,8 @@ def test_record_new_component(records_from_jsonl, record_components, expected) - ), ], ) -def test_buffer_flush(buffered_record, expected) -> None: - query = ShopifyBulkQuery(shop_id=0) +def test_buffer_flush(basic_config, buffered_record, expected) -> None: + query = ShopifyBulkQuery(basic_config) record_instance = ShopifyBulkRecord(query) # populate the buffer with record record_instance.buffer.append(buffered_record) @@ -196,8 +196,8 @@ def test_buffer_flush(buffered_record, expected) -> None: ], ids=["test_compose"], ) -def test_record_compose(records_from_jsonl, record_composition, expected) -> None: - query = ShopifyBulkQuery(shop_id=0) +def test_record_compose(basic_config, records_from_jsonl, record_composition, expected) -> None: + query = ShopifyBulkQuery(basic_config) # query.record_composition = record_composition record_instance = ShopifyBulkRecord(query) record_instance.composition = record_composition diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-shopify/unit_tests/unit_test.py index 157fe1d2be05..ca9e306e6698 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/unit_test.py @@ -100,11 +100,9 @@ def test_privileges_validation(requests_mock, fetch_transactions_user_id, basic_ "Internal Server Error for slice (500)", ], ) -def test_unavailable_stream(requests_mock, basic_config, stream, slice: Optional[Mapping[str, Any]], status_code: int, +def test_unavailable_stream(requests_mock, auth_config, stream, slice: Optional[Mapping[str, Any]], status_code: int, json_response: Mapping[str, Any]): - config = basic_config - config["authenticator"] = None - stream = stream(config) + stream = stream(auth_config) url = stream.url_base + stream.path(stream_slice=slice) requests_mock.get(url=url, json=json_response, status_code=status_code) response = requests.get(url) @@ -112,10 +110,8 @@ def test_unavailable_stream(requests_mock, basic_config, stream, slice: Optional assert stream.get_error_handler().interpret_response(response) == expected_error_resolution -def test_filter_records_newer_than_state(basic_config): - config = basic_config - config["authenticator"] = None - stream = DiscountCodes(config) +def test_filter_records_newer_than_state(auth_config): + stream = DiscountCodes(auth_config) records_slice = [ # present cursor older than state - record should be omitted {"id": 1, "updated_at": "2022-01-01T01:01:01-07:00"}, diff --git a/docs/integrations/sources/shopify.md b/docs/integrations/sources/shopify.md index 9c615f96d41c..e40c5e2ab7b5 100644 --- a/docs/integrations/sources/shopify.md +++ b/docs/integrations/sources/shopify.md @@ -212,6 +212,7 @@ For all `Shopify GraphQL BULK` api requests these limitations are applied: https | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 2.4.16 | 2024-07-21 | [42095](https://github.com/airbytehq/airbyte/pull/42095) | Added the `Checkpointing` for the `BULK` streams, fixed the `store` redirection | | 2.4.15 | 2024-07-27 | [42806](https://github.com/airbytehq/airbyte/pull/42806) | Update dependencies | | 2.4.14 | 2024-07-20 | [42150](https://github.com/airbytehq/airbyte/pull/42150) | Update dependencies | | 2.4.13 | 2024-07-13 | [41809](https://github.com/airbytehq/airbyte/pull/41809) | Update dependencies |