diff --git a/.circleci/config.yml b/.circleci/config.yml index fd0ac65..c889104 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -20,6 +20,23 @@ jobs: command: | source /usr/local/share/virtualenvs/tap-tester/bin/activate stitch-validate-json tap_freshdesk/schemas/*.json + - run: + name: 'pylint' + command: | + source /usr/local/share/virtualenvs/tap-freshdesk/bin/activate + pylint tap_freshdesk --disable 'missing-module-docstring,line-too-long,invalid-name,too-many-lines,consider-using-f-string,too-many-arguments,too-many-locals' + - run: + name: 'Unit Tests' + command: | + source /usr/local/share/virtualenvs/tap-freshdesk/bin/activate + pip install nose coverage parameterized + nosetests --with-coverage --cover-erase --cover-package=tap_freshdesk --cover-html-dir=htmlcov tests/unittests + coverage html + when: always + - store_test_results: + path: test_output/report.xml + - store_artifacts: + path: htmlcov - run: name: 'Integration Tests' command: | diff --git a/setup.py b/setup.py index b4f3170..cbbc079 100644 --- a/setup.py +++ b/setup.py @@ -10,10 +10,15 @@ classifiers=['Programming Language :: Python :: 3 :: Only'], py_modules=['tap_freshdesk'], install_requires=[ - 'singer-python==5.2.3', + 'singer-python==5.12.2', 'requests==2.20.0', - 'backoff==1.3.2' + 'backoff==1.8.0' ], + extras_require={ + 'dev': [ + 'pylint', + ] + }, entry_points=''' [console_scripts] tap-freshdesk=tap_freshdesk:main diff --git a/tap_freshdesk/__init__.py b/tap_freshdesk/__init__.py index a79b60c..1ca6c9d 100644 --- a/tap_freshdesk/__init__.py +++ b/tap_freshdesk/__init__.py @@ -1,259 +1,31 @@ #!/usr/bin/env python3 - -import sys -import time - -import backoff -import requests -from requests.exceptions import HTTPError import singer +from singer import utils +from tap_freshdesk.discover import discover as _discover +from tap_freshdesk.sync import sync as _sync +from tap_freshdesk.client import FreshdeskClient -from tap_freshdesk import utils - - -REQUIRED_CONFIG_KEYS = ['api_key', 'domain', 'start_date'] -PER_PAGE = 100 -BASE_URL = "https://{}.freshdesk.com" -CONFIG = {} -STATE = {} - -endpoints = { - "tickets": "/api/v2/tickets", - "sub_ticket": "/api/v2/tickets/{id}/{entity}", - "agents": "/api/v2/agents", - "roles": "/api/v2/roles", - "groups": "/api/v2/groups", - "companies": "/api/v2/companies", - "contacts": "/api/v2/contacts", -} - -logger = singer.get_logger() -session = requests.Session() - - -def get_url(endpoint, **kwargs): - return BASE_URL.format(CONFIG['domain']) + endpoints[endpoint].format(**kwargs) - - -@backoff.on_exception(backoff.expo, - (requests.exceptions.RequestException), - max_tries=5, - giveup=lambda e: e.response is not None and 400 <= e.response.status_code < 500, - factor=2) -@utils.ratelimit(1, 2) -def request(url, params=None): - params = params or {} - headers = {} - if 'user_agent' in CONFIG: - headers['User-Agent'] = CONFIG['user_agent'] - - req = requests.Request('GET', url, params=params, auth=(CONFIG['api_key'], ""), headers=headers).prepare() - logger.info("GET {}".format(req.url)) - resp = session.send(req) - - if 'Retry-After' in resp.headers: - retry_after = int(resp.headers['Retry-After']) - logger.info("Rate limit reached. Sleeping for {} seconds".format(retry_after)) - time.sleep(retry_after) - return request(url, params) - - resp.raise_for_status() - - return resp - - -def get_start(entity): - if entity not in STATE: - STATE[entity] = CONFIG['start_date'] - - return STATE[entity] - - -def gen_request(url, params=None): - params = params or {} - params["per_page"] = PER_PAGE - page = 1 - while True: - params['page'] = page - data = request(url, params).json() - for row in data: - yield row - - if len(data) == PER_PAGE: - page += 1 - else: - break - - -def transform_dict(d, key_key="name", value_key="value", force_str=False): - # Custom fields are expected to be strings, but sometimes the API sends - # booleans. We cast those to strings to match the schema. - rtn = [] - for k, v in d.items(): - if force_str: - v = str(v).lower() - rtn.append({key_key: k, value_key: v}) - return rtn - - -def sync_tickets(): - bookmark_property = 'updated_at' - - singer.write_schema("tickets", - utils.load_schema("tickets"), - ["id"], - bookmark_properties=[bookmark_property]) - - singer.write_schema("conversations", - utils.load_schema("conversations"), - ["id"], - bookmark_properties=[bookmark_property]) - - singer.write_schema("satisfaction_ratings", - utils.load_schema("satisfaction_ratings"), - ["id"], - bookmark_properties=[bookmark_property]) +REQUIRED_CONFIG_KEYS = ["start_date", "domain", "api_key"] - singer.write_schema("time_entries", - utils.load_schema("time_entries"), - ["id"], - bookmark_properties=[bookmark_property]) - - sync_tickets_by_filter(bookmark_property) - sync_tickets_by_filter(bookmark_property, "deleted") - sync_tickets_by_filter(bookmark_property, "spam") - - -def sync_tickets_by_filter(bookmark_property, predefined_filter=None): - endpoint = "tickets" - - state_entity = endpoint - if predefined_filter: - state_entity = state_entity + "_" + predefined_filter - - start = get_start(state_entity) - - params = { - 'updated_since': start, - 'order_by': bookmark_property, - 'order_type': "asc", - 'include': "requester,company,stats" - } - - if predefined_filter: - logger.info("Syncing tickets with filter {}".format(predefined_filter)) - - if predefined_filter: - params['filter'] = predefined_filter - - for i, row in enumerate(gen_request(get_url(endpoint), params)): - logger.info("Ticket {}: Syncing".format(row['id'])) - row.pop('attachments', None) - row['custom_fields'] = transform_dict(row['custom_fields'], force_str=True) - - # get all sub-entities and save them - logger.info("Ticket {}: Syncing conversations".format(row['id'])) - - try: - for subrow in gen_request(get_url("sub_ticket", id=row['id'], entity="conversations")): - subrow.pop("attachments", None) - subrow.pop("body", None) - if subrow[bookmark_property] >= start: - singer.write_record("conversations", subrow, time_extracted=singer.utils.now()) - except HTTPError as e: - if e.response.status_code == 403: - logger.info('Invalid ticket ID requested from Freshdesk {0}'.format(row['id'])) - else: - raise - - try: - logger.info("Ticket {}: Syncing satisfaction ratings".format(row['id'])) - for subrow in gen_request(get_url("sub_ticket", id=row['id'], entity="satisfaction_ratings")): - subrow['ratings'] = transform_dict(subrow['ratings'], key_key="question") - if subrow[bookmark_property] >= start: - singer.write_record("satisfaction_ratings", subrow, time_extracted=singer.utils.now()) - except HTTPError as e: - if e.response.status_code == 403: - logger.info("The Surveys feature is unavailable. Skipping the satisfaction_ratings stream.") - else: - raise - - try: - logger.info("Ticket {}: Syncing time entries".format(row['id'])) - for subrow in gen_request(get_url("sub_ticket", id=row['id'], entity="time_entries")): - if subrow[bookmark_property] >= start: - singer.write_record("time_entries", subrow, time_extracted=singer.utils.now()) - - except HTTPError as e: - if e.response.status_code == 403: - logger.info("The Timesheets feature is unavailable. Skipping the time_entries stream.") - elif e.response.status_code == 404: - # 404 is being returned for deleted tickets and spam - logger.info("Could not retrieve time entries for ticket id {}. This may be caused by tickets " - "marked as spam or deleted.".format(row['id'])) - else: - raise - - utils.update_state(STATE, state_entity, row[bookmark_property]) - singer.write_record(endpoint, row, time_extracted=singer.utils.now()) - singer.write_state(STATE) - - -def sync_time_filtered(entity): - bookmark_property = 'updated_at' - - singer.write_schema(entity, - utils.load_schema(entity), - ["id"], - bookmark_properties=[bookmark_property]) - start = get_start(entity) - - logger.info("Syncing {} from {}".format(entity, start)) - for row in gen_request(get_url(entity)): - if row[bookmark_property] >= start: - if 'custom_fields' in row: - row['custom_fields'] = transform_dict(row['custom_fields'], force_str=True) - - utils.update_state(STATE, entity, row[bookmark_property]) - singer.write_record(entity, row, time_extracted=singer.utils.now()) - - singer.write_state(STATE) - - -def do_sync(): - logger.info("Starting FreshDesk sync") - - try: - sync_tickets() - sync_time_filtered("agents") - sync_time_filtered("roles") - sync_time_filtered("groups") - # commenting out this high-volume endpoint for now - #sync_time_filtered("contacts") - sync_time_filtered("companies") - except HTTPError as e: - logger.critical( - "Error making request to Freshdesk API: GET %s: [%s - %s]", - e.request.url, e.response.status_code, e.response.content) - sys.exit(1) - - logger.info("Completed sync") - - -def main_impl(): - config, state = utils.parse_args(REQUIRED_CONFIG_KEYS) - CONFIG.update(config) - STATE.update(state) - do_sync() +LOGGER = singer.get_logger() +@utils.handle_top_exception(LOGGER) def main(): - try: - main_impl() - except Exception as exc: - logger.critical(exc) - raise exc + """ + Run discover mode or sync mode. + """ + args = utils.parse_args(REQUIRED_CONFIG_KEYS) + config = args.config + with FreshdeskClient(config) as client: + if args.discover: + catalog = _discover() + catalog.dump() + else: + catalog = args.catalog \ + if args.catalog else _discover() + _sync(client, config, args.state, catalog.to_dict()) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/tap_freshdesk/client.py b/tap_freshdesk/client.py new file mode 100644 index 0000000..9ac440c --- /dev/null +++ b/tap_freshdesk/client.py @@ -0,0 +1,83 @@ +import time +import backoff +import requests +import singer +from singer import utils + + +LOGGER = singer.get_logger() +BASE_URL = "https://{}.freshdesk.com" +DEFAULT_PAGE_SIZE = 100 + + +class FreshdeskClient: + """ + The client class is used for making REST calls to the Freshdesk API. + """ + + def __init__(self, config): + self.config = config + self.session = requests.Session() + self.base_url = BASE_URL.format(config.get("domain")) + self.page_size = self.get_page_size() + + def __enter__(self): + self.check_access_token() + return self + + def __exit__(self, exception_type, exception_value, traceback): + # Kill the session instance. + self.session.close() + + def get_page_size(self): + """ + This function will get page size from config, + and will return the default value if an invalid page size is given. + """ + page_size = self.config.get('page_size') + + # return a default value if no page size is given in the config + if page_size is None: + return DEFAULT_PAGE_SIZE + + # Return integer value if the valid value is given + if (type(page_size) in [int, float] and page_size > 0) or \ + (isinstance(page_size, str) and page_size.replace('.', '', 1).isdigit() and (float(page_size) > 0)): + return int(float(page_size)) + # Raise an exception for 0, "0" or invalid value of page_size + raise Exception("The entered page size is invalid, it should be a valid integer.") + + def check_access_token(self): + """ + Check if the access token is valid. + """ + self.request(self.base_url+"/api/v2/roles", {"per_page": 1, "page": 1}) + + @backoff.on_exception(backoff.expo, + (requests.exceptions.RequestException), + max_tries=5, + giveup=lambda e: e.response is not None and 400 <= e.response.status_code < 500, + factor=2) + @utils.ratelimit(1, 2) + def request(self, url, params=None): + """ + Call rest API and return the response in case of status code 200. + """ + headers = {} + if 'user_agent' in self.config: + headers['User-Agent'] = self.config['user_agent'] + + req = requests.Request('GET', url, params=params, auth=(self.config['api_key'], ""), headers=headers).prepare() + LOGGER.info("GET %s", req.url) + response = self.session.send(req) + + # Call the function again if the rate limit is exceeded + if 'Retry-After' in response.headers: + retry_after = int(response.headers['Retry-After']) + LOGGER.info("Rate limit reached. Sleeping for %s seconds", retry_after) + time.sleep(retry_after) + return self.request(url, params) + + response.raise_for_status() + + return response.json() diff --git a/tap_freshdesk/discover.py b/tap_freshdesk/discover.py new file mode 100644 index 0000000..c7fc073 --- /dev/null +++ b/tap_freshdesk/discover.py @@ -0,0 +1,34 @@ +import singer +from singer.catalog import Catalog, CatalogEntry, Schema +from tap_freshdesk.schema import get_schemas + +LOGGER = singer.get_logger() + + +def discover(): + """ + Run the discovery mode, prepare the catalog file and return the catalog. + """ + schemas, field_metadata = get_schemas() + catalog = Catalog([]) + + for stream_name, schema_dict in schemas.items(): + try: + schema = Schema.from_dict(schema_dict) + mdata = field_metadata[stream_name] + except Exception as err: + LOGGER.error(err) + LOGGER.error('stream_name: %s', stream_name) + LOGGER.error('type schema_dict: %s', type(schema_dict)) + raise err + + key_properties = mdata[0]['metadata'].get('table-key-properties') + catalog.streams.append(CatalogEntry( + stream=stream_name, + tap_stream_id=stream_name, + key_properties=key_properties, + schema=schema, + metadata=mdata + )) + + return catalog diff --git a/tap_freshdesk/schema.py b/tap_freshdesk/schema.py new file mode 100644 index 0000000..d980ab5 --- /dev/null +++ b/tap_freshdesk/schema.py @@ -0,0 +1,51 @@ +import os +import json +from singer import metadata +import singer +from tap_freshdesk.streams import STREAMS + + +def get_abs_path(path): + """ + Get the absolute path for the schema files. + """ + return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) + + +def get_schemas(): + """ + Load the schema references, prepare metadata for each stream and return schema and metadata for the catalog. + """ + schemas = {} + field_metadata = {} + + refs = {} + for stream_name, stream_metadata in STREAMS.items(): + schema_path = get_abs_path('schemas/{}.json'.format(stream_name)) + + with open(schema_path) as file: # pylint: disable=unspecified-encoding + schema = json.load(file) + + schemas[stream_name] = schema + schema = singer.resolve_schema_references(schema, refs) + + replication_keys = (hasattr(stream_metadata, 'replication_keys') or None) and stream_metadata.replication_keys + mdata = metadata.new() + mdata = metadata.get_standard_metadata( + schema=schema, + key_properties=(hasattr(stream_metadata, 'key_properties') or None) and stream_metadata.key_properties, + valid_replication_keys=replication_keys, + replication_method=(hasattr(stream_metadata, 'replication_method') or None) + and stream_metadata.replication_method) + mdata = metadata.to_map(mdata) + + # Loop through all keys and make replication keys of automatic inclusion + for field_name in schema['properties'].keys(): + + if replication_keys and field_name in replication_keys: + mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'automatic') + + mdata = metadata.to_list(mdata) + field_metadata[stream_name] = mdata + + return schemas, field_metadata diff --git a/tap_freshdesk/streams.py b/tap_freshdesk/streams.py new file mode 100644 index 0000000..edfd1cc --- /dev/null +++ b/tap_freshdesk/streams.py @@ -0,0 +1,344 @@ +import copy +from datetime import datetime as dt +import singer +from singer.bookmarks import get_bookmark + + +LOGGER = singer.get_logger() +DEFAULT_PAGE_SIZE = 100 +DATETIME_FMT = "%Y-%m-%dT%H:%M:%SZ" + + +def get_min_bookmark(stream, selected_streams, bookmark, start_date, state, bookmark_key, predefined_filter=None): + """ + Get the minimum bookmark from the parent and its corresponding child bookmarks. + """ + + stream_obj = STREAMS[stream]() + min_bookmark = bookmark + if stream in selected_streams: + # Get minimum of stream's bookmark(start date in case of no bookmark) and min_bookmark + if predefined_filter: + stream = stream + '_' + predefined_filter + min_bookmark = min(min_bookmark, get_bookmark(state, stream, bookmark_key, start_date)) + + # Iterate through all children and return minimum bookmark among all. + for child in stream_obj.children: + min_bookmark = min(min_bookmark, get_min_bookmark( + child, selected_streams, bookmark, start_date, state, bookmark_key)) + + return min_bookmark + + +def get_schema(catalog, stream_id): + """ + Return the catalog of the specified stream. + """ + stream_catalog = [cat for cat in catalog if cat['tap_stream_id'] == stream_id][0] + return stream_catalog + + +def write_bookmark(stream, selected_streams, bookmark_value, state, predefined_filter=None): + """ + Write the bookmark in case the stream is selected. + """ + stream_obj = STREAMS[stream]() + stream_id = stream_obj.tap_stream_id + if stream in selected_streams: + if predefined_filter: + stream_id = stream_id + '_' + predefined_filter + singer.write_bookmark(state, stream_id, stream_obj.replication_keys[0], bookmark_value) + + +class Stream: + """ + Base class representing tap-freshdesk streams. + """ + tap_stream_id = None + replication_method = 'INCREMENTAL' + replication_keys = ['updated_at'] + key_properties = ['id'] + endpoint = None + filter_param = False + children = [] + path = '' + headers = {} + params = {"per_page": DEFAULT_PAGE_SIZE, "page": 1} + paginate = True + parent = None + id_key = None + records_count = {} + force_str = False + date_filter = '' + parent_id = None + filters = [] + filter_keyword = '' + + def transform_dict(self, d, key_key="name", value_key="value", force_str=False): + """ + Custom fields are expected to be strings, but sometimes the API sends + booleans. We cast those to strings to match the schema. + """ + rtn = [] + for k, v in d.items(): + if force_str: + v = str(v).lower() + rtn.append({key_key: k, value_key: v}) + return rtn + + def build_url(self, base_url, *args): + """ + Build the full url with parameters and attributes. + """ + return base_url + '/api/v2/' + self.path.format(*args) + + def write_records(self, catalog, state, selected_streams, start_date, data, max_bookmark, + client, streams_to_sync, child_max_bookmarks, predefined_filter=None): + """ + Transform the chunk of records according to the schema and write the records based on the bookmark. + """ + stream_catalog = get_schema(catalog, self.tap_stream_id) + stream_id = self.tap_stream_id + + # Append the predefined filter in case it's present + if predefined_filter: + stream_id = stream_id + '_' + predefined_filter + bookmark = get_bookmark(state, stream_id, self.replication_keys[0], start_date) + # The max bookmark so far for the child stream + child_max_bookmark = None + + with singer.metrics.record_counter(self.tap_stream_id) as counter: + with singer.Transformer() as transformer: + extraction_time = singer.utils.now() + stream_metadata = singer.metadata.to_map(stream_catalog['metadata']) + for row in data: + if self.tap_stream_id in selected_streams and row[self.replication_keys[0]] >= bookmark: + # Custom fields are expected to be strings, but sometimes the API sends + # booleans. We cast those to strings to match the schema. + if 'custom_fields' in row: + row['custom_fields'] = self.transform_dict(row['custom_fields'], force_str=self.force_str) + + rec = transformer.transform(row, stream_catalog['schema'], stream_metadata) + singer.write_record(self.tap_stream_id, rec, time_extracted=extraction_time) + max_bookmark = max(max_bookmark, rec[self.replication_keys[0]]) + counter.increment(1) + + # Sync the child streams if they are selected + for child in self.children: + child_obj = STREAMS[child]() + if child in selected_streams: + child_obj.parent_id = row['id'] + child_max_bookmark = get_bookmark(state, child_obj.tap_stream_id, + child_obj.replication_keys[0], start_date) + # Update the child's max_bookmark as the max of the already present max value and the return value + child_max_bookmark = max(child_max_bookmarks.get(child, child_max_bookmark), child_obj.sync_obj( + state, start_date, client, catalog, selected_streams, streams_to_sync)) + child_max_bookmarks[child] = child_max_bookmark + return max_bookmark, child_max_bookmarks + + def sync_obj(self, state, start_date, client, catalog, selected_streams, streams_to_sync, predefined_filter=None): + """ + The base stream class sync_obj() function to fetch records. + """ + params = {**self.params, "per_page": client.page_size} + full_url = self.build_url(client.base_url, self.parent_id) + + # Update the filter keyword in the params for date-filtered streams + if predefined_filter: + LOGGER.info("Syncing %s with filter %s", self.tap_stream_id, predefined_filter) + params[self.filter_keyword] = predefined_filter + + current_time = dt.strftime(dt.now(), DATETIME_FMT) + # Get the minimum bookmark from the parent and the child streams + min_bookmark = get_min_bookmark(self.tap_stream_id, selected_streams, current_time, + start_date, state, self.replication_keys[0], predefined_filter) + max_bookmark = min_bookmark + # Initialize the child_max_bookmarks dictionary + child_max_bookmarks = {} + + # Add the `updated_since` param if the date_filter attribute is True + if self.date_filter: + params[self.date_filter] = min_bookmark + params['page'] = 1 + self.paginate = True + + LOGGER.info("Syncing %s from %s", self.tap_stream_id, min_bookmark) + # Paginate through the request + while self.paginate: + data = client.request(full_url, params) + self.paginate = len(data) >= client.page_size + params['page'] += 1 + max_bookmark, child_max_bookmarks = self.write_records( + catalog, state, selected_streams, start_date, data, max_bookmark, client, streams_to_sync, + child_max_bookmarks, predefined_filter) + write_bookmark(self.tap_stream_id, selected_streams, max_bookmark, state, predefined_filter) + + # Write the max_bookmark for the child streams in the state files if they are selected. + for key, value in child_max_bookmarks.items(): + write_bookmark(key, selected_streams, value, state, None) + return state + + +class Agents(Stream): + """ + https://developer.freshdesk.com/api/#list_all_agents + """ + tap_stream_id = 'agents' + path = 'agents' + + +class Companies(Stream): + """ + https://developer.freshdesk.com/api/#list_all_companies + """ + tap_stream_id = 'companies' + path = 'companies' + + +class Groups(Stream): + """ + https://developer.freshdesk.com/api/#list_all_groups + """ + tap_stream_id = 'groups' + path = 'groups' + + +class Roles(Stream): + """ + https://developer.freshdesk.com/api/#list_all_roles + """ + tap_stream_id = 'roles' + path = 'roles' + + +class DateFilteredStream(Stream): + """ + Base class for all the streams that can be filtered by date. + """ + + def sync_obj(self, state, start_date, client, catalog, selected_streams, streams_to_sync, predefined_filter=None): + """ + The overridden sync_obj() method to fetch the records with different filters. + """ + dup_state = copy.deepcopy(state) + max_child_bms = {} + for each_filter in self.filters: + # Update child bookmark to original_state + for child in filter(lambda s: s in selected_streams, self.children): + singer.write_bookmark(state, child, "updated_at", get_bookmark( + dup_state, child, "updated_at", start_date)) + + super().sync_obj(state, start_date, client, catalog, selected_streams, streams_to_sync, each_filter) + + # Update the max child bookmarks dictionary with the maximum from the child and the existing bookmark + max_child_bms.update({child: max(max_child_bms.get(child, ""), get_bookmark( + state, child, "updated_at", start_date)) for child in self.children if child in selected_streams}) + + # Write the child stream bookmarks with the max value found + for child, bm in max_child_bms.items(): + singer.write_bookmark(state, child, "updated_at", bm) + + +class Tickets(DateFilteredStream): + """ + https://developer.freshdesk.com/api/#list_all_tickets + """ + tap_stream_id = 'tickets' + path = 'tickets' + children = ['conversations', 'satisfaction_ratings', 'time_entries'] + id_key = 'id' + date_filter = 'updated_since' + params = { + "per_page": DEFAULT_PAGE_SIZE, + 'order_by': "updated_at", + 'order_type': "asc", + 'include': "requester,company,stats" + } + filter_keyword = 'filter' + filters = [None, 'deleted', 'spam'] + + +class Contacts(DateFilteredStream): + """ + https://developer.freshdesk.com/api/#list_all_contacts + """ + tap_stream_id = 'contacts' + path = 'contacts' + id_key = 'id' + date_filter = '_updated_since' + filter_keyword = 'state' + filters = [None, 'deleted', 'blocked'] + + +class ChildStream(Stream): + """ + Base class for all the child streams. + """ + + def sync_obj(self, state, start_date, client, catalog, selected_streams, streams_to_sync, predefined_filter=None): + """ + The child stream sync_obj() method to sync the child records + """ + params = {**self.params, "per_page": client.page_size} + # Build the url for the request + full_url = self.build_url(client.base_url, self.parent_id) + + current_time = dt.strftime(dt.now(), DATETIME_FMT) + # Get the min bookmark from the parent and the child streams + min_bookmark = get_min_bookmark(self.tap_stream_id, selected_streams, current_time, + start_date, state, self.replication_keys[0], None) + max_bookmark = min_bookmark + params['page'] = 1 + self.paginate = True + + LOGGER.info("Syncing %s from %s", self.tap_stream_id, min_bookmark) + # Paginate through the records + while self.paginate: + data = client.request(full_url, params) + self.paginate = len(data) >= client.page_size + params['page'] += 1 + # Write the records based on the bookmark and return the max_bookmark for the page + bookmark, _ = self.write_records(catalog, state, selected_streams, start_date, + data, max_bookmark, client, streams_to_sync, None) + max_bookmark = max(max_bookmark, bookmark) + return max_bookmark + + +class Conversations(ChildStream): + """ + https://developer.freshdesk.com/api/#list_all_ticket_notes + """ + tap_stream_id = 'conversations' + path = 'tickets/{}/conversations' + parent = 'tickets' + + +class SatisfactionRatings(ChildStream): + """ + https://developer.freshdesk.com/api/#view_ticket_satisfaction_ratings + """ + tap_stream_id = 'satisfaction_ratings' + path = 'tickets/{}/satisfaction_ratings' + parent = 'tickets' + + +class TimeEntries(ChildStream): + """ + https://developer.freshdesk.com/api/#list_all_ticket_timeentries + """ + tap_stream_id = 'time_entries' + path = 'tickets/{}/time_entries' + parent = 'tickets' + + +STREAMS = { + "agents": Agents, + "companies": Companies, + "contacts": Contacts, + "conversations": Conversations, + "groups": Groups, + "roles": Roles, + "satisfaction_ratings": SatisfactionRatings, + "tickets": Tickets, + "time_entries": TimeEntries +} diff --git a/tap_freshdesk/sync.py b/tap_freshdesk/sync.py new file mode 100644 index 0000000..0c604fb --- /dev/null +++ b/tap_freshdesk/sync.py @@ -0,0 +1,106 @@ +import singer +from tap_freshdesk.streams import STREAMS + +LOGGER = singer.get_logger() + + +def write_schemas(stream_id, catalog, selected_streams): + """ + Write the schemas for each stream. + """ + stream_obj = STREAMS[stream_id]() + + if stream_id in selected_streams: + # Get catalog object for a particular stream. + stream = [cat for cat in catalog['streams'] if cat['tap_stream_id'] == stream_id][0] + singer.write_schema(stream_id, stream['schema'], stream['key_properties']) + + for child in stream_obj.children: + write_schemas(child, catalog, selected_streams) + + +def get_selected_streams(catalog): + ''' + Gets selected streams. Checks schema's 'selected' + first -- and then checks metadata, looking for an empty + breadcrumb and mdata with a 'selected' entry + ''' + selected_streams = [] + for stream in catalog['streams']: + stream_metadata = stream['metadata'] + for entry in stream_metadata: + # Stream metadata will have an empty breadcrumb + if not entry['breadcrumb'] and entry['metadata'].get('selected', None): + selected_streams.append(stream['tap_stream_id']) + return selected_streams + + +def update_currently_syncing(state, stream_name): + """ + Updates currently syncing stream in the state. + """ + if not stream_name and singer.get_currently_syncing(state): + del state['currently_syncing'] + else: + singer.set_currently_syncing(state, stream_name) + singer.write_state(state) + + +def get_ordered_stream_list(currently_syncing, streams_to_sync): + """ + Get an ordered list of remaining streams to sync other streams followed by synced streams. + """ + stream_list = list(sorted(streams_to_sync)) + if currently_syncing in stream_list: + index = stream_list.index(currently_syncing) + stream_list = stream_list[index:] + stream_list[:index] + return stream_list + + +def get_stream_to_sync(selected_streams): + """ + Get the streams for which the sync function should be called + (the parent in case of selected child streams). + """ + streams_to_sync = [] + + # Loop thru all selected streams + for stream_name in selected_streams: + stream_obj = STREAMS[stream_name] + # If the stream has a parent_stream, then it is a child stream + parent_stream = hasattr(stream_obj, 'parent') and stream_obj.parent + + # Append selected parent streams + if not parent_stream: + streams_to_sync.append(stream_name) + else: + # Append un-selected parent streams of selected children + if parent_stream not in selected_streams and parent_stream not in streams_to_sync: + streams_to_sync.append(parent_stream) + return streams_to_sync + + +def sync(client, config, state, catalog): + """ + Sync selected streams. + """ + + # Get selected streams, make sure stream dependencies are met + selected_streams = get_selected_streams(catalog) + streams_to_sync = get_stream_to_sync(selected_streams) + LOGGER.info("Selected Streams: %s", selected_streams) + LOGGER.info("Syncing Streams: %s", streams_to_sync) + + singer.write_state(state) + currently_syncing = singer.get_currently_syncing(state) + streams_to_sync = get_ordered_stream_list(currently_syncing, streams_to_sync) + for stream in streams_to_sync: + stream_obj = STREAMS[stream]() + + write_schemas(stream, catalog, selected_streams) + update_currently_syncing(state, stream) + + stream_obj.sync_obj(state, config["start_date"], client, catalog['streams'], + selected_streams, streams_to_sync) + singer.write_state(state) + update_currently_syncing(state, None) diff --git a/tap_freshdesk/utils.py b/tap_freshdesk/utils.py deleted file mode 100644 index ffa3b6f..0000000 --- a/tap_freshdesk/utils.py +++ /dev/null @@ -1,93 +0,0 @@ -import argparse -import collections -import datetime -import functools -import json -import os -import time - -DATETIME_FMT = "%Y-%m-%dT%H:%M:%SZ" - - -def strptime(dt): - return datetime.datetime.strptime(dt, DATETIME_FMT) - - -def strftime(dt): - return dt.strftime(DATETIME_FMT) - - -def ratelimit(limit, every): - def limitdecorator(fn): - times = collections.deque() - - @functools.wraps(fn) - def wrapper(*args, **kwargs): - if len(times) >= limit: - t0 = times.pop() - t = time.time() - sleep_time = every - (t - t0) - if sleep_time > 0: - time.sleep(sleep_time) - - times.appendleft(time.time()) - return fn(*args, **kwargs) - - return wrapper - - return limitdecorator - - -def chunk(l, n): - for i in range(0, len(l), n): - yield l[i:i + n] - - -def get_abs_path(path): - return os.path.join(os.path.dirname(os.path.realpath(__file__)), path) - - -def load_json(path): - with open(path) as f: - return json.load(f) - - -def load_schema(entity): - return load_json(get_abs_path("schemas/{}.json".format(entity))) - - -def update_state(state, entity, dt): - if dt is None: - return - - if isinstance(dt, datetime.datetime): - dt = strftime(dt) - - if entity not in state: - state[entity] = dt - - if dt >= state[entity]: - state[entity] = dt - - -def parse_args(required_config_keys): - parser = argparse.ArgumentParser() - parser.add_argument('-c', '--config', help='Config file', required=True) - parser.add_argument('-s', '--state', help='State file') - args = parser.parse_args() - - config = load_json(args.config) - check_config(config, required_config_keys) - - if args.state: - state = load_json(args.state) - else: - state = {} - - return config, state - - -def check_config(config, required_keys): - missing_keys = [key for key in required_keys if key not in config] - if missing_keys: - raise Exception("Config is missing required keys: {}".format(missing_keys)) diff --git a/tests/base.py b/tests/base.py index cb3698d..f250677 100644 --- a/tests/base.py +++ b/tests/base.py @@ -19,9 +19,10 @@ class FreshdeskBaseTest(unittest.TestCase): FULL = "FULL_TABLE" start_date = "" - START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" # %H:%M:%SZ + PAGE_SIZE = 100 + START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z" # %H:%M:%SZ BOOKMARK_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" - + RECORD_REPLICATION_KEY_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" OBEYS_START_DATE = "obey-start-date" PAGE_SIZE = 100 @@ -51,7 +52,8 @@ def get_properties(self, original: bool = True): :param original: set to false to change the start_date or end_date """ return_value = { - 'start_date': '2019-01-04T00:00:00Z', + 'start_date' : '2019-01-04T00:00:00Z', + 'page_size': self.PAGE_SIZE } if original: return return_value diff --git a/tests/test_freshdesk_interrupted_sync.py b/tests/test_freshdesk_interrupted_sync.py new file mode 100644 index 0000000..f1ddf1a --- /dev/null +++ b/tests/test_freshdesk_interrupted_sync.py @@ -0,0 +1,157 @@ +from tap_tester import connections, runner, menagerie +from base import FreshdeskBaseTest + + +class TestFreshdeskInterruptedSync(FreshdeskBaseTest): + """Test tap's ability to recover from an interrupted sync""" + + @staticmethod + def name(): + return "tt_freshdesk_interrupted_sync_test" + + def get_properties(self): + """ + Maintain states for start_date and end_date + """ + return { + 'start_date' : '2021-10-01T00:00:00Z', + } + + def test_run(self): + """ + Testing that if a sync job is interrupted and state is saved with `currently_syncing`(stream), + the next sync job kicks off and the tap picks back up on that `currently_syncing` stream of `currently_syncing_repo`. + """ + streams_to_test = {"roles", "agents", "groups", "companies"} + conn_id = connections.ensure_connection(self) + expected_replication_methods = self.expected_replication_method() + expected_replication_keys = self.expected_replication_keys() + + start_date = self.dt_to_ts(self.get_properties().get("start_date"), self.START_DATE_FORMAT) + + # Run a discovery job + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Partition catalogs for use in table/field selection + test_catalogs = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in streams_to_test] + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs, select_all_fields=True) + + # Run a sync + self.run_and_verify_sync(conn_id) + + # Acquire records from the target output + full_sync_records = runner.get_records_from_target_output() + full_sync_state = menagerie.get_state(conn_id) + + # Set state in which all streams of one repo(singer-io/singer-python) have completed a sync. + # And one stream (pull_requests) of other repo(singer-io/test-repo) is syncing currently. + + interrupted_state = { + "currently_syncing": "roles", + "bookmarks": { + "agents": { + "updated_at": "2022-08-25T04:35:47.000000Z" + }, + "companies": { + "updated_at": "2022-08-22T13:58:07.000000Z" + }, + "groups": { + "updated_at": "2022-08-18T05:13:56.000000Z" + }, + "roles": { + "updated_at": "2022-07-19T11:49:58.000000Z" + } + } + } + + menagerie.set_state(conn_id, interrupted_state) + + # Run another sync + self.run_and_verify_sync(conn_id) + + # acquire records from target output + interrupted_sync_records = runner.get_records_from_target_output() + final_state = menagerie.get_state(conn_id) + currently_syncing = final_state.get('currently_syncing') + + # Checking resuming sync resulted in a successfully saved state + with self.subTest(): + + # Verify sync is not interrupted by checking currently_syncing in the state for sync + self.assertIsNone(currently_syncing) + + # Verify bookmarks are saved + self.assertIsNotNone(final_state.get('bookmarks')) + + # Verify final_state is equal to uninterrupted sync's state + # (This is what the value would have been without an interruption and proves resuming succeeds) + self.assertDictEqual(final_state, full_sync_state) + + full_sync_bookmark = full_sync_state["bookmarks"] + final_bookmark = final_state["bookmarks"] + interrupted_repo_bookmark = interrupted_state["bookmarks"] + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # Expected values + expected_replication_method = expected_replication_methods[stream] + expected_primary_keys = list(self.expected_primary_keys()[stream]) + + # Gather results + full_records = [message['data'] for message in + full_sync_records.get(stream, {}).get('messages', [])] + full_record_count = len(full_records) + + interrupted_records = [message['data'] for message in + interrupted_sync_records.get(stream, {}).get('messages', [])] + interrupted_record_count = len(interrupted_records) + + if expected_replication_method == self.INCREMENTAL: + expected_replication_key = next(iter(expected_replication_keys[stream])) + + if stream in interrupted_repo_bookmark.keys(): + interrupted_bookmark = self.dt_to_ts(interrupted_repo_bookmark[stream]["updated_at"], self.BOOKMARK_FORMAT) + + if stream == interrupted_state['currently_syncing']: + + for record in interrupted_records: + rec_time = self.dt_to_ts(record[expected_replication_key], self.RECORD_REPLICATION_KEY_FORMAT) + self.assertGreaterEqual(rec_time, interrupted_bookmark) + + # Verify all interrupted recs are in full recs + self.assertIn(record, full_records, msg='incremental table record in interrupted sync not found in full sync') + + # Record count for all streams of interrupted sync match expectations + full_records_after_interrupted_bookmark = 0 + + for record in full_records: + rec_time = self.dt_to_ts(record[expected_replication_key], self.RECORD_REPLICATION_KEY_FORMAT) + self.assertGreaterEqual(rec_time, start_date) + + if (rec_time >= interrupted_bookmark): + full_records_after_interrupted_bookmark += 1 + + self.assertEqual(full_records_after_interrupted_bookmark, len(interrupted_records), \ + msg="Expected {} records in each sync".format(full_records_after_interrupted_bookmark)) + else: + # Verify we collected records that have the same replication value as a bookmark for streams that are already synced + self.assertGreaterEqual(interrupted_record_count, 0) + else: + # Verify resuming sync replicates all records that were found in the full sync (uninterrupted) + for record in interrupted_records: + with self.subTest(record_primary_key=record[expected_primary_keys[0]]): + self.assertIn(record, full_records, msg='Unexpected record replicated in resuming sync.') + for record in full_records: + with self.subTest(record_primary_key=record[expected_primary_keys[0]]): + self.assertIn(record, interrupted_records, msg='Record missing from resuming sync.' ) + else: + # Verify full table streams do not save bookmarked values at the conclusion of a successful sync + self.assertNotIn(stream, full_sync_bookmark.keys()) + self.assertNotIn(stream, final_bookmark.keys()) + + # Verify first and second sync have the same records + self.assertEqual(full_record_count, interrupted_record_count) + for rec in interrupted_records: + self.assertIn(rec, full_records, msg='full table record in interrupted sync not found in full sync') diff --git a/tests/test_freshdesk_interrupted_sync_add_stream.py b/tests/test_freshdesk_interrupted_sync_add_stream.py new file mode 100644 index 0000000..3b55c04 --- /dev/null +++ b/tests/test_freshdesk_interrupted_sync_add_stream.py @@ -0,0 +1,161 @@ +from tap_tester import connections, runner, menagerie +from base import FreshdeskBaseTest + + +class TestFreshdeskInterruptedSyncAddStream(FreshdeskBaseTest): + """Test tap's ability to recover from an interrupted sync""" + + @staticmethod + def name(): + return "tt_freshdesk_interrupted_sync_add_stream_test" + + def get_properties(self): + """ + Maintain states for start_date and end_date + """ + return { + 'start_date' : '2022-07-19T00:00:00Z' + } + + def test_run(self): + """ + Testing that if a sync job is interrupted and state is saved with `currently_syncing`(stream) and `currently_syncing_repo`, + the next sync job kicks off and the tap picks back up on that `currently_syncing` stream of `currently_syncing_repo`. + - Verify behavior is consistent when an added stream is selected between initial and resuming sync + """ + streams_to_test = {"agents", "groups", "companies"} + conn_id = connections.ensure_connection(self) + expected_replication_methods = self.expected_replication_method() + expected_replication_keys = self.expected_replication_keys() + + start_date = self.dt_to_ts(self.get_properties().get("start_date"), self.START_DATE_FORMAT) + + # Run a discovery job + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Partition catalogs for use in table/field selection + test_catalogs = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in streams_to_test] + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs, select_all_fields=True) + + # Run a sync + self.run_and_verify_sync(conn_id) + + # Acquire records from the target output + full_sync_records = runner.get_records_from_target_output() + full_sync_state = menagerie.get_state(conn_id) + + # Add a stream between syncs + added_stream = 'roles' + streams_to_test.add(added_stream) + test_catalogs = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in streams_to_test] + # Add new stream to selected list + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs, select_all_fields=True) + + # Set state in which one stream (roles) is syncing currently. + + interrupted_state = { + "currently_syncing": "roles", + "bookmarks": { + "agents": { + "updated_at": "2022-08-25T04:35:47.000000Z" + }, + "companies": { + "updated_at": "2022-08-22T13:58:07.000000Z" + }, + "groups": { + "updated_at": "2022-08-18T05:13:56.000000Z" + } + } + } + + menagerie.set_state(conn_id, interrupted_state) + + # Run another sync + self.run_and_verify_sync(conn_id) + + # acquire records from target output + interrupted_sync_records = runner.get_records_from_target_output() + final_state = menagerie.get_state(conn_id) + currently_syncing = final_state.get('currently_syncing') + + # Checking resuming sync resulted in a successfully saved state + with self.subTest(): + + # Verify sync is not interrupted by checking currently_syncing in the state for sync + self.assertIsNone(currently_syncing) + + # Verify bookmarks are saved + self.assertIsNotNone(final_state.get('bookmarks')) + + full_sync_bookmark = full_sync_state["bookmarks"] + final_bookmark = final_state["bookmarks"] + interrupted_repo_bookmark = interrupted_state["bookmarks"] + + for stream in streams_to_test: + with self.subTest(stream=stream): + + # Expected values + expected_replication_method = expected_replication_methods[stream] + + # Gather results + if stream != added_stream: + full_records = [message['data'] for message in + full_sync_records.get(stream, {}).get('messages', [])] + full_record_count = len(full_records) + + interrupted_records = [message['data'] for message in + interrupted_sync_records.get(stream, {}).get('messages', [])] + interrupted_record_count = len(interrupted_records) + + if expected_replication_method == self.INCREMENTAL: + expected_replication_key = next(iter(expected_replication_keys[stream])) + + if stream in full_sync_bookmark.keys(): + full_sync_stream_bookmark = self.dt_to_ts(full_sync_bookmark.get(stream, {}).get("updated_at"), self.BOOKMARK_FORMAT) + final_sync_stream_bookmark = self.dt_to_ts(final_bookmark.get(stream, {}).get("updated_at"), self.BOOKMARK_FORMAT) + + if stream in interrupted_repo_bookmark.keys(): + interrupted_bookmark = self.dt_to_ts(interrupted_repo_bookmark[stream]["updated_at"], self.BOOKMARK_FORMAT) + + for record in interrupted_records: + rec_time = self.dt_to_ts(record[expected_replication_key], self.RECORD_REPLICATION_KEY_FORMAT) + self.assertGreaterEqual(rec_time, interrupted_bookmark) + + else: + # verify we collected records that have the same replication value as a bookmark for streams that are already synced + self.assertGreater(interrupted_record_count, 0) + + if stream != added_stream: + + # Verify state ends with the same value for common streams after both full and interrupted syncs + self.assertEqual(full_sync_stream_bookmark, final_sync_stream_bookmark) + + for record in interrupted_records: + + # Verify all interrupted recs are in full recs + self.assertIn(record, full_records, msg='incremental table record in interrupted sync not found in full sync') + + # Record count for all streams of interrupted sync match expectations + full_records_after_interrupted_bookmark = 0 + + for record in full_records: + rec_time = self.dt_to_ts(record[expected_replication_key], self.RECORD_REPLICATION_KEY_FORMAT) + self.assertGreater(rec_time, start_date, msg=f"{expected_replication_key} {stream} {record}") + + if (rec_time >= interrupted_bookmark): + full_records_after_interrupted_bookmark += 1 + + self.assertGreaterEqual(full_records_after_interrupted_bookmark, interrupted_record_count, \ + msg="Expected max {} records in each sync".format(full_records_after_interrupted_bookmark)) + + else: + # Verify full table streams do not save bookmarked values after a successful sync + self.assertNotIn(stream, full_sync_bookmark.keys()) + self.assertNotIn(stream, final_bookmark.keys()) + + # Verify first and second sync have the same records + self.assertEqual(full_record_count, interrupted_record_count) + for rec in interrupted_records: + self.assertIn(rec, full_records, msg='full table record in interrupted sync not found in full sync') \ No newline at end of file diff --git a/tests/test_freshdesk_interrupted_sync_remove_stream.py b/tests/test_freshdesk_interrupted_sync_remove_stream.py new file mode 100644 index 0000000..cff92e5 --- /dev/null +++ b/tests/test_freshdesk_interrupted_sync_remove_stream.py @@ -0,0 +1,187 @@ +from tap_tester import connections, runner, menagerie +from base import FreshdeskBaseTest + + +class TestFreshdeskInterruptedSyncRemoveStream(FreshdeskBaseTest): + """Test tap's ability to recover from an interrupted sync""" + + @staticmethod + def name(): + return "tt_freshdesk_interrupted_sync_remove_stream_test" + + def get_properties(self): + """ + Maintain states for start_date and end_date + """ + return { + 'start_date' : '2022-07-19T00:00:00Z' + } + + def test_run(self): + + # Test for removing any stream from state + self.run_interrupted_sync("groups") + + # Test for removing currently syncing stream from state + self.run_interrupted_sync("roles") + + def run_interrupted_sync(self, removed_stream): + """ + Testing that if a sync job is interrupted and state is saved with `currently_syncing`(stream), + the next sync job kicks off and the tap picks back up on that `currently_syncing` stream. + - Verify behavior is consistent when a stream is removed from the selected list between initial and resuming sync. + """ + streams_to_test = {"roles", "agents", "groups", "companies"} + conn_id = connections.ensure_connection(self) + expected_replication_methods = self.expected_replication_method() + expected_replication_keys = self.expected_replication_keys() + + start_date = self.dt_to_ts(self.get_properties().get("start_date"), self.START_DATE_FORMAT) + + # Run a discovery job + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # Partition catalogs for use in table/field selection + test_catalogs = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in streams_to_test] + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs, select_all_fields=True) + + # Run a sync + self.run_and_verify_sync(conn_id) + + # Acquire records from target output + full_sync_records = runner.get_records_from_target_output() + full_sync_state = menagerie.get_state(conn_id) + + # Create new connection for another sync + conn_id_2 = connections.ensure_connection(self) + + # Add a stream between syncs + streams_to_test = streams_to_test - {removed_stream} + found_catalogs = self.run_and_verify_check_mode(conn_id_2) + + test_catalogs = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in streams_to_test] + + # Add new stream to selected list + self.perform_and_verify_table_and_field_selection(conn_id_2, test_catalogs, select_all_fields=True) + + # Set state in with a currently syncing stream + + interrupted_state = { + "currently_syncing": "roles", + "bookmarks": { + "agents": { + "updated_at": "2022-08-25T04:35:47.000000Z" + }, + "companies": { + "updated_at": "2022-08-22T13:58:07.000000Z" + }, + "groups": { + "updated_at": "2022-08-18T05:13:56.000000Z" + }, + "roles": { + "updated_at": "2022-07-19T11:49:58.000000Z" + } + } + } + + menagerie.set_state(conn_id_2, interrupted_state) + + # Run another sync + self.run_and_verify_sync(conn_id_2) + + # Acquire records from target output + interrupted_sync_records = runner.get_records_from_target_output() + final_state = menagerie.get_state(conn_id_2) + currently_syncing = final_state.get('currently_syncing') + + # Checking resuming sync resulted in a successfully saved state + with self.subTest(): + + # Verify sync is not interrupted by checking currently_syncing in the state for sync + self.assertIsNone(currently_syncing) + + # Verify bookmarks are saved + self.assertIsNotNone(final_state.get('bookmarks')) + + + full_sync_bookmark = full_sync_state["bookmarks"] + final_bookmark = final_state["bookmarks"] + interrupted_repo_bookmark = interrupted_state["bookmarks"] + + for stream in list(streams_to_test) + [removed_stream]: + with self.subTest(stream=stream): + + # Expected values + expected_replication_method = expected_replication_methods[stream] + expected_primary_keys = list(self.expected_primary_keys()[stream]) + + # Gather results + full_records = [message['data'] for message in + full_sync_records.get(stream, {}).get('messages', []) ] + full_record_count = len(full_records) + + if stream != removed_stream: + interrupted_records = [message['data'] for message in + interrupted_sync_records.get(stream, {}).get('messages', [])] + interrupted_record_count = len(interrupted_records) + else: + self.assertNotIn(stream, interrupted_sync_records.keys()) + + if expected_replication_method == self.INCREMENTAL: + expected_replication_key = next(iter(expected_replication_keys[stream])) + full_sync_stream_bookmark = self.dt_to_ts(full_sync_bookmark.get(stream, {}).get("updated_at"), self.BOOKMARK_FORMAT) + + if stream in interrupted_repo_bookmark.keys(): + interrupted_bookmark = self.dt_to_ts(interrupted_repo_bookmark[stream]["updated_at"], self.BOOKMARK_FORMAT) + final_sync_stream_bookmark = self.dt_to_ts(final_bookmark.get(stream, {}).get("updated_at"), self.BOOKMARK_FORMAT) + + if stream != removed_stream: + + # Verify state ends with the same value for common streams after both full and interrupted syncs + self.assertEqual(full_sync_stream_bookmark, final_sync_stream_bookmark) + + # Verify resuming sync only replicates records with replication key values greater or equal to + # the interrupted_state for streams that were completed, replicated during the interrupted sync. + for record in interrupted_records: + with self.subTest(record_primary_key=record[expected_primary_keys[0]]): + rec_time = self.dt_to_ts(record[expected_replication_key], self.RECORD_REPLICATION_KEY_FORMAT) + self.assertGreaterEqual(rec_time, interrupted_bookmark) + + # Verify all interrupted recs are in full recs + self.assertIn(record, full_records, msg='Incremental table record in interrupted sync not found in full sync') + + # Record count for all streams of interrupted sync match expectations + full_records_after_interrupted_bookmark = 0 + for record in full_records: + rec_time = self.dt_to_ts(record[expected_replication_key], self.RECORD_REPLICATION_KEY_FORMAT) + self.assertGreater(rec_time, start_date, msg=f"{expected_replication_key} {stream} {record}") + + if (rec_time >= interrupted_bookmark): + full_records_after_interrupted_bookmark += 1 + + self.assertGreaterEqual(full_records_after_interrupted_bookmark, interrupted_record_count, \ + msg="Expected max {} records in each sync".format(full_records_after_interrupted_bookmark)) + else: + # Verify the bookmark has not advanced for the removed stream + self.assertEqual(final_sync_stream_bookmark, interrupted_bookmark) + else: + # verify we collected records that have the same replication value as a bookmark for streams that are already synced + self.assertGreater(interrupted_record_count, 0) + + else: + # Verify full table streams do not save bookmarked values after a successful sync + self.assertNotIn(stream, full_sync_bookmark.keys()) + self.assertNotIn(stream, final_bookmark.keys()) + + # Verify first and second sync have the same records + self.assertEqual(full_record_count, interrupted_record_count) + for rec in interrupted_records: + self.assertIn(rec, full_records, msg='Full table record in interrupted sync not found in full sync') + + # Verify at least 1 record was replicated for each stream + if stream != removed_stream: + self.assertGreater(interrupted_record_count, 0) + + print(f"{stream} resumed sync records replicated: {interrupted_record_count}") \ No newline at end of file diff --git a/tests/unittests/test_bookmark_handling.py b/tests/unittests/test_bookmark_handling.py new file mode 100644 index 0000000..b33a8c3 --- /dev/null +++ b/tests/unittests/test_bookmark_handling.py @@ -0,0 +1,86 @@ +import unittest +from parameterized import parameterized +from tap_freshdesk.streams import get_min_bookmark, get_schema, write_bookmark + +START_DATE = '2022-09-00T00:00:00.000000Z' + + +class TestGetMinBookmark(unittest.TestCase): + """ + Test `get_min_bookmark` method of the stream class + """ + + start_date = "2020-04-01T00:00:00Z" + state = { + "bookmarks": { + "tickets": {"updated_at": "2022-03-29T00:00:00Z"}, + "conversations": {"updated_at": "2022-03-01T00:00:00Z"}, + "satisfaction_ratings": {"updated_at": "2022-03-14T00:00:00Z"}, + "time_entries": {"updated_at": "2022-04-01T00:00:00Z"}, + } + } + + @parameterized.expand([ + # ["test_name", "selected_streams", "state", "expected_bookmark"] + ['test_parent_only_with_state', ['tickets'], {'bookmarks': {'tickets': {'updated_at': '2022-08-30T00:00:00.000000Z'}}}, '2022-08-30T00:00:00.000000Z'], + ['test_child_only_with_state', ['conversations'], {'bookmarks': {'conversations': {'updated_at': '2022-08-30T00:00:00.000000Z'}}}, '2022-08-30T00:00:00.000000Z'], + ['test_parent_only_without_state', ['tickets'], {}, START_DATE], + ['test_child_only_without_state', ['tickets'], {}, START_DATE], + ['test_min_parent_bookmark_single_child', ['tickets', 'conversations'], + {'bookmarks': {'tickets': {'updated_at': '2022-07-30T00:00:00.000000Z'}, 'conversations': {'updated_at': '2022-08-30T00:00:00.000000Z'}}}, '2022-07-30T00:00:00.000000Z'], + ['test_min_child_bookmark_single_child', ['tickets', 'conversations'], + {'bookmarks': {'tickets': {'updated_at': '2022-08-30T00:00:00.000000Z'}, 'conversations': {'updated_at': '2022-07-30T00:00:00.000000Z'}}}, '2022-07-30T00:00:00.000000Z'], + ['test_min_child_bookmark_multiple_child', ['tickets', 'conversations', 'time_entries'], + {'bookmarks': {'tickets': {'updated_at': '2022-09-30T00:00:00.000000Z'}, 'conversations': {'updated_at': '2022-09-30T00:00:00.000000Z'}}}, START_DATE], + ['test_multiple_child_only_bookmark', ['tickets', 'conversations', 'time_entries'], + {'bookmarks': {'time_entries': {'updated_at': '2022-09-30T00:00:00.000000Z'}, 'conversations': {'updated_at': '2022-09-30T00:00:00.000000Z'}}}, START_DATE], + ['test_multiple_child_bookmark', ['tickets', 'conversations', 'time_entries'], + {'bookmarks': {'time_entries': {'updated_at': '2022-06-30T00:00:00.000000Z'}, 'tickets': {'updated_at': '2022-08-30T00:00:00.000000Z'}, 'conversations': {'updated_at': '2022-11-30T00:00:00.000000Z'}}}, '2022-06-30T00:00:00.000000Z'] + + ]) + def test_min_bookmark(self, test_name, selected_streams, state, expected_bookmark): + """ + Test that `get_min_bookmark` method returns the minimum bookmark from the parent and its corresponding child bookmarks. + """ + current_time = '2022-09-30T00:00:00.000000Z' + actual_bookmark = get_min_bookmark('tickets', selected_streams, current_time, START_DATE, state, 'updated_at') + self.assertEqual(actual_bookmark, expected_bookmark) + + +class TestGetSchema(unittest.TestCase): + """ + Test `get_schema` method of the stream class. + """ + + def test_get_schema(self): + """Verify function returns expected schema""" + catalog = [ + {"tap_stream_id": "roles"}, + {"tap_stream_id": "agents"}, + {"tap_stream_id": "time_entries"}, + ] + expected_schema = {"tap_stream_id": "agents"} + + # Verify returned schema is same as expected schema + self.assertEqual(get_schema(catalog, "agents"), expected_schema) + + +class TestWriteBookmark(unittest.TestCase): + """ + Test the `write_bookmark` method of the stream class + """ + + @parameterized.expand([ + # ["test_name", "stream", "expected_state"] + ["stream_not_selected", "agents", {"bookmarks": {}}], + ["stream_not_selected", "groups", {"bookmarks": {"groups": {"updated_at": "BOOKMARK_VALUE"}}}], + ]) + def test_write_bookmark(self, test_name, stream, expected_state): + """ + Test that bookmark is written only if the stream is selected + """ + state = {"bookmarks": {}} + write_bookmark(stream, ["roles", "groups"], "BOOKMARK_VALUE", state) + + # Verify that the final state is as expected + self.assertEqual(state, expected_state) diff --git a/tests/unittests/test_check_access_token.py b/tests/unittests/test_check_access_token.py new file mode 100644 index 0000000..8d60121 --- /dev/null +++ b/tests/unittests/test_check_access_token.py @@ -0,0 +1,22 @@ +import unittest +from unittest import mock +from tap_freshdesk import client + + +class TestAccessToken(unittest.TestCase): + """ + Test `check_access_token` method of client class + """ + + @mock.patch("tap_freshdesk.client.FreshdeskClient.request") + def test_access_token(self, mock_request): + """ + Test that to check the access token a request call is made. + """ + config = {"domain": "sampleDomain"} + _client = client.FreshdeskClient(config) + _client.check_access_token() + + # Verify that for check access token, `request` method was called + self.assertTrue(mock_request.called) + mock_request.assert_called_with("https://sampleDomain.freshdesk.com/api/v2/roles", mock.ANY) diff --git a/tests/unittests/test_client.py b/tests/unittests/test_client.py new file mode 100644 index 0000000..de677a8 --- /dev/null +++ b/tests/unittests/test_client.py @@ -0,0 +1,36 @@ +import unittest +from unittest import mock +from tap_freshdesk import client +import requests +import json + + +def get_response(status_code, json_resp={}, headers=None): + """ + Returns mock response + """ + response = requests.Response() + response.status_code = status_code + response._content = json.dumps(json_resp).encode() + if headers: + response.headers = headers + return response + + +class TestAccessToken(unittest.TestCase): + """ + Test `check_access_token` method of client class + """ + + @mock.patch("tap_freshdesk.client.FreshdeskClient.request") + def test_access_token(self, mock_request): + """ + Test that to check access token a request call is made. + """ + config = {"domain": "sampleDomain"} + _client = client.FreshdeskClient(config) + _client.check_access_token() + + # Verify that for check access token, `request` method was called + self.assertTrue(mock_request.called) + mock_request.assert_called_with("https://sampleDomain.freshdesk.com/api/v2/roles", mock.ANY) diff --git a/tests/unittests/test_currently_syncing.py b/tests/unittests/test_currently_syncing.py new file mode 100644 index 0000000..8cd4db1 --- /dev/null +++ b/tests/unittests/test_currently_syncing.py @@ -0,0 +1,61 @@ +import unittest +from tap_freshdesk.sync import update_currently_syncing, get_ordered_stream_list + + +class TestGetOrderedStreamList(unittest.TestCase): + """ + Test `get_ordered_stream_list` function to get ordered list od streams + """ + + streams_to_sync = ["agents", "companies", "tickets", + "conversations", "groups", "satisfaction_ratings", "time_entries"] + + def test_currently_syncing_not_in_list(self): + """Test if currently syncing is not available in `streams_to_sync` list, function returns sorted streams_to_sync list.""" + expected_list = ["agents", "companies", "conversations", + "groups", "satisfaction_ratings", "tickets", "time_entries"] + final_list = get_ordered_stream_list("roles", self.streams_to_sync) + + # Verify with expected ordered list of streams + self.assertEqual(final_list, expected_list) + + def test_for_interrupted_sync(self): + """Test when the sync was interrupted, the function returns ordered list of streams starting with 'currently_syncing' stream.""" + expected_list = ["groups", "satisfaction_ratings", "tickets", + "time_entries", "agents", "companies", "conversations"] + final_list = get_ordered_stream_list("groups", self.streams_to_sync) + + # Verify with expected ordered list of streams + self.assertEqual(final_list, expected_list) + + def test_for_completed_sync(self): + """Test when sync was not interrupted, the function returns sorted streams_to_sync list.""" + expected_list = ["agents", "companies", "conversations", + "groups", "satisfaction_ratings", "tickets", "time_entries"] + final_list = get_ordered_stream_list(None, self.streams_to_sync) + + # Verify with expected ordered list of streams + self.assertEqual(final_list, expected_list) + + +class TestUpdateCurrentlySyncing(unittest.TestCase): + + """ + Test `update_currently_syncing` function of sync. + """ + + def test_update_syncing_stream(self): + """Test for adding currently syncing stream in state.""" + state = {"currently_syncing": "groups"} + update_currently_syncing(state, "groups") + + # Verify with expected state + self.assertEqual(state, {"currently_syncing": "groups"}) + + def test_flush_currently_syncing(self): + """Test for removing currently syncing stream from state.""" + state = {"currently_syncing": "agents"} + update_currently_syncing(state, None) + + # Verify with expected state + self.assertEqual(state, {}) diff --git a/tests/unittests/test_main.py b/tests/unittests/test_main.py new file mode 100644 index 0000000..d5a802e --- /dev/null +++ b/tests/unittests/test_main.py @@ -0,0 +1,108 @@ +import unittest +from unittest import mock +from singer.catalog import Catalog +from tap_freshdesk import main +from tap_freshdesk.discover import discover + + +class MockArgs: + """Mock args object class""" + + def __init__(self, config=None, catalog=None, state={}, discover=False) -> None: + self.config = config + self.catalog = catalog + self.state = state + self.discover = discover + + +@mock.patch("tap_freshdesk.FreshdeskClient") +@mock.patch("singer.utils.parse_args") +class TestDiscoverMode(unittest.TestCase): + """ + Test main function for discover mode + """ + + mock_config = {"start_date": "", "access_token": ""} + + @mock.patch("tap_freshdesk._discover") + def test_discover_with_config(self, mock_discover, mock_args, mock_verify_access): + """Test `_discover` function is called for discover mode""" + mock_discover.return_value = Catalog([]) + mock_args.return_value = MockArgs( + discover=True, config=self.mock_config) + main() + + # Verify that `discover` was called + self.assertTrue(mock_discover.called) + + +@mock.patch("tap_freshdesk.FreshdeskClient.check_access_token") +@mock.patch("singer.utils.parse_args") +@mock.patch("tap_freshdesk._sync") +class TestSyncMode(unittest.TestCase): + """ + Test main function for sync mode + """ + + mock_config = {"start_date": "", "access_token": ""} + mock_catalog = {"streams": [{"stream": "teams", "schema": {}, "metadata": {}}]} + + @mock.patch("tap_freshdesk._discover") + def test_sync_with_catalog(self, mock_discover, mock_sync, mock_args, mock_check_access_token): + """Test sync mode with catalog given in args""" + + mock_args.return_value = MockArgs(config=self.mock_config, + catalog=Catalog.from_dict(self.mock_catalog)) + main() + + # Verify `_sync` is called with expected arguments + mock_sync.assert_called_with(mock.ANY, self.mock_config, {}, self.mock_catalog) + + # verify `_discover` function is not called + self.assertFalse(mock_discover.called) + + @mock.patch("tap_freshdesk._discover") + def test_sync_without_catalog(self, mock_discover, mock_sync, mock_args, mock_check_access_token): + """Test sync mode without catalog given in args""" + + mock_discover.return_value = Catalog.from_dict(self.mock_catalog) + mock_args.return_value = MockArgs(config=self.mock_config) + main() + + # Verify `_sync` is called with expected arguments + mock_sync.assert_called_with(mock.ANY, self.mock_config, {}, self.mock_catalog) + + # verify `_discover` function is called + self.assertTrue(mock_discover.called) + + def test_sync_with_state(self, mock_sync, mock_args, mock_check_access_token): + """Test sync mode with state given in args""" + mock_state = {"bookmarks": {"projec ts": ""}} + mock_args.return_value = MockArgs(config=self.mock_config, + catalog=Catalog.from_dict(self.mock_catalog), + state=mock_state) + main() + + # Verify `_sync` is called with expected arguments + mock_sync.assert_called_with(mock.ANY, self.mock_config, mock_state, self.mock_catalog) + + +class TestDiscover(unittest.TestCase): + """Test `discover` function.""" + + def test_discover(self): + return_catalog = discover() + + # Verify discover function returns `Catalog` type object. + self.assertIsInstance(return_catalog, Catalog) + + @mock.patch("tap_freshdesk.discover.Schema") + @mock.patch("tap_freshdesk.discover.LOGGER.error") + def test_discover_error_handling(self, mock_logger, mock_schema): + """Test discover function if exception arises.""" + mock_schema.from_dict.side_effect = Exception + with self.assertRaises(Exception): + discover() + + # Verify logger called 3 times when an exception arises. + self.assertEqual(mock_logger.call_count, 3) diff --git a/tests/unittests/test_page_size.py b/tests/unittests/test_page_size.py new file mode 100644 index 0000000..846b439 --- /dev/null +++ b/tests/unittests/test_page_size.py @@ -0,0 +1,64 @@ +import unittest +from parameterized import parameterized +import tap_freshdesk.client as client_ + +PAGE_SIZE_INT = 50 +PAGE_SIZE_STR_INT = "50" +PAGE_SIZE_STR_FLOAT = "50.0" +PAGE_SIZE_FLOAT = 50.0 +PAGE_SIZE_ZERO = 0 +PAGE_SIZE_STR_ZERO = "0" +PAGE_SIZE_INVALID_STRING = "abc" + + +class TestPageSizeValue(unittest.TestCase): + + @parameterized.expand([ + # ["page_size_value", "expected_value"] + [PAGE_SIZE_INT, PAGE_SIZE_INT], + [PAGE_SIZE_STR_INT, PAGE_SIZE_INT], + [PAGE_SIZE_STR_FLOAT, PAGE_SIZE_INT], + [PAGE_SIZE_FLOAT, PAGE_SIZE_INT], + ]) + def test_page_size_for_valid_values(self, page_size_value, expected_value): + """ + Test the various values of page_size: + - For string, integer, float type of values, converts to float + - For null string, zero(string), zero(integer), takes default integer value + """ + config = {'domain': 'abc', "page_size": page_size_value} + client = client_.FreshdeskClient(config) + + # Verify the page_size is the same as the expected value + self.assertEqual(client.page_size, expected_value) + + @parameterized.expand([ + # ["page_size_value"] + [PAGE_SIZE_INVALID_STRING], + [PAGE_SIZE_STR_ZERO], + [PAGE_SIZE_ZERO], + ]) + def test_page_size_for_invalid_values(self, page_size_value): + """ + Test the various values of page_size: + - For string, integer, float type of values, converts to float + - For null string, zero(string), zero(integer), takes default integer value + """ + + config = {'domain': 'abc', "page_size": page_size_value} + # Verify the tap raises Exception + with self.assertRaises(Exception) as e: + client_.FreshdeskClient(config) + + # Verify the tap raises an error with expected error message + self.assertEqual(str(e.exception), "The entered page size is invalid, it should be a valid integer.") + + def test_without_page_size(self): + """ + Test if no page size is given in config, default page_size will be considered. + """ + config = {'domain': 'abc'} + client = client_.FreshdeskClient(config) + + # Verify the page_size is the same as the default value + self.assertEqual(client.page_size, client_.DEFAULT_PAGE_SIZE) diff --git a/tests/unittests/test_streams.py b/tests/unittests/test_streams.py new file mode 100644 index 0000000..0ac97ee --- /dev/null +++ b/tests/unittests/test_streams.py @@ -0,0 +1,167 @@ +import unittest +from unittest import mock +from parameterized import parameterized +from tap_freshdesk.streams import Agents, Tickets + + +class TestSyncObj(unittest.TestCase): + """ + Test `sync_obj` method of stream. + """ + + start_date = "2019-06-01T00:00:00Z" + only_parent_response = [ + [{"id": i, "updated_at": f"2020-0{i}-01T00:00:00Z"} for i in [1, 5, 2]], # Tickets Response + [{"id": "33", "updated_at": f"2020-03-01T00:00:00Z"}], # Deleted tickets Response + [{"id": "55", "updated_at": f"2020-04-01T00:00:00Z"}], # Spam tickets Response + ] + written_states_1 = { + "tickets": "2020-05-01T00:00:00Z", + "tickets_deleted": "2020-03-01T00:00:00Z", + "tickets_spam": "2020-04-01T00:00:00Z", + } + with_child_response = [ + [{"id": i, "updated_at": f"2020-0{i}-01T00:00:00Z"} for i in [1, 5, 2]], # Tickets Response + [{"id": i, "updated_at": f"2020-0{i}-01T00:00:00Z"} for i in [2, 4]], # conversations Response + [{"id": "33", "updated_at": "2020-03-01T00:00:00Z"}], # conversations Response + [{"id": "55", "updated_at": "2020-04-01T00:00:00Z"}], # conversations Response + [], [] # Deleted/Spam tickets response + ] + written_states_2 = { + "conversations": "2020-04-01T00:00:00Z", + } + written_states_3 = { + "tickets": "2020-05-01T00:00:00Z", + "conversations": "2020-04-01T00:00:00Z", + } + expected_state_1 = { + "conversations": {"updated_at": "2020-04-01T00:00:00Z"}, + "tickets": {"updated_at": "2020-03-15T00:00:00Z"}, + "tickets_deleted": {"updated_at": "2020-05-01T00:00:00Z"}, + "tickets_spam": {"updated_at": "2020-04-01T00:00:00Z"} + } + expected_state_2 = {'conversations': {'updated_at': '2020-04-01T00:00:00Z'}, + 'tickets': {'updated_at': '2019-06-01T00:00:00Z'}, + 'tickets_deleted': {'updated_at': '2020-05-01T00:00:00Z'}, + 'tickets_spam': {'updated_at': '2020-04-01T00:00:00Z'}} + expected_state_3 = { + **expected_state_1, + "tickets": {"updated_at": "2020-03-16T00:00:00Z"}, + } + + @parameterized.expand([ + # ["test_name", "selected_streams", "streams_to_sync", "responses", "written_records", "written_states"] + ["parent_selected", ["tickets"], ["tickets"], + only_parent_response, 5, written_states_1], + ["child_selected", ["conversations"], ["tickets", "conversations"], + with_child_response, 4, written_states_2], + ["parent_child_both_selected", ["tickets", "conversations"], ["tickets", "conversations"], + with_child_response, 7, written_states_3], + ]) + @mock.patch("singer.write_record") + @mock.patch("singer.write_bookmark") + def test_stream_sync_obj( + self, test_name, selected_streams, streams_to_sync, responses, written_records, written_states, + mock_write_bookmark, mock_write_record): + """ + Test that stream is writing records and bookmarks only if selected. + """ + stream = Tickets() + state = {} + client = mock.Mock() + client.base_url = "" + client.page_size = 100 + client.request.side_effect = responses + catalog = [ + {"schema": {}, "tap_stream_id": "tickets", "metadata": []}, + {"schema": {}, "tap_stream_id": "conversations", "metadata": []} + ] + + stream.sync_obj(state, self.start_date, client, catalog, selected_streams, streams_to_sync) + + # Verify expected records are written + self.assertEqual(mock_write_record.call_count, written_records) + + # Verify max bookmark is updated for all selected streams + for stream, bookmark in written_states.items(): + mock_write_bookmark.assert_any_call({}, stream, "updated_at", bookmark) + + @parameterized.expand([ + # ["test_name", "state", "expected_state", "written_records"] + ["without_state", dict(), expected_state_1, 13], + ["with_parent_state", {"bookmarks": {"tickets": {"updated_at": "2020-03-16T00:00:00Z"}}}, expected_state_2, 10], + ["with_child_state", {"bookmarks": {"conversations": { + "updated_at": "2020-03-23T00:00:00Z"}}}, expected_state_1, 8], + ["with_both_state", {"bookmarks": {"tickets": {"updated_at": "2020-03-16T00:00:00Z"}, + "conversations": {"updated_at": "2020-03-23T00:00:00Z"}}}, expected_state_3, 5], + ]) + @mock.patch("singer.write_record") + def test_parent_child_both_selected(self, test_name, state, expected_state, written_records, mock_write_record): + """ + Test parent and child streams both selected in given conditions: + - Without a state, all the records will be written. + - With only parent bookmark parent records with replication value < bookmark will not be written. + - With only a child bookmark child records with replication value < bookmark will not be written. + - With both parent and child bookmarks, both follow bookmarks. + """ + stream = Tickets() + client = mock.Mock() + client.base_url = "" + client.page_size = 100 + client.request.side_effect = [ + [{"id": i, "updated_at": f"2020-03-{i}T00:00:00Z"} for i in [11, 15, 12]], # Tickets Response + [{"id": 10+i, "updated_at": f"2020-03-{i}T00:00:00Z"} for i in [13, 24]], # conversations Response + [{"id": 13, "updated_at": "2020-03-01T00:00:00Z"}], # conversations Response + [{"id": 95, "updated_at": "2020-04-01T00:00:00Z"}], # conversations Response + [{"id": 73, "updated_at": "2020-05-01T00:00:00Z"}], # Deleted tickets response + [{"id": 30+i, "updated_at": f"2020-03-{i}T00:00:00Z"}for i in [22, 10]], # conversations response + [{"id": 43, "updated_at": "2020-04-01T00:00:00Z"}], # Spam tickets response + [{"id": 50+i, "updated_at": f"2020-03-{i}T00:00:00Z"}for i in [12, 25]], # conversations response + ] + catalog = [ + {"schema": {}, "tap_stream_id": "tickets", "metadata": []}, + {"schema": {}, "tap_stream_id": "conversations", "metadata": []} + ] + + stream.sync_obj(state=state, + start_date=self.start_date, + client=client, + catalog=catalog, + selected_streams=["tickets", "conversations"], + streams_to_sync=["tickets", "conversations"]) + self.assertEqual(mock_write_record.call_count, written_records) + self.assertDictEqual(state, {"bookmarks": expected_state}) + + +class TestSyncTransformDict(unittest.TestCase): + """ + Test `transform_dict` method of stream class. + """ + + stream = Agents() + expected_list_1 = [{"name": "Agency", "value": "Justice League"}, + {"name": "Department", "value": "Superhero"}] + expected_list_2 = [{"key": "Agency", "data": "Justice League"}, + {"key": "Department", "data": "Superhero"}] + expected_list_3 = [{"name": "Agency", "value": "justice league"}, + {"name": "Department", "value": "superhero"}] + + @ parameterized.expand([ + # ["test_name", "dictionary", "expected_list", "kwargs"] + ["coverting_dict_to_list", {"Agency": "Justice League", "Department": "Superhero"}, + expected_list_1, {}], + ["With_custom_keys", {"Agency": "Justice League", "Department": "Superhero"}, + expected_list_2, {"key_key": "key", "value_key": "data"}], + ["With_string_value", {"Agency": "Justice League", "Department": "Superhero"}, + expected_list_3, {"force_str": True}], + ]) + def test_transform(self, test_name, dictionary, expected_list, kwargs): + """ + Test that the dictionary is transformed as per given conditions: + - Value is a lowercase string when force_str: True + - Key-Values can be customized by passing in args + """ + returned_list = self.stream.transform_dict(dictionary, **kwargs) + + # Verify returned list is expected + self.assertEqual(returned_list, expected_list) diff --git a/tests/unittests/test_sync.py b/tests/unittests/test_sync.py new file mode 100644 index 0000000..6f1e434 --- /dev/null +++ b/tests/unittests/test_sync.py @@ -0,0 +1,134 @@ +import unittest +from unittest import mock +from parameterized import parameterized +from tap_freshdesk.sync import (write_schemas, get_selected_streams, + get_stream_to_sync, sync) + + +def get_stream_catalog(stream_name, is_selected=False): + """Return catalog for stream""" + return { + "schema": {}, + "tap_stream_id": stream_name, + "metadata": [ + { + "breadcrumb": [], + "metadata":{"selected": is_selected} + } + ], + "key_properties": [] + } + + +def get_catalog(parent=False, child=False): + """Return complete catalog""" + + return { + "streams": [ + get_stream_catalog("agents"), + get_stream_catalog("companies", parent), + get_stream_catalog("conversations", child), + get_stream_catalog("tickets", parent), + get_stream_catalog("time_entries", child), + get_stream_catalog("groups", parent), + ] + } + + +class TestSyncFunctions(unittest.TestCase): + """ + Test `sync` function. + """ + + # NOTE: For `tickets` stream `sync_obj` is called 3 times + @parameterized.expand([ + # ["test_name", "mock_catalog", "selected_streams", "synced_streams"] + ["only_parent_selected", get_catalog(parent=True), + ["companies", "tickets", "groups"], 5], + ["only_child_selected", get_catalog(child=True), + ["conversations", "time_entries"], 3], + ["both_selected", get_catalog(parent=True, child=True), + ["companies", "tickets", "groups", "conversations", "time_entries"], 5], + ["No_streams_selected", get_catalog(), + [], 0], + ]) + @mock.patch("singer.write_state") + @mock.patch("singer.write_schema") + @mock.patch("tap_freshdesk.streams.Stream.sync_obj") + def test_sync(self, test_name, mock_catalog, selected_streams, synced_streams, + mock_sync_endpoint, mock_write_schemas, mock_write_state): + """ + Test sync function that for child streams selected parent sync_obj is called and, + schema is written only for selected streams. + """ + client = mock.Mock() + sync(client, {'start_date': ""}, {}, mock_catalog) + + # Verify write schema is called for selected streams + self.assertEqual(mock_write_schemas.call_count, len(selected_streams)) + for stream in selected_streams: + mock_write_schemas.assert_any_call(stream, mock.ANY, mock.ANY) + + # Verify sync object was called for syncing parent streams + self.assertEqual(mock_sync_endpoint.call_count, synced_streams) + + +class TestUtilsFunction(unittest.TestCase): + """ + Test functions used in `sync`. + """ + + mock_catalog = {"streams": [ + get_stream_catalog("tickets"), + get_stream_catalog("time_entries"), + get_stream_catalog("conversations") + ]} + + @parameterized.expand([ + # [test_name, selected_streams, mock_write_schema] + ["parents_selected", ["tickets"]], + ["child_selected", ["time_entries"]], + ["parent_and_child_selected", ["tickets", "conversations"]], + ]) + @mock.patch("singer.write_schema") + def test_write_schema(self, test_name, selected_streams, mock_write_schema): + """ + Test that `write_schemas` function writes schema of only selected streams. + """ + write_schemas("tickets", self.mock_catalog, selected_streams) + for stream in selected_streams: + # Verify write_schema function is called. + mock_write_schema.assert_any_call(stream, mock.ANY, mock.ANY) + + @parameterized.expand([ + # ["test_name", "selected_streams", "expected_streams"] + ['test_parent_selected', ["tickets"], ["tickets"]], + ['test_child_selected', ["conversations", "satisfaction_ratings"], ["tickets"]], + ['test_both_selected', ["conversations", "roles", "tickets"], ["roles", "tickets"]] + ]) + def test_get_sync_streams(self, test_name, selected_streams, expected_streams): + """ + Test that if an only child is selected in the catalog, + then `get_stream_to_sync` returns the parent streams if selected stream is child. + """ + sync_streams = get_stream_to_sync(selected_streams) + + # Verify that the expected list of streams is returned + self.assertEqual(sync_streams, expected_streams) + + def test_streams_selection(self): + """ + Test that `get_selected_streams` returns the list of selected streams. + """ + catalog = {"streams": [ + get_stream_catalog("tickets", is_selected=True), + get_stream_catalog("roles", is_selected=True), + get_stream_catalog("contacts"), + get_stream_catalog("groups", is_selected=True), + get_stream_catalog("agents"), + ]} + expected_streams = ["tickets", "roles", "groups"] + selected_streams = get_selected_streams(catalog) + + # Verify expected list is returned + self.assertEqual(selected_streams, expected_streams)