Skip to content

Commit

Permalink
🐛 Source Zendesk Support: fix incremental logic for ticket_comments s…
Browse files Browse the repository at this point in the history
…tream (#5787)

* fix negative backoff

* correct incremental logic for comments

* remove test logs

* bump version

* fix spec

* remove old non-relevant test

* correct backoff default value

* correction a test

* save addl export cursor field

* fix incorrect optinal date-time

* add format date-time again

* correction after review

* fix integration test

* fix unit test

Co-authored-by: Maksym Pavlenok <maksym.pavlenok@globallogic.com>
  • Loading branch information
antixar and gl-pix authored Sep 9, 2021
1 parent 9b71c28 commit 984cebe
Show file tree
Hide file tree
Showing 12 changed files with 140 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"sourceDefinitionId": "79c1aa37-dae3-42ae-b333-d1c105477715",
"name": "Zendesk Support",
"dockerRepository": "airbyte/source-zendesk-support",
"dockerImageTag": "0.1.0",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/zendesk-support",
"icon": "zendesk.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@
- sourceDefinitionId: 79c1aa37-dae3-42ae-b333-d1c105477715
name: Zendesk Support
dockerRepository: airbyte/source-zendesk-support
dockerImageTag: 0.1.0
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/sources/zendesk-support
icon: zendesk.svg
- sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ COPY source_zendesk_support ./source_zendesk_support
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-zendesk-support
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
#!/usr/bin/env sh
image_name=$(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)
# Build latest connector image
echo "try to build: ${image_name}"
docker build . -t ${image_name}

# Pull latest acctest image
docker pull airbyte/source-acceptance-test:latest

docker run --rm -it \
-v /var/run/docker.sock:/var/run/docker.sock \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,18 @@ def prepare_stream_args():

def _test_export_stream(self, stream_cls: type):
stream = stream_cls(**self.prepare_stream_args())
stream.page_size = 1
record_timestamps = {}
for record in stream.read_records(sync_mode=None):
# save the first 5 records
if len(record_timestamps) > 5:
break
record_timestamps[record["id"]] = record[stream.cursor_field]
if stream._last_end_time not in record_timestamps.values():
record_timestamps[record["id"]] = stream._last_end_time

stream.page_size = 10
for record_id, timestamp in record_timestamps.items():
state = {stream.cursor_field: timestamp}
state = {"_last_end_time": timestamp}
for record in stream.read_records(sync_mode=None, stream_state=state):
assert record["id"] != record_id
break
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"email": "broken.email@invalid.config",
"api_token": "<broken token>",
"auth_method": {
"api_token": "<broken token>",
"email": "broken.email@invalid.config"
},
"subdomain": "test-failure-airbyte",
"start_date": "2030-01-01T00:00:00Z"
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

MAIN_REQUIREMENTS = ["airbyte-cdk", "pytz"]

TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "requests-mock==1.9.3", "timeout-decorator==0.5.0"]
TEST_REQUIREMENTS = ["pytest~=6.1", "source-acceptance-test", "requests-mock==1.9.3"]

setup(
version="0.1.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
},
"initially_assigned_at": {
"type": ["null", "string"],
"format": "date-time"
"format": "datetime"
},
"assigned_at": {
"type": ["null", "string"],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"auth_method": {
"title": "ZenDesk Authorization Method",
"type": "object",
"default": "api_token",
"description": "Zendesk service provides 2 auth method: API token and oAuth2. Now only the first one is available. Another one will be added in the future",
"oneOf": [
{
Expand All @@ -28,6 +29,10 @@
"required": ["email", "api_token"],
"additionalProperties": false,
"properties": {
"auth_method": {
"type": "string",
"const": "api_token"
},
"email": {
"type": "string",
"description": "The user email for your Zendesk account"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import time
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
from urllib.parse import parse_qsl, urlparse

import pytz
Expand Down Expand Up @@ -69,23 +69,24 @@ def _parse_next_page_number(response: requests.Response) -> Optional[int]:
return dict(parse_qsl(urlparse(next_page).query)).get("page")
return None

def backoff_time(self, response: requests.Response) -> int:
def backoff_time(self, response: requests.Response) -> Union[int, float]:
"""
The rate limit is 700 requests per minute
# monitoring-your-request-activity
See https://developer.zendesk.com/api-reference/ticketing/account-configuration/usage_limits/
The response has a Retry-After header that tells you for how many seconds to wait before retrying.
"""
retry_after = response.headers.get("Retry-After")
if retry_after:

retry_after = int(response.headers.get("Retry-After", 0))
if retry_after and retry_after > 0:
return int(retry_after)

# the header X-Rate-Limit returns a amount of requests per minute
# we try to wait twice as long
rate_limit = float(response.headers.get("X-Rate-Limit") or 0)
if rate_limit:
rate_limit = float(response.headers.get("X-Rate-Limit", 0))
if rate_limit and rate_limit > 0:
return (60.0 / rate_limit) * 2
# default value if there is not any headers
return 60
return super().backoff_time(response)

@staticmethod
def str2datetime(str_dt: str) -> datetime:
Expand Down Expand Up @@ -141,6 +142,12 @@ def __init__(self, start_date: str, **kwargs):
super().__init__(**kwargs)
# add the custom value for skiping of not relevant records
self._start_date = self.str2datetime(start_date) if isinstance(start_date, str) else start_date
# Flag for marking of completed process
self._finished = False

@property
def is_finished(self):
return self._finished

def path(self, **kwargs) -> str:
return f"{self.name}.json"
Expand All @@ -152,15 +159,12 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
if record.get(self.created_at_field) and self.str2datetime(record[self.created_at_field]) < self._start_date:
continue
yield record
yield from []

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
# try to save maximum value of a cursor field
return {
self.cursor_field: max(
str((latest_record or {}).get(self.cursor_field, "")), str((current_stream_state or {}).get(self.cursor_field, ""))
)
}
old_value = str((current_stream_state or {}).get(self.cursor_field, ""))
new_value = str((latest_record or {}).get(self.cursor_field, ""))
return {self.cursor_field: max(new_value, old_value)}


class IncrementalExportStream(IncrementalEntityStream, ABC):
Expand All @@ -178,6 +182,12 @@ class IncrementalExportStream(IncrementalEntityStream, ABC):
# this endpoint provides responces in ascending order.
state_checkpoint_interval = 100

def __init__(self, **kwargs):
super().__init__(**kwargs)
# for saving of last page cursor value
# endpoints can have different cursor format but incremental logic uses unixtime format only
self._last_end_time = None

@staticmethod
def str2unixtime(str_dt: str) -> int:
"""convert string to unixtime number
Expand All @@ -190,11 +200,9 @@ def str2unixtime(str_dt: str) -> int:
return calendar.timegm(dt.utctimetuple())

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
data = response.json()
if data["end_of_stream"]:
# true if the current request has returned all the results up to the current time; false otherwise
if self.is_finished:
return None
return {"start_time": data["end_time"]}
return {"start_time": self._last_end_time}

def path(self, *args, **kwargs) -> str:
return f"incremental/{self.name}.json"
Expand All @@ -205,12 +213,14 @@ def request_params(

params = {"per_page": self.page_size}
if not next_page_token:
# try to search all reconds with generated_timestamp > start_time
current_state = stream_state.get(self.cursor_field)
if current_state and isinstance(current_state, str) and not current_state.isdigit():
# try to save a stage with UnixTime format
current_state = self.str2unixtime(current_state)
start_time = int(current_state or time.mktime(self._start_date.timetuple())) + 1
current_state = stream_state.get("_last_end_time")
if not current_state:
# try to search all reconds with generated_timestamp > start_time
current_state = stream_state.get(self.cursor_field)
if current_state and isinstance(current_state, str) and not current_state.isdigit():
current_state = self.str2unixtime(current_state)

start_time = int(current_state or time.mktime(self._start_date.timetuple()))
# +1 because the API returns all records where generated_timestamp >= start_time

now = calendar.timegm(datetime.now().utctimetuple())
Expand All @@ -223,6 +233,26 @@ def request_params(
params.update(next_page_token)
return params

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
# try to save maximum value of a cursor field
state = super().get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record)

if self._last_end_time:
state["_last_end_time"] = self._last_end_time
current_stream_state.update(state)
return current_stream_state

def parse_response(
self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs
) -> Iterable[Mapping]:

data = response.json()
# save a last end time for the next attempt
self._last_end_time = data["end_time"]
# end_of_stream is true if the current request has returned all the results up to the current time; false otherwise
self._finished = data["end_of_stream"]
yield from super().parse_response(response, stream_state=stream_state, stream_slice=stream_slice, **kwargs)


class IncrementalUnsortedStream(IncrementalEntityStream, ABC):
"""Stream for loading without sorting
Expand All @@ -233,43 +263,29 @@ class IncrementalUnsortedStream(IncrementalEntityStream, ABC):

def __init__(self, **kwargs):
super().__init__(**kwargs)
# Flag for marking of completed process
self._finished = False
# For saving of a relevant last updated date
self._max_cursor_date = None

def _get_stream_date(self, stream_state: Mapping[str, Any], **kwargs) -> datetime:
"""Can change a date of comparison"""
return self.str2datetime((stream_state or {}).get(self.cursor_field))

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
"""try to select relevant data only"""

if not self.cursor_field:
yield from super().parse_response(response, stream_state=stream_state, **kwargs)
else:
send_cnt = 0
cursor_date = self._get_stream_date(stream_state, **kwargs)

cursor_date = (stream_state or {}).get(self.cursor_field)
for record in super().parse_response(response, stream_state=stream_state, **kwargs):
updated = self.str2datetime(record[self.cursor_field])
updated = record[self.cursor_field]
if not self._max_cursor_date or self._max_cursor_date < updated:
self._max_cursor_date = updated
if not cursor_date or updated > cursor_date:
send_cnt += 1
yield record
if not send_cnt:
self._finished = True
yield from []

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:

max_updated_at = self.datetime2str(self._max_cursor_date) if self._max_cursor_date else ""
return {self.cursor_field: max(max_updated_at, (current_stream_state or {}).get(self.cursor_field, ""))}

@property
def is_finished(self):
return self._finished
return {self.cursor_field: max(self._max_cursor_date or "", (current_stream_state or {}).get(self.cursor_field, ""))}

@abstractmethod
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
Expand Down Expand Up @@ -337,29 +353,46 @@ class TicketComments(IncrementalSortedPageStream):
response_list_name = "comments"
cursor_field = IncrementalSortedPageStream.created_at_field

def __init__(self, **kwargs):
super().__init__(**kwargs)
# need to save a slice ticket state
# because the function get_updated_state doesn't have a stream_slice as argument
self._slice_cursor_date = None

def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, **kwargs) -> str:
ticket_id = stream_slice["id"]
return f"tickets/{ticket_id}/comments.json"

def parse_response(
self, response: requests.Response, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs
) -> Iterable[Mapping]:
# save a slice ticket state
self._cursor_ticket_date = stream_slice[Tickets.cursor_field]
yield from super().parse_response(response, stream_state=stream_state, stream_slice=stream_slice, **kwargs)

def stream_slices(
self, sync_mode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
"""Loads all updated tickets after last stream state"""
stream_state = stream_state or {}
# convert a comment state value to a ticket one
# Comment state: {"created_at": "2021-07-30T12:30:09Z"} => Ticket state {"generated_timestamp": 1627637409}
ticket_stream_value = Tickets.str2unixtime(stream_state.get(self.cursor_field))

tickets = Tickets(self._start_date, subdomain=self._subdomain, authenticator=self.authenticator).read_records(
# tickets and comments have different cursor formats. For example:
# Ticket state {"generated_timestamp": 1627637409}
# Comment state: {"created_at": "2021-07-30T12:30:09Z"}
# At the first try to find a ticket cursor value
ticket_stream_value = stream_state.get(Tickets.cursor_field)
if not ticket_stream_value:
# for backward compatibility because not all relevant states can have some last ticket state
ticket_stream_value = Tickets.str2unixtime(stream_state.get(self.cursor_field))

tickets = Tickets(start_date=self._start_date, subdomain=self._subdomain, authenticator=self.authenticator).read_records(
sync_mode=sync_mode, cursor_field=cursor_field, stream_state={Tickets.cursor_field: ticket_stream_value}
)
stream_state_dt = self.str2datetime(stream_state.get(self.cursor_field))

# selects all tickets what have at least one comment
ticket_ids = [
{
"id": ticket["id"],
"start_stream_state": stream_state_dt,
Tickets.cursor_field: ticket[Tickets.cursor_field],
}
for ticket in tickets
Expand All @@ -370,9 +403,12 @@ def stream_slices(
ticket_ids.sort(key=lambda ticket: ticket[Tickets.cursor_field])
return ticket_ids

def _get_stream_date(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any], **kwargs) -> datetime:
"""For each tickets all comments must be compared with a start value of stream state"""
return stream_slice["start_stream_state"]
def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""Adds a last cursor ticket updated time for a comment state"""
new_state = super().get_updated_state(current_stream_state=current_stream_state, latest_record=latest_record)
if new_state:
new_state[Tickets.cursor_field] = self._cursor_ticket_date
return new_state


# NOTE: all Zendesk endpoints can be splitted into several templates of data loading.
Expand Down Expand Up @@ -453,6 +489,8 @@ class Macros(IncrementalSortedPageStream):
class TicketAudits(IncrementalSortedCursorStream):
"""TicketAudits stream: https://developer.zendesk.com/api-reference/ticketing/tickets/ticket_audits/"""

# can request a maximum of 1,000 results
page_size = 1000
# ticket audits doesn't have the 'updated_by' field
cursor_field = "created_at"

Expand Down
Loading

0 comments on commit 984cebe

Please sign in to comment.