Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Zendesk: sync rate improvement #9456

Merged
Merged
Changes from 12 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@

from setuptools import find_packages, setup

MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1.36", "pytz"]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1.36", "pytz", "requests-futures~=1.0.0", "pendulum~=2.1.2"]

TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "requests-mock==1.9.3"]

Original file line number Diff line number Diff line change
@@ -144,7 +144,10 @@
"type": ["null", "integer"]
},
"value": {
"type": ["null", "string"]
"type": ["null", "string", "array"],
"items": {
"type": ["null", "string"]
}
},
"author_id": {
"type": ["null", "integer"]
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
from source_zendesk_support.streams import SourceZendeskException

from .streams import (
Brands,
@@ -20,7 +21,6 @@
SatisfactionRatings,
Schedules,
SlaPolicies,
SourceZendeskException,
Tags,
TicketAudits,
TicketComments,
@@ -68,7 +68,7 @@ def check_connection(self, logger, config) -> Tuple[bool, any]:
auth = self.get_authenticator(config)
settings = None
try:
settings = UserSettingsStream(config["subdomain"], authenticator=auth).get_settings()
settings = UserSettingsStream(config["subdomain"], authenticator=auth, start_date=None).get_settings()
except requests.exceptions.RequestException as e:
return False, e

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
import json
from datetime import timedelta
from urllib.parse import urljoin

import pendulum
import pytest
import requests_mock
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http.exceptions import DefaultBackoffException
from source_zendesk_support.source import BasicApiTokenAuthenticator
from source_zendesk_support.streams import Tickets


@pytest.fixture(scope="module")
def stream_args():
return {
"subdomain": "fake-subdomain",
"start_date": "2021-01-27T00:00:00Z",
"authenticator": BasicApiTokenAuthenticator("test@airbyte.io", "api_token"),
}


@pytest.mark.parametrize(
"records_count,page_size,expected_futures_deque_len",
[
(1000, 100, 10),
(1000, 10, 100),
(0, 100, 0),
(1, 100, 1),
(101, 100, 2),
],
)
def test_proper_number_of_future_requests_generated(stream_args, records_count, page_size, expected_futures_deque_len):
stream = Tickets(**stream_args)
stream.page_size = page_size

with requests_mock.Mocker() as m:
count_url = urljoin(stream.url_base, f"{stream.path()}/count.json")
m.get(count_url, text=json.dumps({"count": {"value": records_count}}))

records_url = urljoin(stream.url_base, stream.path())
m.get(records_url)

stream.generate_future_requests(sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field)

assert len(stream.future_requests) == expected_futures_deque_len


@pytest.mark.parametrize(
"records_count,page_size,expected_futures_deque_len",
[
(1000, 100, 10),
(1000, 10, 100),
(0, 100, 0),
(1, 100, 1),
(101, 100, 2),
],
)
def test_parse_future_records(stream_args, records_count, page_size, expected_futures_deque_len):
stream = Tickets(**stream_args)
stream.page_size = page_size
expected_records = [
{f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()}
for i in range(records_count)
]

with requests_mock.Mocker() as m:
count_url = urljoin(stream.url_base, f"{stream.path()}/count.json")
m.get(
count_url,
text=json.dumps({"count": {"value": records_count}}),
)

records_url = urljoin(stream.url_base, stream.path())
m.get(records_url, text=json.dumps({stream.name: expected_records}))

stream.generate_future_requests(sync_mode=SyncMode.full_refresh, cursor_field=stream.cursor_field)
if not stream.future_requests and not expected_futures_deque_len:
assert len(stream.future_requests) == 0 and not expected_records
else:
response = stream.future_requests[0]["future"].result()
records = list(stream.parse_response(response, stream_state=None, stream_slice=None))
assert records == expected_records


@pytest.mark.parametrize(
"records_count,page_size,expected_futures_deque_len,should_retry",
[
(1000, 100, 10, True),
(1000, 10, 100, True),
(0, 100, 0, True),
(1, 100, 1, False),
(101, 100, 2, False),
],
)
def test_read_records(stream_args, records_count, page_size, expected_futures_deque_len, should_retry):
stream = Tickets(**stream_args)
stream.page_size = page_size
expected_records = [
{f"key{i}": f"val{i}", stream.cursor_field: (pendulum.parse("2020-01-01") + timedelta(days=i)).isoformat()}
for i in range(page_size)
]

with requests_mock.Mocker() as m:
count_url = urljoin(stream.url_base, f"{stream.path()}/count.json")
m.get(count_url, text=json.dumps({"count": {"value": records_count}}))

records_url = urljoin(stream.url_base, stream.path())

m.get(records_url, status_code=429 if should_retry else 200, headers={"X-Rate-Limit": "700"})

if should_retry and expected_futures_deque_len:
with pytest.raises(DefaultBackoffException):
list(stream.read_records(sync_mode=SyncMode.full_refresh))
else:
assert list(stream.read_records(sync_mode=SyncMode.full_refresh)) == expected_records