From 8e9fddfc0d0ee5c2f84c0a19927ae37f0eef41c0 Mon Sep 17 00:00:00 2001 From: prijendev Date: Wed, 20 Nov 2024 16:46:07 +0530 Subject: [PATCH 01/11] Fix 502 and 429 error --- tap_zendesk/http.py | 20 ++-- tap_zendesk/streams.py | 9 +- test/unittests/test_http.py | 223 ++++++++++++++++++++++++++++++++---- 3 files changed, 214 insertions(+), 38 deletions(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 079a93a..f8c7bfc 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -9,7 +9,7 @@ LOGGER = singer.get_logger() -# Default wait time for backoff +# Default wait time for 429 and 5xx error DEFAULT_WAIT = 60 # Default wait time for backoff for conflict error DEFAULT_WAIT_FOR_CONFLICT_ERROR = 10 @@ -35,7 +35,7 @@ class ZendeskForbiddenError(ZendeskError): class ZendeskNotFoundError(ZendeskError): pass -class ZendeskConflictError(ZendeskError): +class ZendeskConflictError(ZendeskBackoffError): pass class ZendeskUnprocessableEntityError(ZendeskError): @@ -217,18 +217,18 @@ async def raise_for_error_for_async(response): """ Error handling method which throws custom error. Class for each error defined above which extends `ZendeskError`. """ - response_json = {} try: response_json = await response.json() except (ContentTypeError, ValueError) as e: - LOGGER.warning("Error decoding response from API. Exception: %s", e, exc_info=True) + # Invalid JSON response + response_json = {} if response.status == 200: return response_json - elif response.status == 429: + elif response.status == 429 or response.status >= 500: # Get the 'Retry-After' header value, defaulting to 60 seconds if not present. - retry_after = response.headers.get("Retry-After", 1) - LOGGER.warning("Caught HTTP 429, retrying request in %s seconds", retry_after) + retry_after = int(response.headers.get("Retry-After", "0")) or DEFAULT_WAIT + LOGGER.warning("Caught HTTP %s, retrying request in %s seconds", response.status, retry_after) # Wait for the specified time before retrying the request. await async_sleep(int(retry_after)) elif response.status == 409: @@ -254,8 +254,10 @@ async def raise_for_error_for_async(response): ), ), ) + + DEFAULT_ERROR_OBJECT = ZendeskError if response.status < 500 else ZendeskBackoffError exc = ERROR_CODE_EXCEPTION_MAPPING.get(response.status, {}).get( - "raise_exception", ZendeskError + "raise_exception", DEFAULT_ERROR_OBJECT ) LOGGER.error(message) raise exc(message, response) from None @@ -263,7 +265,7 @@ async def raise_for_error_for_async(response): @backoff.on_exception( backoff.constant, - (ZendeskRateLimitError, ZendeskConflictError), + ZendeskBackoffError, max_tries=5, interval=0 ) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 993d921..02463a5 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -18,7 +18,7 @@ DEFAULT_PAGE_SIZE = 100 REQUEST_TIMEOUT = 300 -DEFAULT_BATCH_SIZE = 700 +DEFAULT_BATCH_SIZE = 20 START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" HEADERS = { 'Content-Type': 'application/json', @@ -276,9 +276,6 @@ def sync(self, state): #pylint: disable=too-many-statements # https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints tickets = self.get_objects(bookmark, side_load='metric_sets') - # Run this method to set batch size to fetch ticket audits and comments records in async way - self.check_access() - audits_stream = TicketAudits(self.client, self.config) metrics_stream = TicketMetrics(self.client, self.config) comments_stream = TicketComments(self.client, self.config) @@ -359,10 +356,6 @@ def check_access(self): response = http.call_api(url, self.request_timeout, params={'start_time': start_time, 'per_page': 1}, headers=HEADERS) - # Rate limit are varies according to the zendesk account. So, we need to set the batch size dynamically. - # https://developer.zendesk.com/api-reference/introduction/rate-limits/ - self.batch_size = int(response.headers.get('x-rate-limit', DEFAULT_BATCH_SIZE)) - class TicketAudits(Stream): name = "ticket_audits" diff --git a/test/unittests/test_http.py b/test/unittests/test_http.py index 45ea291..68f4eea 100644 --- a/test/unittests/test_http.py +++ b/test/unittests/test_http.py @@ -552,14 +552,14 @@ async def run_test(): asyncio.run(run_test()) - @patch("asyncio.sleep", return_value=None) + @patch("tap_zendesk.http.async_sleep") @patch("aiohttp.ClientSession.get") def test_call_api_async_rate_limit(self, mocked, mock_sleep): """ - Test that call_api_async retries the request when the response status is 429 (Too Many Requests). + Test that call_api_async retries the request when the response status is 429 (Too Many Requests) and Retry-After header is present with value 10. """ url = "https://api.example.com/resource" - retry_after = "1" + retry_after = "10" response_data = {"key": "value"} mock_error_response = AsyncMock() mock_error_response.status = 429 @@ -575,6 +575,67 @@ def test_call_api_async_rate_limit(self, mocked, mock_sleep): mock_response, ] + async def run_test(): + async with ClientSession() as session: + result = await http.call_api_async(session, url, 10, {}, {}) + self.assertEqual(result, response_data) + + asyncio.run(run_test()) + mock_sleep.assert_called_with(10) + + @patch("tap_zendesk.http.async_sleep") + @patch("aiohttp.ClientSession.get") + def test_call_api_async_rate_limit_zero_retry_after(self, mocked, mock_sleep): + """ + Test that call_api_async retries the request when the response status is 429 (Too Many Requests) and Retry-After header is present with value 0 + """ + url = "https://api.example.com/resource" + retry_after = "0" + response_data = {"key": "value"} + mock_error_response = AsyncMock() + mock_error_response.status = 429 + mock_error_response.headers = {"Retry-After": retry_after} + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json.return_value = response_data + mocked.return_value.__aenter__.side_effect = [ + mock_error_response, + mock_error_response, + mock_error_response, + mock_error_response, + mock_response, + ] + + async def run_test(): + async with ClientSession() as session: + result = await http.call_api_async(session, url, 10, {}, {}) + self.assertEqual(result, response_data) + + asyncio.run(run_test()) + mock_sleep.assert_called_with(60) + + @patch("tap_zendesk.http.async_sleep") + @patch("aiohttp.ClientSession.get") + def test_call_api_async_rate_limit_retry_after_missing_header(self, mocked, mock_sleep): + """ + Test that call_api_async retries the request when the response status is 429 (Too Many Requests) and Retry-After header is missing. + """ + url = "https://api.example.com/resource" + response_data = {"key": "value"} + mock_error_response = AsyncMock() + mock_error_response.status = 429 + mock_error_response.headers = {} + mock_response = AsyncMock() + mock_response.status = 200 + mock_response.json.return_value = response_data + mocked.return_value.__aenter__.side_effect = [ + mock_error_response, + mock_error_response, + mock_error_response, + mock_error_response, + mock_response, + ] + async def run_test(): async with ClientSession() as session: result = await http.call_api_async(session, url, 10, {}, {}) @@ -582,8 +643,9 @@ async def run_test(): self.assertEqual(mock_sleep.call_count, 4) asyncio.run(run_test()) + mock_sleep.assert_called_with(60) - @patch("asyncio.sleep", return_value=None) + @patch("tap_zendesk.http.async_sleep") @patch("aiohttp.ClientSession.get") def test_call_api_async_rate_limit_exception_after_5_retries( self, mocked, mock_sleep @@ -593,14 +655,10 @@ def test_call_api_async_rate_limit_exception_after_5_retries( """ url = "https://api.example.com/resource" retry_after = "1" - response_data = {"key": "value"} mock_error_response = AsyncMock() mock_error_response.status = 429 mock_error_response.headers = {"Retry-After": retry_after} mock_error_response.json.return_value = {} - mock_response = AsyncMock() - mock_response.status = 200 - mock_response.json.return_value = response_data mocked.return_value.__aenter__.side_effect = [ mock_error_response, mock_error_response, @@ -609,11 +667,12 @@ def test_call_api_async_rate_limit_exception_after_5_retries( mock_error_response, ] + async def run_test(): async with ClientSession() as session: - with self.assertRaises(http.ZendeskError) as context: + with self.assertRaises(http.ZendeskRateLimitError) as context: await http.call_api_async(session, url, 10, {}, {}) - self.assertEqual(mock_sleep.call_count, 4) + self.assertEqual(mock_sleep.call_count, 5) self.assertEqual( "HTTP-error-code: 429, Error: The API rate limit for your organisation/application pairing has been exceeded.", str(context.exception), @@ -621,7 +680,7 @@ async def run_test(): asyncio.run(run_test()) - @patch("asyncio.sleep", return_value=None) + @patch("tap_zendesk.http.async_sleep") @patch("aiohttp.ClientSession.get") def test_call_api_async_conflict(self, mocked, mock_sleep): """ @@ -651,25 +710,147 @@ async def run_test(): asyncio.run(run_test()) - @patch("asyncio.sleep", return_value=None) + @patch("tap_zendesk.http.async_sleep") @patch("aiohttp.ClientSession.get") - def test_call_api_async_other_error(self, mocked, mock_sleep): + def test_call_api_async_conflict_after_5_retries(self, mocked, mock_sleep): """ - Test that call_api_async raises an exception for other HTTP errors (e.g., 500). + Test that call_api_async retries the request when the response status is 409 (Conflict) with backoff. """ url = "https://api.example.com/resource" - error_message = "Some error" + response_data = {"key": "value"} + mock_error_response = AsyncMock() + mock_error_response.status = 409 + mock_error_response.json.return_value = {} + mocked.return_value.__aenter__.side_effect = [ + mock_error_response, + mock_error_response, + mock_error_response, + mock_error_response, + mock_error_response, + ] + + async def run_test(): + async with ClientSession() as session: + with self.assertRaises(http.ZendeskConflictError) as context: + await http.call_api_async(session, url, 10, {}, {}) + self.assertEqual(mock_sleep.call_count, 5) + self.assertEqual( + "HTTP-error-code: 409, Error: The API request cannot be completed because the requested operation would conflict with an existing item.", + str(context.exception), + ) + + asyncio.run(run_test()) + + @patch("tap_zendesk.http.async_sleep") + @patch("aiohttp.ClientSession.get") + def test_call_api_async_500_error_backoff(self, mocked, mock_sleep): + """ + Test that call_api_async raises an exception for 500 (Internal Server Error) after 5 retries. + """ + url = "https://api.example.com/resource" + error_message = "Internal Server Error" response_data = {"error": error_message} - mock_response = AsyncMock() - mock_response.status = 500 - mock_response.json.return_value = response_data - mocked.return_value.__aenter__.return_value = mock_response + mock_error_response = AsyncMock() + mock_error_response.status = 500 + mock_error_response.headers = {} + mock_error_response.json.return_value = response_data + mocked.return_value.__aenter__.side_effect = [ + mock_error_response, + mock_error_response, + mock_error_response, + mock_error_response, + mock_error_response, + ] + + async def run_test(): + async with ClientSession() as session: + with self.assertRaises(http.ZendeskInternalServerError) as context: + await http.call_api_async(session, url, 10, {}, {}) + self.assertEqual('HTTP-error-code: 500, Error: Internal Server Error', str(context.exception)) + self.assertEqual(mock_sleep.call_count, 5) + + asyncio.run(run_test()) + + @patch("tap_zendesk.http.async_sleep") + @patch("aiohttp.ClientSession.get") + def test_call_api_async_502_error_backoff(self, mocked, mock_sleep): + """ + Test that call_api_async raises an exception for 502 (Bad Gateway Error) after 5 retries. + """ + url = "https://api.example.com/resource" + error_message = "Bad Gateway Error" + response_data = {"error": error_message} + mock_error_response = AsyncMock() + mock_error_response.status = 502 + mock_error_response.headers = {} + mock_error_response.json.return_value = response_data + mocked.return_value.__aenter__.side_effect = [ + mock_error_response, + mock_error_response, + mock_error_response, + mock_error_response, + mock_error_response, + ] + + async def run_test(): + async with ClientSession() as session: + with self.assertRaises(http.ZendeskBadGatewayError) as context: + await http.call_api_async(session, url, 10, {}, {}) + self.assertEqual('HTTP-error-code: 502, Error: Bad Gateway Error', str(context.exception)) + self.assertEqual(mock_sleep.call_count, 5) + + asyncio.run(run_test()) + + @patch("tap_zendesk.http.async_sleep") + @patch("aiohttp.ClientSession.get") + def test_call_api_async_524_error_backoff(self, mocked, mock_sleep): + """ + Test that call_api_async raises an exception for 524 (Unknown Error) after 5 retries. + """ + url = "https://api.example.com/resource" + error_message = "Unknown Error" + response_data = {"error": error_message} + mock_error_response = AsyncMock() + mock_error_response.status = 524 + mock_error_response.headers = {} + mock_error_response.json.return_value = response_data + mocked.return_value.__aenter__.side_effect = [ + mock_error_response, + mock_error_response, + mock_error_response, + mock_error_response, + mock_error_response, + ] + + async def run_test(): + async with ClientSession() as session: + with self.assertRaises(http.ZendeskBackoffError) as context: + await http.call_api_async(session, url, 10, {}, {}) + self.assertEqual('HTTP-error-code: 524, Error: Unknown Error', str(context.exception)) + self.assertEqual(mock_sleep.call_count, 5) + + asyncio.run(run_test()) + + @patch("tap_zendesk.http.async_sleep") + @patch("aiohttp.ClientSession.get") + def test_call_api_async_400(self, mocked, mock_sleep): + """ + Test that call_api_async raises an exception for 401 (Bad Request) responses without retrying. + """ + url = "https://api.example.com/resource" + error_message = "Bad Request" + response_data = {"error": error_message} + mock_error_response = AsyncMock() + mock_error_response.status = 400 + mock_error_response.headers = {} + mock_error_response.json.return_value = response_data + mocked.return_value.__aenter__.return_value = mock_error_response async def run_test(): async with ClientSession() as session: - with self.assertRaises(http.ZendeskError) as context: + with self.assertRaises(http.ZendeskBadRequestError) as context: await http.call_api_async(session, url, 10, {}, {}) - self.assertIn(error_message, str(context.exception)) + self.assertEqual('HTTP-error-code: 400, Error: Bad Request', str(context.exception)) self.assertEqual(mock_sleep.call_count, 0) asyncio.run(run_test()) From c311aff3e14e64bc94408599a98fe7f720ad7353 Mon Sep 17 00:00:00 2001 From: prijendev Date: Wed, 20 Nov 2024 16:54:56 +0530 Subject: [PATCH 02/11] Fix pylint issue --- tap_zendesk/http.py | 2 +- tap_zendesk/streams.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index f8c7bfc..4cde321 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -219,7 +219,7 @@ async def raise_for_error_for_async(response): """ try: response_json = await response.json() - except (ContentTypeError, ValueError) as e: + except (ContentTypeError, ValueError): # Invalid JSON response response_json = {} diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 02463a5..c290a39 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -354,7 +354,7 @@ def check_access(self): start_time = datetime.datetime.strptime(self.config['start_date'], START_DATE_FORMAT).timestamp() HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"]) - response = http.call_api(url, self.request_timeout, params={'start_time': start_time, 'per_page': 1}, headers=HEADERS) + http.call_api(url, self.request_timeout, params={'start_time': start_time, 'per_page': 1}, headers=HEADERS) class TicketAudits(Stream): From f9e77c601534931cd7a32819702faa57c3ce399a Mon Sep 17 00:00:00 2001 From: prijendev Date: Wed, 20 Nov 2024 17:01:46 +0530 Subject: [PATCH 03/11] Replace batch size with concurrency_limit --- tap_zendesk/streams.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index c290a39..668d0dc 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -18,7 +18,7 @@ DEFAULT_PAGE_SIZE = 100 REQUEST_TIMEOUT = 300 -DEFAULT_BATCH_SIZE = 20 +CONCURRENCY_LIMIT = 20 START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" HEADERS = { 'Content-Type': 'application/json', @@ -266,7 +266,7 @@ class Tickets(CursorBasedExportStream): replication_key = "generated_timestamp" item_key = "tickets" endpoint = "https://{}.zendesk.com/api/v2/incremental/tickets/cursor.json" - batch_size = DEFAULT_BATCH_SIZE + concurrency_limit = CONCURRENCY_LIMIT def sync(self, state): #pylint: disable=too-many-statements @@ -311,7 +311,7 @@ def emit_sub_stream_metrics(sub_stream): # Check if the number of ticket IDs has reached the batch size. ticket_ids.append(ticket["id"]) - if len(ticket_ids) >= self.batch_size: + if len(ticket_ids) >= self.concurrency_limit: # Process audits and comments in batches records = self.sync_ticket_audits_and_comments( comments_stream, audits_stream, ticket_ids) From 7dfda05d114a500a8c556eb3897f74de6c46a14a Mon Sep 17 00:00:00 2001 From: prijendev Date: Thu, 21 Nov 2024 15:09:07 +0530 Subject: [PATCH 04/11] Fix rate limit issue --- tap_zendesk/streams.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 668d0dc..0e060e0 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -2,6 +2,7 @@ import json import datetime import asyncio +import time import pytz from zenpy.lib.exception import APIException from aiohttp import ClientSession @@ -19,6 +20,8 @@ DEFAULT_PAGE_SIZE = 100 REQUEST_TIMEOUT = 300 CONCURRENCY_LIMIT = 20 +# Reference: https://developer.zendesk.com/api-reference/introduction/rate-limits/#endpoint-rate-limits:~:text=List%20Audits%20for,requests%20per%20minute +AUDITS_REQUEST_PER_MINUTE=500 START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" HEADERS = { 'Content-Type': 'application/json', @@ -266,7 +269,6 @@ class Tickets(CursorBasedExportStream): replication_key = "generated_timestamp" item_key = "tickets" endpoint = "https://{}.zendesk.com/api/v2/incremental/tickets/cursor.json" - concurrency_limit = CONCURRENCY_LIMIT def sync(self, state): #pylint: disable=too-many-statements @@ -292,6 +294,8 @@ def emit_sub_stream_metrics(sub_stream): LOGGER.info("Syncing ticket_audits per ticket...") ticket_ids = [] + counter = 0 + start_time = time.time() for ticket in tickets: zendesk_metrics.capture('ticket') @@ -311,7 +315,7 @@ def emit_sub_stream_metrics(sub_stream): # Check if the number of ticket IDs has reached the batch size. ticket_ids.append(ticket["id"]) - if len(ticket_ids) >= self.concurrency_limit: + if len(ticket_ids) >= CONCURRENCY_LIMIT: # Process audits and comments in batches records = self.sync_ticket_audits_and_comments( comments_stream, audits_stream, ticket_ids) @@ -324,6 +328,15 @@ def emit_sub_stream_metrics(sub_stream): ticket_ids = [] # Write state after processing the batch. singer.write_state(state) + counter+=CONCURRENCY_LIMIT + + # Check if the number of records processed in a minute has reached the limit. + if counter >= AUDITS_REQUEST_PER_MINUTE-CONCURRENCY_LIMIT: + # Processed max number of records in a minute. Sleep for few seconds. + # Add 2 seconds of buffer time + time.sleep(max(0, 60 - (time.time() - start_time)+2)) + start_time = time.time() + counter = 0 # Check if there are any remaining ticket IDs after the loop. if ticket_ids: From e2052347d8e99d796d29c5b121b2ef3bd1666e44 Mon Sep 17 00:00:00 2001 From: prijendev Date: Fri, 22 Nov 2024 14:19:39 +0530 Subject: [PATCH 05/11] Remove 404 log --- tap_zendesk/http.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 4cde321..8fa7462 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -259,7 +259,6 @@ async def raise_for_error_for_async(response): exc = ERROR_CODE_EXCEPTION_MAPPING.get(response.status, {}).get( "raise_exception", DEFAULT_ERROR_OBJECT ) - LOGGER.error(message) raise exc(message, response) from None From a1d299ef1538243b75f8cbd5e5741fdcd6f0fa01 Mon Sep 17 00:00:00 2001 From: prijendev Date: Mon, 25 Nov 2024 09:24:06 +0530 Subject: [PATCH 06/11] Increase buffer wait time --- tap_zendesk/streams.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 0e060e0..3d2d9f5 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -21,7 +21,7 @@ REQUEST_TIMEOUT = 300 CONCURRENCY_LIMIT = 20 # Reference: https://developer.zendesk.com/api-reference/introduction/rate-limits/#endpoint-rate-limits:~:text=List%20Audits%20for,requests%20per%20minute -AUDITS_REQUEST_PER_MINUTE=500 +AUDITS_REQUEST_PER_MINUTE=450 START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" HEADERS = { 'Content-Type': 'application/json', @@ -331,10 +331,10 @@ def emit_sub_stream_metrics(sub_stream): counter+=CONCURRENCY_LIMIT # Check if the number of records processed in a minute has reached the limit. - if counter >= AUDITS_REQUEST_PER_MINUTE-CONCURRENCY_LIMIT: + if counter >= AUDITS_REQUEST_PER_MINUTE: # Processed max number of records in a minute. Sleep for few seconds. # Add 2 seconds of buffer time - time.sleep(max(0, 60 - (time.time() - start_time)+2)) + time.sleep(max(0, 60 - (time.time() - start_time)+5)) start_time = time.time() counter = 0 From 116497c14b532e433d29e1574c04a0c7c306a148 Mon Sep 17 00:00:00 2001 From: prijendev Date: Mon, 25 Nov 2024 12:01:39 +0530 Subject: [PATCH 07/11] Skip deleted tickets --- tap_zendesk/streams.py | 6 ++- test/unittests/test_async_ticket_audits.py | 55 ++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 3d2d9f5..5cf7d19 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -313,6 +313,10 @@ def emit_sub_stream_metrics(sub_stream): metrics_stream.count+=1 yield (metrics_stream.stream, ticket["metric_set"]) + # Skip deleted tickets because they don't have audits or comments + if ticket.get('status') == 'deleted': + continue + # Check if the number of ticket IDs has reached the batch size. ticket_ids.append(ticket["id"]) if len(ticket_ids) >= CONCURRENCY_LIMIT: @@ -334,7 +338,7 @@ def emit_sub_stream_metrics(sub_stream): if counter >= AUDITS_REQUEST_PER_MINUTE: # Processed max number of records in a minute. Sleep for few seconds. # Add 2 seconds of buffer time - time.sleep(max(0, 60 - (time.time() - start_time)+5)) + time.sleep(max(0, 60 - (time.time() - start_time)+2)) start_time = time.time() counter = 0 diff --git a/test/unittests/test_async_ticket_audits.py b/test/unittests/test_async_ticket_audits.py index 9905fdd..5123f54 100644 --- a/test/unittests/test_async_ticket_audits.py +++ b/test/unittests/test_async_ticket_audits.py @@ -187,6 +187,61 @@ def test_sync_audits_comments_stream__both_not_selected( # Assertions self.assertEqual(len(result), 2) + @patch('tap_zendesk.streams.time.sleep') + @patch("tap_zendesk.streams.Tickets.update_bookmark") + @patch("tap_zendesk.streams.Tickets.get_bookmark") + @patch("tap_zendesk.streams.Tickets.get_objects") + @patch("tap_zendesk.streams.Tickets.check_access") + @patch("tap_zendesk.streams.singer.write_state") + @patch("tap_zendesk.streams.zendesk_metrics.capture") + @patch("tap_zendesk.streams.LOGGER.info") + def test_concurrency_for_audit_stream( + self, + mock_info, + mock_capture, + mock_write_state, + mock_check_access, + mock_get_objects, + mock_get_bookmark, + mock_update_bookmark, + mock_sleep + ): + """ + Test that sync does not extract records for audits and comments when both of them are not selected. + """ + # Mock the necessary data + state = {} + bookmark = "2023-01-01T00:00:00Z" + tickets = [ + {"id": 1, "generated_timestamp": 1672531200, "fields": "duplicate"}, + {"id": 2, "generated_timestamp": 1672531300, "fields": "duplicate"}, + {"id": 3, "generated_timestamp": 1672531200, "fields": "duplicate"}, + {"id": 4, "generated_timestamp": 1672531300, "fields": "duplicate"}, + {"id": 5, "generated_timestamp": 1672531300, "fields": "duplicate"}, + {"id": 6, "generated_timestamp": 1672531300, "fields": "duplicate"}, + {"id": 7, "generated_timestamp": 1672531300, "fields": "duplicate"}, + {"id": 8, "generated_timestamp": 1672531300, "fields": "duplicate"}, + {"id": 9, "generated_timestamp": 1672531300, "fields": "duplicate"} + ] + mock_get_bookmark.return_value = bookmark + mock_get_objects.return_value = tickets + streams.AUDITS_REQUEST_PER_MINUTE = 4 + streams.CONCURRENCY_LIMIT = 2 + + # Create an instance of the Tickets class + instance = streams.Tickets(None, {}) + instance.sync_ticket_audits_and_comments = MagicMock(return_value=[ + (['audit1', 'audit2'], ['comment1', 'comment2']), + (['audit3', 'audit4'], ['comment3', 'comment4']), + ]) + + # Run the sync method + result = list(instance.sync(state)) + + # Assertions + self.assertEqual(mock_write_state.call_count, 5) + self.assertEqual(mock_sleep.call_count, 2) + @patch("tap_zendesk.streams.zendesk_metrics.capture") @patch("tap_zendesk.streams.LOGGER.warning") @patch( From 56c7afd1fdf8e451c9e2f21ef2aea4e29267b93d Mon Sep 17 00:00:00 2001 From: prijendev Date: Mon, 25 Nov 2024 12:10:35 +0530 Subject: [PATCH 08/11] Move forward inf condition --- tap_zendesk/streams.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 5cf7d19..317157f 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -307,16 +307,15 @@ def emit_sub_stream_metrics(sub_stream): # yielding stream name with record in a tuple as it is used for obtaining only the parent records while sync yield (self.stream, ticket) + # Skip deleted tickets because they don't have audits or comments + if ticket.get('status') == 'deleted': + continue if metrics_stream.is_selected() and ticket.get('metric_set'): zendesk_metrics.capture('ticket_metric') metrics_stream.count+=1 yield (metrics_stream.stream, ticket["metric_set"]) - # Skip deleted tickets because they don't have audits or comments - if ticket.get('status') == 'deleted': - continue - # Check if the number of ticket IDs has reached the batch size. ticket_ids.append(ticket["id"]) if len(ticket_ids) >= CONCURRENCY_LIMIT: From ecf4bf09efc921c75af5ba4f909643a6dacd44eb Mon Sep 17 00:00:00 2001 From: prijendev Date: Mon, 25 Nov 2024 14:15:49 +0530 Subject: [PATCH 09/11] Add unit tests for deleted tickets --- test/unittests/test_async_ticket_audits.py | 51 ++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/test/unittests/test_async_ticket_audits.py b/test/unittests/test_async_ticket_audits.py index 5123f54..da574c7 100644 --- a/test/unittests/test_async_ticket_audits.py +++ b/test/unittests/test_async_ticket_audits.py @@ -187,6 +187,57 @@ def test_sync_audits_comments_stream__both_not_selected( # Assertions self.assertEqual(len(result), 2) + @patch('tap_zendesk.streams.time.sleep') + @patch("tap_zendesk.streams.Tickets.update_bookmark") + @patch("tap_zendesk.streams.Tickets.get_bookmark") + @patch("tap_zendesk.streams.Tickets.get_objects") + @patch("tap_zendesk.streams.Tickets.check_access") + @patch("tap_zendesk.streams.singer.write_state") + @patch("tap_zendesk.streams.zendesk_metrics.capture") + @patch("tap_zendesk.streams.LOGGER.info") + def test_sync_for_deleted_tickets( + self, + mock_info, + mock_capture, + mock_write_state, + mock_check_access, + mock_get_objects, + mock_get_bookmark, + mock_update_bookmark, + mock_sleep + ): + """ + Test that sync does not extract records for audits and comments when both of them are not selected. + """ + # Mock the necessary data + state = {} + bookmark = "2023-01-01T00:00:00Z" + tickets = [ + {"id": 1, "generated_timestamp": 1672531200, "fields": "duplicate", "status": "deleted"}, + {"id": 2, "generated_timestamp": 1672531300, "fields": "duplicate"}, + {"id": 3, "generated_timestamp": 1672531200, "fields": "duplicate", "status": "deleted"}, + {"id": 4, "generated_timestamp": 1672531300, "fields": "duplicate"} + ] + mock_get_bookmark.return_value = bookmark + mock_get_objects.return_value = tickets + streams.AUDITS_REQUEST_PER_MINUTE = 4 + streams.CONCURRENCY_LIMIT = 2 + + # Create an instance of the Tickets class + instance = streams.Tickets(None, {}) + instance.sync_ticket_audits_and_comments = MagicMock(return_value=[ + (['audit1', 'audit2'], ['comment1', 'comment2']), + (['audit3'], ['comment3']), + ]) + + # Run the sync method + result = list(instance.sync(state)) + + # Assertions + self.assertEqual(mock_write_state.call_count, 2) + # 4 tickets, 3 audits, 3 comments + self.assertEqual(len(result), 10) + @patch('tap_zendesk.streams.time.sleep') @patch("tap_zendesk.streams.Tickets.update_bookmark") @patch("tap_zendesk.streams.Tickets.get_bookmark") From 8a366ea9b7826f42752df3b590d879e328e590a1 Mon Sep 17 00:00:00 2001 From: prijendev Date: Mon, 25 Nov 2024 17:13:11 +0530 Subject: [PATCH 10/11] Address review comments --- tap_zendesk/streams.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 317157f..c408e5a 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -21,7 +21,7 @@ REQUEST_TIMEOUT = 300 CONCURRENCY_LIMIT = 20 # Reference: https://developer.zendesk.com/api-reference/introduction/rate-limits/#endpoint-rate-limits:~:text=List%20Audits%20for,requests%20per%20minute -AUDITS_REQUEST_PER_MINUTE=450 +AUDITS_REQUEST_PER_MINUTE = 450 START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" HEADERS = { 'Content-Type': 'application/json', @@ -331,13 +331,18 @@ def emit_sub_stream_metrics(sub_stream): ticket_ids = [] # Write state after processing the batch. singer.write_state(state) - counter+=CONCURRENCY_LIMIT + counter += CONCURRENCY_LIMIT # Check if the number of records processed in a minute has reached the limit. if counter >= AUDITS_REQUEST_PER_MINUTE: - # Processed max number of records in a minute. Sleep for few seconds. - # Add 2 seconds of buffer time - time.sleep(max(0, 60 - (time.time() - start_time)+2)) + # Calculate elapsed time + elapsed_time = time.time() - start_time + + # Calculate remaining time until the next minute, plus buffer of 2 more seconds + remaining_time = max(0, 60 - elapsed_time + 2) + + # Sleep for the calculated time + time.sleep(remaining_time) start_time = time.time() counter = 0 From 4e114c9e118f4b3035c524d0a33c74d1c00112f4 Mon Sep 17 00:00:00 2001 From: Prijen Khokhani <88327452+prijendev@users.noreply.github.com> Date: Tue, 26 Nov 2024 17:25:30 +0530 Subject: [PATCH 11/11] Fix pylint --- tap_zendesk/streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index c408e5a..dbbf364 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -337,10 +337,10 @@ def emit_sub_stream_metrics(sub_stream): if counter >= AUDITS_REQUEST_PER_MINUTE: # Calculate elapsed time elapsed_time = time.time() - start_time - + # Calculate remaining time until the next minute, plus buffer of 2 more seconds remaining_time = max(0, 60 - elapsed_time + 2) - + # Sleep for the calculated time time.sleep(remaining_time) start_time = time.time()