Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source twilio: implement rolling windows #12555

Closed
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7b104a5
set lookback
Lex-MUTTDATA May 2, 2022
9ab1abb
lookback in minutes
Lex-MUTTDATA May 4, 2022
2c851f4
delete comments
Lex-MUTTDATA May 4, 2022
9c66dae
add IncrementalMixin function
Lex-MUTTDATA May 13, 2022
ab26add
lookback description
Lex-MUTTDATA May 13, 2022
0852e61
modify lookback __init__
Lex-MUTTDATA May 13, 2022
449eaf2
pagesize parameter
Lex-MUTTDATA May 13, 2022
fd62791
chore(twilio-connector): changelog and bump version
gvillafanetapia May 19, 2022
fea98cb
fixed dates format
Lex-MUTTDATA May 27, 2022
a34e61b
delete comments lines and prints
Lex-MUTTDATA May 27, 2022
7c304ec
Merge branch 'master' into refactor/source-twilio-muttdata
Lex-MUTTDATA May 27, 2022
7519a71
Update streams.py
Lex-MUTTDATA May 27, 2022
4dd0f30
delete line 79
Lex-MUTTDATA May 30, 2022
6e95ae2
Update lookback spec to integer
Lex-MUTTDATA Jun 3, 2022
ef1bfe9
Update lookback spec name
Lex-MUTTDATA Jun 3, 2022
eecdfce
Update page_size spec
Lex-MUTTDATA Jun 3, 2022
d84bd06
Update page_size spec type
Lex-MUTTDATA Jun 3, 2022
f7ec32d
Update request_params lookback window type
Lex-MUTTDATA Jun 3, 2022
cbb0891
Update lookback explicit name
Lex-MUTTDATA Jun 3, 2022
581af4f
Update state object declaration
Lex-MUTTDATA Jun 3, 2022
f66a5d7
Update request_params start_date
Lex-MUTTDATA Jun 3, 2022
4a8c3d0
update page_size var name
Lex-MUTTDATA Jun 5, 2022
3c026ff
Update logic read_records incremental function
Lex-MUTTDATA Jun 5, 2022
0e7fdf3
add/fix suggestions
Lex-MUTTDATA Jun 5, 2022
e62399f
to merge
Lex-MUTTDATA Jun 5, 2022
5f3f5bd
rollback bumped up version manually
Lex-MUTTDATA Jun 9, 2022
5f37b11
rollback page_size source.py
Lex-MUTTDATA Jun 9, 2022
c57a7a6
rollback page_size source.py - 2
Lex-MUTTDATA Jun 9, 2022
c9e5c36
rollback page_size spec.json
Lex-MUTTDATA Jun 9, 2022
e80ad3a
rollback page_size streams.py
Lex-MUTTDATA Jun 9, 2022
ffac4c3
setting default max page_size value
Lex-MUTTDATA Jun 9, 2022
e4067ee
change page_size variable name
Lex-MUTTDATA Jun 9, 2022
f4dc6e2
nit to avoid timedelta import
Lex-MUTTDATA Jun 9, 2022
87e5618
nit for readability
Lex-MUTTDATA Jun 9, 2022
70bc769
Update docker-compose.yaml
Lex-MUTTDATA Jun 9, 2022
fb01bde
Update source_specs.yaml
Lex-MUTTDATA Jun 9, 2022
31acf22
lookback_window description update
Lex-MUTTDATA Jun 10, 2022
220f16d
delete comma spec.json
Lex-MUTTDATA Jun 10, 2022
630cbd0
incremental_stream_kwargs - lookback_window config
Lex-MUTTDATA Jun 10, 2022
5107ca8
update streams-py - state.setter
Lex-MUTTDATA Jun 10, 2022
85dc8ae
update streams.py - request params start_date
Lex-MUTTDATA Jun 10, 2022
182d56e
update stream_slices fn in media_messages stream
Lex-MUTTDATA Jun 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
config["auth_token"],
),
)
full_refresh_stream_kwargs = {"authenticator": auth}
incremental_stream_kwargs = {"authenticator": auth, "start_date": config["start_date"]}
full_refresh_stream_kwargs = {"authenticator": auth, "pagesize": config["page_size"]}
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
incremental_stream_kwargs = {"authenticator": auth, "start_date": config["start_date"], "lookback": config["lookback"], "pagesize": config["page_size"]}

streams = [
Accounts(**full_refresh_stream_kwargs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,63 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Twilio Spec",
"type": "object",
"required": ["account_sid", "auth_token", "start_date"],
"required": [
"account_sid",
"auth_token",
"start_date"
],
"additionalProperties": false,
"properties": {
"account_sid": {
"title": "Account ID",
"description": "Twilio account SID",
"airbyte_secret": true,
"type": "string"
"type": "string",
"order": 1
},
"auth_token": {
"title": "Auth Token",
"description": "Twilio Auth Token.",
"airbyte_secret": true,
"type": "string"
"type": "string",
"order": 2
},
"start_date": {
"title": "Replication Start Date",
"description": "UTC date and time in the format 2020-10-01T00:00:00Z. Any data before this date will not be replicated.",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
"examples": ["2020-10-01T00:00:00Z"],
"type": "string"
"examples": [
"2020-10-01T00:00:00Z"
],
"type": "string",
"order": 3
},
"lookback": {
"title": "Lookback",
"description": "format in minutes. Any data between sync time and sync time minus lookback setting in minutes will be replicated.",
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
"examples": [
"60"
],
"default": 0,
"type": "string",
"order": 4
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
},
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
"page_size": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why you want to allow users to set the page_size? IMO it should not be a custom parameter and should default to the allowed maximum.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @alafanechere , In our opinion too, but since it was set to the minimum possible and it is a variable that the api allows us to modify, we thought of putting it as an attribute that can be modified by the user, if you think it is not correct we can leave it at the maximum allowed as you suggest. Cheers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alejandroRosano-muttdata yes please use the maximum value and do not expose the parameter to the user. Thanks!

"title": "Page Size",
"description": "How many resources to return in each list page. The default is 50, and the maximum is 1000.",
"examples": [
"500"
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
],
"minimum": 50,
"maximum": 1000,
"default": 50,
"type": "string",
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
"order": 5
}
}
},
"supportsIncremental": true,
"supported_destination_sync_modes": ["append"]
}
"supported_destination_sync_modes": [
"append"
]
}
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,21 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#


from abc import ABC, abstractmethod
from typing import Any, Iterable, Mapping, MutableMapping, Optional
from asyncio.log import logger
from datetime import datetime, timedelta
from distutils.command.config import config
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional
from urllib.parse import parse_qsl, urlparse

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams import IncrementalMixin
from airbyte_cdk.sources.streams.http import HttpStream
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
from typing_extensions import Self

TWILIO_API_URL_BASE = "https://api.twilio.com"
TWILIO_API_URL_BASE_VERSIONED = f"{TWILIO_API_URL_BASE}/2010-04-01/"
Expand All @@ -20,9 +26,12 @@
class TwilioStream(HttpStream, ABC):
url_base = TWILIO_API_URL_BASE
primary_key = "sid"
page_size = 100
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization | TransformConfig.CustomSchemaNormalization)


def __init__(self, pagesize: int = 50, **kwargs):
super().__init__(**kwargs)
self._page_size = pagesize
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved

@property
def data_field(self):
return self.name
Expand All @@ -41,6 +50,7 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
stream_data = response.json()
next_page_uri = stream_data.get("next_page_uri")
if next_page_uri:
print (next_page_uri)
next_url = urlparse(next_page_uri)
next_page_params = dict(parse_qsl(next_url.query))
return next_page_params
Expand All @@ -67,12 +77,11 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:
backoff_time = response.headers.get("Retry-After")
if backoff_time is not None:
return float(backoff_time)

def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:


def request_params(self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
params["PageSize"] = self.page_size
params["PageSize"] = self._page_size
if next_page_token:
params.update(**next_page_token)
return params
Expand All @@ -90,15 +99,18 @@ def custom_transform_function(original_value: Any, field_schema: Mapping[str, An
# is no need in transforming anything.
pass
return original_value



class IncrementalTwilioStream(TwilioStream, ABC):
class IncrementalTwilioStream(TwilioStream, IncrementalMixin):
cursor_field = "date_updated"
time_filter_template = "%Y-%m-%dT%H:%M:%SZ"

def __init__(self, start_date: str = None, **kwargs):
def __init__(self, start_date: str = None, lookback: int = 0, **kwargs):
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
super().__init__(**kwargs)
self._start_date = start_date
self._start_date = start_date
self._lookback = lookback
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
self._cursor_value = None

@property
@abstractmethod
Expand All @@ -107,28 +119,29 @@ def incremental_filter_field(self) -> str:
return: date filter query parameter name
"""

def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]:
"""
Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object
and returning an updated state object.
"""
latest_benchmark = pendulum.parse(latest_record[self.cursor_field], strict=False).strftime(self.time_filter_template)
if current_stream_state.get(self.cursor_field):
return {self.cursor_field: max(latest_benchmark, current_stream_state[self.cursor_field])}
return {self.cursor_field: latest_benchmark}
@property
def state(self) -> Mapping[str, Any]:
if self._cursor_value:
return {self.cursor_field: self._cursor_value.strftime(self.time_filter_template)}
else:
return {self.cursor_field: self._start_date.strftime(self.time_filter_template)}

@state.setter
def state(self, value: Mapping[str, Any]):
self._cursor_value = datetime.strptime(value[self.cursor_field], self.time_filter_template)

def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, **kwargs)
start_date = stream_state.get(self.cursor_field) or self._start_date
if start_date:
params.update({self.incremental_filter_field: pendulum.parse(start_date, strict=False).strftime(self.time_filter_template)})
params.update({self.incremental_filter_field: (pendulum.parse(start_date, strict=False)-timedelta(minutes=int(self._lookback))).strftime(self.time_filter_template)})
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
return params

def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs):
stream_state = stream_state or {}
records = super().read_records(stream_state=stream_state, **kwargs)
for record in records:
record[self.cursor_field] = pendulum.parse(record[self.cursor_field], strict=False).strftime(self.time_filter_template)
def read_records(self, *args, **kwargs) -> Iterable[Mapping[str, Any]]:
for record in super().read_records(*args, **kwargs):
if self._cursor_value:
latest_record_date = datetime.strptime(record[self.cursor_field], self.time_filter_template)
self._cursor_value = max(self._cursor_value, latest_record_date)
yield record


Expand Down Expand Up @@ -333,6 +346,7 @@ class Messages(TwilioNestedStream, IncrementalTwilioStream):
parent_stream = Accounts
incremental_filter_field = "DateSent>"
cursor_field = "date_sent"
page_size = config
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved


class MessageMedia(TwilioNestedStream, IncrementalTwilioStream):
Expand Down
5 changes: 4 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
version: "3.7"
#https://github.com/compose-spec/compose-spec/blob/master/spec.md#using-extensions-as-fragments
x-logging: &default-logging
x-logging:
&default-logging
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
options:
max-size: "100m"
max-file: "5"
Expand Down Expand Up @@ -48,6 +49,8 @@ services:
- POSTGRES_USER=${DATABASE_USER}
volumes:
- db:/var/lib/postgresql/data
ports:
Lex-MUTTDATA marked this conversation as resolved.
Show resolved Hide resolved
- 5432:5432
scheduler:
image: airbyte/scheduler:${VERSION}
logging: *default-logging
Expand Down