diff --git a/.circleci/config.yml b/.circleci/config.yml index 0cf1a65..7430186 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -14,12 +14,19 @@ jobs: virtualenv -p python3 /usr/local/share/virtualenvs/tap-zendesk source /usr/local/share/virtualenvs/tap-zendesk/bin/activate pip install .[test] + pip install coverage - run: name: 'pylint' command: | source /usr/local/share/virtualenvs/tap-zendesk/bin/activate - make test + pylint tap_zendesk -d missing-docstring,invalid-name,line-too-long,too-many-locals,too-few-public-methods,fixme,stop-iteration-return,too-many-branches,useless-import-alias,no-else-return,logging-not-lazy + nosetests --with-coverage --cover-erase --cover-package=tap_zendesk --cover-html-dir=htmlcov test/unittests + coverage html - add_ssh_keys + - store_test_results: + path: test_output/report.xml + - store_artifacts: + path: htmlcov - run: name: 'Integration Tests' command: | diff --git a/setup.py b/setup.py index 9085ff7..6bd0117 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,7 @@ 'pylint==2.8.3', 'nose', 'nose-watch', + 'pytest' ] }, entry_points=''' diff --git a/tap_zendesk/__init__.py b/tap_zendesk/__init__.py index 6982a0c..048ee55 100755 --- a/tap_zendesk/__init__.py +++ b/tap_zendesk/__init__.py @@ -15,6 +15,7 @@ LOGGER = singer.get_logger() + REQUIRED_CONFIG_KEYS = [ "start_date", "subdomain", @@ -46,9 +47,9 @@ def request_metrics_patch(self, method, url, **kwargs): Session.request = request_metrics_patch # end patch -def do_discover(client): +def do_discover(client, config): LOGGER.info("Starting discover") - catalog = {"streams": discover_streams(client)} + catalog = {"streams": discover_streams(client, config)} json.dump(catalog, sys.stdout, indent=2) LOGGER.info("Finished discover") @@ -199,7 +200,8 @@ def main(): LOGGER.error("""No suitable authentication keys provided.""") if parsed_args.discover: - do_discover(client) + # passing the config to check the authentication in the do_discover method + do_discover(client, parsed_args.config) elif parsed_args.catalog: state = parsed_args.state do_sync(client, parsed_args.catalog, state, parsed_args.config) diff --git a/tap_zendesk/discover.py b/tap_zendesk/discover.py index 0bac705..99f31b0 100644 --- a/tap_zendesk/discover.py +++ b/tap_zendesk/discover.py @@ -1,7 +1,11 @@ import os import json import singer +import zenpy from tap_zendesk.streams import STREAMS +from tap_zendesk.http import ZendeskForbiddenError + +LOGGER = singer.get_logger() def get_abs_path(path): return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) @@ -20,12 +24,44 @@ def load_shared_schema_refs(): return shared_schema_refs -def discover_streams(client): +def discover_streams(client, config): streams = [] + error_list = [] refs = load_shared_schema_refs() - for s in STREAMS.values(): - s = s(client) - schema = singer.resolve_schema_references(s.load_schema(), refs) - streams.append({'stream': s.name, 'tap_stream_id': s.name, 'schema': schema, 'metadata': s.load_metadata()}) + + for stream in STREAMS.values(): + # for each stream in the `STREAMS` check if the user has the permission to access the data of that stream + stream = stream(client, config) + schema = singer.resolve_schema_references(stream.load_schema(), refs) + try: + # Here it call the check_access method to check whether stream have read permission or not. + # If stream does not have read permission then append that stream name to list and at the end of all streams + # raise forbidden error with proper message containing stream names. + stream.check_access() + except ZendeskForbiddenError as e: + error_list.append(stream.name) # Append stream name to the error_list + except zenpy.lib.exception.APIException as e: + args0 = json.loads(e.args[0]) + err = args0.get('error') + + # check if the error is of type dictionary and the message retrieved from the dictionary + # is the expected message. If so, only then print the logger message and return the schema + if isinstance(err, dict): + if err.get('message', None) == "You do not have access to this page. Please contact the account owner of this help desk for further help.": + error_list.append(stream.name) + elif args0.get('description') == "You are missing the following required scopes: read": + error_list.append(stream.name) + else: + raise e from None # raise error if it is other than 403 forbidden error + + streams.append({'stream': stream.name, 'tap_stream_id': stream.name, 'schema': schema, 'metadata': stream.load_metadata()}) + + if error_list: + streams_name = ", ".join(error_list) + message = "HTTP-error-code: 403, Error: You are missing the following required scopes: read. "\ + "The account credentials supplied do not have read access for the following stream(s): {}".format(streams_name) + raise ZendeskForbiddenError(message) + + return streams diff --git a/tap_zendesk/http.py b/tap_zendesk/http.py index 469e060..b4d2b4a 100644 --- a/tap_zendesk/http.py +++ b/tap_zendesk/http.py @@ -2,32 +2,154 @@ import backoff import requests import singer +from requests.exceptions import Timeout, HTTPError + LOGGER = singer.get_logger() +class ZendeskError(Exception): + def __init__(self, message=None, response=None): + super().__init__(message) + self.message = message + self.response = response + +class ZendeskBackoffError(ZendeskError): + pass + +class ZendeskBadRequestError(ZendeskError): + pass + +class ZendeskUnauthorizedError(ZendeskError): + pass + +class ZendeskForbiddenError(ZendeskError): + pass + +class ZendeskNotFoundError(ZendeskError): + pass + +class ZendeskConflictError(ZendeskError): + pass + +class ZendeskUnprocessableEntityError(ZendeskError): + pass + +class ZendeskRateLimitError(ZendeskBackoffError): + pass + +class ZendeskInternalServerError(ZendeskBackoffError): + pass + +class ZendeskNotImplementedError(ZendeskBackoffError): + pass + +class ZendeskBadGatewayError(ZendeskBackoffError): + pass + +class ZendeskServiceUnavailableError(ZendeskBackoffError): + pass + +ERROR_CODE_EXCEPTION_MAPPING = { + 400: { + "raise_exception": ZendeskBadRequestError, + "message": "A validation exception has occurred." + }, + 401: { + "raise_exception": ZendeskUnauthorizedError, + "message": "The access token provided is expired, revoked, malformed or invalid for other reasons." + }, + 403: { + "raise_exception": ZendeskForbiddenError, + "message": "You are missing the following required scopes: read" + }, + 404: { + "raise_exception": ZendeskNotFoundError, + "message": "The resource you have specified cannot be found." + }, + 409: { + "raise_exception": ZendeskConflictError, + "message": "The API request cannot be completed because the requested operation would conflict with an existing item." + }, + 422: { + "raise_exception": ZendeskUnprocessableEntityError, + "message": "The request content itself is not processable by the server." + }, + 429: { + "raise_exception": ZendeskRateLimitError, + "message": "The API rate limit for your organisation/application pairing has been exceeded." + }, + 500: { + "raise_exception": ZendeskInternalServerError, + "message": "The server encountered an unexpected condition which prevented" \ + " it from fulfilling the request." + }, + 501: { + "raise_exception": ZendeskNotImplementedError, + "message": "The server does not support the functionality required to fulfill the request." + }, + 502: { + "raise_exception": ZendeskBadGatewayError, + "message": "Server received an invalid response." + }, + 503: { + "raise_exception": ZendeskServiceUnavailableError, + "message": "API service is currently unavailable." + } +} def is_fatal(exception): status_code = exception.response.status_code - if status_code == 429: - sleep_time = int(exception.response.headers['Retry-After']) - LOGGER.info("Caught HTTP 429, retrying request in %s seconds", sleep_time) - sleep(sleep_time) - return False - - return 400 <= status_code < 500 + if status_code in [429, 503]: + # If status_code is 429 or 503 then checking whether response header has 'Retry-After' attribute or not. + # If response header has 'Retry-After' attribute then retry the error otherwise raise the error directly. + retry_after = exception.response.headers.get('Retry-After') + if retry_after: + sleep_time = int(retry_after) + LOGGER.info("Caught HTTP %s, retrying request in %s seconds", status_code, sleep_time) + sleep(sleep_time) + return False + else: + return True + + return 400 <=status_code < 500 + +def raise_for_error(response): + """ Error handling method which throws custom error. Class for each error defined above which extends `ZendeskError`. + This method map the status code with `ERROR_CODE_EXCEPTION_MAPPING` dictionary and accordingly raise the error. + If status_code is 200 then simply return json response. + """ + try: + response_json = response.json() + except Exception: # pylint: disable=broad-except + response_json = {} + if response.status_code != 200: + if response_json.get('error'): + message = "HTTP-error-code: {}, Error: {}".format(response.status_code, response_json.get('error')) + else: + message = "HTTP-error-code: {}, Error: {}".format( + response.status_code, + response_json.get("message", ERROR_CODE_EXCEPTION_MAPPING.get( + response.status_code, {}).get("message", "Unknown Error"))) + exc = ERROR_CODE_EXCEPTION_MAPPING.get( + response.status_code, {}).get("raise_exception", ZendeskError) + raise exc(message, response) from None @backoff.on_exception(backoff.expo, - requests.exceptions.HTTPError, + (HTTPError, ZendeskBackoffError), max_tries=10, giveup=is_fatal) -def call_api(url, params, headers): - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() +@backoff.on_exception(backoff.expo, + (ConnectionError, Timeout),#As ConnectionError error and timeout error does not have attribute status_code, + max_tries=10, # here we added another backoff expression. + factor=2) +def call_api(url, request_timeout, params, headers): + response = requests.get(url, params=params, headers=headers, timeout=request_timeout) # Pass request timeout + raise_for_error(response) return response -def get_cursor_based(url, access_token, cursor=None, **kwargs): +def get_cursor_based(url, access_token, request_timeout, cursor=None, **kwargs): headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', @@ -43,7 +165,7 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs): if cursor: params['page[after]'] = cursor - response = call_api(url, params=params, headers=headers) + response = call_api(url, request_timeout, params=params, headers=headers) response_json = response.json() yield response_json @@ -54,13 +176,13 @@ def get_cursor_based(url, access_token, cursor=None, **kwargs): cursor = response_json['meta']['after_cursor'] params['page[after]'] = cursor - response = call_api(url, params=params, headers=headers) + response = call_api(url, request_timeout, params=params, headers=headers) response_json = response.json() yield response_json has_more = response_json['meta']['has_more'] -def get_offset_based(url, access_token, **kwargs): +def get_offset_based(url, access_token, request_timeout, **kwargs): headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', @@ -73,7 +195,7 @@ def get_offset_based(url, access_token, **kwargs): **kwargs.get('params', {}) } - response = call_api(url, params=params, headers=headers) + response = call_api(url, request_timeout, params=params, headers=headers) response_json = response.json() yield response_json @@ -81,13 +203,13 @@ def get_offset_based(url, access_token, **kwargs): next_url = response_json.get('next_page') while next_url: - response = call_api(next_url, params=None, headers=headers) + response = call_api(next_url, request_timeout, params=None, headers=headers) response_json = response.json() yield response_json next_url = response_json.get('next_page') -def get_incremental_export(url, access_token, start_time): +def get_incremental_export(url, access_token, request_timeout, start_time): headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', @@ -96,7 +218,7 @@ def get_incremental_export(url, access_token, start_time): params = {'start_time': start_time.timestamp()} - response = call_api(url, params=params, headers=headers) + response = call_api(url, request_timeout, params=params, headers=headers) response_json = response.json() yield response_json @@ -107,8 +229,13 @@ def get_incremental_export(url, access_token, start_time): cursor = response_json['after_cursor'] params = {'cursor': cursor} - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() + # Replaced below line of code with call_api method + # response = requests.get(url, params=params, headers=headers) + # response.raise_for_status() + # Because it doing the same as call_api. So, now error handling will work properly with backoff + # as earlier backoff was not possible + response = call_api(url, request_timeout, params=params, headers=headers) + response_json = response.json() yield response_json diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index c707d51..1877d3c 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -4,7 +4,6 @@ import time import pytz import zenpy -from requests.exceptions import HTTPError import singer from singer import metadata from singer import utils @@ -16,6 +15,12 @@ LOGGER = singer.get_logger() KEY_PROPERTIES = ['id'] +REQUEST_TIMEOUT = 300 +START_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" +HEADERS = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', +} CUSTOM_TYPES = { 'text': 'string', 'textarea': 'string', @@ -59,10 +64,18 @@ class Stream(): replication_key = None key_properties = KEY_PROPERTIES stream = None + endpoint = None + request_timeout = None def __init__(self, client=None, config=None): self.client = client self.config = config + # Set and pass request timeout to config param `request_timeout` value. + config_request_timeout = self.config.get('request_timeout') + if config_request_timeout and float(config_request_timeout): + self.request_timeout = float(config_request_timeout) + else: + self.request_timeout = REQUEST_TIMEOUT # If value is 0,"0","" or not passed then it set default to 300 seconds. def get_bookmark(self, state): return utils.strptime_with_tz(singer.get_bookmark(state, self.name, self.replication_key)) @@ -103,6 +116,15 @@ def load_metadata(self): def is_selected(self): return self.stream is not None + def check_access(self): + ''' + Check whether the permission was given to access stream resources or not. + ''' + url = self.endpoint.format(self.config['subdomain']) + HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"]) + + http.call_api(url, self.request_timeout, params={'per_page': 1}, headers=HEADERS) + class CursorBasedStream(Stream): item_key = None endpoint = None @@ -112,8 +134,8 @@ def get_objects(self, **kwargs): Cursor based object retrieval ''' url = self.endpoint.format(self.config['subdomain']) - - for page in http.get_cursor_based(url, self.config['access_token'], **kwargs): + # Pass `request_timeout` parameter + for page in http.get_cursor_based(url, self.config['access_token'], self.request_timeout, **kwargs): yield from page[self.item_key] class CursorBasedExportStream(Stream): @@ -125,8 +147,8 @@ def get_objects(self, start_time): Retrieve objects from the incremental exports endpoint using cursor based pagination ''' url = self.endpoint.format(self.config['subdomain']) - - for page in http.get_incremental_export(url, self.config['access_token'], start_time): + # Pass `request_timeout` parameter + for page in http.get_incremental_export(url, self.config['access_token'], self.request_timeout, start_time): yield from page[self.item_key] @@ -137,7 +159,16 @@ def raise_or_log_zenpy_apiexception(schema, stream, e): # it doesn't have access. if not isinstance(e, zenpy.lib.exception.APIException): raise ValueError("Called with a bad exception type") from e - if json.loads(e.args[0])['error']['message'] == "You do not have access to this page. Please contact the account owner of this help desk for further help.": + + #If read permission is not available in OAuth access_token, then it returns the below error. + if json.loads(e.args[0]).get('description') == "You are missing the following required scopes: read": + LOGGER.warning("The account credentials supplied do not have access to `%s` custom fields.", + stream) + return schema + error = json.loads(e.args[0]).get('error') + # check if the error is of type dictionary and the message retrieved from the dictionary + # is the expected message. If so, only then print the logger message and return the schema + if isinstance(error, dict) and error.get('message', None) == "You do not have access to this page. Please contact the account owner of this help desk for further help.": LOGGER.warning("The account credentials supplied do not have access to `%s` custom fields.", stream) return schema @@ -174,6 +205,14 @@ def sync(self, state): self.update_bookmark(state, organization.updated_at) yield (self.stream, organization) + def check_access(self): + ''' + Check whether the permission was given to access stream resources or not. + ''' + # Convert datetime object to standard format with timezone. Used utcnow to reduce API call burden at discovery time. + # Because API will return records from now which will be very less + start_time = datetime.datetime.utcnow().strftime(START_DATE_FORMAT) + self.client.organizations.incremental(start_time=start_time) class Users(Stream): name = "users" @@ -251,6 +290,14 @@ def sync(self, state): start = end - datetime.timedelta(seconds=1) end = start + datetime.timedelta(seconds=search_window_size) + def check_access(self): + ''' + Check whether the permission was given to access stream resources or not. + ''' + # Convert datetime object to standard format with timezone. Used utcnow to reduce API call burden at discovery time. + # Because API will return records from now which will be very less + start_time = datetime.datetime.utcnow().strftime(START_DATE_FORMAT) + self.client.search("", updated_after=start_time, updated_before='2000-01-02T00:00:00Z', type="user") class Tickets(CursorBasedExportStream): name = "tickets" @@ -259,32 +306,10 @@ class Tickets(CursorBasedExportStream): item_key = "tickets" endpoint = "https://{}.zendesk.com/api/v2/incremental/tickets/cursor.json" - last_record_emit = {} - buf = {} - buf_time = 60 - def _buffer_record(self, record): - stream_name = record[0].tap_stream_id - if self.last_record_emit.get(stream_name) is None: - self.last_record_emit[stream_name] = utils.now() - - if self.buf.get(stream_name) is None: - self.buf[stream_name] = [] - self.buf[stream_name].append(record) - - if (utils.now() - self.last_record_emit[stream_name]).total_seconds() > self.buf_time: - self.last_record_emit[stream_name] = utils.now() - return True - - return False - - def _empty_buffer(self): - for stream_name, stream_buf in self.buf.items(): - for rec in stream_buf: - yield rec - self.buf[stream_name] = [] - def sync(self, state): #pylint: disable=too-many-statements + bookmark = self.get_bookmark(state) + tickets = self.get_objects(bookmark) audits_stream = TicketAudits(self.client, self.config) @@ -304,59 +329,65 @@ def emit_sub_stream_metrics(sub_stream): for ticket in tickets: zendesk_metrics.capture('ticket') + generated_timestamp_dt = datetime.datetime.utcfromtimestamp(ticket.get('generated_timestamp')).replace(tzinfo=pytz.UTC) + self.update_bookmark(state, utils.strftime(generated_timestamp_dt)) ticket.pop('fields') # NB: Fields is a duplicate of custom_fields, remove before emitting - should_yield = self._buffer_record((self.stream, ticket)) + # yielding stream name with record in a tuple as it is used for obtaining only the parent records while sync + yield (self.stream, ticket) if audits_stream.is_selected(): try: for audit in audits_stream.sync(ticket["id"]): - self._buffer_record(audit) - except HTTPError as e: - if e.response.status_code == 404: - LOGGER.warning("Unable to retrieve audits for ticket (ID: %s), record not found", ticket['id']) - else: - raise e + yield audit + except http.ZendeskNotFoundError: + # Skip stream if ticket_audit does not found for particular ticekt_id. Earlier it throwing HTTPError + # but now as error handling updated, it throws ZendeskNotFoundError. + message = "Unable to retrieve audits for ticket (ID: {}), record not found".format(ticket['id']) + LOGGER.warning(message) if metrics_stream.is_selected(): try: for metric in metrics_stream.sync(ticket["id"]): - self._buffer_record(metric) - except HTTPError as e: - if e.response.status_code == 404: - LOGGER.warning("Unable to retrieve metrics for ticket (ID: %s), record not found", ticket['id']) - else: - raise e + yield metric + except http.ZendeskNotFoundError: + # Skip stream if ticket_metric does not found for particular ticekt_id. Earlier it throwing HTTPError + # but now as error handling updated, it throws ZendeskNotFoundError. + message = "Unable to retrieve metrics for ticket (ID: {}), record not found".format(ticket['id']) + LOGGER.warning(message) if comments_stream.is_selected(): try: # add ticket_id to ticket_comment so the comment can # be linked back to it's corresponding ticket for comment in comments_stream.sync(ticket["id"]): - self._buffer_record(comment) - except HTTPError as e: - if e.response.status_code == 404: - LOGGER.warning("Unable to retrieve comments for ticket (ID: %s), record not found", ticket['id']) - else: - raise e - - if should_yield: - for rec in self._empty_buffer(): - yield rec - emit_sub_stream_metrics(audits_stream) - emit_sub_stream_metrics(metrics_stream) - emit_sub_stream_metrics(comments_stream) - singer.write_state(state) - - for rec in self._empty_buffer(): - yield rec + yield comment + except http.ZendeskNotFoundError: + # Skip stream if ticket_comment does not found for particular ticekt_id. Earlier it throwing HTTPError + # but now as error handling updated, it throws ZendeskNotFoundError. + message = "Unable to retrieve comments for ticket (ID: {}), record not found".format(ticket['id']) + LOGGER.warning(message) + + singer.write_state(state) emit_sub_stream_metrics(audits_stream) emit_sub_stream_metrics(metrics_stream) emit_sub_stream_metrics(comments_stream) singer.write_state(state) + def check_access(self): + ''' + Check whether the permission was given to access stream resources or not. + ''' + url = self.endpoint.format(self.config['subdomain']) + # Convert start_date parameter to timestamp to pass with request param + start_time = datetime.datetime.strptime(self.config['start_date'], START_DATE_FORMAT).timestamp() + HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"]) + + http.call_api(url, self.request_timeout, params={'start_time': start_time, 'per_page': 1}, headers=HEADERS) + + class TicketAudits(Stream): name = "ticket_audits" replication_method = "INCREMENTAL" @@ -366,7 +397,8 @@ class TicketAudits(Stream): def get_objects(self, ticket_id): url = self.endpoint.format(self.config['subdomain'], ticket_id) - pages = http.get_offset_based(url, self.config['access_token']) + # Pass `request_timeout` parameter + pages = http.get_offset_based(url, self.config['access_token'], self.request_timeout) for page in pages: yield from page[self.item_key] @@ -377,6 +409,19 @@ def sync(self, ticket_id): self.count += 1 yield (self.stream, ticket_audit) + def check_access(self): + ''' + Check whether the permission was given to access stream resources or not. + ''' + + url = self.endpoint.format(self.config['subdomain'], '1') + HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"]) + try: + http.call_api(url, self.request_timeout, params={'per_page': 1}, headers=HEADERS) + except http.ZendeskNotFoundError: + #Skip 404 ZendeskNotFoundError error as goal is just to check whether TicketComments have read permission or not + pass + class TicketMetrics(CursorBasedStream): name = "ticket_metrics" replication_method = "INCREMENTAL" @@ -387,12 +432,25 @@ class TicketMetrics(CursorBasedStream): def sync(self, ticket_id): # Only 1 ticket metric per ticket url = self.endpoint.format(self.config['subdomain'], ticket_id) - pages = http.get_offset_based(url, self.config['access_token']) + # Pass `request_timeout` + pages = http.get_offset_based(url, self.config['access_token'], self.request_timeout) for page in pages: zendesk_metrics.capture('ticket_metric') self.count += 1 yield (self.stream, page[self.item_key]) + def check_access(self): + ''' + Check whether the permission was given to access stream resources or not. + ''' + url = self.endpoint.format(self.config['subdomain'], '1') + HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"]) + try: + http.call_api(url, self.request_timeout, params={'per_page': 1}, headers=HEADERS) + except http.ZendeskNotFoundError: + #Skip 404 ZendeskNotFoundError error as goal is just to check whether TicketComments have read permission or not + pass + class TicketComments(Stream): name = "ticket_comments" replication_method = "INCREMENTAL" @@ -402,7 +460,8 @@ class TicketComments(Stream): def get_objects(self, ticket_id): url = self.endpoint.format(self.config['subdomain'], ticket_id) - pages = http.get_offset_based(url, self.config['access_token']) + # Pass `request_timeout` parameter + pages = http.get_offset_based(url, self.config['access_token'], self.request_timeout) for page in pages: yield from page[self.item_key] @@ -414,6 +473,18 @@ def sync(self, ticket_id): ticket_comment['ticket_id'] = ticket_id yield (self.stream, ticket_comment) + def check_access(self): + ''' + Check whether the permission was given to access stream resources or not. + ''' + url = self.endpoint.format(self.config['subdomain'], '1') + HEADERS['Authorization'] = 'Bearer {}'.format(self.config["access_token"]) + try: + http.call_api(url, self.request_timeout, params={'per_page': 1}, headers=HEADERS) + except http.ZendeskNotFoundError: + #Skip 404 ZendeskNotFoundError error as goal is to just check to whether TicketComments have read permission or not + pass + class SatisfactionRatings(CursorBasedStream): name = "satisfaction_ratings" replication_method = "INCREMENTAL" @@ -519,6 +590,12 @@ def sync(self, state): self.update_bookmark(state, form.updated_at) yield (self.stream, form) + def check_access(self): + ''' + Check whether the permission was given to access stream resources or not. + ''' + self.client.ticket_forms() + class GroupMemberships(CursorBasedStream): name = "group_memberships" replication_method = "INCREMENTAL" @@ -556,6 +633,12 @@ def sync(self, state): # pylint: disable=unused-argument for policy in self.client.sla_policies(): yield (self.stream, policy) + def check_access(self): + ''' + Check whether the permission was given to access stream resources or not. + ''' + self.client.sla_policies() + STREAMS = { "tickets": Tickets, "groups": Groups, diff --git a/test/base.py b/test/base.py index ed31d45..0953a3a 100644 --- a/test/base.py +++ b/test/base.py @@ -1,57 +1,352 @@ import unittest import os import tap_tester.connections as connections -import tap_tester.menagerie as menagerie - +import tap_tester.menagerie as menagerie +import tap_tester.runner as runner +from datetime import datetime as dt +from datetime import timedelta +import dateutil.parser +import pytz class ZendeskTest(unittest.TestCase): + start_date = "" + DATETIME_FMT = { + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%dT%H:%M:%S.%fZ" + } + START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" + PRIMARY_KEYS = "table-key-properties" + REPLICATION_METHOD = "forced-replication-method" + REPLICATION_KEYS = "valid-replication-keys" + FULL_TABLE = "FULL_TABLE" + INCREMENTAL = "INCREMENTAL" + OBEYS_START_DATE = "obey-start-date" + def tap_name(self): return "tap-zendesk" + def setUp(self): + required_env = { + "TAP_ZENDESK_CLIENT_ID", + "TAP_ZENDESK_CLIENT_SECRET", + "TAP_ZENDESK_ACCESS_TOKEN", + } + missing_envs = [v for v in required_env if not os.getenv(v)] + if missing_envs: + raise Exception("set " + ", ".join(missing_envs)) + def get_type(self): return "platform.zendesk" + def get_credentials(self): + return { + 'access_token': os.getenv('TAP_ZENDESK_ACCESS_TOKEN'), + 'client_id': os.getenv('TAP_ZENDESK_CLIENT_ID'), + 'client_secret': os.getenv('TAP_ZENDESK_CLIENT_SECRET') + } - def required_environment_variables(self): - return set(['TAP_ZENDESK_CLIENT_ID', - 'TAP_ZENDESK_CLIENT_SECRET', - 'TAP_ZENDESK_ACCESS_TOKEN', - ]) - - def setUp(self): - missing_envs = [x for x in self.required_environment_variables() if os.getenv(x) is None] - if missing_envs: - raise Exception("Missing environment variables, please set {}." .format(missing_envs)) - - def get_properties(self): - return { - 'start_date' : '2017-01-01T00:00:00Z', - 'subdomain': 'rjmdev', - 'marketplace_app_id': int(os.getenv('TAP_ZENDESK_MARKETPLACE_APP_ID')) or 0, - 'marketplace_name': os.getenv('TAP_ZENDESK_MARKETPLACE_NAME') or "", - 'marketplace_organization_id': int(os.getenv('TAP_ZENDESK_MARKETPLACE_ORGANIZATION_ID')) or 0, - 'search_window_size': '2592000'# seconds in a month + def get_properties(self, original: bool = True): + return_value = { + "start_date" : "2017-01-01T00:00:00Z", + "subdomain": "rjmdev", + "marketplace_app_id": int(os.getenv("TAP_ZENDESK_MARKETPLACE_APP_ID")) or 0, + "marketplace_name": os.getenv("TAP_ZENDESK_MARKETPLACE_NAME") or "", + "marketplace_organization_id": int(os.getenv("TAP_ZENDESK_MARKETPLACE_ORGANIZATION_ID")) or 0, + "search_window_size": "2592000"# seconds in a month } + if original: + return return_value - def get_credentials(self): - return {'access_token': os.getenv('TAP_ZENDESK_ACCESS_TOKEN'), - 'client_id': os.getenv('TAP_ZENDESK_CLIENT_ID'), - 'client_secret': os.getenv('TAP_ZENDESK_CLIENT_SECRET')} + # Reassign start date + return_value["start_date"] = self.start_date + return return_value - def expected_check_streams(self): + def expected_metadata(self): return { - 'groups', - 'group_memberships', - 'macros', - 'organizations', - 'satisfaction_ratings', - 'sla_policies', - 'tags', - 'ticket_comments', - 'ticket_fields', - 'ticket_forms', - 'ticket_metrics', - 'tickets', - 'users', - 'ticket_audits' + "groups": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.OBEYS_START_DATE: True + }, + "group_memberships": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.OBEYS_START_DATE: True + }, + "macros": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.OBEYS_START_DATE: True + }, + "organizations": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.OBEYS_START_DATE: True + }, + "satisfaction_ratings": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.OBEYS_START_DATE: True + }, + "sla_policies": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.OBEYS_START_DATE: False + }, + "tags": { + self.PRIMARY_KEYS: {"name"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.OBEYS_START_DATE: False + }, + "ticket_comments": { + # ticket_comments is child stream of tickets, and tickets is incremental stream. + # But it does not save its own bookmark. It fetches records based on the record of the parent stream. + # That's why make it FULL_TABLE + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.OBEYS_START_DATE: False + }, + "ticket_fields": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.OBEYS_START_DATE: True + }, + "ticket_forms": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.OBEYS_START_DATE: True + }, + "ticket_metrics": { + # ticket_metrics is child stream of tickets, and tickets is incremental stream. + # But it does not save its own bookmark. It fetches records based on the record of the parent stream. + # That's why make it FULL_TABLE + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.OBEYS_START_DATE: False + }, + "tickets": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"generated_timestamp"}, + self.OBEYS_START_DATE: True + }, + "users": { + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.INCREMENTAL, + self.REPLICATION_KEYS: {"updated_at"}, + self.OBEYS_START_DATE: True + }, + "ticket_audits": { + # ticket_audits is child stream of tickets, and tickets is incremental stream. + # But it does not save its own bookmark. It fetches records based on the record of the parent stream. + # That's why make it FULL_TABLE + self.PRIMARY_KEYS: {"id"}, + self.REPLICATION_METHOD: self.FULL_TABLE, + self.OBEYS_START_DATE: False + } } + + def expected_check_streams(self): + return set(self.expected_metadata().keys()) + + def expected_replication_keys(self): + return {table: properties.get(self.REPLICATION_KEYS, set()) for table, properties + in self.expected_metadata().items()} + + def expected_primary_keys(self): + return {table: properties.get(self.PRIMARY_KEYS, set()) for table, properties + in self.expected_metadata().items()} + + def expected_replication_method(self): + return {table: properties.get(self.REPLICATION_METHOD, set()) for table, properties + in self.expected_metadata().items()} + + def expected_automatic_fields(self): + """return a dictionary with key of table name and set of value of automatic(primary key and bookmark field) fields""" + auto_fields = {} + for k, v in self.expected_metadata().items(): + auto_fields[k] = v.get(self.PRIMARY_KEYS, set()) | v.get(self.REPLICATION_KEYS, set()) + return auto_fields + + def run_and_verify_check_mode(self, conn_id): + """ + Run the tap in check mode and verify it succeeds. + This should be ran prior to field selection and initial sync. + Return the connection id and found catalogs from menagerie. + """ + # run in check mode + check_job_name = runner.run_check_mode(self, conn_id) + + # verify check exit codes + exit_status = menagerie.get_exit_status(conn_id, check_job_name) + menagerie.verify_check_exit_status(self, exit_status, check_job_name) + + found_catalogs = menagerie.get_catalogs(conn_id) + self.assertGreater(len(found_catalogs), 0, msg="unable to locate schemas for connection {}".format(conn_id)) + + found_catalog_names = set(map(lambda c: c['stream_name'], found_catalogs)) + self.assertSetEqual(self.expected_check_streams(), found_catalog_names, msg="discovered schemas do not match") + print("discovered schemas are OK") + + return found_catalogs + + def run_and_verify_sync(self, conn_id): + sync_job_name = runner.run_sync_mode(self, conn_id) + + # verify tap and target exit codes + exit_status = menagerie.get_exit_status(conn_id, sync_job_name) + menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) + + sync_record_count = runner.examine_target_output_file(self, + conn_id, + self.expected_check_streams(), + self.expected_primary_keys()) + + self.assertGreater( + sum(sync_record_count.values()), 0, + msg="failed to replicate any data: {}".format(sync_record_count) + ) + print("total replicated row count: {}".format(sum(sync_record_count.values()))) + + return sync_record_count + + def perform_and_verify_table_and_field_selection(self, + conn_id, + test_catalogs, + select_all_fields=True): + """ + Perform table and field selection based off of the streams to select + set and field selection parameters. + Verify this results in the expected streams selected and all or no + fields selected for those streams. + """ + + # Select all available fields or select no fields from all testable streams + self.select_all_streams_and_fields( + conn_id=conn_id, catalogs=test_catalogs, select_all_fields=select_all_fields + ) + + catalogs = menagerie.get_catalogs(conn_id) + + # Ensure our selection affects the catalog + expected_selected = [tc.get('stream_name') for tc in test_catalogs] + for cat in catalogs: + catalog_entry = menagerie.get_annotated_schema(conn_id, cat['stream_id']) + + # Verify all testable streams are selected + selected = catalog_entry.get('annotated-schema').get('selected') + print("Validating selection on {}: {}".format(cat['stream_name'], selected)) + if cat['stream_name'] not in expected_selected: + self.assertFalse(selected, msg="Stream selected, but not testable.") + continue # Skip remaining assertions if we aren't selecting this stream + self.assertTrue(selected, msg="Stream not selected.") + + if select_all_fields: + # Verify all fields within each selected stream are selected + for field, field_props in catalog_entry.get('annotated-schema').get('properties').items(): + field_selected = field_props.get('selected') + print("\tValidating selection on {}.{}: {}".format( + cat['stream_name'], field, field_selected)) + self.assertTrue(field_selected, msg="Field not selected.") + else: + # Verify only automatic fields are selected + expected_automatic_fields = self.expected_automatic_fields().get(cat['stream_name']) + selected_fields = self.get_selected_fields_from_metadata(catalog_entry['metadata']) + self.assertEqual(expected_automatic_fields, selected_fields) + + @staticmethod + def get_selected_fields_from_metadata(metadata): + selected_fields = set() + for field in metadata: + is_field_metadata = len(field['breadcrumb']) > 1 + inclusion_automatic_or_selected = ( + field['metadata']['selected'] is True or \ + field['metadata']['inclusion'] == 'automatic' + ) + if is_field_metadata and inclusion_automatic_or_selected: + selected_fields.add(field['breadcrumb'][1]) + return selected_fields + + + @staticmethod + def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = True): + """Select all streams and all fields within streams""" + for catalog in catalogs: + schema = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) + + non_selected_properties = [] + if not select_all_fields: + # get a list of all properties so that none are selected + non_selected_properties = schema.get('annotated-schema', {}).get( + 'properties', {}).keys() + + connections.select_catalog_and_fields_via_metadata( + conn_id, catalog, schema, [], non_selected_properties) + + def parse_date(self, date_value): + """ + Pass in string-formatted-datetime, parse the value, and return it as an unformatted datetime object. + """ + date_formats = { + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S.%f+00:00", + "%Y-%m-%dT%H:%M:%S+00:00", + "%Y-%m-%d" + } + for date_format in date_formats: + try: + date_stripped = dt.strptime(date_value, date_format) + return date_stripped + except ValueError: + continue + + raise NotImplementedError( + "Tests do not account for dates of this format: {}".format(date_value)) + + def calculated_states_by_stream(self, current_state): + timedelta_by_stream = {stream: [0,1,1] # {stream_name: [days, hours, minutes], ...} + for stream in self.expected_check_streams()} + + stream_to_calculated_state = {stream: "" for stream in current_state['bookmarks'].keys()} + for stream, state in current_state['bookmarks'].items(): + state_key, state_value = next(iter(state.keys())), next(iter(state.values())) + state_as_datetime = dateutil.parser.parse(state_value) + + days, hours, minutes = timedelta_by_stream[stream] + calculated_state_as_datetime = state_as_datetime - timedelta(days=days, hours=hours, minutes=minutes) + + state_format = '%Y-%m-%dT%H:%M:%SZ' + calculated_state_formatted = dt.strftime(calculated_state_as_datetime, state_format) + + stream_to_calculated_state[stream] = {state_key: calculated_state_formatted} + + return stream_to_calculated_state + + def timedelta_formatted(self, dtime, days=0): + try: + date_stripped = dt.strptime(dtime, self.START_DATE_FORMAT) + return_date = date_stripped + timedelta(days=days) + + return dt.strftime(return_date, self.START_DATE_FORMAT) + + except ValueError: + return Exception("Datetime object is not of the format: {}".format(self.START_DATE_FORMAT)) + + def convert_state_to_utc(self, date_str): + """ + Convert a saved bookmark value of the form '2020-08-25T13:17:36-07:00' to + a string formatted utc datetime, + in order to compare aginast json formatted datetime values + """ + date_object = dateutil.parser.parse(date_str) + date_object_utc = date_object.astimezone(tz=pytz.UTC) + return dt.strftime(date_object_utc, "%Y-%m-%dT%H:%M:%SZ") diff --git a/test/test_all_fields.py b/test/test_all_fields.py new file mode 100644 index 0000000..35dd876 --- /dev/null +++ b/test/test_all_fields.py @@ -0,0 +1,83 @@ +import tap_tester.connections as connections +import tap_tester.runner as runner +import tap_tester.menagerie as menagerie +from base import ZendeskTest + +class ZendeskAllFields(ZendeskTest): + """Ensure running the tap with all streams and fields selected results in the replication of all fields.""" + + def name(self): + return "zendesk_all_fields" + + def test_run(self): + """ + • Verify no unexpected streams were replicated + • Verify that more than just the automatic fields are replicated for each stream. + • verify all fields for each stream are replicated + """ + + + # Streams to verify all fields tests + expected_streams = self.expected_check_streams() + + expected_automatic_fields = self.expected_automatic_fields() + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_all_fields = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_all_fields) + + # grab metadata after performing table-and-field selection to set expectations + # used for asserting all fields are replicated + stream_to_all_catalog_fields = dict() + for catalog in test_catalogs_all_fields: + stream_id, stream_name = catalog['stream_id'], catalog['stream_name'] + catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id) + fields_from_field_level_md = [md_entry['breadcrumb'][1] + for md_entry in catalog_entry['metadata'] + if md_entry['breadcrumb'] != []] + stream_to_all_catalog_fields[stream_name] = set( + fields_from_field_level_md) + + self.run_and_verify_sync(conn_id) + + synced_records = runner.get_records_from_target_output() + + # Verify no unexpected streams were replicated + synced_stream_names = set(synced_records.keys()) + self.assertSetEqual(expected_streams, synced_stream_names) + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected values + expected_all_keys = stream_to_all_catalog_fields[stream] + expected_automatic_keys = expected_automatic_fields.get( + stream, set()) + + # Verify that more than just the automatic fields are replicated for each stream. + self.assertTrue(expected_automatic_keys.issubset( + expected_all_keys), msg='{} is not in "expected_all_keys"'.format(expected_automatic_keys-expected_all_keys)) + + messages = synced_records.get(stream) + # collect actual values + actual_all_keys = set() + for message in messages['messages']: + if message['action'] == 'upsert': + actual_all_keys.update(message['data'].keys()) + + # As we can't generate following fields by zendesk APIs now so expected. + if stream == "ticket_fields": + expected_all_keys = expected_all_keys - {'system_field_options', 'sub_type_id'} + elif stream == "users": + expected_all_keys = expected_all_keys - {'permanently_deleted'} + elif stream == "ticket_metrics": + expected_all_keys = expected_all_keys - {'status', 'instance_id', 'metric', 'type', 'time'} + + # verify all fields for each stream are replicated + self.assertSetEqual(expected_all_keys, actual_all_keys) diff --git a/test/test_automatic_fields.py b/test/test_automatic_fields.py new file mode 100644 index 0000000..ab43c3a --- /dev/null +++ b/test/test_automatic_fields.py @@ -0,0 +1,66 @@ +import tap_tester.connections as connections +import tap_tester.runner as runner +from base import ZendeskTest + +class ZendeskAutomaticFields(ZendeskTest): + """ + Ensure running the tap with all streams selected and all fields deselected results in the replication of just the + primary keys and replication keys (automatic fields). + """ + + def name(self): + return "zendesk_automatic_fields" + + def test_run(self): + """ + Verify we can deselect all fields except when inclusion=automatic, which is handled by base.py methods + Verify that only the automatic fields are sent to the target. + Verify that all replicated records have unique primary key values. + """ + + streams_to_test = self.expected_check_streams() + + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_automatic_fields = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in streams_to_test] + + # Select all streams and no fields within streams + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_automatic_fields, select_all_fields=False) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + synced_records = runner.get_records_from_target_output() + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # expected values + expected_keys = self.expected_automatic_fields().get(stream) + expected_primary_keys = self.expected_primary_keys()[stream] + + # collect actual values + data = synced_records.get(stream, {}) + record_messages_keys = [set(row['data'].keys()) + for row in data.get('messages', [])] + primary_keys_list = [tuple(message.get('data', {}).get(expected_pk) for expected_pk in expected_primary_keys) + for message in data.get('messages', []) + if message.get('action') == 'upsert'] + unique_primary_keys_list = set(primary_keys_list) + + # Verify that you get some records for each stream + self.assertGreater( + record_count_by_stream.get(stream, -1), 0, + msg="The number of records is not over the stream min limit") + + # Verify that only the automatic fields are sent to the target + for actual_keys in record_messages_keys: + self.assertSetEqual(expected_keys, actual_keys) + + #Verify that all replicated records have unique primary key values. + self.assertEqual(len(primary_keys_list), + len(unique_primary_keys_list), + msg="Replicated record does not have unique primary key values.") \ No newline at end of file diff --git a/test/test_discovery.py b/test/test_discovery.py new file mode 100644 index 0000000..ba936a5 --- /dev/null +++ b/test/test_discovery.py @@ -0,0 +1,129 @@ +import re + +import tap_tester.connections as connections +from base import ZendeskTest +from tap_tester import menagerie + +class ZendeskDiscover(ZendeskTest): + """ + Testing that discovery creates the appropriate catalog with valid metadata. + • Verify number of actual streams discovered match expected + • Verify the stream names discovered were what we expect + • Verify stream names follow naming convention + streams should only have lowercase alphas and underscores + • verify there is only 1 top level breadcrumb + • verify replication key(s) + • verify primary key(s) + • verify that if there is a replication key we are doing INCREMENTAL otherwise FULL + • verify the actual replication matches our expected replication method + • verify that primary, replication keys are given the inclusion of automatic. + • verify that all other fields have inclusion of available metadata. + """ + + def name(self): + return "zendesk_discover_test" + + def test_run(self): + streams_to_test = self.expected_check_streams() + + conn_id = connections.ensure_connection(self, payload_hook=None) + + # Verify that there are catalogs found + found_catalogs = self.run_and_verify_check_mode( + conn_id) + + # Verify stream names follow naming convention + # streams should only have lowercase alphas and underscores + found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} + self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), + msg="One or more streams don't follow standard naming") + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # Verify ensure the caatalog is found for a given stream + catalog = next(iter([catalog for catalog in found_catalogs + if catalog["stream_name"] == stream])) + self.assertIsNotNone(catalog) + + # collecting expected values + expected_primary_keys = self.expected_primary_keys()[stream] + expected_replication_keys = self.expected_replication_keys()[ + stream] + expected_automatic_fields = self.expected_automatic_fields().get(stream) + expected_replication_method = self.expected_replication_method()[ + stream] + + # collecting actual values... + schema_and_metadata = menagerie.get_annotated_schema( + conn_id, catalog['stream_id']) + metadata = schema_and_metadata["metadata"] + stream_properties = [ + item for item in metadata if item.get("breadcrumb") == []] + actual_primary_keys = set( + stream_properties[0].get( + "metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, []) + ) + actual_replication_keys = set( + stream_properties[0].get( + "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []) + ) + actual_replication_method = stream_properties[0].get( + "metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD) + actual_automatic_fields = set( + item.get("breadcrumb", ["properties", None])[1] for item in metadata + if item.get("metadata").get("inclusion") == "automatic" + ) + + ########################################################################## + # metadata assertions + ########################################################################## + + # verify there is only 1 top level breadcrumb in metadata + self.assertTrue(len(stream_properties) == 1, + msg="There is NOT only one top level breadcrumb for {}".format(stream) + + "\nstream_properties | {}".format(stream_properties)) + + # verify primary key(s) match expectations + self.assertSetEqual( + expected_primary_keys, actual_primary_keys, + ) + + # verify that primary keys and replication keys + # are given the inclusion of automatic in metadata. + self.assertSetEqual(expected_automatic_fields, + actual_automatic_fields) + + # verify that all other fields have inclusion of available + # This assumes there are no unsupported fields for SaaS sources + self.assertTrue( + all({item.get("metadata").get("inclusion") == "available" + for item in metadata + if item.get("breadcrumb", []) != [] + and item.get("breadcrumb", ["properties", None])[1] + not in actual_automatic_fields}), + msg="Not all non key properties are set to available in metadata") + + # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL + # Given below streams are child stremas of parent stream `tickets` and tickets is incremental streams + # so, child streams also behave as incremental streams but does not save it's own state. So, skipping it. + if not stream in ["ticket_comments", "ticket_audits", "ticket_metrics"]: + + if actual_replication_keys: + self.assertTrue(actual_replication_method == self.INCREMENTAL, + msg="Expected INCREMENTAL replication " + "since there is a replication key") + else: + self.assertTrue(actual_replication_method == self.FULL_TABLE, + msg="Expected FULL replication " + "since there is no replication key") + + # verify the actual replication matches our expected replication method + self.assertEqual(expected_replication_method, actual_replication_method, + msg="The actual replication method {} doesn't match the expected {}".format( + actual_replication_method, expected_replication_method)) + + # verify replication key(s) + self.assertEqual(expected_replication_keys, actual_replication_keys, + msg="expected replication key {} but actual is {}".format( + expected_replication_keys, actual_replication_keys)) diff --git a/test/test_pagination.py b/test/test_pagination.py new file mode 100644 index 0000000..3b0cf8a --- /dev/null +++ b/test/test_pagination.py @@ -0,0 +1,70 @@ +import tap_tester.connections as connections +import tap_tester.runner as runner +import tap_tester.menagerie as menagerie +from base import ZendeskTest + + +class ZendeskPagination(ZendeskTest): + """ + Ensure tap can replicate multiple pages of data for streams that use pagination. + """ + API_LIMIT = 100 + def name(self): + return "zendesk_pagination_test" + + def test_run(self): + """ + • Verify that for each stream you can get multiple pages of data. + This requires we ensure more than 1 page of data exists at all times for any given stream. + • Verify by pks that the data replicated matches the data we expect. + + """ + + # Streams to verify all fields tests + expected_streams = self.expected_check_streams() + #Skip satisfaction_ratings streams as only end user of tickets can create satisfaction_ratings + expected_streams = expected_streams - {"satisfaction_ratings"} + + conn_id = connections.ensure_connection(self) + + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs_all_fields = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, test_catalogs_all_fields) + + record_count_by_stream = self.run_and_verify_sync(conn_id) + + synced_records = runner.get_records_from_target_output() + + # Verify no unexpected streams were replicated + synced_stream_names = set(synced_records.keys()) + self.assertSetEqual(expected_streams, synced_stream_names) + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected values + expected_primary_keys = self.expected_primary_keys()[stream] + + # verify that we can paginate with all fields selected + record_count_sync = record_count_by_stream.get(stream, 0) + self.assertGreater(record_count_sync, self.API_LIMIT, + msg="The number of records is not over the stream max limit") + + primary_keys_list = [tuple([message.get('data').get(expected_pk) for expected_pk in expected_primary_keys]) + for message in synced_records.get(stream).get('messages') + if message.get('action') == 'upsert'] + + primary_keys_list_1 = primary_keys_list[:self.API_LIMIT] + primary_keys_list_2 = primary_keys_list[self.API_LIMIT:2*self.API_LIMIT] + + primary_keys_page_1 = set(primary_keys_list_1) + primary_keys_page_2 = set(primary_keys_list_2) + + # Verify by primary keys that data is unique for page + self.assertTrue( + primary_keys_page_1.isdisjoint(primary_keys_page_2)) diff --git a/test/test_standard_bookmark.py b/test/test_standard_bookmark.py new file mode 100644 index 0000000..cb3d90e --- /dev/null +++ b/test/test_standard_bookmark.py @@ -0,0 +1,196 @@ +import tap_tester.connections as connections +import tap_tester.runner as runner +from base import ZendeskTest +from tap_tester import menagerie +from datetime import datetime +import uuid +import os +import time +from zenpy import Zenpy +from zenpy.lib.api_objects import User + +class ZendeskBookMark(ZendeskTest): + """Test tap sets a bookmark and respects it for the next sync of a stream""" + + def name(self): + return "zendesk_bookmark_test" + + def test_run(self): + """ + Verify that for each stream you can do a sync which records bookmarks. + That the bookmark is the maximum value sent to the target for the replication key. + That a second sync respects the bookmark + All data of the second sync is >= the bookmark from the first sync + The number of records in the 2nd sync is less then the first (This assumes that + new data added to the stream is done at a rate slow enough that you haven't + doubled the amount of data from the start date to the first sync between + the first sync and second sync run in this test) + + Verify that for full table stream, all data replicated in sync 1 is replicated again in sync 2. + + PREREQUISITE + For EACH stream that is incrementally replicated there are multiple rows of data with + different values for the replication key + """ + + + expected_streams = self.expected_check_streams() + + expected_replication_keys = self.expected_replication_keys() + expected_replication_methods = self.expected_replication_method() + + ########################################################################## + # First Sync + ########################################################################## + conn_id = connections.ensure_connection(self) + + # Run in check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + catalog_entries = [catalog for catalog in found_catalogs + if catalog.get('tap_stream_id') in expected_streams] + + self.perform_and_verify_table_and_field_selection( + conn_id, catalog_entries) + + # Run a first sync job using orchestrator + first_sync_record_count = self.run_and_verify_sync(conn_id) + first_sync_records = runner.get_records_from_target_output() + first_sync_bookmarks = menagerie.get_state(conn_id) + + ########################################################################## + # Update State Between Syncs + ########################################################################## + + new_states = {'bookmarks': dict()} + simulated_states = self.calculated_states_by_stream( + first_sync_bookmarks) + for stream, new_state in simulated_states.items(): + new_states['bookmarks'][stream] = new_state + menagerie.set_state(conn_id, new_states) + + ########################################################################## + # Second Sync + ########################################################################## + + second_sync_record_count = self.run_and_verify_sync(conn_id) + second_sync_records = runner.get_records_from_target_output() + second_sync_bookmarks = menagerie.get_state(conn_id) + + ########################################################################## + # Test By Stream + ########################################################################## + + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected values + expected_replication_method = expected_replication_methods[stream] + + # collect information for assertions from syncs 1 & 2 base on expected values + first_sync_count = first_sync_record_count.get(stream, 0) + second_sync_count = second_sync_record_count.get(stream, 0) + first_sync_messages = [record.get('data') for record in + first_sync_records.get( + stream, {}).get('messages', []) + if record.get('action') == 'upsert'] + second_sync_messages = [record.get('data') for record in + second_sync_records.get( + stream, {}).get('messages', []) + if record.get('action') == 'upsert'] + first_bookmark_key_value = first_sync_bookmarks.get('bookmarks', {stream: None}).get(stream) + second_bookmark_key_value = second_sync_bookmarks.get('bookmarks', {stream: None}).get(stream) + + + if expected_replication_method == self.INCREMENTAL: + + # collect information specific to incremental streams from syncs 1 & 2 + replication_key = next( + iter(expected_replication_keys[stream])) + first_bookmark_value = first_bookmark_key_value.get(replication_key) + second_bookmark_value = second_bookmark_key_value.get(replication_key) + first_bookmark_value_utc = self.convert_state_to_utc( + first_bookmark_value) + second_bookmark_value_utc = self.convert_state_to_utc( + second_bookmark_value) + + + simulated_bookmark_value = self.convert_state_to_utc(new_states['bookmarks'][stream][replication_key]) + + # Verify the first sync sets a bookmark of the expected form + self.assertIsNotNone(first_bookmark_key_value) + self.assertIsNotNone(first_bookmark_value) + + # Verify the second sync sets a bookmark of the expected form + self.assertIsNotNone(second_bookmark_key_value) + self.assertIsNotNone(second_bookmark_value) + + # Verify the second sync bookmark is Equal to the first sync bookmark + # assumes no changes to data during test + if not stream == "users": + self.assertEqual(second_bookmark_value, + first_bookmark_value) + else: + # For `users` stream it stores bookmark as 1 minute less than current time if `updated_at` of + # last records less than it. So, if there is no data change then second_bookmark_value will be + # 1 minute less than current time. Therefore second_bookmark_value will always be + # greater or equal to first_bookmark_value + self.assertGreaterEqual(second_bookmark_value, + first_bookmark_value) + + for record in first_sync_messages: + + # Verify the first sync bookmark value is the max replication key value for a given stream + replication_key_value = record.get(replication_key) + # For `ticket` stream it stores bookmarks as int timestamp. So, converting it to the string. + if stream == "tickets": + replication_key_value = datetime.utcfromtimestamp(replication_key_value).strftime('%Y-%m-%dT%H:%M:%SZ') + + self.assertLessEqual( + replication_key_value, first_bookmark_value_utc, + msg="First sync bookmark was set incorrectly, a record with a greater replication-key value was synced." + ) + + for record in second_sync_messages: + # Verify the second sync replication key value is Greater or Equal to the first sync bookmark + replication_key_value = record.get(replication_key) + + if stream == "tickets": + replication_key_value = datetime.utcfromtimestamp(replication_key_value).strftime('%Y-%m-%dT%H:%M:%SZ') + + self.assertGreaterEqual(replication_key_value, simulated_bookmark_value, + msg="Second sync records do not repect the previous bookmark.") + + # Verify the second sync bookmark value is the max replication key value for a given stream + self.assertLessEqual( + replication_key_value, second_bookmark_value_utc, + msg="Second sync bookmark was set incorrectly, a record with a greater replication-key value was synced." + ) + + elif expected_replication_method == self.FULL_TABLE: + + # Verify the syncs do not set a bookmark for full table streams + self.assertIsNone(first_bookmark_key_value) + self.assertIsNone(second_bookmark_key_value) + + # Verify the number of records in the second sync is the same as the first + + # Given below streams are child stremas of parent stream `tickets` and tickets is incremental streams + # Child streams also behave like incremental streams but does not save it's own state. So, it don't + # have same no of record on second sync and first sync. + if not stream in ["ticket_comments", "ticket_audits", "ticket_metrics"]: + self.assertEqual(second_sync_count, first_sync_count) + + else: + + raise NotImplementedError( + "INVALID EXPECTATIONS\t\tSTREAM: {} REPLICATION_METHOD: {}".format( + stream, expected_replication_method) + ) + + # Verify at least 1 record was replicated in the second sync + self.assertGreater( + second_sync_count, 0, msg="We are not fully testing bookmarking for {}".format(stream)) + diff --git a/test/test_start_date.py b/test/test_start_date.py new file mode 100644 index 0000000..0042da3 --- /dev/null +++ b/test/test_start_date.py @@ -0,0 +1,165 @@ +import tap_tester.connections as connections +import tap_tester.runner as runner +from base import ZendeskTest +from datetime import datetime + +class ZendeskStartDate(ZendeskTest): + """ + Ensure both all expected streams respect the start date. Run tap in check mode, + run 1st sync with start date = few days ago, run check mode and 2nd sync on a new connection with start date = today. + """ + + + start_date_1 = "" + start_date_2 = "" + + def name(self): + return "zendesk_start_date_test" + + def test_run(self): + """ + Test that the start_date configuration is respected + • verify that a sync with a later start date has at least one record synced + and less records than the 1st sync with a previous start date + • verify that each stream has less records than the earlier start date sync + • verify all data from later start data has bookmark values >= start_date + """ + self.run_test(days=1172, expected_streams=self.expected_check_streams()-{"ticket_forms"}) + self.run_test(days=1774, expected_streams={"ticket_forms"}) + + def run_test(self, days, expected_streams): + self.start_date_1 = self.get_properties().get('start_date') + self.start_date_2 = self.timedelta_formatted(self.start_date_1, days=days) + self.start_date = self.start_date_1 + + expected_streams = expected_streams + + ########################################################################## + # First Sync + ########################################################################## + + # instantiate connection + conn_id_1 = connections.ensure_connection(self) + + # run check mode + found_catalogs_1 = self.run_and_verify_check_mode(conn_id_1) + + # table and field selection + test_catalogs_1_all_fields = [catalog for catalog in found_catalogs_1 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection( + conn_id_1, test_catalogs_1_all_fields, select_all_fields=True) + + # run initial sync + record_count_by_stream_1 = self.run_and_verify_sync(conn_id_1) + synced_records_1 = runner.get_records_from_target_output() + + ########################################################################## + # Update START DATE Between Syncs + ########################################################################## + + print("REPLICATION START DATE CHANGE: {} ===>>> {} ".format( + self.start_date, self.start_date_2)) + self.start_date = self.start_date_2 + + ########################################################################## + # Second Sync + ########################################################################## + + # create a new connection with the new start_date + conn_id_2 = connections.ensure_connection( + self, original_properties=False) + + # run check mode + found_catalogs_2 = self.run_and_verify_check_mode(conn_id_2) + + # table and field selection + test_catalogs_2_all_fields = [catalog for catalog in found_catalogs_2 + if catalog.get('tap_stream_id') in expected_streams] + self.perform_and_verify_table_and_field_selection( + conn_id_2, test_catalogs_2_all_fields, select_all_fields=True) + + # run sync + record_count_by_stream_2 = self.run_and_verify_sync(conn_id_2) + synced_records_2 = runner.get_records_from_target_output() + + for stream in expected_streams: + with self.subTest(stream=stream): + + # expected values + expected_primary_keys = self.expected_primary_keys()[stream] + + # collect information for assertions from syncs 1 & 2 base on expected values + record_count_sync_1 = record_count_by_stream_1.get(stream, 0) + record_count_sync_2 = record_count_by_stream_2.get(stream, 0) + + primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_1.get(stream, {}).get('messages', []) + if message.get('action') == 'upsert'] + primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys) + for message in synced_records_2.get(stream, {}).get('messages', []) + if message.get('action') == 'upsert'] + + primary_keys_sync_1 = set(primary_keys_list_1) + primary_keys_sync_2 = set(primary_keys_list_2) + + if self.expected_metadata()[stream][self.OBEYS_START_DATE]: + + # collect information specific to incremental streams from syncs 1 & 2 + expected_replication_key = next( + iter(self.expected_replication_keys().get(stream, []))) + replication_dates_1 = [row.get('data').get(expected_replication_key) for row in + synced_records_1.get(stream, {'messages': []}).get('messages', []) + if row.get('data')] + replication_dates_2 = [row.get('data').get(expected_replication_key) for row in + synced_records_2.get(stream, {'messages': []}).get('messages', []) + if row.get('data')] + + # Verify replication key is greater or equal to start_date for sync 1 + for replication_date in replication_dates_1: + if stream == "tickets": + replication_date = datetime.utcfromtimestamp(replication_date).strftime('%Y-%m-%dT%H:%M:%SZ') + + self.assertGreaterEqual( + self.parse_date(replication_date), self.parse_date( + self.start_date_1), + msg="Report pertains to a date prior to our start date.\n" + + "Sync start_date: {}\n".format(self.start_date_1) + + "Record date: {} ".format(replication_date) + ) + + # Verify replication key is greater or equal to start_date for sync 2 + for replication_date in replication_dates_2: + if stream == "tickets": + replication_date = datetime.utcfromtimestamp(replication_date).strftime('%Y-%m-%dT%H:%M:%SZ') + + self.assertGreaterEqual( + self.parse_date(replication_date), self.parse_date( + self.start_date_2), + msg="Report pertains to a date prior to our start date.\n" + + "Sync start_date: {}\n".format(self.start_date_2) + + "Record date: {} ".format(replication_date) + ) + + # Verify the number of records replicated in sync 1 is greater than the number + # of records replicated in sync 2 + self.assertGreater(record_count_sync_1, + record_count_sync_2) + + # Verify the records replicated in sync 2 were also replicated in sync 1 + self.assertTrue( + primary_keys_sync_2.issubset(primary_keys_sync_1)) + + else: + # Given below streams are child stremas of parent stream `tickets` and tickets is incremental streams + # Child streams also behave like incremental streams but does not save it's own state. So, it don't + # have same no of record on second sync and first sync. + + # Verify that the 2nd sync with a later start date replicates the same number of + # records as the 1st sync. + if not stream in ["ticket_comments", "ticket_audits", "ticket_metrics"]: + self.assertEqual(record_count_sync_2, record_count_sync_1) + + # Verify by primary key the same records are replicated in the 1st and 2nd syncs + self.assertSetEqual(primary_keys_sync_1, + primary_keys_sync_2) \ No newline at end of file diff --git a/test/unittests/test_discovery_mode.py b/test/unittests/test_discovery_mode.py new file mode 100644 index 0000000..0c7c266 --- /dev/null +++ b/test/unittests/test_discovery_mode.py @@ -0,0 +1,276 @@ +import unittest +from unittest.mock import MagicMock, Mock, patch +from tap_zendesk import discover, http +import tap_zendesk +import requests +import zenpy + +ACCSESS_TOKEN_ERROR = '{"error": "Forbidden", "description": "You are missing the following required scopes: read"}' +API_TOKEN_ERROR = '{"error": {"title": "Forbidden",'\ + '"message": "You do not have access to this page. Please contact the account owner of this help desk for further help."}}' +AUTH_ERROR = '{"error": "Could not authenticate you"}' +START_DATE = "2021-10-30T00:00:00Z" + +def mocked_get(*args, **kwargs): + fake_response = requests.models.Response() + fake_response.headers.update(kwargs.get('headers', {})) + fake_response.status_code = kwargs['status_code'] + + # We can't set the content or text of the Response directly, so we mock a function + fake_response.json = Mock() + fake_response.json.side_effect = lambda:kwargs.get('json', {}) + + return fake_response + +class TestDiscovery(unittest.TestCase): + ''' + Test that we can call api for each stream in discovey mode and handle forbidden error. + ''' + @patch('tap_zendesk.streams.Organizations.check_access',side_effect=zenpy.lib.exception.APIException(ACCSESS_TOKEN_ERROR)) + @patch('tap_zendesk.streams.Users.check_access',side_effect=zenpy.lib.exception.APIException(ACCSESS_TOKEN_ERROR)) + @patch('tap_zendesk.streams.TicketForms.check_access',side_effect=zenpy.lib.exception.APIException(ACCSESS_TOKEN_ERROR)) + @patch('tap_zendesk.streams.SLAPolicies.check_access',side_effect=[mocked_get(status_code=200, json={"key1": "val1"})]) + @patch('tap_zendesk.discover.load_shared_schema_refs', return_value={}) + @patch('tap_zendesk.streams.Stream.load_metadata', return_value={}) + @patch('tap_zendesk.streams.Stream.load_schema', return_value={}) + @patch('singer.resolve_schema_references', return_value={}) + @patch('requests.get', + side_effect=[ + mocked_get(status_code=200, json={"tickets": [{"id": "t1"}]}), # Response of the 1st get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 2nd get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 3rd get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 4th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 5th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 6th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 7th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 8th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 9th get request call + mocked_get(status_code=403, json={"key1": "val1"}) # Response of the 10th get request call + ]) + def test_discovery_handles_403__raise_tap_zendesk_forbidden_error(self, mock_get, mock_resolve_schema_references, + mock_load_metadata, mock_load_schema,mock_load_shared_schema_refs, mocked_sla_policies, + mocked_ticket_forms, mock_users, mock_organizations): + ''' + Test that we handle forbidden error for child streams. discover_streams calls check_access for each stream to + check the read perission. discover_streams call many other methods including load_shared_schema_refs, load_metadata, + load_schema, resolve_schema_references also which we mock to test forbidden error. We mock check_access method of + some of stream method which call request of zenpy module and also mock get method of requests module with 200, 403 error. + + ''' + try: + responses = discover.discover_streams('dummy_client', {'subdomain': 'arp', 'access_token': 'dummy_token', 'start_date':START_DATE}) + except tap_zendesk.http.ZendeskForbiddenError as e: + expected_error_message = "HTTP-error-code: 403, Error: You are missing the following required scopes: read. "\ + "The account credentials supplied do not have read access for the following stream(s): groups, users, "\ + "organizations, ticket_audits, ticket_comments, ticket_fields, ticket_forms, group_memberships, macros, "\ + "satisfaction_ratings, tags, ticket_metrics" + + # Verifying the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + expected_call_count = 10 + actual_call_count = mock_get.call_count + self.assertEqual(expected_call_count, actual_call_count) + + + @patch('tap_zendesk.streams.Organizations.check_access',side_effect=zenpy.lib.exception.APIException(ACCSESS_TOKEN_ERROR)) + @patch('tap_zendesk.streams.Users.check_access',side_effect=zenpy.lib.exception.APIException(ACCSESS_TOKEN_ERROR)) + @patch('tap_zendesk.streams.TicketForms.check_access',side_effect=zenpy.lib.exception.APIException(ACCSESS_TOKEN_ERROR)) + @patch('tap_zendesk.streams.SLAPolicies.check_access',side_effect=zenpy.lib.exception.APIException(ACCSESS_TOKEN_ERROR)) + @patch('tap_zendesk.discover.load_shared_schema_refs', return_value={}) + @patch('tap_zendesk.streams.Stream.load_metadata', return_value={}) + @patch('tap_zendesk.streams.Stream.load_schema', return_value={}) + @patch('singer.resolve_schema_references', return_value={}) + @patch('requests.get', + side_effect=[ + mocked_get(status_code=200, json={"tickets": [{"id": "t1"}]}), # Response of the 1st get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 2nd get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 3rd get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 4th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 5th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 6th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 7th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 8th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 9th get request call + mocked_get(status_code=403, json={"key1": "val1"}) # Response of the 10th get request call + ]) + def test_discovery_handles_403_raise_zenpy_forbidden_error_for_access_token(self, mock_get, mock_resolve_schema_references, mock_load_metadata, + mock_load_schema,mock_load_shared_schema_refs, mocked_sla_policies, mocked_ticket_forms, + mock_users, mock_organizations): + ''' + Test that we handle forbidden error received from last failed request which we called from zenpy module and + raised zenpy.lib.exception.APIException. discover_streams calls check_access for each stream to check the + read perission. discover_streams call many other methods including load_shared_schema_refs, load_metadata, + load_schema, resolve_schema_references also which we mock to test forbidden error. We mock check_access method of + some of stream method which call request of zenpy module and also mock get method of requests module with 200, 403 error. + ''' + try: + responses = discover.discover_streams('dummy_client', {'subdomain': 'arp', 'access_token': 'dummy_token', 'start_date':START_DATE}) + except tap_zendesk.http.ZendeskForbiddenError as e: + expected_error_message = "HTTP-error-code: 403, Error: You are missing the following required scopes: read. "\ + "The account credentials supplied do not have read access for the following stream(s): groups, users, "\ + "organizations, ticket_audits, ticket_comments, ticket_fields, ticket_forms, group_memberships, macros, "\ + "satisfaction_ratings, tags, ticket_metrics, sla_policies" + + self.assertEqual(str(e), expected_error_message) + + expected_call_count = 10 + actual_call_count = mock_get.call_count + self.assertEqual(expected_call_count, actual_call_count) + + + @patch('tap_zendesk.streams.Organizations.check_access',side_effect=zenpy.lib.exception.APIException(API_TOKEN_ERROR)) + @patch('tap_zendesk.streams.Users.check_access',side_effect=zenpy.lib.exception.APIException(API_TOKEN_ERROR)) + @patch('tap_zendesk.streams.TicketForms.check_access',side_effect=zenpy.lib.exception.APIException(API_TOKEN_ERROR)) + @patch('tap_zendesk.streams.SLAPolicies.check_access',side_effect=[mocked_get(status_code=200, json={"key1": "val1"})]) + @patch('tap_zendesk.discover.load_shared_schema_refs', return_value={}) + @patch('tap_zendesk.streams.Stream.load_metadata', return_value={}) + @patch('tap_zendesk.streams.Stream.load_schema', return_value={}) + @patch('singer.resolve_schema_references', return_value={}) + @patch('requests.get', + side_effect=[ + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 1st get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 2nd get request call + mocked_get(status_code=404, json={"key1": "val1"}), # Response of the 3rd get request call + mocked_get(status_code=404, json={"key1": "val1"}), # Response of the 4th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 5th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 6th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 7th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 8th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 9th get request call + mocked_get(status_code=404, json={"key1": "val1"}) # Response of the 10th get request call + ]) + def test_discovery_handles_403_raise_zenpy_forbidden_error_for_api_token(self, mock_get, mock_resolve_schema_references, + mock_load_metadata, mock_load_schema,mock_load_shared_schema_refs, mocked_sla_policies, + mocked_ticket_forms, mock_users, mock_organizations): + ''' + Test that we handle forbidden error received from last failed request which we called from zenpy module and + raised zenpy.lib.exception.APIException. discover_streams calls check_access for each stream to check the + read perission. discover_streams call many other methods including load_shared_schema_refs, load_metadata, + load_schema, resolve_schema_references also which we mock to test forbidden error. We mock check_access method of + some of stream method which call request of zenpy module and also mock get method of requests module with 200, 403 error. + ''' + try: + responses = discover.discover_streams('dummy_client', {'subdomain': 'arp', 'access_token': 'dummy_token', 'start_date':START_DATE}) + except tap_zendesk.http.ZendeskForbiddenError as e: + expected_error_message = "HTTP-error-code: 403, Error: You are missing the following required scopes: read. "\ + "The account credentials supplied do not have read access for the following stream(s): tickets, groups, users, "\ + "organizations, ticket_fields, ticket_forms, group_memberships, macros, satisfaction_ratings, tags" + + self.assertEqual(str(e), expected_error_message) + + expected_call_count = 10 + actual_call_count = mock_get.call_count + self.assertEqual(expected_call_count, actual_call_count) + + + @patch('tap_zendesk.streams.Organizations.check_access',side_effect=zenpy.lib.exception.APIException(ACCSESS_TOKEN_ERROR)) + @patch('tap_zendesk.streams.Users.check_access',side_effect=zenpy.lib.exception.APIException(ACCSESS_TOKEN_ERROR)) + @patch('tap_zendesk.streams.TicketForms.check_access',side_effect=zenpy.lib.exception.APIException(ACCSESS_TOKEN_ERROR)) + @patch('tap_zendesk.streams.SLAPolicies.check_access',side_effect=[mocked_get(status_code=200, json={"key1": "val1"})]) + @patch('tap_zendesk.discover.load_shared_schema_refs', return_value={}) + @patch('tap_zendesk.streams.Stream.load_metadata', return_value={}) + @patch('tap_zendesk.streams.Stream.load_schema', return_value={}) + @patch('singer.resolve_schema_references', return_value={}) + @patch('requests.get', + side_effect=[ + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 1st get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 2nd get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 3rd get request call + mocked_get(status_code=400, json={"key1": "val1"}), # Response of the 4th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 5th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 6th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 7th get request call + ]) + def test_discovery_handles_except_403_error_requests_module(self, mock_get, mock_resolve_schema_references, + mock_load_metadata, mock_load_schema,mock_load_shared_schema_refs, mocked_sla_policies, + mocked_ticket_forms, mock_users, mock_organizations): + ''' + Test that function raises error directly if error code is other than 403. discover_streams calls check_access for each + stream to check the read perission. discover_streams call many other methods including load_shared_schema_refs, load_metadata, + load_schema, resolve_schema_references also which we mock to test forbidden error. We mock check_access method of + some of stream method which call request of zenpy module and also mock get method of requests module with 200, 403 error. + ''' + try: + responses = discover.discover_streams('dummy_client', {'subdomain': 'arp', 'access_token': 'dummy_token', 'start_date':START_DATE}) + except http.ZendeskBadRequestError as e: + expected_error_message = "HTTP-error-code: 400, Error: A validation exception has occurred." + # Verifying the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + expected_call_count = 4 + actual_call_count = mock_get.call_count + self.assertEqual(expected_call_count, actual_call_count) + + + @patch('tap_zendesk.streams.Organizations.check_access',side_effect=zenpy.lib.exception.APIException(AUTH_ERROR)) + @patch('tap_zendesk.streams.Users.check_access',side_effect=zenpy.lib.exception.APIException(AUTH_ERROR)) + @patch('tap_zendesk.streams.TicketForms.check_access',side_effect=zenpy.lib.exception.APIException(AUTH_ERROR)) + @patch('tap_zendesk.streams.SLAPolicies.check_access',side_effect=[mocked_get(status_code=200, json={"key1": "val1"})]) + @patch('tap_zendesk.discover.load_shared_schema_refs', return_value={}) + @patch('tap_zendesk.streams.Stream.load_metadata', return_value={}) + @patch('tap_zendesk.streams.Stream.load_schema', return_value={}) + @patch('singer.resolve_schema_references', return_value={}) + @patch('requests.get', + side_effect=[ + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 1st get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 2nd get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 3rd get request call + mocked_get(status_code=400, json={"key1": "val1"}), # Response of the 4th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 5th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 6th get request call + mocked_get(status_code=403, json={"key1": "val1"}), # Response of the 7th get request call + ]) + def test_discovery_handles_except_403_error_zenpy_module(self, mock_get, mock_resolve_schema_references, + mock_load_metadata, mock_load_schema,mock_load_shared_schema_refs, mocked_sla_policies, + mocked_ticket_forms, mock_users, mock_organizations): + ''' + Test that discovery mode raise error direclty if it is rather than 403 for request zenpy module. discover_streams call + many other methods including load_shared_schema_refs, load_metadata, load_schema, resolve_schema_references + also which we mock to test forbidden error. We mock check_access method of some of stream method which + call request of zenpy module and also mock get method of requests module with 400, 403 error. + ''' + try: + responses = discover.discover_streams('dummy_client', {'subdomain': 'arp', 'access_token': 'dummy_token', 'start_date':START_DATE}) + except zenpy.lib.exception.APIException as e: + expected_error_message = AUTH_ERROR + # Verifying the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + expected_call_count = 2 + actual_call_count = mock_get.call_count + self.assertEqual(expected_call_count, actual_call_count) + + + @patch('tap_zendesk.streams.Organizations.check_access',side_effect=[mocked_get(status_code=200, json={"key1": "val1"})]) + @patch('tap_zendesk.streams.Users.check_access',side_effect=[mocked_get(status_code=200, json={"key1": "val1"})]) + @patch('tap_zendesk.streams.TicketForms.check_access',side_effect=[mocked_get(status_code=200, json={"key1": "val1"})]) + @patch('tap_zendesk.streams.SLAPolicies.check_access',side_effect=[mocked_get(status_code=200, json={"key1": "val1"})]) + @patch('tap_zendesk.discover.load_shared_schema_refs', return_value={}) + @patch('tap_zendesk.streams.Stream.load_metadata', return_value={}) + @patch('tap_zendesk.streams.Stream.load_schema', return_value={}) + @patch('singer.resolve_schema_references', return_value={}) + @patch('requests.get', + side_effect=[ + mocked_get(status_code=200, json={"tickets": [{"id": "t1"}]}), # Response of the 1st get request call + mocked_get(status_code=200, json={"key1": "val1"}), # Response of the 1st get request call + mocked_get(status_code=200, json={"key1": "val1"}), # Response of the 1st get request call + mocked_get(status_code=200, json={"key1": "val1"}), # Response of the 1st get request call + mocked_get(status_code=200, json={"key1": "val1"}), # Response of the 1st get request call + mocked_get(status_code=200, json={"key1": "val1"}), # Response of the 1st get request call + mocked_get(status_code=200, json={"key1": "val1"}), # Response of the 1st get request call + mocked_get(status_code=200, json={"key1": "val1"}), # Response of the 1st get request call + mocked_get(status_code=200, json={"key1": "val1"}), # Response of the 1st get request call + mocked_get(status_code=200, json={"key1": "val1"}) # Response of the 1st get request call + ]) + def test_discovery_handles_200_response(self, mock_get, mock_resolve_schema_references, + mock_load_metadata, mock_load_schema,mock_load_shared_schema_refs, mocked_sla_policies, + mocked_ticket_forms, mock_users, mock_organizations): + ''' + Test that discovery mode does not raise any error in case of all streams have read permission + ''' + discover.discover_streams('dummy_client', {'subdomain': 'arp', 'access_token': 'dummy_token', 'start_date':START_DATE}) + + expected_call_count = 10 + actual_call_count = mock_get.call_count + self.assertEqual(expected_call_count, actual_call_count) diff --git a/test/unittests/test_exception.py b/test/unittests/test_exception.py new file mode 100644 index 0000000..60a4dc4 --- /dev/null +++ b/test/unittests/test_exception.py @@ -0,0 +1,55 @@ +import unittest +from tap_zendesk import get_session +from unittest import mock +from pytest import raises +from tap_zendesk.streams import raise_or_log_zenpy_apiexception, zenpy, json, LOGGER + +class ValueError(Exception): + def __init__(self, m): + self.message = m + + def __str__(self): + return self.message + + +class TestException(unittest.TestCase): + @mock.patch("tap_zendesk.streams.LOGGER.warning") + def test_exception_logger(self, mocked_logger): + """ + Test whether the specific logger message is correctly printed when access error occurs and the error is a dict + """ + schema = {} + stream = 'test_stream' + error_string = '{"error":{"message": "You do not have access to this page. Please contact the account owner of this help desk for further help."}' + "}" + e = zenpy.lib.exception.APIException(error_string) + raise_or_log_zenpy_apiexception(schema, stream, e) + mocked_logger.assert_called_with( + "The account credentials supplied do not have access to `%s` custom fields.", + stream) + + def test_zenpy_exception_raised(self): + """ + Test whether the no logger message is printed in case of errors other then access error and the exception is raised + """ + try: + schema = {} + stream = 'test_stream' + error_string = '{"error": "invalid_token", "error_description": "The access token provided is expired, revoked, malformed or invalid for other reasons."}' + e = zenpy.lib.exception.APIException(error_string) + raise_or_log_zenpy_apiexception(schema, stream, e) + except zenpy.lib.exception.APIException as ex: + self.assertEqual(str(ex), error_string) + + + def test_zenpy_exception_but_different_message_raised(self): + """ + Test whether the exception is raised when the error is a dict but with different error message + """ + try: + schema = {} + stream = 'test_stream' + error_string = '{"error":{"message": "Could not authenticate you"}' + "}" + e = zenpy.lib.exception.APIException(error_string) + raise_or_log_zenpy_apiexception(schema, stream, e) + except zenpy.lib.exception.APIException as ex: + self.assertEqual(str(ex), error_string) diff --git a/test/unittests/test_http.py b/test/unittests/test_http.py index 2328f55..00f0c36 100644 --- a/test/unittests/test_http.py +++ b/test/unittests/test_http.py @@ -1,8 +1,9 @@ import unittest from unittest.mock import MagicMock, Mock, patch -from tap_zendesk import http +from tap_zendesk import http, streams import requests +import zenpy SINGLE_RESPONSE = { 'meta': {'has_more': False} @@ -24,7 +25,7 @@ def mocked_get(*args, **kwargs): return fake_response - +@patch("time.sleep") class TestBackoff(unittest.TestCase): """Test that we can make single requests to the API and handle cursor based pagination. @@ -34,9 +35,9 @@ class TestBackoff(unittest.TestCase): @patch('requests.get', side_effect=[mocked_get(status_code=200, json=SINGLE_RESPONSE)]) - def test_get_cursor_based_gets_one_page(self, mock_get): + def test_get_cursor_based_gets_one_page(self, mock_get, mock_sleep): responses = [response for response in http.get_cursor_based(url='some_url', - access_token='some_token')] + access_token='some_token', request_timeout=300)] actual_response = responses[0] self.assertDictEqual(SINGLE_RESPONSE, actual_response) @@ -50,10 +51,10 @@ def test_get_cursor_based_gets_one_page(self, mock_get): mocked_get(status_code=200, json={"key1": "val1", **PAGINATE_RESPONSE}), mocked_get(status_code=200, json={"key2": "val2", **SINGLE_RESPONSE}), ]) - def test_get_cursor_based_can_paginate(self, mock_get): + def test_get_cursor_based_can_paginate(self, mock_get, mock_sleep): responses = [response for response in http.get_cursor_based(url='some_url', - access_token='some_token')] + access_token='some_token', request_timeout=300)] self.assertDictEqual({"key1": "val1", **PAGINATE_RESPONSE}, responses[0]) @@ -71,14 +72,14 @@ def test_get_cursor_based_can_paginate(self, mock_get): mocked_get(status_code=429, headers={'retry-after': 1}, json={"key2": "val2", **SINGLE_RESPONSE}), mocked_get(status_code=200, json={"key1": "val1", **SINGLE_RESPONSE}), ]) - def test_get_cursor_based_handles_429(self, mock_get): + def test_get_cursor_based_handles_429(self, mock_get, mock_sleep): """Test that the tap: - can handle 429s - requests uses a case insensitive dict for the `headers` - can handle either a string or an integer for the retry header """ responses = [response for response in http.get_cursor_based(url='some_url', - access_token='some_token')] + access_token='some_token', request_timeout=300)] actual_response = responses[0] self.assertDictEqual({"key1": "val1", **SINGLE_RESPONSE}, actual_response) @@ -86,3 +87,224 @@ def test_get_cursor_based_handles_429(self, mock_get): expected_call_count = 3 actual_call_count = mock_get.call_count self.assertEqual(expected_call_count, actual_call_count) + + @patch('requests.get',side_effect=[mocked_get(status_code=400, json={"key1": "val1"})]) + def test_get_cursor_based_handles_400(self,mock_get, mock_sleep): + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + + except http.ZendeskBadRequestError as e: + expected_error_message = "HTTP-error-code: 400, Error: A validation exception has occurred." + # Verify the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + #Verify the request calls only 1 time + self.assertEqual(mock_get.call_count, 1) + + @patch('requests.get',side_effect=[mocked_get(status_code=400, json={"error": "Couldn't authenticate you"})]) + def test_get_cursor_based_handles_400_api_error_message(self,mock_get, mock_sleep): + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + + except http.ZendeskBadRequestError as e: + expected_error_message = "HTTP-error-code: 400, Error: Couldn't authenticate you" + # Verify the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + #Verify the request calls only 1 time + self.assertEqual(mock_get.call_count, 1) + + @patch('requests.get',side_effect=[mocked_get(status_code=401, json={"key1": "val1"})]) + def test_get_cursor_based_handles_401(self,mock_get, mock_sleep): + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + except http.ZendeskUnauthorizedError as e: + expected_error_message = "HTTP-error-code: 401, Error: The access token provided is expired, revoked,"\ + " malformed or invalid for other reasons." + # Verify the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + #Verify the request calls only 1 time + self.assertEqual(mock_get.call_count, 1) + + @patch('requests.get',side_effect=[mocked_get(status_code=404, json={"key1": "val1"})]) + def test_get_cursor_based_handles_404(self,mock_get, mock_sleep): + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + except http.ZendeskNotFoundError as e: + expected_error_message = "HTTP-error-code: 404, Error: The resource you have specified cannot be found." + # Verify the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + #Verify the request calls only 1 time + self.assertEqual(mock_get.call_count, 1) + + + @patch('requests.get',side_effect=[mocked_get(status_code=409, json={"key1": "val1"})]) + def test_get_cursor_based_handles_409(self,mock_get, mock_sleep): + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + except http.ZendeskConflictError as e: + expected_error_message = "HTTP-error-code: 409, Error: The API request cannot be completed because the requested operation would conflict with an existing item." + # Verify the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + #Verify the request calls only 1 time + self.assertEqual(mock_get.call_count, 1) + + @patch('requests.get',side_effect=[mocked_get(status_code=422, json={"key1": "val1"})]) + def test_get_cursor_based_handles_422(self,mock_get, mock_sleep): + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + except http.ZendeskUnprocessableEntityError as e: + expected_error_message = "HTTP-error-code: 422, Error: The request content itself is not processable by the server." + # Verify the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + #Verify the request calls only 1 time + self.assertEqual(mock_get.call_count, 1) + + @patch('requests.get',side_effect=10*[mocked_get(status_code=500, json={"key1": "val1"})]) + def test_get_cursor_based_handles_500(self,mock_get, mock_sleep): + """ + Test that the tap can handle 500 error and retry it 10 times + """ + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + except http.ZendeskInternalServerError as e: + expected_error_message = "HTTP-error-code: 500, Error: The server encountered an unexpected condition which prevented" \ + " it from fulfilling the request." + # Verify the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + #Verify the request retry 10 times + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get',side_effect=10*[mocked_get(status_code=501, json={"key1": "val1"})]) + def test_get_cursor_based_handles_501(self,mock_get, mock_sleep): + """ + Test that the tap can handle 501 error and retry it 10 times + """ + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + except http.ZendeskNotImplementedError as e: + expected_error_message = "HTTP-error-code: 501, Error: The server does not support the functionality required to fulfill the request." + # Verify the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + #Verify the request retry 10 times + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get',side_effect=10*[mocked_get(status_code=502, json={"key1": "val1"})]) + def test_get_cursor_based_handles_502(self,mock_get, mock_sleep): + """ + Test that the tap can handle 502 error and retry it 10 times + """ + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + except http.ZendeskBadGatewayError as e: + expected_error_message = "HTTP-error-code: 502, Error: Server received an invalid response." + # Verify the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + #Verify the request retry 10 times + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get', + side_effect=[ + mocked_get(status_code=503, headers={'Retry-After': '1'}, json={"key3": "val3", **SINGLE_RESPONSE}), + mocked_get(status_code=503, headers={'retry-after': 1}, json={"key2": "val2", **SINGLE_RESPONSE}), + mocked_get(status_code=200, json={"key1": "val1", **SINGLE_RESPONSE}), + ]) + def test_get_cursor_based_handles_503(self,mock_get, mock_sleep): + """Test that the tap: + - can handle 503s + - requests uses a case insensitive dict for the `headers` + - can handle either a string or an integer for the retry header + """ + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + actual_response = responses[0] + self.assertDictEqual({"key1": "val1", **SINGLE_RESPONSE}, + actual_response) + + #Verify the request calls only 3 times + self.assertEqual(mock_get.call_count, 3) + + @patch('requests.get', + side_effect=[mocked_get(status_code=503)]) + def test_get_cursor_based_handles_503_without_retry_after(self,mock_get, mock_sleep): + """Test that the tap can handle 503 without retry-after headers + """ + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + except http.ZendeskServiceUnavailableError as e: + expected_error_message = 'HTTP-error-code: 503, Error: API service is currently unavailable.' + # Verify the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + #Verify the request calls only 1 time + self.assertEqual(mock_get.call_count, 1) + + @patch('requests.get') + def test_get_cursor_based_handles_444(self,mock_get, mock_sleep): + fake_response = requests.models.Response() + fake_response.status_code = 444 + + mock_get.side_effect = [fake_response] + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + except http.ZendeskError as e: + expected_error_message = 'HTTP-error-code: 444, Error: Unknown Error' + # Verify the message formed for the custom exception + self.assertEqual(str(e), expected_error_message) + + self.assertEqual(mock_get.call_count, 1) + + @patch("tap_zendesk.streams.LOGGER.warning") + def test_raise_or_log_zenpy_apiexception(self, mocked_logger, mock_sleep): + schema = {} + stream = 'test_stream' + error_string = '{"error": "Forbidden", "description": "You are missing the following required scopes: read"}' + e = zenpy.lib.exception.APIException(error_string) + streams.raise_or_log_zenpy_apiexception(schema, stream, e) + # Verify the raise_or_log_zenpy_apiexception Log expected message + mocked_logger.assert_called_with( + "The account credentials supplied do not have access to `%s` custom fields.", + stream) + + @patch('requests.get') + def test_call_api_handles_timeout_error(self,mock_get, mock_sleep): + mock_get.side_effect = requests.exceptions.Timeout + + try: + responses = http.call_api(url='some_url', request_timeout=300, params={}, headers={}) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get') + def test_call_api_handles_connection_error(self,mock_get, mock_sleep): + mock_get.side_effect = ConnectionError + + try: + responses = http.call_api(url='some_url', request_timeout=300, params={}, headers={}) + except ConnectionError as e: + pass + + # Verify the request retry 5 times on timeout + self.assertEqual(mock_get.call_count, 10) + diff --git a/test/unittests/test_request_timeout.py b/test/unittests/test_request_timeout.py new file mode 100644 index 0000000..8f3778b --- /dev/null +++ b/test/unittests/test_request_timeout.py @@ -0,0 +1,194 @@ +import unittest +from unittest.mock import MagicMock, Mock, patch +from tap_zendesk import http, streams +import requests +import datetime + +PAGINATE_RESPONSE = { + 'meta': {'has_more': True, + 'after_cursor': 'some_cursor'}, + 'end_of_stream': False, + 'after_cursor': 'some_cursor', + 'next_page': '3' +} + +SINGLE_RESPONSE = { + 'meta': {'has_more': False} +} +START_TIME = datetime.datetime.strptime("2021-10-30T00:00:00Z", "%Y-%m-%dT%H:%M:%SZ") +def mocked_get(*args, **kwargs): + fake_response = requests.models.Response() + fake_response.headers.update(kwargs.get('headers', {})) + fake_response.status_code = kwargs['status_code'] + + # We can't set the content or text of the Response directly, so we mock a function + fake_response.json = Mock() + fake_response.json.side_effect = lambda:kwargs.get('json', {}) + + return fake_response + +@patch("time.sleep") +class TestRequestTimeoutBackoff(unittest.TestCase): + """ + A set of unit tests to ensure that requests are retrying properly for Timeout Error. + """ + + @patch('requests.get') + def test_call_api_handles_timeout_error(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 10 times, + """ + mock_get.side_effect = requests.exceptions.Timeout + + try: + responses = http.call_api(url='some_url', request_timeout=300, params={}, headers={}) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 10 times on timeout + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get', side_effect=10*[requests.exceptions.Timeout]) + def test_get_cursor_based_handles_timeout_error(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 10 times, + """ + + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 10 times on timeout + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get', side_effect=[mocked_get(status_code=200, json={"key1": "val1", **PAGINATE_RESPONSE}), + requests.exceptions.Timeout, requests.exceptions.Timeout, + mocked_get(status_code=200, json={"key1": "val1", **SINGLE_RESPONSE})]) + def test_get_cursor_based_handles_timeout_error_in_pagination_call(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout`. In next page call the tap should retry request timeout error. + """ + + try: + responses = [response for response in http.get_cursor_based(url='some_url', + access_token='some_token', request_timeout=300)] + except requests.exceptions.Timeout as e: + pass + + # Verify the request call total 4 times(2 time retry call, 2 time 200 call) + self.assertEqual(mock_get.call_count, 4) + + @patch('requests.get', side_effect=10*[requests.exceptions.Timeout]) + def test_get_offset_based_handles_timeout_error(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 10 times, + """ + + try: + responses = [response for response in http.get_offset_based(url='some_url', + access_token='some_token', request_timeout=300)] + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 10 times on timeout + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get', side_effect=[mocked_get(status_code=200, json={"key1": "val1", **PAGINATE_RESPONSE}), + requests.exceptions.Timeout, requests.exceptions.Timeout, + mocked_get(status_code=200, json={"key1": "val1", **SINGLE_RESPONSE})]) + def test_get_offset_based_handles_timeout_error_in_pagination_call(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout`. In next page call the tap should retry request timeout error. + """ + + try: + responses = [response for response in http.get_offset_based(url='some_url', + access_token='some_token', request_timeout=300)] + except requests.exceptions.Timeout as e: + pass + + # Verify the request call total 4 times(2 time retry call, 2 time 200 call) + self.assertEqual(mock_get.call_count, 4) + + @patch('requests.get', side_effect=10*[requests.exceptions.Timeout]) + def test_get_incremental_export_handles_timeout_error(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 10 times, + """ + + try: + responses = [response for response in http.get_incremental_export(url='some_url',access_token='some_token', + request_timeout=300, start_time= START_TIME)] + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 10 times on timeout + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get') + def test_cursor_based_stream_timeout_error(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 10 times, + """ + mock_get.side_effect = requests.exceptions.Timeout + cursor_based_stream = streams.CursorBasedStream(config={'subdomain': '34', 'access_token': 'df'}) + cursor_based_stream.endpoint = 'https://{}' + try: + responses = list(cursor_based_stream.get_objects()) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 10 times on timeout + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get') + def test_cursor_based_export_stream_timeout_error(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 10 times, + """ + mock_get.side_effect = requests.exceptions.Timeout + cursor_based_export_stream = streams.CursorBasedExportStream(config={'subdomain': '34', 'access_token': 'df'}) + cursor_based_export_stream.endpoint = 'https://{}' + try: + responses = list(cursor_based_export_stream.get_objects(START_TIME)) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 10 times on timeout + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get') + def test_ticket_audits_timeout_error(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 10 times, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_audits = streams.TicketAudits(config={'subdomain': '34', 'access_token': 'df'}) + try: + responses = list(ticket_audits.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 10 times on timeout + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get') + def test_ticket_metrics_timeout_error(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 10 times, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_metrics = streams.TicketMetrics(config={'subdomain': '34', 'access_token': 'df'}) + try: + responses = list(ticket_metrics.sync('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 10 times on timeout + self.assertEqual(mock_get.call_count, 10) + + @patch('requests.get') + def test_ticket_comments_timeout_error(self, mock_get, mock_sleep): + """We mock request method to raise a `Timeout` and expect the tap to retry this up to 10 times, + """ + mock_get.side_effect = requests.exceptions.Timeout + ticket_comments = streams.TicketComments(config={'subdomain': '34', 'access_token': 'df'}) + try: + responses = list(ticket_comments.get_objects('i1')) + except requests.exceptions.Timeout as e: + pass + + # Verify the request retry 10 times on timeout + self.assertEqual(mock_get.call_count, 10) \ No newline at end of file diff --git a/test/unittests/test_tickets_child_stream_skip_404_error.py b/test/unittests/test_tickets_child_stream_skip_404_error.py new file mode 100644 index 0000000..8cb9a71 --- /dev/null +++ b/test/unittests/test_tickets_child_stream_skip_404_error.py @@ -0,0 +1,82 @@ +import unittest +from unittest.mock import patch +from tap_zendesk import streams, http + + +class MockClass: + def is_selected(self): + return True + + def sync(self, ticket_id): + raise http.ZendeskNotFoundError + +@patch("tap_zendesk.streams.LOGGER.warning") +@patch('tap_zendesk.streams.Stream.update_bookmark') +@patch('tap_zendesk.metrics.capture') +@patch('tap_zendesk.streams.CursorBasedExportStream.get_objects') +@patch('tap_zendesk.streams.Stream.get_bookmark') +class TestSkip404Error(unittest.TestCase): + """ + Test that child stream of tickets - ticket_audits, ticket_metrics, ticket_comments skip the 404 error. + To raise the 404 error some of the method including _empty_buffer, LOGGER.warning, _buffer_record, update_bookmark, + metrics.capture, get_objects, get_bookmark mocked. + """ + @patch('tap_zendesk.streams.TicketAudits') + def test_ticket_audits_skip_404_error(self, mock_ticket_audits, mock_get_bookmark, mock_get_object, mock_metrics, + mock_update_bookmark, mock_logger): + + ''' + Test that ticket_audits stream skip the 404 error + ''' + mock_ticket_audits.return_value = MockClass() + mock_get_object.return_value = [{'generated_timestamp': 12457845, 'fields': {}, 'id': 'i1'}] + tickets = streams.Tickets(config={'subdomain': '34', 'access_token': 'df'}) + + try: + responses = list(tickets.sync(state={})) + except AttributeError: + pass + + # verify if the LOGGER.warning was called and verify the message + mock_logger.assert_called_with("Unable to retrieve audits for ticket (ID: i1), record not found") + + @patch('tap_zendesk.streams.TicketComments') + def test_ticket_comments_skip_404_error(self, mock_ticket_comments, mock_get_bookmark, mock_get_object, mock_metrics, + mock_update_bookmark, mock_logger): + + ''' + Test that ticket_audits stream skip the 404 error + ''' + mock_ticket_comments.return_value = MockClass() + mock_get_object.return_value = [{'generated_timestamp': 12457845, 'fields': {}, 'id': 'i1'}] + tickets = streams.Tickets(config={'subdomain': '34', 'access_token': 'df'}) + + try: + responses = list(tickets.sync(state={})) + except AttributeError: + pass + + # verify if the LOGGER.warning was called and verify the message + mock_logger.assert_called_with("Unable to retrieve comments for ticket (ID: i1), record not found") + + @patch('tap_zendesk.streams.TicketMetrics') + def test_ticket_metrics_skip_404_error(self, mock_ticket_metrics, mock_get_bookmark, mock_get_object, mock_metrics, + mock_update_bookmark, mock_logger): + + ''' + Test that ticket_audits stream skip the 404 error + ''' + mock_ticket_metrics.return_value = MockClass() + mock_get_object.return_value = [{'generated_timestamp': 12457845, 'fields': {}, 'id': 'i1'}] + tickets = streams.Tickets(config={'subdomain': '34', 'access_token': 'df'}) + + try: + responses = list(tickets.sync(state={})) + self.assertEqual(responses, 1) + except AttributeError: + pass + + # verify if the LOGGER.warning was called and verify the message + mock_logger.assert_called_with("Unable to retrieve metrics for ticket (ID: i1), record not found") + + diff --git a/test/unittests/test_yield_records.py b/test/unittests/test_yield_records.py new file mode 100644 index 0000000..b9dfb6e --- /dev/null +++ b/test/unittests/test_yield_records.py @@ -0,0 +1,193 @@ +from tap_zendesk import LOGGER, oauth_auth +import unittest +from unittest import mock +from unittest.mock import patch +from tap_zendesk.streams import Stream, TicketAudits, Tickets, zendesk_metrics +from tap_zendesk.sync import sync_stream +from tap_zendesk import Zenpy +import json + +class Zenpy(): + def __init__(self) -> None: + pass + +def mocked_sync_audits(ticket_id=None): + """ + Mock the audit records which are retrieved in the sync function of the Audits stream + """ + ticket_audits = [ + { + "author_id":387494208358, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732098, + "ticket_id":2 + }, + { + "author_id":387494208358, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204898, + "ticket_id":2, + } + ] + for audit in ticket_audits: + yield ('ticket_audits', audit) + +def mocked_sync_metrics(ticket_id=None): + """ + Mock the metric records which are retrieved in the sync function of the Audits stream + """ + ticket_metrics = [ + { + "author_id":387494208358, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732090, + "ticket_id":2 + }, + { + "author_id":387494208358, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204892, + "ticket_id":2, + } + ] + for metric in ticket_metrics: + yield ('ticket_metrics', metric) + +def mocked_sync_comments(ticket_id=None): + """ + Mock the comment records which are retrieved in the sync function of the Audits stream + """ + ticket_comments = [ + { + "author_id":387494208356, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732090, + "ticket_id":2 + }, + { + "author_id":387494208354, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204892, + "ticket_id":2, + } + ] + for comment in ticket_comments: + yield ('ticket_comments', comment) + +def logger(logger, point): + return "test stream" + +@mock.patch('tap_zendesk.streams.Stream.update_bookmark') +@mock.patch('tap_zendesk.streams.Stream.get_bookmark') +@mock.patch('tap_zendesk.streams.TicketAudits.is_selected') +@mock.patch('tap_zendesk.streams.TicketMetrics.is_selected') +@mock.patch('tap_zendesk.streams.TicketComments.is_selected') +@mock.patch('tap_zendesk.streams.TicketAudits.sync') +@mock.patch('tap_zendesk.streams.TicketMetrics.sync') +@mock.patch('tap_zendesk.streams.TicketComments.sync') +@mock.patch('tap_zendesk.streams.CursorBasedExportStream.get_objects') +@mock.patch('tap_zendesk.streams.TicketAudits.stream') +@mock.patch('tap_zendesk.streams.TicketComments.stream') +@mock.patch('tap_zendesk.streams.TicketMetrics.stream') +@mock.patch('singer.metrics.log') +def test_yield_records(mocked_log, mocked_audits_stream, mocked_comments_stream, mocked_metrics_stream, mock_objects, mock_comments_sync, mock_metrics_sync, mock_audits_sync, mock_comments, mock_metrics, mock_audits, mock_get_bookmark, mock_update_bookmark): + """ + This function tests that the Tickets and its substreams' records are yielded properly. + """ + ticket_stream = Tickets(Zenpy(), {}) + # mocked ticket record for get_objects() function + tickets = [{ + "url":"https://talend1234.zendesk.com/api/v2/tickets/1.json", + "id":2, + "external_id":"None", + "created_at":"2021-10-11T12:12:31Z", + "updated_at":"2021-10-12T08:37:28Z", + "requester_id":387331462257, + "submitter_id":387494208358, + "assignee_id":387494208358, + "organization_id":"None", + "group_id":360010350357, + "due_at":"None", + "ticket_form_id":360003740737, + "brand_id":360004806057, + "generated_timestamp":1634027848, + "fields": [] + }] + mock_objects.return_value = tickets + + # expected audit records after yield + expected_audits = [ + { + "author_id":387494208358, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732098, + "ticket_id":2 + }, + { + "author_id":387494208358, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204898, + "ticket_id":2, + } + ] + + # expected metric records after yield + expected_metrics = [ + { + "author_id":387494208358, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732090, + "ticket_id":2 + }, + { + "author_id":387494208358, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204892, + "ticket_id":2, + } + ] + + # expected comment records after yield + expected_comments = [ + { + "author_id":387494208356, + "created_at":"2021-10-11T12:23:20.000000Z", + "id":910518732090, + "ticket_id":2 + }, + { + "author_id":387494208354, + "created_at":"2021-10-11T12:24:05.000000Z", + "id":910519204892, + "ticket_id":2, + } + ] + mock_metrics.return_value = True + mock_audits.return_value = True + mock_comments.return_value = True + mock_update_bookmark.side_effect = None + mock_metrics_sync.side_effect = mocked_sync_metrics + mock_audits_sync.side_effect = mocked_sync_audits + mock_comments_sync.side_effect = mocked_sync_comments + + expected_tickets = list(ticket_stream.sync(state={})) + audits = [] + metrics = [] + comments = [] + + # the yield returns a list with the first element as the parent stream tickets record + # and other elements as a tuple with the first element as the name of the stream and the second element + # as the record of that stream. Hence we are checking if each element of the stream and appending in our + # custom list and asserting all the lists at last. + for count, each in enumerate(expected_tickets): + if count == 0: + continue + if each[0] == 'ticket_audits': + audits.append(each[1]) + if each[0] == 'ticket_metrics': + metrics.append(each[1]) + if each[0] == 'ticket_comments': + comments.append(each[1]) + assert expected_audits == audits + assert expected_metrics == metrics + assert expected_comments == comments