Skip to content

Commit

Permalink
Revert "Add forms stream (#29)"
Browse files Browse the repository at this point in the history
This reverts commit 12c3d4e.
  • Loading branch information
luandy64 committed Jul 26, 2021
1 parent 27c51c3 commit 3c91a58
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 195 deletions.
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ ENV/

# Custom stuff
env.sh
*config.json
config.json
.autoenv.zsh

rsa-key
tags
singer-check-tap-data
state*.json
catalog*.json
state.json
catalog.json
6 changes: 3 additions & 3 deletions tap_typeform/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import singer
from singer import utils
from singer.catalog import Catalog, CatalogEntry, Schema
from tap_typeform import streams
from tap_typeform.context import Context
from tap_typeform import schemas
from . import streams
from .context import Context
from . import schemas

REQUIRED_CONFIG_KEYS = ["token", "forms", "incremental_range"]

Expand Down
48 changes: 12 additions & 36 deletions tap_typeform/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,34 @@ class MetricsRateLimitException(Exception):
pass

class Client(object):
BASE_URL = 'https://api.typeform.com'
BASE_URL = 'https://api.typeform.com/forms/FORM_ID/responses'

def __init__(self, config):
self.token = 'Bearer ' + config.get('token')
self.metric = config.get('metric')
self.session = requests.Session()

def build_url(self, endpoint):
return f"{self.BASE_URL}/{endpoint}"
def url(self, form_id):
return self.BASE_URL.replace("FORM_ID", form_id)

@backoff.on_exception(backoff.expo,
RateLimitException,
max_tries=10,
factor=2)
def request(self, method, url, **kwargs):
def request(self, method, form_id, **kwargs):
# note that typeform response api doesn't return limit headers

if 'headers' not in kwargs:
kwargs['headers'] = {}
if self.token:
kwargs['headers']['Authorization'] = self.token

response = requests.request(method, url, **kwargs)
# if we're just pulling the form definition, strip the rest of the url
if 'params' not in kwargs:
response = requests.request(method, self.url(form_id).replace('/responses', ''), **kwargs)
else:
response = requests.request(method, self.url(form_id), **kwargs)
#print('final3 url=',response.url)

if response.status_code in [429, 503]:
raise RateLimitException()
Expand All @@ -48,34 +53,5 @@ def request(self, method, url, **kwargs):
LOGGER.info('raw data items= {}'.format(response.json()['total_items']))
return response.json()

# Max page size for forms API is 200
def get_forms(self, page_size=200):
url = self.build_url(endpoint='forms')
return self._get_forms('get', url, page_size)

def _get_forms(self, method, url, page_size):
page = 1
paginate = True
records = []
params = {'page_size': page_size}

while paginate:
params['page'] = page
response = self.request(method, url, params=params)
page_count = response.get('page_count')
paginate = page_count > page
page += 1

records += response.get('items')

return records

def get_form_definition(self, form_id, **kwargs):
endpoint = f"forms/{form_id}"
url = self.build_url(endpoint=endpoint)
return self.request('get', url, **kwargs)

def get_form_responses(self, form_id, **kwargs):
endpoint = f"forms/{form_id}/responses"
url = self.build_url(endpoint)
return self.request('get', url, **kwargs)
def get(self, form_id, **kwargs):
return self.request('get', form_id, **kwargs)
5 changes: 1 addition & 4 deletions tap_typeform/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,17 @@ class IDS(object):
LANDINGS = 'landings'
ANSWERS = 'answers'
QUESTIONS = 'questions'
FORMS = 'forms'

STATIC_SCHEMA_STREAM_IDS = [
IDS.LANDINGS,
IDS.ANSWERS,
IDS.QUESTIONS,
IDS.FORMS,
IDS.QUESTIONS
]

PK_FIELDS = {
IDS.LANDINGS: ['landing_id'],
IDS.ANSWERS: ['landing_id', 'question_id'],
IDS.QUESTIONS: ['form_id', 'question_id'],
IDS.FORMS: ['id']
}

def normalize_fieldname(fieldname):
Expand Down
94 changes: 0 additions & 94 deletions tap_typeform/schemas/forms.json

This file was deleted.

57 changes: 2 additions & 55 deletions tap_typeform/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def select_fields(mdata, obj):
@sleep_and_retry
@limits(calls=1, period=6) # 5 seconds needed to be padded by 1 second to work
def get_form_definition(atx, form_id):
return atx.client.get_form_definition(form_id)
return atx.client.get(form_id)

@on_exception(constant, MetricsRateLimitException, max_tries=5, interval=60)
@on_exception(expo, RateLimitException, max_tries=5)
Expand All @@ -86,15 +86,7 @@ def get_form(atx, form_id, start_date, end_date):
# the api doesn't have a means of paging through responses if the number is greater than 1000,
# so since the order of data retrieved is by submitted_at we have
# to take the last submitted_at date and use it to cycle through
return atx.client.get_form_responses(form_id, params={'since': start_date, 'until': end_date, 'page_size': 1000})

@on_exception(constant, MetricsRateLimitException, max_tries=5, interval=60)
@on_exception(expo, RateLimitException, max_tries=5)
@sleep_and_retry
@limits(calls=1, period=6) # 5 seconds needed to be padded by 1 second to work
def get_forms(atx):
LOGGER.info('All forms query')
return atx.client.get_forms()
return atx.client.get(form_id, params={'since': start_date, 'until': end_date, 'page_size': 1000})

def sync_form_definition(atx, form_id):
with singer.metrics.job_timer('form definition '+form_id):
Expand Down Expand Up @@ -202,44 +194,6 @@ def write_forms_state(atx, form, date_to_resume):
write_bookmark(atx.state, form, 'date_to_resume', date_to_resume.to_datetime_string())
atx.write_state()


def sync_latest_forms(atx):
replication_key = 'last_updated_at'
tap_id = 'forms'
with singer.metrics.job_timer('all forms'):
start = time.monotonic()
while True:
if (time.monotonic() - start) >= MAX_METRIC_JOB_TIME:
raise Exception('Metric job timeout ({} secs)'.format(
MAX_METRIC_JOB_TIME))
forms = get_forms(atx)
if forms != '':
break
else:
time.sleep(METRIC_JOB_POLL_SLEEP)

# Using an older version of singer
bookmark_date = singer.get_bookmark(atx.state, tap_id, replication_key) or atx.config['start_date']
bookmark_datetime = singer.utils.strptime_to_utc(bookmark_date)
max_datetime = bookmark_datetime

records = []
for form in forms:
record_datetime = singer.utils.strptime_to_utc(form[replication_key])
if record_datetime >= bookmark_datetime:
records.append(form)
max_datetime = max(record_datetime, max_datetime)

write_records(atx, tap_id, records)
bookmark_date = singer.utils.strftime(max_datetime)
state = singer.write_bookmark(atx.state,
tap_id,
replication_key,
bookmark_date)

return state


def sync_forms(atx):
incremental_range = atx.config.get('incremental_range')

Expand Down Expand Up @@ -323,10 +277,3 @@ def sync_forms(atx):
reset_stream(atx.state, 'questions')
reset_stream(atx.state, 'landings')
reset_stream(atx.state, 'answers')

if 'forms'in atx.selected_stream_ids:
state = sync_latest_forms(atx)

singer.write_state(state)

reset_stream(atx.state, 'forms')

0 comments on commit 3c91a58

Please sign in to comment.