Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-20724 : Use Incremental Export endpoint for Users stream #127

Merged
merged 31 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
431ac25
Added code for export API for Users stream
shantanu73 May 10, 2023
8f67ff4
removed unused import
shantanu73 May 10, 2023
1bc0692
Added logic to update bookmark for every user record synced.
shantanu73 May 11, 2023
6821682
Added space
shantanu73 May 11, 2023
9f5def7
Fixed the expected keys for all fields test.
shantanu73 May 12, 2023
d203575
TDL-17980 add satisfaction_ratings stream to test coverage (#128)
bhtowles May 22, 2023
78872f1
TDL-6756: Fix Infinite Loop for Users (#103)
karanpanchal-crest May 25, 2023
7c530c2
Added code for export API for Users stream
shantanu73 May 10, 2023
8aabcb6
removed unused import
shantanu73 May 10, 2023
401d984
Added logic to update bookmark for every user record synced.
shantanu73 May 11, 2023
848f6a4
Added space
shantanu73 May 11, 2023
ba1cc4f
Fixed the expected keys for all fields test.
shantanu73 May 12, 2023
de2d7ee
Merge branch 'TDL-20724-incremental-export-users-stream' of github.co…
shantanu73 May 25, 2023
c7a411f
Removed tests related to infinite loop as -DEFAULT_SEARCH_WINDOW_SIZE…
shantanu73 May 25, 2023
eca4b31
Added code for export API for Users stream
shantanu73 May 10, 2023
3021238
removed unused import
shantanu73 May 10, 2023
1d11985
Added logic to update bookmark for every user record synced.
shantanu73 May 11, 2023
f8d6461
Added space
shantanu73 May 11, 2023
bf55562
Fixed the expected keys for all fields test.
shantanu73 May 12, 2023
f68a7e8
Removed tests related to infinite loop as -DEFAULT_SEARCH_WINDOW_SIZE…
shantanu73 May 25, 2023
d963559
Merge branch 'TDL-20724-incremental-export-users-stream' of github.co…
shantanu73 May 25, 2023
b04061a
Added code for export API for Users stream
shantanu73 May 10, 2023
cc4c19e
removed unused import
shantanu73 May 10, 2023
5ae0d86
Added logic to update bookmark for every user record synced.
shantanu73 May 11, 2023
d910821
Added space
shantanu73 May 11, 2023
d9e1856
Fixed the expected keys for all fields test.
shantanu73 May 12, 2023
33ed0ac
Removed tests related to infinite loop as -DEFAULT_SEARCH_WINDOW_SIZE…
shantanu73 May 25, 2023
b888a8f
Added code for export API for Users stream
shantanu73 May 10, 2023
ee62157
Added logic to update bookmark for every user record synced.
shantanu73 May 11, 2023
79c7fce
Added space
shantanu73 May 11, 2023
9720a72
Merge branch 'TDL-20724-incremental-export-users-stream' of github.co…
shantanu73 May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion tap_zendesk/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,10 @@ def get_incremental_export(url, access_token, request_timeout, start_time):
'Authorization': 'Bearer {}'.format(access_token),
}

params = {'start_time': start_time.timestamp()}
params = {'start_time': start_time}

if not isinstance(start_time, int):
params = {'start_time': start_time.timestamp()}

response = call_api(url, request_timeout, params=params, headers=headers)
response_json = response.json()
Expand Down
71 changes: 12 additions & 59 deletions tap_zendesk/streams.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import json
import datetime
import time
import pytz
import zenpy
import singer
Expand Down Expand Up @@ -214,10 +213,12 @@ def check_access(self):
start_time = datetime.datetime.utcnow().strftime(START_DATE_FORMAT)
self.client.organizations.incremental(start_time=start_time)

class Users(Stream):
class Users(CursorBasedExportStream):
name = "users"
replication_method = "INCREMENTAL"
replication_key = "updated_at"
item_key = "users"
endpoint = "https://{}.zendesk.com/api/v2/incremental/users/cursor.json"

def _add_custom_fields(self, schema):
try:
Expand All @@ -231,64 +232,16 @@ def _add_custom_fields(self, schema):
return schema

def sync(self, state):
original_search_window_size = int(self.config.get('search_window_size', DEFAULT_SEARCH_WINDOW_SIZE))
search_window_size = original_search_window_size
bookmark = self.get_bookmark(state)
start = bookmark - datetime.timedelta(seconds=1)
end = start + datetime.timedelta(seconds=search_window_size)
sync_end = singer.utils.now() - datetime.timedelta(minutes=1)
parsed_sync_end = singer.strftime(sync_end, "%Y-%m-%dT%H:%M:%SZ")

# ASSUMPTION: updated_at value always comes back in utc
num_retries = 0
while start < sync_end:
parsed_start = singer.strftime(start, "%Y-%m-%dT%H:%M:%SZ")
parsed_end = min(singer.strftime(end, "%Y-%m-%dT%H:%M:%SZ"), parsed_sync_end)
LOGGER.info("Querying for users between %s and %s", parsed_start, parsed_end)
users = self.client.search("", updated_after=parsed_start, updated_before=parsed_end, type="user")

# NB: Zendesk will return an error on the 1001st record, so we
# need to check total response size before iterating
# See: https://develop.zendesk.com/hc/en-us/articles/360022563994--BREAKING-New-Search-API-Result-Limits
if users.count > 1000:
if search_window_size > 1:
search_window_size = search_window_size // 2
end = start + datetime.timedelta(seconds=search_window_size)
LOGGER.info("users - Detected Search API response size too large. Cutting search window in half to %s seconds.", search_window_size)
continue

raise Exception("users - Unable to get all users within minimum window of a single second ({}), found {} users within this timestamp. Zendesk can only provide a maximum of 1000 users per request. See: https://develop.zendesk.com/hc/en-us/articles/360022563994--BREAKING-New-Search-API-Result-Limits".format(parsed_start, users.count))

# Consume the records to account for dates lower than window start
users = [user for user in users] # pylint: disable=unnecessary-comprehension

if not all(parsed_start <= user.updated_at for user in users):
# Only retry up to 30 minutes (60 attempts at 30 seconds each)
if num_retries < 60:
LOGGER.info("users - Record found before date window start. Waiting 30 seconds, then retrying window for consistency. (Retry #%s)", num_retries + 1)
time.sleep(30)
num_retries += 1
continue
bad_users = [user for user in users if user.updated_at < parsed_start]
raise AssertionError("users - Record (user-id: {}) found before date window start and did not resolve after 30 minutes of retrying. Details: window start ({}) is not less than or equal to updated_at value(s) {}".format(
[user.id for user in bad_users],
parsed_start,
[str(user.updated_at) for user in bad_users]))

# If we make it here, all quality checks have passed. Reset retry count.
num_retries = 0
for user in users:
if parsed_start <= user.updated_at <= parsed_end:
yield (self.stream, user)
self.update_bookmark(state, parsed_end)

# Assumes that the for loop got everything
singer.write_state(state)
if search_window_size <= original_search_window_size // 2:
search_window_size = search_window_size * 2
LOGGER.info("Successfully requested records. Doubling search window to %s seconds", search_window_size)
start = end - datetime.timedelta(seconds=1)
end = start + datetime.timedelta(seconds=search_window_size)
epoch_bookmark = int(bookmark.timestamp())
users = self.get_objects(epoch_bookmark)

for user in users:
self.update_bookmark(state, user["updated_at"])
yield (self.stream, user)

singer.write_state(state)


def check_access(self):
'''
Expand Down
2 changes: 1 addition & 1 deletion test/test_all_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_run(self):
if stream == "ticket_fields":
expected_all_keys = expected_all_keys - {'system_field_options', 'sub_type_id'}
elif stream == "users":
expected_all_keys = expected_all_keys - {'permanently_deleted'}
expected_all_keys = expected_all_keys - {'chat_only'}
elif stream == "ticket_metrics":
expected_all_keys = expected_all_keys - {'status', 'instance_id', 'metric', 'type', 'time'}

Expand Down