Skip to content

Commit

Permalink
Revert "TDL-24687: Enhance tap performance (#150)"
Browse files Browse the repository at this point in the history
This reverts commit f75a006.
prijendev committed Nov 8, 2024
1 parent f75a006 commit 1b0881c
Showing 12 changed files with 663 additions and 775 deletions.
5 changes: 3 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -39,15 +39,16 @@ jobs:
when: always
command: |
source /usr/local/share/virtualenvs/tap-zendesk/bin/activate
pip install nose2 parameterized nose2[coverage_plugin]>=0.6.5 aioresponses
nose2 --with-coverage --coverage=tap_zendesk -v -s test/unittests
pip install nose2 parameterized nose2[coverage_plugin]>=0.6.5
nose2 --with-coverage -v -s test/unittests
- store_test_results:
path: test_output/report.xml
- store_artifacts:
path: htmlcov

integration_tests:
executor: docker-executor
parallelism: 10
steps:
- checkout
- attach_workspace:
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
# Changelog

## 2.5.0
* Performance improvement on ticket records sync [#150](https://github.com/singer-io/tap-zendesk/pull/150)

## 2.4.0
* Upgrades to run on python 3.11.7 [#146](https://github.com/singer-io/tap-zendesk/pull/146)

3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-zendesk',
version='2.5.0',
version='2.4.0',
description='Singer.io tap for extracting data from the Zendesk API',
author='Stitch',
url='https://singer.io',
@@ -14,7 +14,6 @@
'zenpy==2.0.24',
'backoff==2.2.1',
'requests==2.31.0',
'aiohttp==3.10.10'
],
extras_require={
'dev': [
130 changes: 4 additions & 126 deletions tap_zendesk/http.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from time import sleep
from asyncio import sleep as async_sleep
import backoff
import requests
import singer
from requests.exceptions import Timeout, HTTPError, ChunkedEncodingError, ConnectionError
from aiohttp import ContentTypeError
from urllib3.exceptions import ProtocolError



LOGGER = singer.get_logger()
DEFAULT_WAIT = 60 # Default wait time for backoff
DEFAULT_WAIT_FOR_CONFLICT_ERROR = 10 # Default wait time for backoff for conflict error


class ZendeskError(Exception):
def __init__(self, message=None, response=None):
@@ -210,126 +208,7 @@ def get_offset_based(url, access_token, request_timeout, page_size, **kwargs):
yield response_json
next_url = response_json.get('next_page')


async def raise_for_error_for_async(response):
"""
Error handling method which throws custom error. Class for each error defined above which extends `ZendeskError`.
"""
response_json = {}
try:
response_json = await response.json()
except ContentTypeError as e:
LOGGER.warning("Error decoding response from API: %s", str(e))
except ValueError as e:
LOGGER.warning("Invalid response from API: %s", str(e))

if response.status == 200:
return response_json
elif response.status == 429:
# Get the 'Retry-After' header value, defaulting to 60 seconds if not present.
retry_after = response.headers.get("Retry-After", 1)
LOGGER.warning(
"Caught HTTP 429, retrying request in %s seconds", retry_after)
# Wait for the specified time before retrying the request.
await async_sleep(int(retry_after))
# Check if the response status is 409 (Conflict).
elif response.status == 409:
LOGGER.warning(
"Caught HTTP 409, retrying request in %s seconds",
DEFAULT_WAIT_FOR_CONFLICT_ERROR,
)
# Wait for the specified time before retrying the request.
await async_sleep(DEFAULT_WAIT_FOR_CONFLICT_ERROR)

# Prepare the error message and raise the appropriate exception.
if response_json.get("error"):
message = "HTTP-error-code: {}, Error: {}".format(
response.status, response_json.get("error")
)
else:
message = "HTTP-error-code: {}, Error: {}".format(
response.status,
response_json.get(
"message",
ERROR_CODE_EXCEPTION_MAPPING.get(response.status, {}).get(
"message", "Unknown Error"
),
),
)
exc = ERROR_CODE_EXCEPTION_MAPPING.get(response.status, {}).get(
"raise_exception", ZendeskError
)
LOGGER.error(message)
raise exc(message, response) from None


@backoff.on_exception(
backoff.constant,
(ZendeskRateLimitError, ZendeskConflictError),
max_tries=5,
interval=0
)
@backoff.on_exception(
backoff.expo,
(
ConnectionError,
ConnectionResetError,
Timeout,
ChunkedEncodingError,
ProtocolError,
),
max_tries=5,
factor=2,
)
async def call_api_async(session, url, request_timeout, params, headers):
"""
Perform an asynchronous GET request
"""
async with session.get(
url, params=params, headers=headers, timeout=request_timeout
) as response:
response_json = await raise_for_error_for_async(response)

return response_json


async def paginate_ticket_audits(session, url, access_token, request_timeout, page_size, **kwargs):
"""
Paginate through the ticket audits API endpoint and return the aggregated results
"""
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': 'Bearer {}'.format(access_token),
**kwargs.get('headers', {})
}

params = {
'per_page': page_size,
**kwargs.get('params', {})
}

# Make the initial asynchronous API call
final_response = await call_api_async(session, url, request_timeout, params=params, headers=headers)

next_url = final_response.get('next_page')

# Fetch next pages of results.
while next_url:

# An asynchronous API call to fetch the next page of results.
response = await call_api_async(session, next_url, request_timeout, params=None, headers=headers)

# Extend the final response with the audits from the current page.
final_response["audits"].extend(response["audits"])

# Get the URL for the next page
next_url = response.get('next_page')

# Return the final aggregated response
return final_response

def get_incremental_export(url, access_token, request_timeout, start_time, side_load):
def get_incremental_export(url, access_token, request_timeout, start_time):
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
@@ -340,7 +219,6 @@ def get_incremental_export(url, access_token, request_timeout, start_time, side_

if not isinstance(start_time, int):
params = {'start_time': start_time.timestamp()}
params['include'] = side_load

response = call_api(url, request_timeout, params=params, headers=headers)
response_json = response.json()
@@ -352,7 +230,7 @@ def get_incremental_export(url, access_token, request_timeout, start_time, side_
while not end_of_stream:
cursor = response_json['after_cursor']

params = {'cursor': cursor, "include": side_load}
params = {'cursor': cursor}
# Replaced below line of code with call_api method
# response = requests.get(url, params=params, headers=headers)
# response.raise_for_status()
Loading

0 comments on commit 1b0881c

Please sign in to comment.