Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-26714: Fix ratelimit error and add retry for 5xx errors #155

Merged
merged 11 commits into from
Nov 27, 2024
23 changes: 12 additions & 11 deletions tap_zendesk/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


LOGGER = singer.get_logger()
# Default wait time for backoff
# Default wait time for 429 and 5xx error
DEFAULT_WAIT = 60
# Default wait time for backoff for conflict error
DEFAULT_WAIT_FOR_CONFLICT_ERROR = 10
Expand All @@ -35,7 +35,7 @@ class ZendeskForbiddenError(ZendeskError):
class ZendeskNotFoundError(ZendeskError):
pass

class ZendeskConflictError(ZendeskError):
class ZendeskConflictError(ZendeskBackoffError):
pass

class ZendeskUnprocessableEntityError(ZendeskError):
Expand Down Expand Up @@ -217,18 +217,18 @@ async def raise_for_error_for_async(response):
"""
Error handling method which throws custom error. Class for each error defined above which extends `ZendeskError`.
"""
response_json = {}
try:
response_json = await response.json()
except (ContentTypeError, ValueError) as e:
LOGGER.warning("Error decoding response from API. Exception: %s", e, exc_info=True)
Copy link
Contributor Author

@prijendev prijendev Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For every non-200 error, API does not return a valid JSON response. Showing, logger with stack trace make it very ugly.

except (ContentTypeError, ValueError):
# Invalid JSON response
response_json = {}

if response.status == 200:
return response_json
elif response.status == 429:
elif response.status == 429 or response.status >= 500:
# Get the 'Retry-After' header value, defaulting to 60 seconds if not present.
retry_after = response.headers.get("Retry-After", 1)
LOGGER.warning("Caught HTTP 429, retrying request in %s seconds", retry_after)
retry_after = int(response.headers.get("Retry-After", "0")) or DEFAULT_WAIT
LOGGER.warning("Caught HTTP %s, retrying request in %s seconds", response.status, retry_after)
# Wait for the specified time before retrying the request.
await async_sleep(int(retry_after))
elif response.status == 409:
Expand All @@ -254,16 +254,17 @@ async def raise_for_error_for_async(response):
),
),
)

DEFAULT_ERROR_OBJECT = ZendeskError if response.status < 500 else ZendeskBackoffError
exc = ERROR_CODE_EXCEPTION_MAPPING.get(response.status, {}).get(
"raise_exception", ZendeskError
"raise_exception", DEFAULT_ERROR_OBJECT
)
LOGGER.error(message)
raise exc(message, response) from None


@backoff.on_exception(
backoff.constant,
(ZendeskRateLimitError, ZendeskConflictError),
ZendeskBackoffError,
max_tries=5,
interval=0
)
Expand Down
31 changes: 20 additions & 11 deletions tap_zendesk/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import datetime
import asyncio
import time
import pytz
from zenpy.lib.exception import APIException
from aiohttp import ClientSession
Expand All @@ -18,7 +19,9 @@

DEFAULT_PAGE_SIZE = 100
REQUEST_TIMEOUT = 300
DEFAULT_BATCH_SIZE = 700
CONCURRENCY_LIMIT = 20
# Reference: https://developer.zendesk.com/api-reference/introduction/rate-limits/#endpoint-rate-limits:~:text=List%20Audits%20for,requests%20per%20minute
AUDITS_REQUEST_PER_MINUTE=450
START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
HEADERS = {
'Content-Type': 'application/json',
Expand Down Expand Up @@ -266,7 +269,6 @@ class Tickets(CursorBasedExportStream):
replication_key = "generated_timestamp"
item_key = "tickets"
endpoint = "https://{}.zendesk.com/api/v2/incremental/tickets/cursor.json"
batch_size = DEFAULT_BATCH_SIZE

def sync(self, state): #pylint: disable=too-many-statements

Expand All @@ -276,9 +278,6 @@ def sync(self, state): #pylint: disable=too-many-statements
# https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints
tickets = self.get_objects(bookmark, side_load='metric_sets')

# Run this method to set batch size to fetch ticket audits and comments records in async way
self.check_access()

audits_stream = TicketAudits(self.client, self.config)
metrics_stream = TicketMetrics(self.client, self.config)
comments_stream = TicketComments(self.client, self.config)
Expand All @@ -295,6 +294,8 @@ def emit_sub_stream_metrics(sub_stream):
LOGGER.info("Syncing ticket_audits per ticket...")

ticket_ids = []
counter = 0
start_time = time.time()
for ticket in tickets:
zendesk_metrics.capture('ticket')

Expand All @@ -306,6 +307,9 @@ def emit_sub_stream_metrics(sub_stream):
# yielding stream name with record in a tuple as it is used for obtaining only the parent records while sync
yield (self.stream, ticket)

# Skip deleted tickets because they don't have audits or comments
if ticket.get('status') == 'deleted':
continue

if metrics_stream.is_selected() and ticket.get('metric_set'):
zendesk_metrics.capture('ticket_metric')
Expand All @@ -314,7 +318,7 @@ def emit_sub_stream_metrics(sub_stream):

# Check if the number of ticket IDs has reached the batch size.
ticket_ids.append(ticket["id"])
if len(ticket_ids) >= self.batch_size:
if len(ticket_ids) >= CONCURRENCY_LIMIT:
# Process audits and comments in batches
records = self.sync_ticket_audits_and_comments(
comments_stream, audits_stream, ticket_ids)
Expand All @@ -327,6 +331,15 @@ def emit_sub_stream_metrics(sub_stream):
ticket_ids = []
# Write state after processing the batch.
singer.write_state(state)
counter+=CONCURRENCY_LIMIT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
counter+=CONCURRENCY_LIMIT
counter += CONCURRENCY_LIMIT


# Check if the number of records processed in a minute has reached the limit.
if counter >= AUDITS_REQUEST_PER_MINUTE:
# Processed max number of records in a minute. Sleep for few seconds.
# Add 2 seconds of buffer time
time.sleep(max(0, 60 - (time.time() - start_time)+2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though original code logic is correct, suggested code may offer better readability, clarity and code maintenance.

Suggested change
# Add 2 seconds of buffer time
time.sleep(max(0, 60 - (time.time() - start_time)+2))
# Calculate elapsed time
elapsed_time = time.time() - start_time
# Calculate remaining time until the next minute, plus buffer
remaining_time = max(0, 60 - elapsed_time + 2)
# Sleep for the calculated time
time.sleep(remaining_time)

start_time = time.time()
counter = 0

# Check if there are any remaining ticket IDs after the loop.
if ticket_ids:
Expand Down Expand Up @@ -357,11 +370,7 @@ def check_access(self):
start_time = datetime.datetime.strptime(self.config['start_date'], START_DATE_FORMAT).timestamp()
HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"])

response = http.call_api(url, self.request_timeout, params={'start_time': start_time, 'per_page': 1}, headers=HEADERS)

# Rate limit are varies according to the zendesk account. So, we need to set the batch size dynamically.
# https://developer.zendesk.com/api-reference/introduction/rate-limits/
self.batch_size = int(response.headers.get('x-rate-limit', DEFAULT_BATCH_SIZE))
http.call_api(url, self.request_timeout, params={'start_time': start_time, 'per_page': 1}, headers=HEADERS)


class TicketAudits(Stream):
Expand Down
106 changes: 106 additions & 0 deletions test/unittests/test_async_ticket_audits.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,112 @@ def test_sync_audits_comments_stream__both_not_selected(
# Assertions
self.assertEqual(len(result), 2)

@patch('tap_zendesk.streams.time.sleep')
@patch("tap_zendesk.streams.Tickets.update_bookmark")
@patch("tap_zendesk.streams.Tickets.get_bookmark")
@patch("tap_zendesk.streams.Tickets.get_objects")
@patch("tap_zendesk.streams.Tickets.check_access")
@patch("tap_zendesk.streams.singer.write_state")
@patch("tap_zendesk.streams.zendesk_metrics.capture")
@patch("tap_zendesk.streams.LOGGER.info")
def test_sync_for_deleted_tickets(
self,
mock_info,
mock_capture,
mock_write_state,
mock_check_access,
mock_get_objects,
mock_get_bookmark,
mock_update_bookmark,
mock_sleep
):
"""
Test that sync does not extract records for audits and comments when both of them are not selected.
"""
# Mock the necessary data
state = {}
bookmark = "2023-01-01T00:00:00Z"
tickets = [
{"id": 1, "generated_timestamp": 1672531200, "fields": "duplicate", "status": "deleted"},
{"id": 2, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 3, "generated_timestamp": 1672531200, "fields": "duplicate", "status": "deleted"},
{"id": 4, "generated_timestamp": 1672531300, "fields": "duplicate"}
]
mock_get_bookmark.return_value = bookmark
mock_get_objects.return_value = tickets
streams.AUDITS_REQUEST_PER_MINUTE = 4
streams.CONCURRENCY_LIMIT = 2

# Create an instance of the Tickets class
instance = streams.Tickets(None, {})
instance.sync_ticket_audits_and_comments = MagicMock(return_value=[
(['audit1', 'audit2'], ['comment1', 'comment2']),
(['audit3'], ['comment3']),
])

# Run the sync method
result = list(instance.sync(state))

# Assertions
self.assertEqual(mock_write_state.call_count, 2)
# 4 tickets, 3 audits, 3 comments
self.assertEqual(len(result), 10)

@patch('tap_zendesk.streams.time.sleep')
@patch("tap_zendesk.streams.Tickets.update_bookmark")
@patch("tap_zendesk.streams.Tickets.get_bookmark")
@patch("tap_zendesk.streams.Tickets.get_objects")
@patch("tap_zendesk.streams.Tickets.check_access")
@patch("tap_zendesk.streams.singer.write_state")
@patch("tap_zendesk.streams.zendesk_metrics.capture")
@patch("tap_zendesk.streams.LOGGER.info")
def test_concurrency_for_audit_stream(
self,
mock_info,
mock_capture,
mock_write_state,
mock_check_access,
mock_get_objects,
mock_get_bookmark,
mock_update_bookmark,
mock_sleep
):
"""
Test that sync does not extract records for audits and comments when both of them are not selected.
"""
# Mock the necessary data
state = {}
bookmark = "2023-01-01T00:00:00Z"
tickets = [
{"id": 1, "generated_timestamp": 1672531200, "fields": "duplicate"},
{"id": 2, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 3, "generated_timestamp": 1672531200, "fields": "duplicate"},
{"id": 4, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 5, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 6, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 7, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 8, "generated_timestamp": 1672531300, "fields": "duplicate"},
{"id": 9, "generated_timestamp": 1672531300, "fields": "duplicate"}
]
mock_get_bookmark.return_value = bookmark
mock_get_objects.return_value = tickets
streams.AUDITS_REQUEST_PER_MINUTE = 4
streams.CONCURRENCY_LIMIT = 2

# Create an instance of the Tickets class
instance = streams.Tickets(None, {})
instance.sync_ticket_audits_and_comments = MagicMock(return_value=[
(['audit1', 'audit2'], ['comment1', 'comment2']),
(['audit3', 'audit4'], ['comment3', 'comment4']),
])

# Run the sync method
result = list(instance.sync(state))

# Assertions
self.assertEqual(mock_write_state.call_count, 5)
self.assertEqual(mock_sleep.call_count, 2)

@patch("tap_zendesk.streams.zendesk_metrics.capture")
@patch("tap_zendesk.streams.LOGGER.warning")
@patch(
Expand Down
Loading