diff --git a/.circleci/config.yml b/.circleci/config.yml index 95798b9..5a6366b 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -39,8 +39,8 @@ jobs: when: always command: | source /usr/local/share/virtualenvs/tap-zendesk/bin/activate - pip install nose2 parameterized nose2[coverage_plugin]>=0.6.5 aioresponses - nose2 --with-coverage --coverage=tap_zendesk -v -s test/unittests + pip install nose2 parameterized nose2[coverage_plugin]>=0.6.5 + nose2 --with-coverage -v -s test/unittests - store_test_results: path: test_output/report.xml - store_artifacts: @@ -48,6 +48,7 @@ jobs: integration_tests: executor: docker-executor + parallelism: 10 steps: - checkout - attach_workspace: diff --git a/CHANGELOG.md b/CHANGELOG.md index cd42a25..2761de0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,5 @@ # Changelog -## 2.5.0 - * Performance improvement on ticket records sync [#150](https://github.com/singer-io/tap-zendesk/pull/150) - ## 2.4.0 * Upgrades to run on python 3.11.7 [#146](https://github.com/singer-io/tap-zendesk/pull/146) diff --git a/setup.py b/setup.py index 11594bd..d3243d5 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-zendesk', - version='2.5.0', + version='2.4.0', description='Singer.io tap for extracting data from the Zendesk API', author='Stitch', url='https://singer.io', @@ -14,7 +14,6 @@ 'zenpy==2.0.24', 'backoff==2.2.1', 'requests==2.31.0', - 'aiohttp==3.10.10' ], extras_require={ 'dev': [ diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 7e2b8f7..62e3028 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -1,16 +1,14 @@ from time import sleep -from asyncio import sleep as async_sleep import backoff import requests import singer from requests.exceptions import Timeout, HTTPError, ChunkedEncodingError, ConnectionError -from aiohttp import ContentTypeError from urllib3.exceptions import ProtocolError + LOGGER = singer.get_logger() -DEFAULT_WAIT = 60 # Default wait time for backoff -DEFAULT_WAIT_FOR_CONFLICT_ERROR = 10 # Default wait time for backoff for conflict error + class ZendeskError(Exception): def __init__(self, message=None, response=None): @@ -210,126 +208,7 @@ def get_offset_based(url, access_token, request_timeout, page_size, **kwargs): yield response_json next_url = response_json.get('next_page') - -async def raise_for_error_for_async(response): - """ - Error handling method which throws custom error. Class for each error defined above which extends `ZendeskError`. - """ - response_json = {} - try: - response_json = await response.json() - except ContentTypeError as e: - LOGGER.warning("Error decoding response from API: %s", str(e)) - except ValueError as e: - LOGGER.warning("Invalid response from API: %s", str(e)) - - if response.status == 200: - return response_json - elif response.status == 429: - # Get the 'Retry-After' header value, defaulting to 60 seconds if not present. - retry_after = response.headers.get("Retry-After", 1) - LOGGER.warning( - "Caught HTTP 429, retrying request in %s seconds", retry_after) - # Wait for the specified time before retrying the request. - await async_sleep(int(retry_after)) - # Check if the response status is 409 (Conflict). - elif response.status == 409: - LOGGER.warning( - "Caught HTTP 409, retrying request in %s seconds", - DEFAULT_WAIT_FOR_CONFLICT_ERROR, - ) - # Wait for the specified time before retrying the request. - await async_sleep(DEFAULT_WAIT_FOR_CONFLICT_ERROR) - - # Prepare the error message and raise the appropriate exception. - if response_json.get("error"): - message = "HTTP-error-code: {}, Error: {}".format( - response.status, response_json.get("error") - ) - else: - message = "HTTP-error-code: {}, Error: {}".format( - response.status, - response_json.get( - "message", - ERROR_CODE_EXCEPTION_MAPPING.get(response.status, {}).get( - "message", "Unknown Error" - ), - ), - ) - exc = ERROR_CODE_EXCEPTION_MAPPING.get(response.status, {}).get( - "raise_exception", ZendeskError - ) - LOGGER.error(message) - raise exc(message, response) from None - - -@backoff.on_exception( - backoff.constant, - (ZendeskRateLimitError, ZendeskConflictError), - max_tries=5, - interval=0 -) -@backoff.on_exception( - backoff.expo, - ( - ConnectionError, - ConnectionResetError, - Timeout, - ChunkedEncodingError, - ProtocolError, - ), - max_tries=5, - factor=2, -) -async def call_api_async(session, url, request_timeout, params, headers): - """ - Perform an asynchronous GET request - """ - async with session.get( - url, params=params, headers=headers, timeout=request_timeout - ) as response: - response_json = await raise_for_error_for_async(response) - - return response_json - - -async def paginate_ticket_audits(session, url, access_token, request_timeout, page_size, **kwargs): - """ - Paginate through the ticket audits API endpoint and return the aggregated results - """ - headers = { - 'Content-Type': 'application/json', - 'Accept': 'application/json', - 'Authorization': 'Bearer {}'.format(access_token), - **kwargs.get('headers', {}) - } - - params = { - 'per_page': page_size, - **kwargs.get('params', {}) - } - - # Make the initial asynchronous API call - final_response = await call_api_async(session, url, request_timeout, params=params, headers=headers) - - next_url = final_response.get('next_page') - - # Fetch next pages of results. - while next_url: - - # An asynchronous API call to fetch the next page of results. - response = await call_api_async(session, next_url, request_timeout, params=None, headers=headers) - - # Extend the final response with the audits from the current page. - final_response["audits"].extend(response["audits"]) - - # Get the URL for the next page - next_url = response.get('next_page') - - # Return the final aggregated response - return final_response - -def get_incremental_export(url, access_token, request_timeout, start_time, side_load): +def get_incremental_export(url, access_token, request_timeout, start_time): headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', @@ -340,7 +219,6 @@ def get_incremental_export(url, access_token, request_timeout, start_time, side_ if not isinstance(start_time, int): params = {'start_time': start_time.timestamp()} - params['include'] = side_load response = call_api(url, request_timeout, params=params, headers=headers) response_json = response.json() @@ -352,7 +230,7 @@ def get_incremental_export(url, access_token, request_timeout, start_time, side_ while not end_of_stream: cursor = response_json['after_cursor'] - params = {'cursor': cursor, "include": side_load} + params = {'cursor': cursor} # Replaced below line of code with call_api method # response = requests.get(url, params=params, headers=headers) # response.raise_for_status() diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 842991e..05f29d3 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -1,10 +1,8 @@ import os import json import datetime -import asyncio import pytz -from zenpy.lib.exception import APIException -from aiohttp import ClientSession +import zenpy import singer from singer import metadata from singer import utils @@ -18,7 +16,6 @@ DEFAULT_PAGE_SIZE = 100 REQUEST_TIMEOUT = 300 -DEFAULT_BATCH_SIZE = 700 START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" HEADERS = { 'Content-Type': 'application/json', @@ -149,13 +146,13 @@ class CursorBasedExportStream(Stream): endpoint = None item_key = None - def get_objects(self, start_time, side_load=None): + def get_objects(self, start_time): ''' Retrieve objects from the incremental exports endpoint using cursor based pagination ''' url = self.endpoint.format(self.config['subdomain']) # Pass `request_timeout` parameter - for page in http.get_incremental_export(url, self.config['access_token'], self.request_timeout, start_time, side_load): + for page in http.get_incremental_export(url, self.config['access_token'], self.request_timeout, start_time): yield from page[self.item_key] @@ -164,7 +161,7 @@ def raise_or_log_zenpy_apiexception(schema, stream, e): # access to `custom_fields` and some do not. This is the specific # error that appears to be return from the API call in the event that # it doesn't have access. - if not isinstance(e, APIException): + if not isinstance(e, zenpy.lib.exception.APIException): raise ValueError("Called with a bad exception type") from e #If read permission is not available in OAuth access_token, then it returns the below error. @@ -197,7 +194,7 @@ def _add_custom_fields(self, schema): try: field_gen = self.client.organizations._query_zendesk(endpoint.organization_fields, # pylint: disable=protected-access 'organization_field') - except APIException as e: + except zenpy.lib.exception.APIException as e: return raise_or_log_zenpy_apiexception(schema, self.name, e) schema['properties']['organization_fields']['properties'] = {} for field in field_gen: @@ -231,7 +228,7 @@ class Users(CursorBasedExportStream): def _add_custom_fields(self, schema): try: field_gen = self.client.user_fields() - except APIException as e: + except zenpy.lib.exception.APIException as e: return raise_or_log_zenpy_apiexception(schema, self.name, e) schema['properties']['user_fields']['properties'] = {} for field in field_gen: @@ -266,18 +263,12 @@ class Tickets(CursorBasedExportStream): replication_key = "generated_timestamp" item_key = "tickets" endpoint = "https://{}.zendesk.com/api/v2/incremental/tickets/cursor.json" - batch_size = DEFAULT_BATCH_SIZE def sync(self, state): #pylint: disable=too-many-statements bookmark = self.get_bookmark(state) - # Fetch tickets with side loaded metrics - # https://developer.zendesk.com/documentation/ticketing/using-the-zendesk-api/side_loading/#supported-endpoints - tickets = self.get_objects(bookmark, side_load='metric_sets') - - # Run this method to set batch size to fetch ticket audits and comments records in async way - self.check_access() + tickets = self.get_objects(bookmark) audits_stream = TicketAudits(self.client, self.config) metrics_stream = TicketMetrics(self.client, self.config) @@ -294,7 +285,6 @@ def emit_sub_stream_metrics(sub_stream): if audits_stream.is_selected(): LOGGER.info("Syncing ticket_audits per ticket...") - ticket_ids = [] for ticket in tickets: zendesk_metrics.capture('ticket') @@ -306,47 +296,44 @@ def emit_sub_stream_metrics(sub_stream): # yielding stream name with record in a tuple as it is used for obtaining only the parent records while sync yield (self.stream, ticket) - - if metrics_stream.is_selected() and ticket.get('metric_set'): - zendesk_metrics.capture('ticket_metric') - metrics_stream.count+=1 - yield (metrics_stream.stream, ticket["metric_set"]) - - # Check if the number of ticket IDs has reached the batch size. - ticket_ids.append(ticket["id"]) - if len(ticket_ids) >= self.batch_size: - # Process audits and comments in batches - records = self.sync_ticket_audits_and_comments( - comments_stream, audits_stream, ticket_ids) - for audits, comments in records: - for audit in audits: + if audits_stream.is_selected(): + try: + for audit in audits_stream.sync(ticket["id"]): yield audit - for comment in comments: + except http.ZendeskNotFoundError: + # Skip stream if ticket_audit does not found for particular ticekt_id. Earlier it throwing HTTPError + # but now as error handling updated, it throws ZendeskNotFoundError. + message = "Unable to retrieve audits for ticket (ID: {}), record not found".format(ticket['id']) + LOGGER.warning(message) + + if metrics_stream.is_selected(): + try: + for metric in metrics_stream.sync(ticket["id"]): + yield metric + except http.ZendeskNotFoundError: + # Skip stream if ticket_metric does not found for particular ticekt_id. Earlier it throwing HTTPError + # but now as error handling updated, it throws ZendeskNotFoundError. + message = "Unable to retrieve metrics for ticket (ID: {}), record not found".format(ticket['id']) + LOGGER.warning(message) + + if comments_stream.is_selected(): + try: + # add ticket_id to ticket_comment so the comment can + # be linked back to it's corresponding ticket + for comment in comments_stream.sync(ticket["id"]): yield comment - # Reset the list of ticket IDs after processing the batch. - ticket_ids = [] - # Write state after processing the batch. - singer.write_state(state) - - # Check if there are any remaining ticket IDs after the loop. - if ticket_ids: - records = self.sync_ticket_audits_and_comments(comments_stream, audits_stream, ticket_ids) - for audits, comments in records: - for audit in audits: - yield audit - for comment in comments: - yield comment + except http.ZendeskNotFoundError: + # Skip stream if ticket_comment does not found for particular ticekt_id. Earlier it throwing HTTPError + # but now as error handling updated, it throws ZendeskNotFoundError. + message = "Unable to retrieve comments for ticket (ID: {}), record not found".format(ticket['id']) + LOGGER.warning(message) + singer.write_state(state) emit_sub_stream_metrics(audits_stream) emit_sub_stream_metrics(metrics_stream) emit_sub_stream_metrics(comments_stream) singer.write_state(state) - def sync_ticket_audits_and_comments(self, comments_stream, audits_stream, ticket_ids): - if comments_stream.is_selected() or audits_stream.is_selected(): - return asyncio.run(audits_stream.sync_in_bulk(ticket_ids, comments_stream)) - return [], [] - def check_access(self): ''' Check whether the permission was given to access stream resources or not. @@ -356,11 +343,7 @@ def check_access(self): start_time = datetime.datetime.strptime(self.config['start_date'], START_DATE_FORMAT).timestamp() HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"]) - response = http.call_api(url, self.request_timeout, params={'start_time': start_time, 'per_page': 1}, headers=HEADERS) - - # Rate limit are varies according to the zendesk account. So, we need to set the batch size dynamically. - # https://developer.zendesk.com/api-reference/introduction/rate-limits/ - self.batch_size = int(response.headers.get('x-rate-limit', DEFAULT_BATCH_SIZE)) + http.call_api(url, self.request_timeout, params={'start_time': start_time, 'per_page': 1}, headers=HEADERS) class TicketAudits(Stream): @@ -370,59 +353,19 @@ class TicketAudits(Stream): endpoint='https://{}.zendesk.com/api/v2/tickets/{}/audits.json' item_key='audits' - async def sync_in_bulk(self, ticket_ids, comments_stream): - """ - Asynchronously fetch ticket audits for multiple tickets - """ - # Create an asynchronous HTTP session - async with ClientSession() as session: - tasks = [self.sync(session, ticket_id, comments_stream) - for ticket_id in ticket_ids] - # Run all tasks concurrently and wait for them to complete - return await asyncio.gather(*tasks) - - async def get_objects(self, session, ticket_id): + def get_objects(self, ticket_id): url = self.endpoint.format(self.config['subdomain'], ticket_id) - # Fetch the ticket audits using pagination - records = await http.paginate_ticket_audits(session, url, self.config['access_token'], self.request_timeout, self.page_size) - - return records[self.item_key] - - async def sync(self, session, ticket_id, comments_stream): - """ - Fetch ticket audits for a single ticket. Also exctract comments for each audit. - """ - audit_records, comment_records = [], [] - try: - # Fetch ticket audits for the given ticket ID - ticket_audits = await self.get_objects(session, ticket_id) - for ticket_audit in ticket_audits: - if self.is_selected(): - zendesk_metrics.capture('ticket_audit') - self.count += 1 - audit_records.append((self.stream, ticket_audit)) - - if comments_stream.is_selected(): - # Extract comments from ticket audit - ticket_comments = ( - event for event in ticket_audit['events'] if event['type'] == 'Comment') - zendesk_metrics.capture('ticket_comments') - for ticket_comment in ticket_comments: - # Update the comment with additional information - ticket_comment.update({ - 'created_at': ticket_audit['created_at'], - 'via': ticket_audit['via'], - 'metadata': ticket_audit['metadata'], - 'ticket_id': ticket_id - }) - - comments_stream.count += 1 - comment_records.append( - (comments_stream.stream, ticket_comment)) - except http.ZendeskNotFoundError: - return audit_records, comment_records + # Pass `request_timeout` parameter + pages = http.get_offset_based(url, self.config['access_token'], self.request_timeout, self.page_size) + for page in pages: + yield from page[self.item_key] - return audit_records, comment_records + def sync(self, ticket_id): + ticket_audits = self.get_objects(ticket_id) + for ticket_audit in ticket_audits: + zendesk_metrics.capture('ticket_audit') + self.count += 1 + yield (self.stream, ticket_audit) def check_access(self): ''' @@ -441,12 +384,30 @@ class TicketMetrics(CursorBasedStream): name = "ticket_metrics" replication_method = "INCREMENTAL" count = 0 + endpoint = 'https://{}.zendesk.com/api/v2/tickets/{}/metrics' + item_key = 'ticket_metric' + + def sync(self, ticket_id): + # Only 1 ticket metric per ticket + url = self.endpoint.format(self.config['subdomain'], ticket_id) + # Pass `request_timeout` + pages = http.get_offset_based(url, self.config['access_token'], self.request_timeout, self.page_size) + for page in pages: + zendesk_metrics.capture('ticket_metric') + self.count += 1 + yield (self.stream, page[self.item_key]) def check_access(self): ''' Check whether the permission was given to access stream resources or not. ''' - return # We load metrics as side load of tickets, so we don't need to check access + url = self.endpoint.format(self.config['subdomain'], '1') + HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"]) + try: + http.call_api(url, self.request_timeout, params={'per_page': 1}, headers=HEADERS) + except http.ZendeskNotFoundError: + #Skip 404 ZendeskNotFoundError error as goal is just to check whether TicketComments have read permission or not + pass class TicketMetricEvents(Stream): name = "ticket_metric_events" @@ -480,12 +441,35 @@ class TicketComments(Stream): name = "ticket_comments" replication_method = "INCREMENTAL" count = 0 + endpoint = "https://{}.zendesk.com/api/v2/tickets/{}/comments.json" + item_key='comments' + + def get_objects(self, ticket_id): + url = self.endpoint.format(self.config['subdomain'], ticket_id) + # Pass `request_timeout` parameter + pages = http.get_offset_based(url, self.config['access_token'], self.request_timeout, self.page_size) + + for page in pages: + yield from page[self.item_key] + + def sync(self, ticket_id): + for ticket_comment in self.get_objects(ticket_id): + self.count += 1 + zendesk_metrics.capture('ticket_comment') + ticket_comment['ticket_id'] = ticket_id + yield (self.stream, ticket_comment) def check_access(self): ''' Check whether the permission was given to access stream resources or not. ''' - return # We load comments as side load of ticket_audits, so we don't need to check access + url = self.endpoint.format(self.config['subdomain'], '1') + HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"]) + try: + http.call_api(url, self.request_timeout, params={'per_page': 1}, headers=HEADERS) + except http.ZendeskNotFoundError: + #Skip 404 ZendeskNotFoundError error as goal is to just check to whether TicketComments have read permission or not + pass class TalkPhoneNumbers(Stream): name = 'talk_phone_numbers' @@ -543,20 +527,13 @@ class Macros(CursorBasedStream): name = "macros" replication_method = "INCREMENTAL" replication_key = "updated_at" - endpoint = 'https://{}.zendesk.com/api/v2/macros/search.json' + endpoint = 'https://{}.zendesk.com/api/v2/macros' item_key = 'macros' - def get_objects(self, **kwargs): - url = self.endpoint.format(self.config['subdomain']) - # Pass `request_timeout` parameter - for page in http.get_offset_based(url, self.config['access_token'], self.request_timeout, self.page_size, **kwargs): - yield from page[self.item_key] - def sync(self, state): bookmark = self.get_bookmark(state) - params = {'query': 'updated_at>{}'.format(utils.strftime(bookmark, "%Y-%m-%dT%H:%M:%SZ"))} - macros = self.get_objects(params=params) + macros = self.get_objects() for macro in macros: if utils.strptime_with_tz(macro['updated_at']) >= bookmark: # NB: We don't trust that the records come back ordered by @@ -565,15 +542,6 @@ def sync(self, state): self.update_bookmark(state, macro['updated_at']) yield (self.stream, macro) - def check_access(self): - ''' - Check whether the permission was given to access stream resources or not. - ''' - url = self.endpoint.format(self.config['subdomain']) - HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"]) - - http.call_api(url, self.request_timeout, params={'per_page': 1, 'query': 'updated_at>=2021-10-12T04:03:53Z'}, headers=HEADERS) - class Tags(CursorBasedStream): name = "tags" replication_method = "FULL_TABLE" diff --git a/test/unittests/test_async_ticket_audits.py b/test/unittests/test_async_ticket_audits.py deleted file mode 100644 index ae2babd..0000000 --- a/test/unittests/test_async_ticket_audits.py +++ /dev/null @@ -1,221 +0,0 @@ -import unittest -from unittest.mock import patch, MagicMock -from aioresponses import aioresponses -import asyncio -from aiohttp import ClientSession - -from tap_zendesk import http, streams - - -class TestASyncTicketAudits(unittest.TestCase): - - @aioresponses() - @patch("tap_zendesk.streams.zendesk_metrics.capture") - @patch("tap_zendesk.streams.LOGGER.warning") - def test_sync_audit_comment_both_selected(self, mocked, mock_capture, mock_warning): - """ - Test that tap sync both ticket_audits and ticket_comments when both streams are selected. - """ - # Mock the necessary data - ticket_id = 1 - comments_stream = MagicMock() - comments_stream.is_selected.return_value = True - - # Mock the responses for get_objects - async def mock_get_objects(session, ticket_id): - return [ - { - "id": ticket_id, - "events": [{"type": "Comment", "id": f"comment_{ticket_id}"}], - "created_at": "2023-01-01T00:00:00Z", - "via": "web", - "metadata": {}, - } - ] - - instance = streams.TicketAudits(None, {}) - instance.stream = "ticket_audits" - - # Run the sync method - async def run_test(): - with patch.object( - streams.TicketAudits, "get_objects", side_effect=mock_get_objects - ): - async with ClientSession() as session: - audit_records, comment_records = await instance.sync( - session, ticket_id, comments_stream - ) - - # Assertions - self.assertEqual(len(audit_records), 1) - self.assertEqual(len(comment_records), 1) - self.assertEqual(audit_records[0][1]["id"], 1) - self.assertEqual( - comment_records[0][1]["id"], f"comment_{ticket_id}" - ) - self.assertEqual( - comment_records[0][1]["created_at"], "2023-01-01T00:00:00Z" - ) - self.assertEqual(comment_records[0][1]["via"], "web") - self.assertEqual(comment_records[0][1]["metadata"], {}) - self.assertEqual( - comment_records[0][1]["ticket_id"], ticket_id) - - asyncio.run(run_test()) - - @aioresponses() - @patch("tap_zendesk.streams.zendesk_metrics.capture") - @patch("tap_zendesk.streams.LOGGER.warning") - def test_sync_comment_only_selected(self, mocked, mock_capture, mock_warning): - """ - Test that tap sync just ticket_comments when only the comment stream is selected. - """ - # Mock the necessary data - ticket_id = 1 - comments_stream = MagicMock() - comments_stream.is_selected.return_value = True - - # Mock the responses for get_objects - async def mock_get_objects(session, ticket_id): - return [ - { - "id": ticket_id, - "events": [{"type": "Comment", "id": f"comment_{ticket_id}"}], - "created_at": "2023-01-01T00:00:00Z", - "via": "web", - "metadata": {}, - } - ] - - instance = streams.TicketAudits(None, {}) - - # Run the sync method - async def run_test(): - with patch.object( - streams.TicketAudits, "get_objects", side_effect=mock_get_objects - ): - async with ClientSession() as session: - audit_records, comment_records = await instance.sync( - session, ticket_id, comments_stream - ) - - # Assertions - self.assertEqual(len(audit_records), 0) - self.assertEqual(len(comment_records), 1) - - asyncio.run(run_test()) - - @aioresponses() - @patch("tap_zendesk.streams.zendesk_metrics.capture") - @patch("tap_zendesk.streams.LOGGER.warning") - def test_sync_audit_only_selected(self, mocked, mock_capture, mock_warning): - """ - Test that tap sync just ticket_audits when only the audit stream is selected. - """ - # Mock the necessary data - ticket_id = 1 - comments_stream = MagicMock() - comments_stream.is_selected.return_value = False - - # Mock the responses for get_objects - async def mock_get_objects(session, ticket_id): - return [ - { - "id": ticket_id, - "events": [{"type": "Comment", "id": f"comment_{ticket_id}"}], - "created_at": "2023-01-01T00:00:00Z", - "via": "web", - "metadata": {}, - } - ] - - instance = streams.TicketAudits(None, {}) - instance.stream = "ticket_audits" - - # Run the sync method - async def run_test(): - with patch.object( - streams.TicketAudits, "get_objects", side_effect=mock_get_objects - ): - async with ClientSession() as session: - audit_records, comment_records = await instance.sync( - session, ticket_id, comments_stream - ) - - # Assertions - self.assertEqual(len(audit_records), 1) - self.assertEqual(len(comment_records), 0) - self.assertEqual(audit_records[0][1]["id"], 1) - - asyncio.run(run_test()) - - @aioresponses() - @patch("tap_zendesk.streams.zendesk_metrics.capture") - @patch("tap_zendesk.streams.LOGGER.warning") - @patch( - "tap_zendesk.streams.TicketAudits.get_objects", - side_effect=http.ZendeskNotFoundError, - ) - def test_audit_not_found( - self, mocked, mock_capture, mock_warning, mock_get_objects - ): - """ - Test that sync handles the case where the ticket is not found. - """ - - # Mock the necessary data - ticket_id = 1 - - comments_stream = MagicMock() - comments_stream.is_selected.return_value = True - - instance = streams.TicketAudits("client", {}) - - async def run_test(): - # Run the sync method - async with ClientSession() as session: - audit_records, comment_records = await instance.sync( - session, ticket_id, comments_stream - ) - - # Assertions - self.assertEqual(len(audit_records), 0) - self.assertEqual(len(comment_records), 0) - - asyncio.run(run_test()) - - @aioresponses() - @patch("tap_zendesk.streams.zendesk_metrics.capture") - @patch("tap_zendesk.streams.LOGGER.warning") - @patch( - "tap_zendesk.streams.TicketAudits.get_objects", - side_effect=http.ZendeskInternalServerError( - "The server encountered an unexpected condition which prevented it from fulfilling the request." - ), - ) - def test_paginate_ticket_audits_exception( - self, mocked, mock_capture, mock_warning, mock_get_objects - ): - """ - Test that sync handles generic exceptions thrown by paginate_ticket_audits method. - """ - # Mock the necessary data - ticket_id = 1 - - comments_stream = MagicMock() - comments_stream.is_selected.return_value = True - - instance = streams.TicketAudits("client", {}) - - async def run_test(): - # Run the sync method - async with ClientSession() as session: - with self.assertRaises(http.ZendeskError) as context: - audit_records, comment_records = await instance.sync( - session, ticket_id, comments_stream - ) - - self.assertEqual(str( - context.exception), "The server encountered an unexpected condition which prevented it from fulfilling the request.") - - asyncio.run(run_test()) diff --git a/test/unittests/test_discovery_mode.py b/test/unittests/test_discovery_mode.py index 2d2552e..6e0b98f 100644 --- a/test/unittests/test_discovery_mode.py +++ b/test/unittests/test_discovery_mode.py @@ -61,14 +61,14 @@ def test_discovery_handles_403__raise_tap_zendesk_forbidden_error(self, mock_get ''' discover.discover_streams('dummy_client', {'subdomain': 'arp', 'access_token': 'dummy_token', 'start_date':START_DATE}) - expected_call_count = 8 + expected_call_count = 10 actual_call_count = mock_get.call_count self.assertEqual(expected_call_count, actual_call_count) # Verifying the logger message mock_logger.assert_called_with("The account credentials supplied do not have 'read' access to the following stream(s): "\ - "groups, users, organizations, ticket_audits, ticket_fields, ticket_forms, group_memberships, macros, "\ - "satisfaction_ratings, tags. The data for these streams would not be collected due to lack of required "\ + "groups, users, organizations, ticket_audits, ticket_comments, ticket_fields, ticket_forms, group_memberships, macros, "\ + "satisfaction_ratings, tags, ticket_metrics. The data for these streams would not be collected due to lack of required "\ "permission.") @patch("tap_zendesk.discover.LOGGER.warning") @@ -107,14 +107,14 @@ def test_discovery_handles_403_raise_zenpy_forbidden_error_for_access_token(self ''' discover.discover_streams('dummy_client', {'subdomain': 'arp', 'access_token': 'dummy_token', 'start_date':START_DATE}) - expected_call_count = 8 + expected_call_count = 10 actual_call_count = mock_get.call_count self.assertEqual(expected_call_count, actual_call_count) # Verifying the logger message mock_logger.assert_called_with("The account credentials supplied do not have 'read' access to the following stream(s): "\ - "groups, users, organizations, ticket_audits, ticket_fields, ticket_forms, group_memberships, macros, "\ - "satisfaction_ratings, tags, sla_policies. The data for these streams would not be collected due to "\ + "groups, users, organizations, ticket_audits, ticket_comments, ticket_fields, ticket_forms, group_memberships, macros, "\ + "satisfaction_ratings, tags, ticket_metrics, sla_policies. The data for these streams would not be collected due to "\ "lack of required permission.") @patch("tap_zendesk.discover.LOGGER.warning") @@ -132,6 +132,7 @@ def test_discovery_handles_403_raise_zenpy_forbidden_error_for_access_token(self side_effect=[ mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 1st get request call mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 2nd get request call + mocked_get(status_code=404, json={"key1": "val1"}), # Response of the 3rd get request call mocked_get(status_code=404, json={"key1": "val1"}), # Response of the 4th get request call mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 5th get request call mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 6th get request call @@ -152,7 +153,7 @@ def test_discovery_handles_403_raise_zenpy_forbidden_error_for_api_token(self, m ''' responses = discover.discover_streams('dummy_client', {'subdomain': 'arp', 'access_token': 'dummy_token', 'start_date':START_DATE}) - expected_call_count = 8 + expected_call_count = 10 actual_call_count = mock_get.call_count self.assertEqual(expected_call_count, actual_call_count) @@ -274,7 +275,7 @@ def test_discovery_handles_200_response(self, mock_get, mock_resolve_schema_refe ''' discover.discover_streams('dummy_client', {'subdomain': 'arp', 'access_token': 'dummy_token', 'start_date':START_DATE}) - expected_call_count = 8 + expected_call_count = 10 actual_call_count = mock_get.call_count self.assertEqual(expected_call_count, actual_call_count) diff --git a/test/unittests/test_exception.py b/test/unittests/test_exception.py index d4bad76..60a4dc4 100644 --- a/test/unittests/test_exception.py +++ b/test/unittests/test_exception.py @@ -2,7 +2,7 @@ from tap_zendesk import get_session from unittest import mock from pytest import raises -from tap_zendesk.streams import raise_or_log_zenpy_apiexception, APIException, json, LOGGER +from tap_zendesk.streams import raise_or_log_zenpy_apiexception, zenpy, json, LOGGER class ValueError(Exception): def __init__(self, m): @@ -21,7 +21,7 @@ def test_exception_logger(self, mocked_logger): schema = {} stream = 'test_stream' error_string = '{"error":{"message": "You do not have access to this page. Please contact the account owner of this help desk for further help."}' + "}" - e = APIException(error_string) + e = zenpy.lib.exception.APIException(error_string) raise_or_log_zenpy_apiexception(schema, stream, e) mocked_logger.assert_called_with( "The account credentials supplied do not have access to `%s` custom fields.", @@ -35,9 +35,9 @@ def test_zenpy_exception_raised(self): schema = {} stream = 'test_stream' error_string = '{"error": "invalid_token", "error_description": "The access token provided is expired, revoked, malformed or invalid for other reasons."}' - e = APIException(error_string) + e = zenpy.lib.exception.APIException(error_string) raise_or_log_zenpy_apiexception(schema, stream, e) - except APIException as ex: + except zenpy.lib.exception.APIException as ex: self.assertEqual(str(ex), error_string) @@ -49,7 +49,7 @@ def test_zenpy_exception_but_different_message_raised(self): schema = {} stream = 'test_stream' error_string = '{"error":{"message": "Could not authenticate you"}' + "}" - e = APIException(error_string) + e = zenpy.lib.exception.APIException(error_string) raise_or_log_zenpy_apiexception(schema, stream, e) - except APIException as ex: + except zenpy.lib.exception.APIException as ex: self.assertEqual(str(ex), error_string) diff --git a/test/unittests/test_http.py b/test/unittests/test_http.py index 77df5e0..99c9fcf 100644 --- a/test/unittests/test_http.py +++ b/test/unittests/test_http.py @@ -4,9 +4,7 @@ import requests from urllib3.exceptions import ProtocolError from requests.exceptions import ChunkedEncodingError, ConnectionError -from aioresponses import aioresponses -import asyncio -from aiohttp import ClientSession + import zenpy @@ -530,154 +528,3 @@ def test_call_api_handles_connection_reset_error(self, mock_get, mock_sleep): url="some_url", request_timeout=300, params={}, headers={} ) self.assertEqual(mock_get.call_count, 5) - - -class TestAPIAsync(unittest.TestCase): - - @aioresponses() - def test_call_api_async_success(self, mocked): - """ - Test that call_api_async successfully retrieves data when the response status is 200. - """ - url = 'https://api.example.com/resource' - response_data = {'key': 'value'} - mocked.get(url, status=200, payload=response_data) - - async def run_test(): - async with ClientSession() as session: - result = await http.call_api_async(session, url, 10, {}, {}) - self.assertEqual(result, response_data) - - asyncio.run(run_test()) - - @aioresponses() - @patch('asyncio.sleep', return_value=None) - def test_call_api_async_rate_limit(self, mocked, mock_sleep): - """ - Test that call_api_async retries the request when the response status is 429 (Too Many Requests). - """ - url = 'https://api.example.com/resource' - retry_after = '1' - response_data = {'key': 'value'} - mocked.get(url, status=429, headers={ - 'Retry-After': retry_after}, payload={}) - mocked.get(url, status=429, headers={ - 'Retry-After': retry_after}, payload={}) - mocked.get(url, status=429, headers={ - 'Retry-After': retry_after}, payload={}) - mocked.get(url, status=429, headers={ - 'Retry-After': retry_after}, payload={}) - mocked.get(url, status=200, payload=response_data) - - async def run_test(): - async with ClientSession() as session: - result = await http.call_api_async(session, url, 10, {}, {}) - self.assertEqual(result, response_data) - self.assertEqual(mock_sleep.call_count, 4) - - asyncio.run(run_test()) - - @aioresponses() - @patch('asyncio.sleep', return_value=None) - def test_call_api_async_rate_limit_exception_after_5_retries(self, mocked, mock_sleep): - """ - Test that call_api_async raises an exception after 5 retries when the response status is 429 (Too Many Requests). - """ - url = 'https://api.example.com/resource' - retry_after = '1' - response_data = {'key': 'value'} - mocked.get(url, status=429, headers={ - 'Retry-After': retry_after}, payload={}) - mocked.get(url, status=429, headers={ - 'Retry-After': retry_after}, payload={}) - mocked.get(url, status=429, headers={ - 'Retry-After': retry_after}, payload={}) - mocked.get(url, status=429, headers={ - 'Retry-After': retry_after}, payload={}) - mocked.get(url, status=429, headers={ - 'Retry-After': retry_after}, payload={}) - mocked.get(url, status=200, payload=response_data) - - async def run_test(): - async with ClientSession() as session: - with self.assertRaises(http.ZendeskError) as context: - await http.call_api_async(session, url, 10, {}, {}) - self.assertEqual(mock_sleep.call_count, 4) - self.assertEqual( - 'HTTP-error-code: 429, Error: The API rate limit for your organisation/application pairing has been exceeded.', str(context.exception)) - - asyncio.run(run_test()) - - @aioresponses() - @patch('asyncio.sleep', return_value=None) - def test_call_api_async_conflict(self, mocked, mock_sleep): - """ - Test that call_api_async retries the request when the response status is 409 (Conflict). - """ - url = 'https://api.example.com/resource' - response_data = {'key': 'value'} - mocked.get(url, status=409, payload={}) - mocked.get(url, status=409, payload={}) - mocked.get(url, status=409, payload={}) - mocked.get(url, status=409, payload={}) - mocked.get(url, status=200, payload=response_data) - - async def run_test(): - async with ClientSession() as session: - result = await http.call_api_async(session, url, 10, {}, {}) - self.assertEqual(result, response_data) - self.assertEqual(mock_sleep.call_count, 4) - - asyncio.run(run_test()) - - @aioresponses() - @patch('asyncio.sleep', return_value=None) - def test_call_api_async_other_error(self, mocked, mock_sleep): - """ - Test that call_api_async raises an exception for other HTTP errors (e.g., 500). - """ - url = 'https://api.example.com/resource' - error_message = 'Some error' - response_data = {'error': error_message} - mocked.get(url, status=500, payload=response_data) - - async def run_test(): - async with ClientSession() as session: - with self.assertRaises(http.ZendeskError) as context: - await http.call_api_async(session, url, 10, {}, {}) - self.assertIn(error_message, str(context.exception)) - self.assertEqual(mock_sleep.call_count, 0) - - asyncio.run(run_test()) - - @aioresponses() - def test_paginate_ticket_audits(self, mocked): - """ - Test that paginate_ticket_audits correctly paginates through multiple pages of results. - """ - url = 'https://api.example.com/resource' - access_token = 'test_token' - page_size = 2 - first_page = { - 'audits': [{'id': 1}, {'id': 2}], - 'next_page': 'https://api.example.com/resource?per_page=2' - } - second_page = { - 'audits': [{'id': 3}, {'id': 4}], - 'next_page': None - } - expected_result = { - 'audits': [{'id': 1}, {'id': 2}, {'id': 3}, {'id': 4}], - 'next_page': 'https://api.example.com/resource?per_page=2' - } - mocked.get('https://api.example.com/resource?per_page=2', - status=200, payload=first_page) - mocked.get('https://api.example.com/resource?per_page=2', - status=200, payload=second_page) - - async def run_test(): - async with ClientSession() as session: - result = await http.paginate_ticket_audits(session, url, access_token, 10, page_size) - self.assertEqual(result, expected_result) - - asyncio.run(run_test()) diff --git a/test/unittests/test_request_timeout.py b/test/unittests/test_request_timeout.py index e125dc1..785c06f 100644 --- a/test/unittests/test_request_timeout.py +++ b/test/unittests/test_request_timeout.py @@ -3,9 +3,6 @@ from tap_zendesk import http, streams import requests import datetime -from aioresponses import aioresponses -import asyncio -import aiohttp PAGINATE_RESPONSE = { 'meta': {'has_more': True, @@ -130,7 +127,7 @@ def test_get_incremental_export_handles_timeout_error(self, mock_get, mock_sleep try: responses = [response for response in http.get_incremental_export(url='some_url',access_token='some_token', - request_timeout=REQUEST_TIMEOUT, start_time= datetime.datetime.utcnow(), side_load=None)] + request_timeout=REQUEST_TIMEOUT, start_time= datetime.datetime.utcnow())] except requests.exceptions.Timeout as e: pass @@ -345,150 +342,296 @@ def test_cursor_based_export_stream_timeout_error_with_float_value(self, mock_ge # Verify the request retry 5 times on timeout self.assertEqual(mock_get.call_count, 5) - @aioresponses() - @patch('asyncio.sleep', return_value=None) - def test_ticket_audits_timeout_error_without_parameter(self, mock_get, mock_async_sleep, mock_sleep): + @patch('requests.get') + def test_ticket_audits_timeout_error_without_parameter(self, mock_get, mock_sleep): """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when `request_timeout` does not passed, """ - url = 'https://test-zendesk.zendesk.com/api/v2/tickets/1/audits.json?per_page=100' - for _ in range(5): - mock_get.get(url, exception=requests.exceptions.Timeout) + mock_get.side_effect = requests.exceptions.Timeout + ticket_audits = streams.TicketAudits(config={'subdomain': '34', 'access_token': 'df'}) + try: + responses = list(ticket_audits.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) + + @patch('requests.get') + def test_ticket_audits_timeout_error_with_zero_str_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when string "0" value of `request_timeout` passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_audits = streams.TicketAudits(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': "0"}) + try: + responses = list(ticket_audits.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - ticket_audits = streams.TicketAudits(config={'subdomain': 'test-zendesk', 'access_token': 'df'}) - async def run_test(): - async with aiohttp.ClientSession() as session: - try: - await ticket_audits.get_objects(session, 1) - except requests.exceptions.Timeout as e: - pass - - # Verify the request retry 5 times on timeout - self.assertEqual(mock_async_sleep.call_count, 4) + @patch('requests.get') + def test_ticket_audits_timeout_error_with_zero_int_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when int 0 value of `request_timeout` passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_audits = streams.TicketAudits(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': 0}) + try: + responses = list(ticket_audits.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) + + @patch('requests.get') + def test_ticket_audits_timeout_error_with_str_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when string value of `request_timeout` passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_audits = streams.TicketAudits(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT_STR}) + try: + responses = list(ticket_audits.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - asyncio.run(run_test()) + @patch('requests.get') + def test_ticket_audits_timeout_error_with_int_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when int value of `request_timeout` passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_audits = streams.TicketAudits(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT}) + try: + responses = list(ticket_audits.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass - @aioresponses() - @patch('asyncio.sleep', return_value=None) - def test_ticket_audits_timeout_error_with_zero_str_value(self, mock_get, mock_async_sleep, mock_sleep): - """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when string "0" value of `request_timeout` passed, + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) + + @patch('requests.get') + def test_ticket_audits_timeout_error_with_float_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when float value of `request_timeout` passed, """ - url = 'https://test-zendesk.zendesk.com/api/v2/tickets/1/audits.json?per_page=100' - for _ in range(5): - mock_get.get(url, exception=requests.exceptions.Timeout) - ticket_audits = streams.TicketAudits(config={'subdomain': 'test-zendesk', 'access_token': 'df', 'request_timeout': "0"}) - async def run_test(): - async with aiohttp.ClientSession() as session: - try: - await ticket_audits.get_objects(session, 1) - except requests.exceptions.Timeout as e: - pass - - # Verify the request retry 5 times on timeout - self.assertEqual(mock_async_sleep.call_count, 4) + mock_get.side_effect = requests.exceptions.Timeout + ticket_audits = streams.TicketAudits(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT_FLOAT}) + try: + responses = list(ticket_audits.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) + + @patch('requests.get') + def test_ticket_audits_timeout_error_with_empty_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when empty value of `request_timeout` passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_audits = streams.TicketAudits(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT_STR}) + try: + responses = list(ticket_audits.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) + + @patch('requests.get') + def test_ticket_metrics_timeout_error_with_empty_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when empty value of `request_timeout` passed,s + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_metrics = streams.TicketMetrics(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': ''}) + try: + responses = list(ticket_metrics.sync('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - asyncio.run(run_test()) + @patch('requests.get') + def test_ticket_metrics_timeout_error_without_parameter(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when `request_timeout` does not passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_metrics = streams.TicketMetrics(config={'subdomain': '34', 'access_token': 'df'}) + try: + responses = list(ticket_metrics.sync('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) + + @patch('requests.get') + def test_ticket_metrics_timeout_error_with_zero_str_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when string "0" value of `request_timeout` passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_metrics = streams.TicketMetrics(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': "0"}) + try: + responses = list(ticket_metrics.sync('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - @aioresponses() - @patch('asyncio.sleep', return_value=None) - def test_ticket_audits_timeout_error_with_zero_int_value(self, mock_get, mock_async_sleep, mock_sleep): + @patch('requests.get') + def test_ticket_metrics_timeout_error_with_zero_int_value(self, mock_get, mock_sleep): """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when int 0 value of `request_timeout` passed, """ - url = 'https://test-zendesk.zendesk.com/api/v2/tickets/1/audits.json?per_page=100' - for _ in range(5): - mock_get.get(url, exception=requests.exceptions.Timeout) - ticket_audits = streams.TicketAudits(config={'subdomain': 'test-zendesk', 'access_token': 'df', 'request_timeout': 0}) - async def run_test(): - async with aiohttp.ClientSession() as session: - try: - await ticket_audits.get_objects(session, 1) - except requests.exceptions.Timeout as e: - pass - - # Verify the request retry 5 times on timeout - self.assertEqual(mock_async_sleep.call_count, 4) + mock_get.side_effect = requests.exceptions.Timeout + ticket_metrics = streams.TicketMetrics(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': 0}) + try: + responses = list(ticket_metrics.sync('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - asyncio.run(run_test()) - - @aioresponses() - @patch('asyncio.sleep', return_value=None) - def test_ticket_audits_timeout_error_with_str_value(self, mock_get, mock_async_sleep, mock_sleep): + @patch('requests.get') + def test_ticket_metrics_timeout_error_with_str_value(self, mock_get, mock_sleep): """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when string value of `request_timeout` passed, """ - url = 'https://test-zendesk.zendesk.com/api/v2/tickets/1/audits.json?per_page=100' - for _ in range(5): - mock_get.get(url, exception=requests.exceptions.Timeout) - ticket_audits = streams.TicketAudits(config={'subdomain': 'test-zendesk', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT_STR}) - async def run_test(): - async with aiohttp.ClientSession() as session: - try: - await ticket_audits.get_objects(session, 1) - except requests.exceptions.Timeout as e: - pass - - # Verify the request retry 5 times on timeout - self.assertEqual(mock_async_sleep.call_count, 4) + mock_get.side_effect = requests.exceptions.Timeout + ticket_metrics = streams.TicketMetrics(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT_STR}) + try: + responses = list(ticket_metrics.sync('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - asyncio.run(run_test()) + @patch('requests.get') + def test_ticket_metrics_timeout_error_with_int_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when int value of `request_timeout` passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_metrics = streams.TicketMetrics(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT}) + try: + responses = list(ticket_metrics.sync('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - @aioresponses() - @patch('asyncio.sleep', return_value=None) - def test_ticket_audits_timeout_error_with_int_value(self, mock_get, mock_async_sleep, mock_sleep): + @patch('requests.get') + def test_ticket_metrics_timeout_error_with_float_value(self, mock_get, mock_sleep): """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when int value of `request_timeout` passed, """ - url = 'https://test-zendesk.zendesk.com/api/v2/tickets/1/audits.json?per_page=100' - for _ in range(5): - mock_get.get(url, exception=requests.exceptions.Timeout) - ticket_audits = streams.TicketAudits(config={'subdomain': 'test-zendesk', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT}) - async def run_test(): - async with aiohttp.ClientSession() as session: - try: - await ticket_audits.get_objects(session, 1) - except requests.exceptions.Timeout as e: - pass - - # Verify the request retry 5 times on timeout - self.assertEqual(mock_async_sleep.call_count, 4) + mock_get.side_effect = requests.exceptions.Timeout + ticket_metrics = streams.TicketMetrics(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT_FLOAT}) + try: + responses = list(ticket_metrics.sync('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - asyncio.run(run_test()) + @patch('requests.get') + def test_ticket_comments_timeout_error_without_parameter(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when `request_timeout` does not passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_comments = streams.TicketComments(config={'subdomain': '34', 'access_token': 'df'}) + try: + responses = list(ticket_comments.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - @aioresponses() - @patch('asyncio.sleep', return_value=None) - def test_ticket_audits_timeout_error_with_float_value(self, mock_get, mock_async_sleep, mock_sleep): - """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when float value of `request_timeout` passed, + @patch('requests.get') + def test_ticket_comments_timeout_error_with_empty_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when empty value of `request_timeout` passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_comments = streams.TicketComments(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': ''}) + try: + responses = list(ticket_comments.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) + + @patch('requests.get') + def test_ticket_comments_timeout_error_with_zero_str_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when string "0" value of `request_timeout` passed, """ - url = 'https://test-zendesk.zendesk.com/api/v2/tickets/1/audits.json?per_page=100' - for _ in range(5): - mock_get.get(url, exception=requests.exceptions.Timeout) - ticket_audits = streams.TicketAudits(config={'subdomain': 'test-zendesk', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT_FLOAT}) - async def run_test(): - async with aiohttp.ClientSession() as session: - try: - await ticket_audits.get_objects(session, 1) - except requests.exceptions.Timeout as e: - pass - - # Verify the request retry 5 times on timeout - self.assertEqual(mock_async_sleep.call_count, 4) + mock_get.side_effect = requests.exceptions.Timeout + ticket_comments = streams.TicketComments(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': "0"}) + try: + responses = list(ticket_comments.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - asyncio.run(run_test()) + @patch('requests.get') + def test_ticket_comments_timeout_error_with_int_str_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when int 0 value of `request_timeout` passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_comments = streams.TicketComments(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': 0}) + try: + responses = list(ticket_comments.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - @aioresponses() - @patch('asyncio.sleep', return_value=None) - def test_ticket_audits_timeout_error_with_empty_value(self, mock_get, mock_async_sleep, mock_sleep): - """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when empty value of `request_timeout` passed, + @patch('requests.get') + def test_ticket_comments_timeout_error_with_str_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when string value of `request_timeout` passed, """ - url = 'https://test-zendesk.zendesk.com/api/v2/tickets/1/audits.json?per_page=100' - for _ in range(5): - mock_get.get(url, exception=requests.exceptions.Timeout) - ticket_audits = streams.TicketAudits(config={'subdomain': 'test-zendesk', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT_STR}) - async def run_test(): - async with aiohttp.ClientSession() as session: - try: - await ticket_audits.get_objects(session, 1) - except requests.exceptions.Timeout as e: - pass - - # Verify the request retry 5 times on timeout - self.assertEqual(mock_async_sleep.call_count, 4) + mock_get.side_effect = requests.exceptions.Timeout + ticket_comments = streams.TicketComments(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT_STR}) + try: + responses = list(ticket_comments.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) - asyncio.run(run_test()) + @patch('requests.get') + def test_ticket_comments_timeout_error_with_float_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when float value of `request_timeout` passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_comments = streams.TicketComments(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT_FLOAT}) + try: + responses = list(ticket_comments.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) + + @patch('requests.get') + def test_ticket_comments_timeout_error_with_int_value(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 5 times when int value of `request_timeout` passed, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_comments = streams.TicketComments(config={'subdomain': '34', 'access_token': 'df', 'request_timeout': REQUEST_TIMEOUT}) + try: + responses = list(ticket_comments.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 5) diff --git a/test/unittests/test_tickets_child_stream_skip_404_error.py b/test/unittests/test_tickets_child_stream_skip_404_error.py new file mode 100644 index 0000000..8cb9a71 --- /dev/null +++ b/test/unittests/test_tickets_child_stream_skip_404_error.py @@ -0,0 +1,82 @@ +import unittest +from unittest.mock import patch +from tap_zendesk import streams, http + + +class MockClass: + def is_selected(self): + return True + + def sync(self, ticket_id): + raise http.ZendeskNotFoundError + +@patch("tap_zendesk.streams.LOGGER.warning") +@patch('tap_zendesk.streams.Stream.update_bookmark') +@patch('tap_zendesk.metrics.capture') +@patch('tap_zendesk.streams.CursorBasedExportStream.get_objects') +@patch('tap_zendesk.streams.Stream.get_bookmark') +class TestSkip404Error(unittest.TestCase): + """ + Test that child stream of tickets - ticket_audits, ticket_metrics, ticket_comments skip the 404 error. + To raise the 404 error some of the method including _empty_buffer, LOGGER.warning, _buffer_record, update_bookmark, + metrics.capture, get_objects, get_bookmark mocked. + """ + @patch('tap_zendesk.streams.TicketAudits') + def test_ticket_audits_skip_404_error(self, mock_ticket_audits, mock_get_bookmark, mock_get_object, mock_metrics, + mock_update_bookmark, mock_logger): + + ''' + Test that ticket_audits stream skip the 404 error + ''' + mock_ticket_audits.return_value = MockClass() + mock_get_object.return_value = [{'generated_timestamp': 12457845, 'fields': {}, 'id': 'i1'}] + tickets = streams.Tickets(config={'subdomain': '34', 'access_token': 'df'}) + + try: + responses = list(tickets.sync(state={})) + except AttributeError: + pass + + # verify if the LOGGER.warning was called and verify the message + mock_logger.assert_called_with("Unable to retrieve audits for ticket (ID: i1), record not found") + + @patch('tap_zendesk.streams.TicketComments') + def test_ticket_comments_skip_404_error(self, mock_ticket_comments, mock_get_bookmark, mock_get_object, mock_metrics, + mock_update_bookmark, mock_logger): + + ''' + Test that ticket_audits stream skip the 404 error + ''' + mock_ticket_comments.return_value = MockClass() + mock_get_object.return_value = [{'generated_timestamp': 12457845, 'fields': {}, 'id': 'i1'}] + tickets = streams.Tickets(config={'subdomain': '34', 'access_token': 'df'}) + + try: + responses = list(tickets.sync(state={})) + except AttributeError: + pass + + # verify if the LOGGER.warning was called and verify the message + mock_logger.assert_called_with("Unable to retrieve comments for ticket (ID: i1), record not found") + + @patch('tap_zendesk.streams.TicketMetrics') + def test_ticket_metrics_skip_404_error(self, mock_ticket_metrics, mock_get_bookmark, mock_get_object, mock_metrics, + mock_update_bookmark, mock_logger): + + ''' + Test that ticket_audits stream skip the 404 error + ''' + mock_ticket_metrics.return_value = MockClass() + mock_get_object.return_value = [{'generated_timestamp': 12457845, 'fields': {}, 'id': 'i1'}] + tickets = streams.Tickets(config={'subdomain': '34', 'access_token': 'df'}) + + try: + responses = list(tickets.sync(state={})) + self.assertEqual(responses, 1) + except AttributeError: + pass + + # verify if the LOGGER.warning was called and verify the message + mock_logger.assert_called_with("Unable to retrieve metrics for ticket (ID: i1), record not found") + + diff --git a/test/unittests/test_yield_records.py b/test/unittests/test_yield_records.py new file mode 100644 index 0000000..b9dfb6e --- /dev/null +++ b/test/unittests/test_yield_records.py @@ -0,0 +1,193 @@ +from tap_zendesk import LOGGER, oauth_auth +import unittest +from unittest import mock +from unittest.mock import patch +from tap_zendesk.streams import Stream, TicketAudits, Tickets, zendesk_metrics +from tap_zendesk.sync import sync_stream +from tap_zendesk import Zenpy +import json + +class Zenpy(): + def __init__(self) -> None: + pass + +def mocked_sync_audits(ticket_id=None): + """ + Mock the audit records which are retrieved in the sync function of the Audits stream + """ + ticket_audits = [ + { + "author_id":387494208358, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732098, + "ticket_id":2 + }, + { + "author_id":387494208358, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204898, + "ticket_id":2, + } + ] + for audit in ticket_audits: + yield ('ticket_audits', audit) + +def mocked_sync_metrics(ticket_id=None): + """ + Mock the metric records which are retrieved in the sync function of the Audits stream + """ + ticket_metrics = [ + { + "author_id":387494208358, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732090, + "ticket_id":2 + }, + { + "author_id":387494208358, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204892, + "ticket_id":2, + } + ] + for metric in ticket_metrics: + yield ('ticket_metrics', metric) + +def mocked_sync_comments(ticket_id=None): + """ + Mock the comment records which are retrieved in the sync function of the Audits stream + """ + ticket_comments = [ + { + "author_id":387494208356, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732090, + "ticket_id":2 + }, + { + "author_id":387494208354, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204892, + "ticket_id":2, + } + ] + for comment in ticket_comments: + yield ('ticket_comments', comment) + +def logger(logger, point): + return "test stream" + +@mock.patch('tap_zendesk.streams.Stream.update_bookmark') +@mock.patch('tap_zendesk.streams.Stream.get_bookmark') +@mock.patch('tap_zendesk.streams.TicketAudits.is_selected') +@mock.patch('tap_zendesk.streams.TicketMetrics.is_selected') +@mock.patch('tap_zendesk.streams.TicketComments.is_selected') +@mock.patch('tap_zendesk.streams.TicketAudits.sync') +@mock.patch('tap_zendesk.streams.TicketMetrics.sync') +@mock.patch('tap_zendesk.streams.TicketComments.sync') +@mock.patch('tap_zendesk.streams.CursorBasedExportStream.get_objects') +@mock.patch('tap_zendesk.streams.TicketAudits.stream') +@mock.patch('tap_zendesk.streams.TicketComments.stream') +@mock.patch('tap_zendesk.streams.TicketMetrics.stream') +@mock.patch('singer.metrics.log') +def test_yield_records(mocked_log, mocked_audits_stream, mocked_comments_stream, mocked_metrics_stream, mock_objects, mock_comments_sync, mock_metrics_sync, mock_audits_sync, mock_comments, mock_metrics, mock_audits, mock_get_bookmark, mock_update_bookmark): + """ + This function tests that the Tickets and its substreams' records are yielded properly. + """ + ticket_stream = Tickets(Zenpy(), {}) + # mocked ticket record for get_objects() function + tickets = [{ + "url":"https://talend1234.zendesk.com/api/v2/tickets/1.json", + "id":2, + "external_id":"None", + "created_at":"2021-10-11T12:12:31Z", + "updated_at":"2021-10-12T08:37:28Z", + "requester_id":387331462257, + "submitter_id":387494208358, + "assignee_id":387494208358, + "organization_id":"None", + "group_id":360010350357, + "due_at":"None", + "ticket_form_id":360003740737, + "brand_id":360004806057, + "generated_timestamp":1634027848, + "fields": [] + }] + mock_objects.return_value = tickets + + # expected audit records after yield + expected_audits = [ + { + "author_id":387494208358, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732098, + "ticket_id":2 + }, + { + "author_id":387494208358, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204898, + "ticket_id":2, + } + ] + + # expected metric records after yield + expected_metrics = [ + { + "author_id":387494208358, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732090, + "ticket_id":2 + }, + { + "author_id":387494208358, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204892, + "ticket_id":2, + } + ] + + # expected comment records after yield + expected_comments = [ + { + "author_id":387494208356, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732090, + "ticket_id":2 + }, + { + "author_id":387494208354, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204892, + "ticket_id":2, + } + ] + mock_metrics.return_value = True + mock_audits.return_value = True + mock_comments.return_value = True + mock_update_bookmark.side_effect = None + mock_metrics_sync.side_effect = mocked_sync_metrics + mock_audits_sync.side_effect = mocked_sync_audits + mock_comments_sync.side_effect = mocked_sync_comments + + expected_tickets = list(ticket_stream.sync(state={})) + audits = [] + metrics = [] + comments = [] + + # the yield returns a list with the first element as the parent stream tickets record + # and other elements as a tuple with the first element as the name of the stream and the second element + # as the record of that stream. Hence we are checking if each element of the stream and appending in our + # custom list and asserting all the lists at last. + for count, each in enumerate(expected_tickets): + if count == 0: + continue + if each[0] == 'ticket_audits': + audits.append(each[1]) + if each[0] == 'ticket_metrics': + metrics.append(each[1]) + if each[0] == 'ticket_comments': + comments.append(each[1]) + assert expected_audits == audits + assert expected_metrics == metrics + assert expected_comments == comments