Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added retry logic for users stream #45

Merged
merged 5 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 1.5.2
* Add retry logic for users stream

## 1.5.1
* Add error message to go along with assert

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from setuptools import setup

setup(name='tap-zendesk',
version='1.5.1',
version='1.5.2',
description='Singer.io tap for extracting data from the Zendesk API',
author='Stitch',
url='https://singer.io',
Expand Down
25 changes: 22 additions & 3 deletions tap_zendesk/streams.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import json
import datetime
import time
import pytz
import zenpy
from zenpy.lib.exception import RecordNotFoundException
Expand Down Expand Up @@ -170,6 +171,7 @@ def sync(self, state):
parsed_sync_end = singer.strftime(sync_end, "%Y-%m-%dT%H:%M:%SZ")

# ASSUMPTION: updated_at value always comes back in utc
num_retries = 0
while start < sync_end:
parsed_start = singer.strftime(start, "%Y-%m-%dT%H:%M:%SZ")
parsed_end = min(singer.strftime(end, "%Y-%m-%dT%H:%M:%SZ"), parsed_sync_end)
Expand All @@ -182,17 +184,34 @@ def sync(self, state):
if users.count > 1000:
search_window_size = search_window_size // 2
end = start + datetime.timedelta(seconds=search_window_size)
LOGGER.info("Detected Search API response size too large. Cutting search window in half to %s seconds.", search_window_size)
LOGGER.info("users - Detected Search API response size too large. Cutting search window in half to %s seconds.", search_window_size)
continue

# Consume the records to account for dates lower than window start
users = [user for user in users] # pylint: disable=unnecessary-comprehension

if not all(parsed_start <= user.updated_at for user in users):
# Only retry up to 30 minutes (60 attempts at 30 seconds each)
if num_retries < 60:
LOGGER.info("users - Record found before date window start. Waiting 30 seconds, then retrying window for consistency. (Retry #%s)", num_retries + 1)
time.sleep(30)
num_retries += 1
continue
raise AssertionError("users - Record found before date window start and did not resolve after 30 minutes of retrying. Details: window start ({}) is not less than or equal to updated_at value(s) {}".format(
parsed_start, [str(user.updated_at) for user in users if user.updated_at < parsed_start]))

# If we make it here, all quality checks have passed. Reset retry count.
num_retries = 0
for user in users:
assert parsed_start <= user.updated_at, "users - Record found before date window start. Details: window start ({}) is not less than or equal to updated_at ({})".format(parsed_start, user.updated_at)
if bookmark < utils.strptime_with_tz(user.updated_at) <= end:
# NB: We don't trust that the records come back ordered by
# updated_at (we've observed out-of-order records),
# so we can't save state until we've seen all records
self.update_bookmark(state, user.updated_at)
if parsed_start <= user.updated_at <= parsed_end:
yield (self.stream, user)

# Assumes that the for loop got everything
singer.write_state(state)
if search_window_size <= original_search_window_size // 2:
search_window_size = search_window_size * 2
Expand Down Expand Up @@ -373,7 +392,7 @@ def sync(self, state):
if satisfaction_ratings.count > 50000:
search_window_size = search_window_size // 2
end = start + datetime.timedelta(seconds=search_window_size)
LOGGER.info("Detected Search API response size for this window is too large (> 50k). Cutting search window in half to %s seconds.", search_window_size)
LOGGER.info("satisfaction_ratings - Detected Search API response size for this window is too large (> 50k). Cutting search window in half to %s seconds.", search_window_size)
continue
for satisfaction_rating in satisfaction_ratings:
assert parsed_start <= satisfaction_rating.updated_at, "satisfaction_ratings - Record found before date window start. Details: window start ({}) is not less than or equal to updated_at ({})".format(parsed_start, satisfaction_rating.updated_at)
Expand Down