Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Twilio: implement slicing #18423

Merged
merged 8 commits into from
Oct 26, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-twilio/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ COPY main.py ./
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.12
LABEL io.airbyte.version=0.1.13
LABEL io.airbyte.name=airbyte/source-twilio
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ tests:
configured_catalog_path: "integration_tests/no_empty_streams_catalog.json"
expect_records:
path: "integration_tests/expected_records.txt"
empty_streams: ["alerts"]
timeout_seconds: 600
incremental:
- config_path: "secrets/config.json"
# usage records stream produces and error if cursor date gte than current date
Expand Down
158 changes: 97 additions & 61 deletions airbyte-integrations/connectors/source-twilio/source_twilio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import copy
from abc import ABC, abstractmethod
from functools import cached_property
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
from urllib.parse import parse_qsl, urlparse

Expand All @@ -24,9 +26,6 @@ class TwilioStream(HttpStream, ABC):
page_size = 1000
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization)

def __init__(self, **kwargs):
super().__init__(**kwargs)

@property
def data_field(self):
return self.name
Expand Down Expand Up @@ -73,9 +72,12 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:
return float(backoff_time)

def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
params["PageSize"] = self.page_size
if next_page_token:
params.update(**next_page_token)
Expand All @@ -98,6 +100,11 @@ def custom_transform_function(original_value: Any, field_schema: Mapping[str, An

class IncrementalTwilioStream(TwilioStream, IncrementalMixin):
time_filter_template = "YYYY-MM-DD HH:mm:ss[Z]"
# This attribute allows balancing between sync speed and memory consumption.
# The greater a slice is - the bigger memory consumption and the faster syncs are since fewer requests are made.
slice_step = pendulum.duration(years=1)
# time gap between when previous slice ends and current slice begins
slice_granularity = pendulum.duration(microseconds=1)
state_checkpoint_interval = 1000

def __init__(self, start_date: str = None, lookback_window: int = 0, **kwargs):
Expand All @@ -108,7 +115,14 @@ def __init__(self, start_date: str = None, lookback_window: int = 0, **kwargs):

@property
@abstractmethod
def incremental_filter_field(self) -> str:
def lower_boundary_filter_field(self) -> str:
"""
return: date filter query parameter name
"""

@property
@abstractmethod
def upper_boundary_filter_field(self) -> str:
"""
return: date filter query parameter name
"""
Expand All @@ -123,7 +137,7 @@ def state(self) -> Mapping[str, Any]:
return {}

@state.setter
def state(self, value: Mapping[str, Any]):
def state(self, value: MutableMapping[str, Any]):
if self._lookback_window and value.get(self.cursor_field):
new_start_date = (
pendulum.parse(value[self.cursor_field]) - pendulum.duration(minutes=self._lookback_window)
Expand All @@ -132,12 +146,38 @@ def state(self, value: Mapping[str, Any]):
value[self.cursor_field] = new_start_date
self._cursor_value = value.get(self.cursor_field)

def generate_date_ranges(self, super_slice: MutableMapping[str, Any]) -> Iterable[Optional[MutableMapping[str, Any]]]:
end_datetime = pendulum.now()
start_datetime = min(end_datetime, pendulum.parse(self.state.get(self.cursor_field, self._start_date)))
current_start = start_datetime
current_end = start_datetime
while current_end < end_datetime:
current_end = min(end_datetime, current_start + self.slice_step)
slice_ = copy.deepcopy(super_slice) if super_slice else {}
slice_[self.lower_boundary_filter_field] = current_start.format(self.time_filter_template)
slice_[self.upper_boundary_filter_field] = current_end.format(self.time_filter_template)
yield slice_
current_start = current_end + self.slice_granularity

def stream_slices(
self, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None
) -> Iterable[Optional[Mapping[str, Any]]]:
for super_slice in super().stream_slices(sync_mode=sync_mode, cursor_field=cursor_field, stream_state=stream_state):
yield from self.generate_date_ranges(super_slice)

def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
self,
stream_state: Mapping[str, Any],
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None,
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
start_date = self.state.get(self.cursor_field, self._start_date)
params[self.incremental_filter_field] = pendulum.parse(start_date).format(self.time_filter_template)
params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
lower_bound = stream_slice and stream_slice.get(self.lower_boundary_filter_field)
upper_bound = stream_slice and stream_slice.get(self.upper_boundary_filter_field)
if lower_bound:
params[self.lower_boundary_filter_field] = lower_bound
if upper_bound:
params[self.upper_boundary_filter_field] = upper_bound
return params

def read_records(
Expand Down Expand Up @@ -165,6 +205,7 @@ class TwilioNestedStream(TwilioStream):
"""

media_exist_validation = {}
uri_from_subresource = True

def path(self, stream_slice: Mapping[str, Any], **kwargs):
return stream_slice["subresource_uri"]
Expand All @@ -180,21 +221,30 @@ def parent_stream(self) -> TwilioStream:
:return: parent stream class
"""

@cached_property
def parent_stream_instance(self):
return self.parent_stream(authenticator=self.authenticator)

def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"subresource_uri": record["subresource_uris"][self.subresource_uri_key]}

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
stream_instance = self.parent_stream(authenticator=self.authenticator)
stream_instance = self.parent_stream_instance
stream_slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=stream_instance.cursor_field)
for stream_slice in stream_slices:
for item in stream_instance.read_records(
sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, cursor_field=stream_instance.cursor_field
):
if item.get("subresource_uris", {}).get(self.subresource_uri_key):
if not self.uri_from_subresource:
yield self.parent_record_to_stream_slice(item)
elif item.get("subresource_uris", {}).get(self.subresource_uri_key):
validated = True
for key, value in self.media_exist_validation.items():
validated = item.get(key) and item.get(key) != value
if not validated:
break
if validated:
yield {"subresource_uri": item["subresource_uris"][self.subresource_uri_key]}
yield self.parent_record_to_stream_slice(item)


class Accounts(TwilioStream):
Expand All @@ -214,18 +264,13 @@ class DependentPhoneNumbers(TwilioNestedStream):

parent_stream = Addresses
url_base = TWILIO_API_URL_BASE_VERSIONED
uri_from_subresource = False

def path(self, stream_slice: Mapping[str, Any], **kwargs):
return f"Accounts/{stream_slice['account_sid']}/Addresses/{stream_slice['sid']}/DependentPhoneNumbers.json"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
stream_instance = self.parent_stream(authenticator=self.authenticator)
stream_slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=stream_instance.cursor_field)
for stream_slice in stream_slices:
for item in stream_instance.read_records(
sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, cursor_field=stream_instance.cursor_field
):
yield {"sid": item["sid"], "account_sid": item["account_sid"]}
def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"sid": record["sid"], "account_sid": record["account_sid"]}


class Applications(TwilioNestedStream):
Expand Down Expand Up @@ -287,22 +332,26 @@ class Keys(TwilioNestedStream):
parent_stream = Accounts


class Calls(TwilioNestedStream, IncrementalTwilioStream):
class Calls(IncrementalTwilioStream, TwilioNestedStream):
"""https://www.twilio.com/docs/voice/api/call-resource#create-a-call-resource"""

parent_stream = Accounts
incremental_filter_field = "EndTime>"
lower_boundary_filter_field = "EndTime>"
upper_boundary_filter_field = "EndTime<"
cursor_field = "end_time"
time_filter_template = "YYYY-MM-DD"
slice_granularity = pendulum.duration(days=1)


class Conferences(TwilioNestedStream, IncrementalTwilioStream):
class Conferences(IncrementalTwilioStream, TwilioNestedStream):
"""https://www.twilio.com/docs/voice/api/conference-resource#read-multiple-conference-resources"""

parent_stream = Accounts
incremental_filter_field = "DateCreated>"
lower_boundary_filter_field = "DateCreated>"
upper_boundary_filter_field = "DateCreated<"
cursor_field = "date_created"
time_filter_template = "YYYY-MM-DD"
slice_granularity = pendulum.duration(days=1)


class ConferenceParticipants(TwilioNestedStream):
Expand All @@ -324,11 +373,12 @@ class OutgoingCallerIds(TwilioNestedStream):
parent_stream = Accounts


class Recordings(TwilioNestedStream, IncrementalTwilioStream):
class Recordings(IncrementalTwilioStream, TwilioNestedStream):
"""https://www.twilio.com/docs/voice/api/recording#read-multiple-recording-resources"""

parent_stream = Accounts
incremental_filter_field = "DateCreated>"
lower_boundary_filter_field = "DateCreated>"
upper_boundary_filter_field = "DateCreated<"
cursor_field = "date_created"


Expand All @@ -344,46 +394,35 @@ class Queues(TwilioNestedStream):
parent_stream = Accounts


class Messages(TwilioNestedStream, IncrementalTwilioStream):
class Messages(IncrementalTwilioStream, TwilioNestedStream):
"""https://www.twilio.com/docs/sms/api/message-resource#read-multiple-message-resources"""

parent_stream = Accounts
incremental_filter_field = "DateSent>"
slice_step = pendulum.duration(days=1)
lower_boundary_filter_field = "DateSent>"
upper_boundary_filter_field = "DateSent<"
cursor_field = "date_sent"


class MessageMedia(TwilioNestedStream, IncrementalTwilioStream):
class MessageMedia(IncrementalTwilioStream, TwilioNestedStream):
"""https://www.twilio.com/docs/sms/api/media-resource#read-multiple-media-resources"""

parent_stream = Messages
data_field = "media_list"
subresource_uri_key = "media"
media_exist_validation = {"num_media": "0"}
incremental_filter_field = "DateCreated>"
lower_boundary_filter_field = "DateCreated>"
upper_boundary_filter_field = "DateCreated<"
cursor_field = "date_created"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
stream_instance = self.parent_stream(
authenticator=self.authenticator, start_date=self._start_date, lookback_window=self._lookback_window
)
stream_slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=stream_instance.cursor_field)
for stream_slice in stream_slices:
for item in stream_instance.read_records(
sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, cursor_field=stream_instance.cursor_field
):
if item.get("subresource_uris", {}).get(self.subresource_uri_key):
validated = True
for key, value in self.media_exist_validation.items():
validated = item.get(key) and item.get(key) != value
if not validated:
break
if validated:

yield {"subresource_uri": item["subresource_uris"][self.subresource_uri_key]}
@cached_property
def parent_stream_instance(self):
return self.parent_stream(authenticator=self.authenticator, start_date=self._start_date, lookback_window=self._lookback_window)


class UsageNestedStream(TwilioNestedStream):
url_base = TWILIO_API_URL_BASE_VERSIONED
uri_from_subresource = False

@property
@abstractmethod
Expand All @@ -395,23 +434,19 @@ def path_name(self) -> str:
def path(self, stream_slice: Mapping[str, Any], **kwargs):
return f"Accounts/{stream_slice['account_sid']}/Usage/{self.path_name}.json"

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
stream_instance = self.parent_stream(authenticator=self.authenticator)
stream_slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=stream_instance.cursor_field)
for stream_slice in stream_slices:
for item in stream_instance.read_records(
sync_mode=SyncMode.full_refresh, stream_slice=stream_slice, cursor_field=stream_instance.cursor_field
):
yield {"account_sid": item["sid"]}
def parent_record_to_stream_slice(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
return {"account_sid": record["sid"]}


class UsageRecords(UsageNestedStream, IncrementalTwilioStream):
class UsageRecords(IncrementalTwilioStream, UsageNestedStream):
"""https://www.twilio.com/docs/usage/api/usage-record#read-multiple-usagerecord-resources"""

parent_stream = Accounts
incremental_filter_field = "StartDate"
lower_boundary_filter_field = "StartDate"
upper_boundary_filter_field = "EndDate"
cursor_field = "start_date"
time_filter_template = "YYYY-MM-DD"
slice_granularity = pendulum.duration(days=1)
path_name = "Records"
primary_key = [["account_sid"], ["category"]]
changeable_fields = ["as_of"]
Expand All @@ -429,7 +464,8 @@ class Alerts(IncrementalTwilioStream):
"""https://www.twilio.com/docs/usage/monitor-alert#read-multiple-alert-resources"""

url_base = TWILIO_MONITOR_URL_BASE
incremental_filter_field = "StartDate"
lower_boundary_filter_field = "StartDate="
upper_boundary_filter_field = "EndDate="
cursor_field = "date_generated"

def path(self, **kwargs):
Expand Down
Loading