Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hot fix for unpacked value error #153

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,14 @@ jobs:
command: |
source /usr/local/share/virtualenvs/tap-zendesk/bin/activate
pip install nose2 parameterized nose2[coverage_plugin]>=0.6.5
nose2 --with-coverage -v -s test/unittests
nose2 --with-coverage --coverage=tap_zendesk -v -s test/unittests
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov

integration_tests:
executor: docker-executor
parallelism: 10
steps:
- checkout
- attach_workspace:
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 2.6.0
* Performance improvement on ticket records sync [#153](https://github.com/singer-io/tap-zendesk/pull/153)

## 2.4.0
* Upgrades to run on python 3.11.7 [#146](https://github.com/singer-io/tap-zendesk/pull/146)

Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-zendesk',
version='2.4.0',
version='2.6.0',
description='Singer.io tap for extracting data from the Zendesk API',
author='Stitch',
url='https://singer.io',
Expand All @@ -14,6 +14,7 @@
'zenpy==2.0.24',
'backoff==2.2.1',
'requests==2.31.0',
'aiohttp==3.10.10'
],
extras_require={
'dev': [
Expand Down
128 changes: 124 additions & 4 deletions tap_zendesk/http.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from time import sleep
from asyncio import sleep as async_sleep
import backoff
import requests
import singer
from requests.exceptions import Timeout, HTTPError, ChunkedEncodingError, ConnectionError
from aiohttp import ContentTypeError
from urllib3.exceptions import ProtocolError



LOGGER = singer.get_logger()

# Default wait time for backoff
DEFAULT_WAIT = 60
# Default wait time for backoff for conflict error
DEFAULT_WAIT_FOR_CONFLICT_ERROR = 10

class ZendeskError(Exception):
def __init__(self, message=None, response=None):
Expand Down Expand Up @@ -208,7 +212,122 @@ def get_offset_based(url, access_token, request_timeout, page_size, **kwargs):
yield response_json
next_url = response_json.get('next_page')

def get_incremental_export(url, access_token, request_timeout, start_time):

async def raise_for_error_for_async(response):
"""
Error handling method which throws custom error. Class for each error defined above which extends `ZendeskError`.
"""
response_json = {}
try:
response_json = await response.json()
except (ContentTypeError, ValueError) as e:
LOGGER.warning("Error decoding response from API. Exception: %s", e, exc_info=True)

if response.status == 200:
return response_json
elif response.status == 429:
# Get the 'Retry-After' header value, defaulting to 60 seconds if not present.
retry_after = response.headers.get("Retry-After", 1)
LOGGER.warning("Caught HTTP 429, retrying request in %s seconds", retry_after)
# Wait for the specified time before retrying the request.
await async_sleep(int(retry_after))
elif response.status == 409:
LOGGER.warning(
"Caught HTTP 409, retrying request in %s seconds",
DEFAULT_WAIT_FOR_CONFLICT_ERROR,
)
# Wait for the specified time before retrying the request.
await async_sleep(DEFAULT_WAIT_FOR_CONFLICT_ERROR)

# Prepare the error message and raise the appropriate exception.
if response_json.get("error"):
message = "HTTP-error-code: {}, Error: {}".format(
response.status, response_json.get("error")
)
else:
message = "HTTP-error-code: {}, Error: {}".format(
response.status,
response_json.get(
"message",
ERROR_CODE_EXCEPTION_MAPPING.get(response.status, {}).get(
"message", "Unknown Error"
),
),
)
exc = ERROR_CODE_EXCEPTION_MAPPING.get(response.status, {}).get(
"raise_exception", ZendeskError
)
LOGGER.error(message)
raise exc(message, response) from None


@backoff.on_exception(
backoff.constant,
(ZendeskRateLimitError, ZendeskConflictError),
max_tries=5,
interval=0
)
@backoff.on_exception(
backoff.expo,
(
ConnectionError,
ConnectionResetError,
Timeout,
ChunkedEncodingError,
ProtocolError,
),
max_tries=5,
factor=2,
)
async def call_api_async(session, url, request_timeout, params, headers):
"""
Perform an asynchronous GET request
"""
async with session.get(
url, params=params, headers=headers, timeout=request_timeout
) as response:
response_json = await raise_for_error_for_async(response)

return response_json


async def paginate_ticket_audits(session, url, access_token, request_timeout, page_size, **kwargs):
"""
Paginate through the ticket audits API endpoint and return the aggregated results
"""
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'Bearer {}'.format(access_token),
**kwargs.get('headers', {})
}

params = {
'per_page': page_size,
**kwargs.get('params', {})
}

# Make the initial asynchronous API call
final_response = await call_api_async(session, url, request_timeout, params=params, headers=headers)

next_url = final_response.get('next_page')

# Fetch next pages of results.
while next_url:

# An asynchronous API call to fetch the next page of results.
response = await call_api_async(session, next_url, request_timeout, params=None, headers=headers)

# Extend the final response with the audits from the current page.
final_response["audits"].extend(response["audits"])

# Get the URL for the next page
next_url = response.get('next_page')

# Return the final aggregated response
return final_response

def get_incremental_export(url, access_token, request_timeout, start_time, side_load):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
Expand All @@ -219,6 +338,7 @@ def get_incremental_export(url, access_token, request_timeout, start_time):

if not isinstance(start_time, int):
params = {'start_time': start_time.timestamp()}
params['include'] = side_load

response = call_api(url, request_timeout, params=params, headers=headers)
response_json = response.json()
Expand All @@ -230,7 +350,7 @@ def get_incremental_export(url, access_token, request_timeout, start_time):
while not end_of_stream:
cursor = response_json['after_cursor']

params = {'cursor': cursor}
params = {'cursor': cursor, "include": side_load}
# Replaced below line of code with call_api method
# response = requests.get(url, params=params, headers=headers)
# response.raise_for_status()
Expand Down
Loading