diff --git a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile index 430b028ea67d..2736be3875d5 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/Dockerfile +++ b/airbyte-integrations/connectors/source-zendesk-support/Dockerfile @@ -25,5 +25,5 @@ COPY source_zendesk_support ./source_zendesk_support ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.10.1 +LABEL io.airbyte.version=0.10.2 LABEL io.airbyte.name=airbyte/source-zendesk-support diff --git a/airbyte-integrations/connectors/source-zendesk-support/acceptance-test-config.yml b/airbyte-integrations/connectors/source-zendesk-support/acceptance-test-config.yml index 37b75c5aa439..c37900f668d7 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-zendesk-support/acceptance-test-config.yml @@ -21,6 +21,7 @@ acceptance_tests: basic_read: tests: - config_path: "secrets/config.json" + timeout_seconds: 2400 expect_records: path: "integration_tests/expected_records.jsonl" extra_fields: no @@ -49,3 +50,4 @@ acceptance_tests: tests: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" + timeout_seconds: 2400 diff --git a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/expected_records.jsonl b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/expected_records.jsonl index ab1a30b6fd43..ecaa3e1c2f45 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/integration_tests/expected_records.jsonl +++ b/airbyte-integrations/connectors/source-zendesk-support/integration_tests/expected_records.jsonl @@ -29,9 +29,9 @@ {"stream":"ticket_fields","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_fields/360002833096.json","id":360002833096,"type":"description","title":"Description","raw_title":"Description","description":"Please enter the details of your request. A member of our support staff will respond as soon as possible.","raw_description":"Please enter the details of your request. A member of our support staff will respond as soon as possible.","position":2,"active":true,"required":false,"collapsed_for_agents":false,"regexp_for_validation":null,"title_in_portal":"Description","raw_title_in_portal":"Description","visible_in_portal":true,"editable_in_portal":true,"required_in_portal":true,"tag":null,"created_at":"2020-12-11T18:34:05Z","updated_at":"2020-12-11T18:34:05Z","removable":false,"key":null,"agent_description":null},"emitted_at":1687861693520} {"stream":"ticket_fields","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_fields/360002833116.json","id":360002833116,"type":"status","title":"Status","raw_title":"Status","description":"Request status","raw_description":"Request status","position":3,"active":true,"required":false,"collapsed_for_agents":false,"regexp_for_validation":null,"title_in_portal":"Status","raw_title_in_portal":"Status","visible_in_portal":false,"editable_in_portal":false,"required_in_portal":false,"tag":null,"created_at":"2020-12-11T18:34:05Z","updated_at":"2020-12-11T18:34:05Z","removable":false,"key":null,"agent_description":null,"system_field_options":[{"name":"Open","value":"open"},{"name":"Pending","value":"pending"},{"name":"Solved","value":"solved"}],"sub_type_id":0},"emitted_at":1687861693521} {"stream":"ticket_forms","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_forms/360000084116.json","name":"Default Ticket Form","display_name":"Default Ticket Form","id":360000084116,"raw_name":"Default Ticket Form","raw_display_name":"Default Ticket Form","end_user_visible":true,"position":1,"ticket_field_ids":[360002833076,360002833096,360002833116,360002833136,360002833156,360002833176,360002833196],"active":true,"default":true,"created_at":"2020-12-11T18:34:37Z","updated_at":"2020-12-11T18:34:37Z","in_all_brands":true,"restricted_brand_ids":[],"end_user_conditions":[],"agent_conditions":[]},"emitted_at":1687861694440} -{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7283000498191.json","id":7283000498191,"ticket_id":153,"created_at":"2023-06-26T11:31:48Z","updated_at":"2023-06-26T12:13:42Z","group_stations":2,"assignee_stations":2,"reopens":0,"replies":0,"assignee_updated_at":"2023-06-26T11:31:48Z","requester_updated_at":"2023-06-26T11:31:48Z","status_updated_at":"2023-06-26T11:31:48Z","initially_assigned_at":"2023-06-26T11:31:48Z","assigned_at":"2023-06-26T12:13:42Z","solved_at":null,"latest_comment_added_at":"2023-06-26T11:31:48Z","reply_time_in_minutes":{"calendar":null,"business":null},"first_resolution_time_in_minutes":{"calendar":null,"business":null},"full_resolution_time_in_minutes":{"calendar":null,"business":null},"agent_wait_time_in_minutes":{"calendar":null,"business":null},"requester_wait_time_in_minutes":{"calendar":null,"business":null},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:31:48Z"},"emitted_at":1687861695566} -{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7282909551759.json","id":7282909551759,"ticket_id":152,"created_at":"2023-06-26T11:10:33Z","updated_at":"2023-06-26T11:25:43Z","group_stations":1,"assignee_stations":1,"reopens":0,"replies":1,"assignee_updated_at":"2023-06-26T11:25:43Z","requester_updated_at":"2023-06-26T11:10:33Z","status_updated_at":"2023-06-26T11:25:43Z","initially_assigned_at":"2023-06-26T11:10:33Z","assigned_at":"2023-06-26T11:10:33Z","solved_at":"2023-06-26T11:25:43Z","latest_comment_added_at":"2023-06-26T11:21:06Z","reply_time_in_minutes":{"calendar":11,"business":0},"first_resolution_time_in_minutes":{"calendar":15,"business":0},"full_resolution_time_in_minutes":{"calendar":15,"business":0},"agent_wait_time_in_minutes":{"calendar":15,"business":0},"requester_wait_time_in_minutes":{"calendar":0,"business":0},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:25:43Z"},"emitted_at":1687861695567} -{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7282901696015.json","id":7282901696015,"ticket_id":151,"created_at":"2023-06-26T11:09:33Z","updated_at":"2023-06-26T12:03:38Z","group_stations":1,"assignee_stations":1,"reopens":0,"replies":1,"assignee_updated_at":"2023-06-26T12:03:37Z","requester_updated_at":"2023-06-26T11:09:33Z","status_updated_at":"2023-06-26T11:09:33Z","initially_assigned_at":"2023-06-26T11:09:33Z","assigned_at":"2023-06-26T11:09:33Z","solved_at":null,"latest_comment_added_at":"2023-06-26T12:03:37Z","reply_time_in_minutes":{"calendar":54,"business":0},"first_resolution_time_in_minutes":{"calendar":null,"business":null},"full_resolution_time_in_minutes":{"calendar":null,"business":null},"agent_wait_time_in_minutes":{"calendar":null,"business":null},"requester_wait_time_in_minutes":{"calendar":null,"business":null},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:09:33Z"},"emitted_at":1687861695567} +{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7283000498191.json","id":7283000498191,"ticket_id":153,"created_at":"2023-06-26T11:31:48Z","updated_at":"2023-06-26T12:13:42Z","group_stations":2,"assignee_stations":2,"reopens":0,"replies":0,"assignee_updated_at":"2023-06-26T11:31:48Z","requester_updated_at":"2023-06-26T11:31:48Z","status_updated_at":"2023-06-26T11:31:48Z","initially_assigned_at":"2023-06-26T11:31:48Z","assigned_at":"2023-06-26T12:13:42Z","solved_at":null,"latest_comment_added_at":"2023-06-26T11:31:48Z","reply_time_in_minutes":{"calendar":null,"business":null},"first_resolution_time_in_minutes":{"calendar":null,"business":null},"full_resolution_time_in_minutes":{"calendar":null,"business":null},"agent_wait_time_in_minutes":{"calendar":null,"business":null},"requester_wait_time_in_minutes":{"calendar":null,"business":null},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:31:48Z"},"emitted_at":1689884373175} +{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7282909551759.json","id":7282909551759,"ticket_id":152,"created_at":"2023-06-26T11:10:33Z","updated_at":"2023-06-26T11:25:43Z","group_stations":1,"assignee_stations":1,"reopens":0,"replies":1,"assignee_updated_at":"2023-06-26T11:25:43Z","requester_updated_at":"2023-06-26T11:10:33Z","status_updated_at":"2023-07-16T12:01:39Z","initially_assigned_at":"2023-06-26T11:10:33Z","assigned_at":"2023-06-26T11:10:33Z","solved_at":"2023-06-26T11:25:43Z","latest_comment_added_at":"2023-06-26T11:21:06Z","reply_time_in_minutes":{"calendar":11,"business":0},"first_resolution_time_in_minutes":{"calendar":15,"business":0},"full_resolution_time_in_minutes":{"calendar":15,"business":0},"agent_wait_time_in_minutes":{"calendar":15,"business":0},"requester_wait_time_in_minutes":{"calendar":0,"business":0},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:25:43Z"},"emitted_at":1689884373175} +{"stream":"ticket_metrics","data":{"url":"https://d3v-airbyte.zendesk.com/api/v2/ticket_metrics/7282901696015.json","id":7282901696015,"ticket_id":151,"created_at":"2023-06-26T11:09:33Z","updated_at":"2023-06-26T12:03:38Z","group_stations":1,"assignee_stations":1,"reopens":0,"replies":1,"assignee_updated_at":"2023-06-26T12:03:37Z","requester_updated_at":"2023-06-26T11:09:33Z","status_updated_at":"2023-06-26T11:09:33Z","initially_assigned_at":"2023-06-26T11:09:33Z","assigned_at":"2023-06-26T11:09:33Z","solved_at":null,"latest_comment_added_at":"2023-06-26T12:03:37Z","reply_time_in_minutes":{"calendar":54,"business":0},"first_resolution_time_in_minutes":{"calendar":null,"business":null},"full_resolution_time_in_minutes":{"calendar":null,"business":null},"agent_wait_time_in_minutes":{"calendar":null,"business":null},"requester_wait_time_in_minutes":{"calendar":null,"business":null},"on_hold_time_in_minutes":{"calendar":0,"business":0},"custom_status_updated_at":"2023-06-26T11:09:33Z"},"emitted_at":1689884373175} {"stream":"ticket_metric_events","data":{"id":4992797383183,"ticket_id":121,"metric":"agent_work_time","instance_id":0,"type":"measure","time":"2022-06-17T14:49:20Z"},"emitted_at":1687861699258} {"stream":"ticket_metric_events","data":{"id":4992797383311,"ticket_id":121,"metric":"pausable_update_time","instance_id":0,"type":"measure","time":"2022-06-17T14:49:20Z"},"emitted_at":1687861699259} {"stream":"ticket_metric_events","data":{"id":4992797383439,"ticket_id":121,"metric":"reply_time","instance_id":0,"type":"measure","time":"2022-06-17T14:49:20Z"},"emitted_at":1687861699260} @@ -52,4 +52,4 @@ {"stream": "attribute_definitions", "data": {"title": "Form", "subject": "ticket_form_id", "type": "list", "group": "ticket", "nullable": false, "repeatable": false, "operators": [{"value": "is", "title": "Is", "terminal": false}, {"value": "is_not", "title": "Is not", "terminal": false}], "values": [{"value": "360000084116", "title": "Default Ticket Form", "enabled": true}], "condition": "all"}, "emitted_at": 1687777243928} {"stream":"ticket_skips","data":{"id":7290033348623,"ticket_id":121,"user_id":360786799676,"reason":"I have no idea.","created_at":"2023-06-27T08:24:02Z","updated_at":"2023-06-27T08:24:02Z","ticket":{"url":"https://d3v-airbyte.zendesk.com/api/v2/tickets/121.json","id":121,"external_id":null,"via":{"channel":"voice","source":{"rel":"voicemail","from":{"formatted_phone":"+1 (689) 689-8023","phone":"+16896898023","name":"Caller +1 (689) 689-8023"},"to":{"formatted_phone":"+1 (205) 953-1462","phone":"+12059531462","name":"Airbyte","brand_id":360000358316}}},"created_at":"2022-06-17T14:49:20Z","updated_at":"2022-06-17T16:01:42Z","type":null,"subject":"Voicemail from: Caller +1 (689) 689-8023","raw_subject":"Voicemail from: Caller +1 (689) 689-8023","description":"Call from: +1 (689) 689-8023\\nTime of call: June 17, 2022 at 2:48:27 PM","priority":null,"status":"new","recipient":null,"requester_id":4992781783439,"submitter_id":4992781783439,"assignee_id":null,"organization_id":null,"group_id":null,"collaborator_ids":[],"follower_ids":[],"email_cc_ids":[],"forum_topic_id":null,"problem_id":null,"has_incidents":false,"is_public":false,"due_at":null,"tags":[],"custom_fields":[],"satisfaction_rating":{"score":"offered"},"sharing_agreement_ids":[],"custom_status_id":4044356,"fields":[],"followup_ids":[],"ticket_form_id":360000084116,"deleted_ticket_form_id":null,"brand_id":360000358316,"allow_channelback":false,"allow_attachments":true,"from_messaging_channel":false}},"emitted_at":1687861697932} {"stream":"ticket_skips","data":{"id":7290088475023,"ticket_id":125,"user_id":360786799676,"reason":"Another test skip.","created_at":"2023-06-27T08:30:01Z","updated_at":"2023-06-27T08:30:01Z","ticket":{"url":"https://d3v-airbyte.zendesk.com/api/v2/tickets/125.json","id":125,"external_id":null,"via":{"channel":"web","source":{"from":{},"to":{},"rel":null}},"created_at":"2022-07-18T10:16:53Z","updated_at":"2022-07-18T10:36:02Z","type":"question","subject":"Ticket Test 2","raw_subject":"Ticket Test 2","description":"238473846","priority":"urgent","status":"open","recipient":null,"requester_id":360786799676,"submitter_id":360786799676,"assignee_id":361089721035,"organization_id":360033549136,"group_id":5059439464079,"collaborator_ids":[360786799676],"follower_ids":[360786799676],"email_cc_ids":[],"forum_topic_id":null,"problem_id":null,"has_incidents":false,"is_public":false,"due_at":null,"tags":[],"custom_fields":[],"satisfaction_rating":{"score":"unoffered"},"sharing_agreement_ids":[],"custom_status_id":4044376,"fields":[],"followup_ids":[],"ticket_form_id":360000084116,"deleted_ticket_form_id":null,"brand_id":360000358316,"allow_channelback":false,"allow_attachments":true,"from_messaging_channel":false}},"emitted_at":1687861697934} -{"stream":"posts","data":{"id":7253375870607,"title":"Which topics should I add to my community?","details":"
That depends. If you support several products, you might add a topic for each product. If you have one big product, you might add a topic for each major feature area or task. If you have different types of users (for example, end users and API developers), you might add a topic or topics for each type of user.
A General Discussion topic is a place for users to discuss issues that don't quite fit in the other topics. You could monitor this topic for emerging issues that might need their own topics.
\n\nTo create your own topics, see Adding community discussion topics.
","author_id":360786799676,"vote_sum":0,"vote_count":0,"comment_count":0,"follower_count":0,"topic_id":7253351897871,"html_url":"https://d3v-airbyte.zendesk.com/hc/en-us/community/posts/7253375870607-Which-topics-should-I-add-to-my-community-","created_at":"2023-06-22T00:32:21Z","updated_at":"2023-06-22T00:32:21Z","url":"https://d3v-airbyte.zendesk.com/api/v2/help_center/community/posts/7253375870607-Which-topics-should-I-add-to-my-community-.json","featured":false,"pinned":false,"closed":false,"frozen":false,"status":"none","non_author_editor_id":null,"non_author_updated_at":null,"content_tag_ids":[]},"emitted_at":1688581224313} \ No newline at end of file +{"stream":"posts","data":{"id":7253375870607,"title":"Which topics should I add to my community?","details":"That depends. If you support several products, you might add a topic for each product. If you have one big product, you might add a topic for each major feature area or task. If you have different types of users (for example, end users and API developers), you might add a topic or topics for each type of user.
A General Discussion topic is a place for users to discuss issues that don't quite fit in the other topics. You could monitor this topic for emerging issues that might need their own topics.
\n\nTo create your own topics, see Adding community discussion topics.
","author_id":360786799676,"vote_sum":0,"vote_count":0,"comment_count":0,"follower_count":0,"topic_id":7253351897871,"html_url":"https://d3v-airbyte.zendesk.com/hc/en-us/community/posts/7253375870607-Which-topics-should-I-add-to-my-community-","created_at":"2023-06-22T00:32:21Z","updated_at":"2023-06-22T00:32:21Z","url":"https://d3v-airbyte.zendesk.com/api/v2/help_center/community/posts/7253375870607-Which-topics-should-I-add-to-my-community-.json","featured":false,"pinned":false,"closed":false,"frozen":false,"status":"none","non_author_editor_id":null,"non_author_updated_at":null},"emitted_at":1689889045524} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-zendesk-support/metadata.yaml b/airbyte-integrations/connectors/source-zendesk-support/metadata.yaml index e517fa08642d..ea6af49b5621 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/metadata.yaml +++ b/airbyte-integrations/connectors/source-zendesk-support/metadata.yaml @@ -7,7 +7,7 @@ data: connectorType: source maxSecondsBetweenMessages: 10800 definitionId: 79c1aa37-dae3-42ae-b333-d1c105477715 - dockerImageTag: 0.10.1 + dockerImageTag: 0.10.2 dockerRepository: airbyte/source-zendesk-support githubIssueLabel: source-zendesk-support icon: zendesk-support.svg diff --git a/airbyte-integrations/connectors/source-zendesk-support/setup.py b/airbyte-integrations/connectors/source-zendesk-support/setup.py index 132d2f3ed272..e72266072960 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/setup.py +++ b/airbyte-integrations/connectors/source-zendesk-support/setup.py @@ -5,7 +5,7 @@ from setuptools import find_packages, setup -MAIN_REQUIREMENTS = ["airbyte-cdk", "pytz", "requests-futures~=1.0.0", "pendulum~=2.1.2"] +MAIN_REQUIREMENTS = ["airbyte-cdk", "pytz"] TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock~=3.6", "connector-acceptance-test", "requests-mock==1.9.3"] diff --git a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py index 6f8a0f8e722d..dea7e1277057 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py +++ b/airbyte-integrations/connectors/source-zendesk-support/source_zendesk_support/streams.py @@ -3,19 +3,12 @@ # import calendar -import functools import logging import re -import time from abc import ABC -from collections import deque -from concurrent.futures import Future, ProcessPoolExecutor -from datetime import datetime, timedelta -from functools import partial -from math import ceil -from pickle import PickleError, dumps +from datetime import datetime from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union -from urllib.parse import urljoin +from urllib.parse import parse_qsl, urljoin, urlparse import pendulum import pytz @@ -24,14 +17,9 @@ from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.core import package_name_from_class from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream -from airbyte_cdk.sources.streams.http.auth.core import HttpAuthenticator from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy -from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException -from airbyte_cdk.sources.streams.http.rate_limiting import TRANSIENT_EXCEPTIONS from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer -from requests.auth import AuthBase -from requests_futures.sessions import PICKLE_ERROR, FuturesSession from source_zendesk_support.ZendeskSupportAvailabilityStrategy import ZendeskSupportAvailabilityStrategy DATETIME_FORMAT: str = "%Y-%m-%dT%H:%M:%SZ" @@ -40,34 +28,9 @@ logger = logging.getLogger("airbyte") -# For some streams, multiple http requests are running at the same time for performance reasons. -# However, it may result in hitting the rate limit, therefore subsequent requests have to be made after a pause. -# The idea is to sustain a pause once and continue making multiple requests at a time. -# A single `retry_at` variable is introduced here, which prevents us from duplicate sleeping in the main thread -# before each request is made as it used to be in prior versions. -# It acts like a global counter - increased each time a 429 status is met -# only if it is greater than the current value. On the other hand, no request may be made before this moment. -# Because the requests are made in parallel, time.sleep will be called in parallel as well. -# This is possible because it is a point in time, not timedelta. -retry_at: Optional[datetime] = None - - -def sleep_before_executing(sleep_time: float): - def wrapper(function): - @functools.wraps(function) - def inner(*args, **kwargs): - logger.info(f"Sleeping {sleep_time} seconds before next request") - time.sleep(int(sleep_time)) - result = function(*args, **kwargs) - return result, datetime.utcnow() - - return inner - - return wrapper - def to_int(s): - "https://github.com/airbytehq/airbyte/issues/13673" + """https://github.com/airbytehq/airbyte/issues/13673""" if isinstance(s, str): res = re.findall(r"[-+]?\d+", s) if res: @@ -79,41 +42,7 @@ class SourceZendeskException(Exception): """default exception of custom SourceZendesk logic""" -class SourceZendeskSupportFuturesSession(FuturesSession): - """ - Check the docs at https://github.com/ross/requests-futures - Used to async execute a set of requests. - """ - - def send_future(self, request: requests.PreparedRequest, **kwargs) -> Future: - """ - Use instead of default `Session.send()` method. - `Session.send()` should not be overridden as it used by `requests-futures` lib. - """ - - if self.session: - func = self.session.send - else: - sleep_time = 0 - now = datetime.utcnow() - if retry_at and retry_at > now: - sleep_time = (retry_at - datetime.utcnow()).seconds - # avoid calling super to not break pickled method - func = partial(requests.Session.send, self) - func = sleep_before_executing(sleep_time)(func) - - if isinstance(self.executor, ProcessPoolExecutor): - logger.warning("ProcessPoolExecutor is used to perform IO related tasks for unknown reason!") - # verify function can be pickled - try: - dumps(func) - except (TypeError, PickleError): - raise RuntimeError(PICKLE_ERROR) - - return self.executor.submit(func, request, **kwargs) - - -class BaseSourceZendeskSupportStream(HttpStream, ABC): +class BaseZendeskSupportStream(HttpStream, ABC): raise_on_http_errors = True def __init__(self, subdomain: str, start_date: str, ignore_pagination: bool = False, **kwargs): @@ -201,7 +130,7 @@ def should_retry(self, response: requests.Response) -> bool: return super().should_retry(response) -class SourceZendeskSupportStream(BaseSourceZendeskSupportStream): +class SourceZendeskSupportStream(BaseZendeskSupportStream): """Basic Zendesk class""" primary_key = "id" @@ -210,17 +139,9 @@ class SourceZendeskSupportStream(BaseSourceZendeskSupportStream): cursor_field = "updated_at" response_list_name: str = None - future_requests: deque = None transformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) - def __init__(self, authenticator: Union[AuthBase, HttpAuthenticator] = None, **kwargs): - super().__init__(**kwargs) - - self._session = SourceZendeskSupportFuturesSession() - self._session.auth = authenticator - self.future_requests = deque() - @property def url_base(self) -> str: return f"https://{self._subdomain}.zendesk.com/api/v2/" @@ -256,52 +177,11 @@ def get_api_records_count(self, stream_slice: Mapping[str, Any] = None, stream_s if start_date: params["start_time"] = self.str2datetime(start_date) - response = self._session.request("get", count_url).result() + response = self._session.request("get", count_url) records_count = response.json().get("count", {}).get("value", 0) return records_count - def generate_future_requests( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ): - records_count = self.get_api_records_count(stream_slice=stream_slice, stream_state=stream_state) - self.logger.info(f"Records count is {records_count}") - page_count = ceil(records_count / self.page_size) - for page_number in range(1, page_count + 1): - params = self.request_params(stream_state=stream_state, stream_slice=stream_slice) - params["page"] = page_number - request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice) - - request = self._create_prepared_request( - path=self.path(stream_state=stream_state, stream_slice=stream_slice), - headers=dict(request_headers, **self.authenticator.get_auth_header()), - params=params, - json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice), - data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice), - ) - - request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice) - self.future_requests.append( - { - "future": self._send_request(request, request_kwargs), - "request": request, - "request_kwargs": request_kwargs, - "retries": 0, - } - ) - self.logger.info(f"Generated {len(self.future_requests)} future requests") - - def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> Future: - response: Future = self._session.send_future(request, **request_kwargs) - return response - - def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Any]) -> Future: - return self._send(request, request_kwargs) - def request_params( self, stream_state: Mapping[str, Any], @@ -325,65 +205,8 @@ def request_params( return params - def _retry( - self, - request: requests.PreparedRequest, - retries: int, - original_exception: Exception = None, - response: requests.Response = None, - finished_at: Optional[datetime] = None, - **request_kwargs, - ): - if retries == self.max_retries: - if original_exception: - raise original_exception - raise DefaultBackoffException(request=request, response=response) - if response is not None: - sleep_time = self.backoff_time(response) - if finished_at and sleep_time: - current_retry_at = finished_at + timedelta(seconds=sleep_time) - global retry_at - if not retry_at or (retry_at < current_retry_at): - retry_at = current_retry_at - self.logger.info(f"Adding a request to be retried in {sleep_time} seconds") - self.future_requests.append( - { - "future": self._send_request(request, request_kwargs), - "request": request, - "request_kwargs": request_kwargs, - "retries": retries + 1, - } - ) - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Mapping[str, Any]]: - self.generate_future_requests(sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state) - - while len(self.future_requests) > 0: - self.logger.info("Starting another while loop iteration") - item = self.future_requests.popleft() - request, retries, future, kwargs = item["request"], item["retries"], item["future"], item["request_kwargs"] - try: - response, finished_at = future.result() - except TRANSIENT_EXCEPTIONS as exc: - self.logger.info("Will retry the request because of a transient exception") - self._retry(request=request, retries=retries, original_exception=exc, **kwargs) - continue - if self.should_retry(response): - self.logger.info("Will retry the request for other reason") - self._retry(request=request, retries=retries, response=response, finished_at=finished_at, **kwargs) - continue - self.logger.info("Request successful, will parse the response now") - yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) - - -class SourceZendeskSupportFullRefreshStream(BaseSourceZendeskSupportStream): +class FullRefreshZendeskSupportStream(BaseZendeskSupportStream): """ Endpoints don't provide the updated_at/created_at fields Thus we can't implement an incremental logic for them @@ -413,14 +236,13 @@ def request_params( stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None, ) -> MutableMapping[str, Any]: - params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - params.update({"page[size]": self.page_size}) + params = {"page[size]": self.page_size} if next_page_token: params.update(next_page_token) return params -class SourceZendeskSupportCursorPaginationStream(SourceZendeskSupportFullRefreshStream): +class IncrementalZendeskSupportStream(FullRefreshZendeskSupportStream): """ Endpoints provide a cursor pagination and sorting mechanism """ @@ -442,6 +264,42 @@ def check_stream_state(self, stream_state: Mapping[str, Any] = None): state = stream_state.get(self.cursor_field) or self._start_date if stream_state else self._start_date return calendar.timegm(pendulum.parse(state).utctimetuple()) + +class CursorPaginationZendeskSupportStream(IncrementalZendeskSupportStream): + """Zendesk Support Cursor Pagination, see https://developer.zendesk.com/api-reference/introduction/pagination/#using-cursor-pagination""" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + if self._ignore_pagination: + return None + + meta = response.json().get("meta", {}) + return {"page[after]": meta.get("after_cursor")} if meta.get("has_more") else None + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + params = { + "start_time": self.check_stream_state(stream_state), + "page[size]": self.page_size, + } + if next_page_token: + params.pop("start_time", None) + params.update(next_page_token) + return params + + +class TimeBasedPaginationZendeskSupportStream(IncrementalZendeskSupportStream): + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + if self._ignore_pagination: + return None + start_time = dict(parse_qsl(urlparse(response.json().get(self.next_page_field), "").query)).get("start_time") + if start_time != self.prev_start_time: + self.prev_start_time = start_time + return {self.cursor_field: int(start_time)} + def request_params( self, stream_state: Mapping[str, Any], @@ -457,7 +315,7 @@ def request_params( return params -class SourceZendeskIncrementalExportStream(SourceZendeskSupportCursorPaginationStream): +class SourceZendeskIncrementalExportStream(IncrementalZendeskSupportStream): """Incremental Export from Tickets stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/#incremental-ticket-export-time-based @@ -487,8 +345,10 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, """ Returns next_page_token based on `end_of_stream` parameter inside of response """ - next_page_token = super().next_page_token(response) - return None if response.json().get(END_OF_STREAM_KEY, False) else next_page_token + if self._ignore_pagination: + return None + response_json = response.json() + return None if response_json.get(END_OF_STREAM_KEY, False) else {"cursor": response_json.get("after_cursor")} def request_params( self, @@ -501,6 +361,9 @@ def request_params( params["start_time"] = self.check_start_time_param(params["start_time"]) if self.sideload_param: params["include"] = self.sideload_param + if next_page_token: + params.pop("start_time", None) + params.update(next_page_token) return params def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: @@ -524,6 +387,42 @@ class SourceZendeskSupportTicketEventsExportStream(SourceZendeskIncrementalExpor list_entities_from_event: List[str] = None event_type: str = None + def path( + self, + *, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> str: + return f"incremental/{self.response_list_name}.json" + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """ + Returns next_page_token based on `end_of_stream` parameter inside of response + """ + response_json = response.json() + return None if response_json.get(END_OF_STREAM_KEY, False) else {"start_time": response_json.get("end_time")} + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + next_page_token = next_page_token or {} + parsed_state = self.check_stream_state(stream_state) + if self.cursor_field: + params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)} + else: + params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())} + # check "start_time" is not in the future + params["start_time"] = self.check_start_time_param(params["start_time"]) + if self.sideload_param: + params["include"] = self.sideload_param + if next_page_token: + params.update(next_page_token) + return params + @property def update_event_from_record(self) -> bool: """Returns True/False based on list_entities_from_event property""" @@ -539,29 +438,14 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield event -class OrganizationMemberships(SourceZendeskSupportCursorPaginationStream): +class OrganizationMemberships(CursorPaginationZendeskSupportStream): """OrganizationMemberships stream: https://developer.zendesk.com/api-reference/ticketing/organizations/organization_memberships/""" - def request_params( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> MutableMapping[str, Any]: - params = { - "start_time": self.check_stream_state(stream_state), - "page[size]": self.page_size, - } - if next_page_token: - params.pop("start_time", None) - params.update(next_page_token) - return params - -class AuditLogs(SourceZendeskSupportCursorPaginationStream): +class AuditLogs(CursorPaginationZendeskSupportStream): """AuditLogs stream: https://developer.zendesk.com/api-reference/ticketing/account-configuration/audit_logs/#list-audit-logs""" - # can request a maximum of 1,00 results + # can request a maximum of 100 results page_size = 100 # audit_logs doesn't have the 'updated_by' field cursor_field = "created_at" @@ -572,12 +456,35 @@ class Users(SourceZendeskIncrementalExportStream): response_list_name: str = "users" + def path(self, **kwargs) -> str: + return "incremental/users/cursor.json" + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + next_page_token = next_page_token or {} + parsed_state = self.check_stream_state(stream_state) + if self.cursor_field: + params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)} + else: + params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())} + # check "start_time" is not in the future + params["start_time"] = self.check_start_time_param(params["start_time"]) + if self.sideload_param: + params["include"] = self.sideload_param + if next_page_token: + params.update(next_page_token) + return params + class Organizations(SourceZendeskSupportStream): """Organizations stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/incremental_exports/""" -class Posts(SourceZendeskSupportCursorPaginationStream): +class Posts(CursorPaginationZendeskSupportStream): """Posts stream: https://developer.zendesk.com/api-reference/help_center/help-center-api/posts/#list-posts""" use_cache = True @@ -594,13 +501,35 @@ class Tickets(SourceZendeskIncrementalExportStream): response_list_name: str = "tickets" transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) - @staticmethod - def check_start_time_param(requested_start_time: int, value: int = 1): + def path(self, **kwargs) -> str: + return "incremental/tickets/cursor.json" + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + next_page_token = next_page_token or {} + parsed_state = self.check_stream_state(stream_state) + if self.cursor_field: + params = {"start_time": next_page_token.get(self.cursor_field, parsed_state)} + else: + params = {"start_time": calendar.timegm(pendulum.parse(self._start_date).utctimetuple())} + # check "start_time" is not in the future + params["start_time"] = self.check_start_time_param(params["start_time"]) + if self.sideload_param: + params["include"] = self.sideload_param + if next_page_token: + params.update(next_page_token) + return params + + def check_start_time_param(self, requested_start_time: int, value: int = 1): """ The stream returns 400 Bad Request StartTimeTooRecent when requesting tasks 1 second before now. Figured out during experiments that the most recent time needed for request to be successful is 3 seconds before now. """ - return SourceZendeskIncrementalExportStream.check_start_time_param(requested_start_time, value=3) + return super().check_start_time_param(requested_start_time, value=3) class TicketComments(SourceZendeskSupportTicketEventsExportStream): @@ -624,7 +553,7 @@ class Groups(SourceZendeskSupportStream): """Groups stream: https://developer.zendesk.com/api-reference/ticketing/groups/groups/""" -class GroupMemberships(SourceZendeskSupportCursorPaginationStream): +class GroupMemberships(CursorPaginationZendeskSupportStream): """GroupMemberships stream: https://developer.zendesk.com/api-reference/ticketing/groups/group_memberships/""" def request_params( @@ -635,15 +564,10 @@ def request_params( ) -> MutableMapping[str, Any]: params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) params.update({"sort_by": "asc"}) - start_time = self.str2unixtime((stream_state or {}).get(self.cursor_field)) - params["start_time"] = start_time if start_time else self.str2unixtime(self._start_date) - if next_page_token: - params.pop("start_time", None) - params.update(next_page_token) return params -class SatisfactionRatings(SourceZendeskSupportCursorPaginationStream): +class SatisfactionRatings(CursorPaginationZendeskSupportStream): """ SatisfactionRatings stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/satisfaction_ratings/ """ @@ -656,10 +580,6 @@ def request_params( ) -> MutableMapping[str, Any]: params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) params.update({"sort_by": "asc"}) - start_time = self.str2unixtime((stream_state or {}).get(self.cursor_field)) - params["start_time"] = start_time if start_time else self.str2unixtime(self._start_date) - if next_page_token: - params["page"] = next_page_token return params @@ -667,36 +587,15 @@ class TicketFields(SourceZendeskSupportStream): """TicketFields stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_fields/""" -class TicketForms(SourceZendeskSupportCursorPaginationStream): +class TicketForms(TimeBasedPaginationZendeskSupportStream): """TicketForms stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_forms""" -class TicketMetrics(SourceZendeskSupportCursorPaginationStream): +class TicketMetrics(CursorPaginationZendeskSupportStream): """TicketMetric stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metrics/""" - def request_params( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> MutableMapping[str, Any]: - """ - To make the Cursor Pagination to return `after_cursor` we should follow these instructions: - https://developer.zendesk.com/documentation/api-basics/pagination/paginating-through-lists-using-cursor-pagination/#enabling-cursor-pagination - """ - params = { - "start_time": self.check_stream_state(stream_state), - "page[size]": self.page_size, - } - if next_page_token: - # when cursor pagination is used, we can pass only `after` and `page size` params, - # other params should be omitted. - params.pop("start_time", None) - params.update(next_page_token) - return params - -class TicketSkips(SourceZendeskSupportCursorPaginationStream): +class TicketSkips(CursorPaginationZendeskSupportStream): """TicketSkips stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_skips/""" response_list_name = "skips" @@ -704,23 +603,8 @@ class TicketSkips(SourceZendeskSupportCursorPaginationStream): def path(self, **kwargs): return "skips.json" - def request_params( - self, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> MutableMapping[str, Any]: - params = { - "start_time": self.check_stream_state(stream_state), - "page[size]": self.page_size, - } - if next_page_token: - params.pop("start_time", None) - params.update(next_page_token) - return params - -class TicketMetricEvents(SourceZendeskSupportCursorPaginationStream): +class TicketMetricEvents(CursorPaginationZendeskSupportStream): """ TicketMetricEvents stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_metric_events/ """ @@ -735,7 +619,7 @@ class Macros(SourceZendeskSupportStream): """Macros stream: https://developer.zendesk.com/api-reference/ticketing/business-rules/macros/""" -class TicketAudits(SourceZendeskSupportCursorPaginationStream): +class TicketAudits(IncrementalZendeskSupportStream): """TicketAudits stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_audits/""" # can request a maximum of 1,000 results @@ -768,14 +652,14 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, return {"cursor": response.json().get("before_cursor")} if response_json.get("before_cursor") else None -class Tags(SourceZendeskSupportFullRefreshStream): +class Tags(FullRefreshZendeskSupportStream): """Tags stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/tags/""" # doesn't have the 'id' field primary_key = "name" -class SlaPolicies(SourceZendeskSupportFullRefreshStream): +class SlaPolicies(FullRefreshZendeskSupportStream): """SlaPolicies stream: https://developer.zendesk.com/api-reference/ticketing/business-rules/sla_policies/""" def path(self, *args, **kwargs) -> str: @@ -790,11 +674,11 @@ def request_params( return {} -class Brands(SourceZendeskSupportFullRefreshStream): +class Brands(FullRefreshZendeskSupportStream): """Brands stream: https://developer.zendesk.com/api-reference/ticketing/account-configuration/brands/#list-brands""" -class CustomRoles(SourceZendeskSupportFullRefreshStream): +class CustomRoles(FullRefreshZendeskSupportStream): """CustomRoles stream: https://developer.zendesk.com/api-reference/ticketing/account-configuration/custom_roles/#list-custom-roles""" def request_params( @@ -806,14 +690,14 @@ def request_params( return {} -class Schedules(SourceZendeskSupportFullRefreshStream): +class Schedules(FullRefreshZendeskSupportStream): """Schedules stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/schedules/#list-schedules""" def path(self, *args, **kwargs) -> str: return "business_hours/schedules.json" -class AccountAttributes(SourceZendeskSupportFullRefreshStream): +class AccountAttributes(FullRefreshZendeskSupportStream): """Account attributes stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/skill_based_routing/#list-account-attributes""" response_list_name = "attributes" @@ -830,7 +714,7 @@ def request_params( return {} -class AttributeDefinitions(SourceZendeskSupportFullRefreshStream): +class AttributeDefinitions(FullRefreshZendeskSupportStream): """Attribute definitions stream: https://developer.zendesk.com/api-reference/ticketing/ticket-management/skill_based_routing/#list-routing-attribute-definitions""" primary_key = None @@ -855,7 +739,7 @@ def request_params( return {} -class UserSettingsStream(SourceZendeskSupportFullRefreshStream): +class UserSettingsStream(FullRefreshZendeskSupportStream): """Stream for checking of a request token and permissions""" def path(self, *args, **kwargs) -> str: @@ -885,7 +769,7 @@ def request_params( return {} -class PostComments(SourceZendeskSupportFullRefreshStream, HttpSubStream): +class PostComments(FullRefreshZendeskSupportStream, HttpSubStream): response_list_name = "comments" def __init__(self, **kwargs): @@ -903,7 +787,7 @@ def path( return f"community/posts/{post_id}/comments" -class AbstractVotes(SourceZendeskSupportFullRefreshStream, ABC): +class AbstractVotes(FullRefreshZendeskSupportStream, ABC): response_list_name = "votes" def get_json_schema(self) -> Mapping[str, Any]: diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py deleted file mode 100644 index 90c8e6efc627..000000000000 --- a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/test_futures.py +++ /dev/null @@ -1,176 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import datetime -import json -from datetime import timedelta -from urllib.parse import urljoin - -import pendulum -import pytest -import requests -import requests_mock -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException -from requests.exceptions import ConnectionError -from source_zendesk_support.source import BasicApiTokenAuthenticator -from source_zendesk_support.streams import Macros, Organizations - -STREAM_ARGS: dict = { - "subdomain": "fake-subdomain", - "start_date": "2021-01-27T00:00:00Z", - "authenticator": BasicApiTokenAuthenticator("test@airbyte.io", "api_token"), -} - - -@pytest.fixture() -def time_sleep_mock(mocker): - time_mock = mocker.patch("time.sleep", lambda x: None) - yield time_mock - - -@pytest.mark.parametrize( - "records_count,page_size,expected_futures_deque_len", - [ - (1000, 100, 10), - (1000, 10, 100), - (0, 100, 0), - (1, 100, 1), - (101, 100, 2), - ], -) -def test_proper_number_of_future_requests_generated(records_count, page_size, expected_futures_deque_len, time_sleep_mock): - stream = Macros(**STREAM_ARGS) - stream.page_size = page_size - - with requests_mock.Mocker() as m: - count_url = urljoin(stream.url_base, f"{stream.path()}/count.json") - m.get(count_url, text=json.dumps({"count": {"value": records_count}})) - records_url = urljoin(stream.url_base, stream.path()) - m.get(records_url) - stream.generate_future_requests(sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field) - assert len(stream.future_requests) == expected_futures_deque_len - - -@pytest.mark.parametrize( - "records_count,page_size,expected_futures_deque_len", - [ - (10, 10, 10), - (10, 100, 10), - (10, 10, 0), - ], -) -def test_parse_future_records(records_count, page_size, expected_futures_deque_len, time_sleep_mock): - stream = Macros(**STREAM_ARGS) - stream.page_size = page_size - expected_records = [ - {f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()} - for i in range(records_count) - ] - - with requests_mock.Mocker() as m: - count_url = urljoin(stream.url_base, f"{stream.path()}/count.json") - m.get( - count_url, - text=json.dumps({"count": {"value": records_count}}), - ) - - records_url = urljoin(stream.url_base, stream.path()) - m.get(records_url, text=json.dumps({stream.name: expected_records})) - - stream.generate_future_requests(sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field) - if not stream.future_requests and not expected_futures_deque_len: - assert len(stream.future_requests) == 0 and not expected_records - else: - response, _ = stream.future_requests[0]["future"].result() - records = list(stream.parse_response(response, stream_state=None, stream_slice=None)) - assert records == expected_records - - -@pytest.mark.parametrize( - "records_count, page_size, expected_futures_deque_len, expected_exception", - [ - (1000, 10, 100, DefaultBackoffException), - (0, 100, 0, DefaultBackoffException), - (150, 100, 2, ConnectionError), - (1, 100, 1, None), - (101, 101, 2, None), - ], -) -def test_read_records(mocker, records_count, page_size, expected_futures_deque_len, expected_exception, time_sleep_mock): - stream = Macros(**STREAM_ARGS) - stream.page_size = page_size - should_retry = bool(expected_exception) - expected_records_count = min(page_size, records_count) if should_retry else records_count - - def record_gen(start=0, end=page_size): - for i in range(start, end): - yield {f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()} - - with requests_mock.Mocker() as m: - count_url = urljoin(stream.url_base, f"{stream.path()}/count.json") - m.get(count_url, text=json.dumps({"count": {"value": records_count}})) - - records_url = urljoin(stream.url_base, stream.path()) - responses = [ - { - "status_code": 429 if should_retry else 200, - "headers": {"X-Rate-Limit": "700"}, - "text": "{}" - if should_retry - else json.dumps({"macros": list(record_gen(page * page_size, min(records_count, (page + 1) * page_size)))}), - } - for page in range(expected_futures_deque_len) - ] - m.get(records_url, responses) - - if expected_exception is ConnectionError: - mocker.patch.object(requests.Session, "send", side_effect=ConnectionError()) - if should_retry and expected_futures_deque_len: - with pytest.raises(expected_exception): - list(stream.read_records(sync_mode=SyncMode.full_refresh)) - else: - assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == list(record_gen(end=expected_records_count)) - - -def test_sleep_time(): - page_size = 100 - x_rate_limit = 10 - records_count = 350 - pages = 4 - - start = datetime.datetime.now() - stream = Organizations(**STREAM_ARGS) - stream.page_size = page_size - - def record_gen(start=0, end=100): - for i in range(start, end): - yield {f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()} - - with requests_mock.Mocker() as m: - count_url = urljoin(stream.url_base, f"{stream.path()}/count.json") - m.get(count_url, text=json.dumps({"count": {"value": records_count}})) - - records_url = urljoin(stream.url_base, stream.path()) - responses = [ - { - "status_code": 429, - "headers": {"X-Rate-Limit": str(x_rate_limit)}, - "text": "{}" - } - for _ in range(pages) - ] + [ - { - "status_code": 200, - "headers": {}, - "text": json.dumps({"organizations": list(record_gen(page * page_size, min(records_count, (page + 1) * page_size)))}) - } - for page in range(pages) - ] - m.get(records_url, responses) - records = list(stream.read_records(sync_mode=SyncMode.full_refresh)) - assert len(records) == records_count - end = datetime.datetime.now() - sleep_time = int(60 / x_rate_limit) - assert sleep_time - 1 <= (end - start).seconds <= sleep_time + 1 diff --git a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py index ef0d6a2f9d08..59d9f1b7289d 100644 --- a/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-zendesk-support/unit_tests/unit_test.py @@ -23,7 +23,7 @@ AccountAttributes, AttributeDefinitions, AuditLogs, - BaseSourceZendeskSupportStream, + BaseZendeskSupportStream, Brands, CustomRoles, GroupMemberships, @@ -37,7 +37,6 @@ Schedules, SlaPolicies, SourceZendeskIncrementalExportStream, - SourceZendeskSupportStream, Tags, TicketAudits, TicketComments, @@ -170,19 +169,19 @@ def time_sleep_mock(mocker): def test_str2datetime(): expected = datetime.strptime(DATETIME_STR, DATETIME_FORMAT) - output = BaseSourceZendeskSupportStream.str2datetime(DATETIME_STR) + output = BaseZendeskSupportStream.str2datetime(DATETIME_STR) assert output == expected def test_datetime2str(): expected = datetime.strftime(DATETIME_FROM_STR.replace(tzinfo=pytz.UTC), DATETIME_FORMAT) - output = BaseSourceZendeskSupportStream.datetime2str(DATETIME_FROM_STR) + output = BaseZendeskSupportStream.datetime2str(DATETIME_FROM_STR) assert output == expected def test_str2unixtime(): expected = calendar.timegm(DATETIME_FROM_STR.utctimetuple()) - output = BaseSourceZendeskSupportStream.str2unixtime(DATETIME_STR) + output = BaseZendeskSupportStream.str2unixtime(DATETIME_STR) assert output == expected @@ -228,14 +227,6 @@ def test_parse_response(requests_mock): assert True if entity in parsed_output else False -def test_retry(mocker): - backoff_time_mock = mocker.Mock() - with mocker.patch.object(SourceZendeskSupportStream, "backoff_time", return_value=backoff_time_mock): - stream = SourceZendeskSupportStream(**STREAM_ARGS) - stream._retry(request=mocker.Mock(), retries=0) - assert not backoff_time_mock.called, "backoff_time should not have been called" - - class TestAllStreams: @pytest.mark.parametrize( "expected_stream_cls", @@ -320,8 +311,8 @@ def test_streams(self, expected_stream_cls): (TicketMetrics, "ticket_metrics"), (TicketSkips, "skips.json"), (TicketMetricEvents, "incremental/ticket_metric_events"), - (Tickets, "incremental/tickets.json"), - (Users, "incremental/users.json"), + (Tickets, "incremental/tickets/cursor.json"), + (Users, "incremental/users/cursor.json"), (Brands, "brands"), (CustomRoles, "custom_roles"), (Schedules, "business_hours/schedules.json"), @@ -477,14 +468,12 @@ def test_next_page_token(self, stream_cls, expected, mocker): "stream_cls, expected", [ (Macros, {"start_time": 1622505600}), - (Posts, {"start_time": 1622505600}), (Organizations, {"start_time": 1622505600}), (Groups, {"start_time": 1622505600}), (TicketFields, {"start_time": 1622505600}), ], ids=[ "Macros", - "Posts", "Organizations", "Groups", "TicketFields", @@ -699,11 +688,11 @@ def test_check_stream_state(self, stream_cls, expected): @pytest.mark.parametrize( "stream_cls, expected", [ - (GroupMemberships, {"sort_by": "asc", "start_time": 1622505600}), + (GroupMemberships, {'page[size]': 100, 'sort_by': 'asc', 'start_time': 1622505600}), (TicketForms, {"start_time": 1622505600}), - (TicketMetricEvents, {"start_time": 1622505600}), + (TicketMetricEvents, {'page[size]': 100, 'start_time': 1622505600}), (TicketAudits, {"sort_by": "created_at", "sort_order": "desc", "limit": 1000}), - (SatisfactionRatings, {"sort_by": "asc", "start_time": 1622505600}), + (SatisfactionRatings, {'page[size]': 100, 'sort_by': 'asc', 'start_time': 1622505600}), (TicketMetrics, {"page[size]": 100, "start_time": 1622505600}), (OrganizationMemberships, {"page[size]": 100, "start_time": 1622505600}), (TicketSkips, {"page[size]": 100, "start_time": 1622505600}), @@ -743,22 +732,6 @@ def test_check_start_time_param(self, stream_cls): result = stream.check_start_time_param(expected) assert result == expected - @pytest.mark.parametrize( - "stream_cls, expected", - [ - (Users, "incremental/users.json"), - (Tickets, "incremental/tickets.json"), - ], - ids=[ - "Users", - "Tickets", - ], - ) - def test_path(self, stream_cls, expected): - stream = stream_cls(**STREAM_ARGS) - result = stream.path() - assert result == expected - @pytest.mark.parametrize( "stream_cls", [ @@ -776,7 +749,7 @@ def test_next_page_token(self, requests_mock, stream_cls): requests_mock.get(STREAM_URL, json={stream_name: {}}) test_response = requests.get(STREAM_URL) output = stream.next_page_token(test_response) - assert output is None + assert output == {'cursor': None} @pytest.mark.parametrize( "stream_cls, expected", @@ -920,7 +893,7 @@ def test_event_type(self, stream_cls, expected): def test_read_tickets_stream(requests_mock): requests_mock.get( - "https://subdomain.zendesk.com/api/v2/incremental/tickets.json", + "https://subdomain.zendesk.com/api/v2/incremental/tickets/cursor.json", json={ "tickets": [ {"custom_fields": []}, @@ -933,7 +906,8 @@ def test_read_tickets_stream(requests_mock): {"id": 360023712840, "value": False}, ] }, - ] + ], + "end_of_stream": True }, ) diff --git a/docs/integrations/sources/zendesk-support.md b/docs/integrations/sources/zendesk-support.md index 148cf151f903..6ae51abfbcfb 100644 --- a/docs/integrations/sources/zendesk-support.md +++ b/docs/integrations/sources/zendesk-support.md @@ -79,6 +79,7 @@ The Zendesk connector ideally should not run into Zendesk API limitations under | Version | Date | Pull Request | Subject | |:---------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `0.10.2` | 2023-07-19 | [28487](https://github.com/airbytehq/airbyte/pull/28487) | Remove extra page from params | | `0.10.1` | 2023-07-10 | [28096](https://github.com/airbytehq/airbyte/pull/28096) | Replace `offset` pagination with `cursor` pagination | | `0.10.0` | 2023-07-06 | [27991](https://github.com/airbytehq/airbyte/pull/27991) | add streams: `PostVotes`, `PostCommentVotes` | | `0.9.0` | 2023-07-05 | [27961](https://github.com/airbytehq/airbyte/pull/27961) | Add stream: `Post Comments` |