From 71dcfe9b75fffa69442d890949d24c3ef94e9a0e Mon Sep 17 00:00:00 2001 From: Jacob Baca Date: Thu, 17 Sep 2020 20:08:16 +0000 Subject: [PATCH 1/5] added retry logic for users stream --- CHANGELOG.md | 3 +++ setup.py | 2 +- tap_zendesk/streams.py | 15 ++++++++++++--- 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e11ead..d08df2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 1.5.2 + * Add retry logic for users stream + ## 1.5.1 * Add error message to go along with assert diff --git a/setup.py b/setup.py index 76cf13a..8edbee6 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-zendesk', - version='1.5.1', + version='1.5.2', description='Singer.io tap for extracting data from the Zendesk API', author='Stitch', url='https://singer.io', diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index ffd7c0d..7953c6a 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -1,6 +1,7 @@ import os import json import datetime +import time import pytz import zenpy from zenpy.lib.exception import RecordNotFoundException @@ -182,10 +183,16 @@ def sync(self, state): if users.count > 1000: search_window_size = search_window_size // 2 end = start + datetime.timedelta(seconds=search_window_size) - LOGGER.info("Detected Search API response size too large. Cutting search window in half to %s seconds.", search_window_size) + LOGGER.info("users - Detected Search API response size too large. Cutting search window in half to %s seconds.", search_window_size) + continue + + # Consume the records to account for dates lower than window start + users = [user for user in users] + if not all(parsed_start <= user.updated_at for user in users): + LOGGER.info("users - Record found before date window start. Waiting 30 seconds, then retrying window for consistency.".format(parsed_start, user.updated_at)) + time.sleep(30) continue for user in users: - assert parsed_start <= user.updated_at, "users - Record found before date window start. Details: window start ({}) is not less than or equal to updated_at ({})".format(parsed_start, user.updated_at) if bookmark < utils.strptime_with_tz(user.updated_at) <= end: # NB: We don't trust that the records come back ordered by # updated_at (we've observed out-of-order records), @@ -193,6 +200,8 @@ def sync(self, state): self.update_bookmark(state, user.updated_at) if parsed_start <= user.updated_at <= parsed_end: yield (self.stream, user) + + # Assumes that the for loop got everything singer.write_state(state) if search_window_size <= original_search_window_size // 2: search_window_size = search_window_size * 2 @@ -373,7 +382,7 @@ def sync(self, state): if satisfaction_ratings.count > 50000: search_window_size = search_window_size // 2 end = start + datetime.timedelta(seconds=search_window_size) - LOGGER.info("Detected Search API response size for this window is too large (> 50k). Cutting search window in half to %s seconds.", search_window_size) + LOGGER.info("satisfaction_ratings - Detected Search API response size for this window is too large (> 50k). Cutting search window in half to %s seconds.", search_window_size) continue for satisfaction_rating in satisfaction_ratings: assert parsed_start <= satisfaction_rating.updated_at, "satisfaction_ratings - Record found before date window start. Details: window start ({}) is not less than or equal to updated_at ({})".format(parsed_start, satisfaction_rating.updated_at) From 1e276144715c8b34eaf32d507de35c73f61980a5 Mon Sep 17 00:00:00 2001 From: Jacob Baca Date: Thu, 17 Sep 2020 20:23:59 +0000 Subject: [PATCH 2/5] cleaned up pylint errors --- tap_zendesk/streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 7953c6a..47c709c 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -187,9 +187,9 @@ def sync(self, state): continue # Consume the records to account for dates lower than window start - users = [user for user in users] + users = [user for user in users] # pylint: disable=unnecessary-comprehension if not all(parsed_start <= user.updated_at for user in users): - LOGGER.info("users - Record found before date window start. Waiting 30 seconds, then retrying window for consistency.".format(parsed_start, user.updated_at)) + LOGGER.info("users - Record found before date window start. Waiting 30 seconds, then retrying window for consistency.") time.sleep(30) continue for user in users: From 3dc61e3ec7309c6105aad8b2f5bc20df929bf493 Mon Sep 17 00:00:00 2001 From: Dan Mosora Date: Mon, 21 Sep 2020 18:37:40 +0000 Subject: [PATCH 3/5] Add limit on number of retries --- tap_zendesk/streams.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 47c709c..a152211 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -171,6 +171,7 @@ def sync(self, state): parsed_sync_end = singer.strftime(sync_end, "%Y-%m-%dT%H:%M:%SZ") # ASSUMPTION: updated_at value always comes back in utc + num_retries = 0 while start < sync_end: parsed_start = singer.strftime(start, "%Y-%m-%dT%H:%M:%SZ") parsed_end = min(singer.strftime(end, "%Y-%m-%dT%H:%M:%SZ"), parsed_sync_end) @@ -188,10 +189,19 @@ def sync(self, state): # Consume the records to account for dates lower than window start users = [user for user in users] # pylint: disable=unnecessary-comprehension + if not all(parsed_start <= user.updated_at for user in users): - LOGGER.info("users - Record found before date window start. Waiting 30 seconds, then retrying window for consistency.") - time.sleep(30) - continue + # Only retry up to 30 minutes (60 attempts) + if num_retries < 60 + LOGGER.info("users - Record found before date window start. Waiting 30 seconds, then retrying window for consistency.") + time.sleep(30) + num_retries += 1 + continue + else: + raise AssertionError("users - Record found before date window start and did not resolve after 30 minutes of retrying. Details: window start ({}) is not less than or equal to updated_at value(s) {}".format( + parsed_start, [str(user.updated_at) for user in users if user.updated_at < parsed_start])) + else: + num_retries = 0 for user in users: if bookmark < utils.strptime_with_tz(user.updated_at) <= end: # NB: We don't trust that the records come back ordered by From 41441387cfc64bf25568cf763c4012d9cd4fb27b Mon Sep 17 00:00:00 2001 From: Dan Mosora Date: Mon, 21 Sep 2020 18:42:19 +0000 Subject: [PATCH 4/5] Fix syntax error and clarify message --- tap_zendesk/streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index a152211..5378cee 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -192,8 +192,8 @@ def sync(self, state): if not all(parsed_start <= user.updated_at for user in users): # Only retry up to 30 minutes (60 attempts) - if num_retries < 60 - LOGGER.info("users - Record found before date window start. Waiting 30 seconds, then retrying window for consistency.") + if num_retries < 60: + LOGGER.info("users - Record found before date window start. Waiting 30 seconds, then retrying window for consistency. (Retry #%s)", num_retries + 1) time.sleep(30) num_retries += 1 continue From 288c9be4610b5088e911ebc8ccf7c99020fd38eb Mon Sep 17 00:00:00 2001 From: Dan Mosora Date: Mon, 21 Sep 2020 19:29:47 +0000 Subject: [PATCH 5/5] Fix pylint --- tap_zendesk/streams.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tap_zendesk/streams.py b/tap_zendesk/streams.py index 5378cee..9b86bd9 100644 --- a/tap_zendesk/streams.py +++ b/tap_zendesk/streams.py @@ -191,17 +191,17 @@ def sync(self, state): users = [user for user in users] # pylint: disable=unnecessary-comprehension if not all(parsed_start <= user.updated_at for user in users): - # Only retry up to 30 minutes (60 attempts) + # Only retry up to 30 minutes (60 attempts at 30 seconds each) if num_retries < 60: LOGGER.info("users - Record found before date window start. Waiting 30 seconds, then retrying window for consistency. (Retry #%s)", num_retries + 1) time.sleep(30) num_retries += 1 continue - else: - raise AssertionError("users - Record found before date window start and did not resolve after 30 minutes of retrying. Details: window start ({}) is not less than or equal to updated_at value(s) {}".format( + raise AssertionError("users - Record found before date window start and did not resolve after 30 minutes of retrying. Details: window start ({}) is not less than or equal to updated_at value(s) {}".format( parsed_start, [str(user.updated_at) for user in users if user.updated_at < parsed_start])) - else: - num_retries = 0 + + # If we make it here, all quality checks have passed. Reset retry count. + num_retries = 0 for user in users: if bookmark < utils.strptime_with_tz(user.updated_at) <= end: # NB: We don't trust that the records come back ordered by