Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-19959 added missing fields #63

Merged
merged 60 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
41e0404
added missing fields
namrata270998 Jul 21, 2022
39dbb50
TDL-19957 update dict based to class based
namrata270998 Jul 22, 2022
8e00cbd
updated while condition
namrata270998 Jul 22, 2022
f94f68e
updated while condition
namrata270998 Jul 22, 2022
f299b8d
removed incremental_range from REQUIRED_CONFIG_KEYS
namrata270998 Jul 22, 2022
999d32d
TDL-19964 Added missing tap-tester commits
namrata270998 Jul 22, 2022
5f8e433
updated discover and schema file
namrata270998 Jul 22, 2022
b76f9cf
Merge branch 'TDL-19964-add-missing-tap-tester-tests' of https://gith…
namrata270998 Jul 25, 2022
86ffe72
updated tap-tester tests
namrata270998 Jul 25, 2022
4353def
updated to replication_key instead of keys
namrata270998 Jul 25, 2022
e0f5029
Merge branch 'TDL-19964-add-missing-tap-tester-tests' of https://gith…
namrata270998 Jul 26, 2022
69692b7
updated schemas and added comments
namrata270998 Jul 26, 2022
89d4e4a
added unittests for code coverage
namrata270998 Jul 26, 2022
348262e
updated bookmarks tap-tester test
namrata270998 Jul 26, 2022
140cd09
added unittests for sync.py
namrata270998 Jul 26, 2022
9e27ff2
Merge branch 'TDL-19964-add-missing-tap-tester-tests' of https://gith…
namrata270998 Jul 26, 2022
c3e40e2
added parameterized in setup.py
namrata270998 Jul 26, 2022
3249d74
added parameterized in config.yml and updated unittests
namrata270998 Jul 27, 2022
c025e28
Merge branch 'TDL-19957-update-dict-based-to-class-based' of https://…
namrata270998 Jul 27, 2022
8b7163c
updated the replication key to list instead of a single key
namrata270998 Jul 27, 2022
cd435d4
Updated and removed extra comments
namrata270998 Jul 27, 2022
f9989bd
fixed the issue for Keyerror of form_id in answers stream
namrata270998 Jul 27, 2022
bc91c4b
Merge branch 'TDL-19964-add-missing-tap-tester-tests' of https://gith…
namrata270998 Jul 27, 2022
3a63e1e
updated unittests
namrata270998 Jul 27, 2022
6621815
updated start date as per format
namrata270998 Jul 27, 2022
8888391
Merge branch 'TDL-19964-add-missing-tap-tester-tests' of https://gith…
namrata270998 Jul 27, 2022
4289539
added configurable page size param
namrata270998 Jul 28, 2022
677eea7
updated pagination test case
namrata270998 Jul 28, 2022
dcc12f2
Merge branch 'TDL-19964-add-missing-tap-tester-tests' of https://gith…
namrata270998 Jul 28, 2022
26d8dce
handled page_size for 0 and updated unittests
namrata270998 Jul 28, 2022
0dc281b
resolved review comments
namrata270998 Jul 29, 2022
828a5ad
Merge branch 'TDL-19964-add-missing-tap-tester-tests' of https://gith…
namrata270998 Jul 29, 2022
ba12410
resolved bug fixes for pagination and removed incompleted_forms_only …
namrata270998 Jul 29, 2022
2600a42
Merge branch 'TDL-19957-update-dict-based-to-class-based' of https://…
namrata270998 Jul 29, 2022
89fae83
updated array type schema
namrata270998 Jul 29, 2022
e3725e6
added new stream unsubmitted landings
namrata270998 Jul 29, 2022
e645182
added new stream in tap-tester
namrata270998 Jul 29, 2022
cd94d7b
Merge branch 'TDL-19964-add-missing-tap-tester-tests' of https://gith…
namrata270998 Jul 29, 2022
cfa13f6
updated indentation
namrata270998 Aug 1, 2022
e63152f
resolved PR review comments
namrata270998 Aug 1, 2022
856049f
updated indentation to use 2 spaces
namrata270998 Aug 2, 2022
a15099f
added page_size in example config
namrata270998 Aug 2, 2022
728da16
added back incremental_range in the tap-tester
namrata270998 Aug 2, 2022
d0089aa
fixed the cci issues
namrata270998 Aug 2, 2022
651f0cc
Merge branch 'TDL-19964-add-missing-tap-tester-tests' of https://gith…
namrata270998 Aug 2, 2022
aa583be
Merge branch 'TDL-19957-update-dict-based-to-class-based' of https://…
namrata270998 Aug 2, 2022
a5b52bb
added missing fields to a dict
namrata270998 Aug 2, 2022
e580805
added missing assertion for all fields
namrata270998 Aug 2, 2022
5cc0a73
Merge branch 'TDL-19964-add-missing-tap-tester-tests' of https://gith…
namrata270998 Aug 2, 2022
c585570
Merge branch 'TDL-19957-update-dict-based-to-class-based' of https://…
namrata270998 Aug 2, 2022
2ab8bdb
updated comment
namrata270998 Aug 2, 2022
c92dd35
raised exc instead of fatal error message and updated unittests
namrata270998 Aug 2, 2022
75e4566
removed get_logger()
namrata270998 Aug 3, 2022
7924ef9
added logger instead of print
namrata270998 Aug 3, 2022
1eb6324
Merge branch 'TDL-19964-add-missing-tap-tester-tests' of https://gith…
namrata270998 Aug 3, 2022
0b76fce
Merge branch 'TDL-19957-update-dict-based-to-class-based' of https://…
namrata270998 Aug 3, 2022
7e39001
Merge branch 'crest-master' into TDL-19959-add-missing-fields
prijendev Aug 29, 2022
ec3546e
Merge branch 'crest-master' into TDL-19959-add-missing-fields
prijendev Aug 29, 2022
3a9ddd1
Updated schema for questions.
prijendev Aug 29, 2022
ebcbe4d
Removed duplicate assertion in all_fields test.
prijendev Sep 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
name: 'Unit Tests'
command: |
source /usr/local/share/virtualenvs/tap-tester/bin/activate
pip install nose coverage
pip install nose coverage parameterized
nosetests --with-coverage --cover-erase --cover-package=tap_typeform --cover-html-dir=htmlcov tests/unittests
coverage html
- store_test_results:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ python3 ./setup.py install

## Configuration

This tap requires a `config.json` which specifies details regarding an [Authentication token](https://developer.typeform.com/get-started/convert-keys-to-access-tokens/), a list of form ids, a start date for syncing historical data (date format of YYYY-MM-DDTHH:MI:SSZ), request_timeout for which request should wait to get response(It is an optional parameter and default request_timeout is 300 seconds) and a time period range [daily,hourly] to control what incremental extract date ranges are. See [example.config.json](example.config.json) for an example.
This tap requires a `config.json` which specifies details regarding an [Authentication token](https://developer.typeform.com/get-started/convert-keys-to-access-tokens/), a list of form ids, a start date for syncing historical data (date format of YYYY-MM-DDTHH:MI:SSZ), request_timeout for which request should wait to get response(It is an optional parameter and default request_timeout is 300 seconds). See [example.config.json](example.config.json) for an example.

Create the catalog:

Expand Down
5 changes: 2 additions & 3 deletions example.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"token": "<myapitoken>",
"start_date": "2018-01-01T00:00:00Z",
"forms": "ZFuC6U,bFPlvG,WFBGBZ,WF0XE6,xFWoCE,OFHRwO,QFh3FI",
"request_timeout": 300,
"incremental_range": "daily"
"page_size": 100,
"request_timeout": 300
}

1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"ratelimit",
"backoff",
"requests",
"parameterized",
],
extras_require={
'dev': [
Expand Down
115 changes: 19 additions & 96 deletions tap_typeform/__init__.py
Original file line number Diff line number Diff line change
@@ -1,131 +1,54 @@
#!/usr/bin/env python3
import singer
from singer import utils
from singer.catalog import Catalog, metadata_module as metadata
from tap_typeform import streams
from tap_typeform.context import Context
from tap_typeform.http import Client
from tap_typeform import schemas
from tap_typeform.discover import discover as _discover
from tap_typeform.sync import _forms_to_list, sync as _sync
from tap_typeform.client import Client
from tap_typeform.streams import Forms

REQUIRED_CONFIG_KEYS = ["token", "forms", "incremental_range"]
REQUIRED_CONFIG_KEYS = ["start_date", "token", "forms"]

LOGGER = singer.get_logger()

#def check_authorization(atx):
# atx.client.get('/settings')
class FormMistmatchError(Exception):
pass

class NoFormsProvidedError(Exception):
pass


# Some taps do discovery dynamically where the catalog is read in from a
# call to the api but with the typeform structure, we won't do that here
# because it's always the same so we just pull it from file we never use
# atx in here since the schema is from file but we would use it if we
# pulled schema from the API def discover(atx):
def discover():
streams = []
for tap_stream_id in schemas.STATIC_SCHEMA_STREAM_IDS:
#print("tap stream id=",tap_stream_id)
key_properties = schemas.PK_FIELDS[tap_stream_id]
schema = schemas.load_schema(tap_stream_id)
replication_method = schemas.REPLICATION_METHODS[tap_stream_id].get("replication_method")
replication_keys = schemas.REPLICATION_METHODS[tap_stream_id].get("replication_keys")
meta = metadata.get_standard_metadata(schema=schema,
key_properties=key_properties,
replication_method=replication_method,
valid_replication_keys=replication_keys)

meta = metadata.to_map(meta)

if replication_keys:
meta = metadata.write(meta, ('properties', replication_keys[0]), 'inclusion', 'automatic')

meta = metadata.to_list(meta)

streams.append({
'stream': tap_stream_id,
'tap_stream_id': tap_stream_id,
'key_properties': key_properties,
'schema': schema,
'metadata': meta,
'replication_method': replication_method,
'replication_key': replication_keys[0] if replication_keys else None
})
return Catalog.from_dict({'streams': streams})


# this is already defined in schemas.py though w/o dependencies. do we keep this for the sync?
def load_schema(tap_stream_id):
path = "schemas/{}.json".format(tap_stream_id)
schema = utils.load_json(get_abs_path(path))
dependencies = schema.pop("tap_schema_dependencies", [])
refs = {}
for sub_stream_id in dependencies:
refs[sub_stream_id] = load_schema(sub_stream_id)
if refs:
singer.resolve_schema_references(schema, refs)
return schema


def sync(atx):

# write schemas for selected streams\
for stream in atx.catalog.streams:
if stream.tap_stream_id in atx.selected_stream_ids:
schemas.load_and_write_schema(stream.tap_stream_id)

# since there is only one set of schemas for all forms, they will always be selected
streams.sync_forms(atx)

LOGGER.info('--------------------')
for stream_name, stream_count in atx.counts.items():
LOGGER.info('%s: %d', stream_name, stream_count)
LOGGER.info('--------------------')


def _compare_forms(config_forms, api_forms):
return config_forms.difference(api_forms)


def _forms_to_list(config, keyword='forms'):
"""Splits entries into a list and strips out surrounding blank spaces"""
return list(map(str.strip, config.get(keyword).split(',')))


def validate_form_ids(config):
def validate_form_ids(client, config):
"""Validate the form ids passed in the config"""
client = Client(config)
form_stream = Forms()

if not config.get('forms'):
LOGGER.fatal("No forms were provided in config")
raise NoFormsProvidedError

config_forms = set(_forms_to_list(config))
api_forms = {form.get('id') for form in client.get_forms()}
config_forms = _forms_to_list(config)
api_forms = {form.get('id') for res in form_stream.get_forms(client) for form in res}

mismatched_forms = _compare_forms(config_forms, api_forms)
mismatched_forms = config_forms.difference(api_forms)

if len(mismatched_forms) > 0:
LOGGER.fatal(f"FormMistmatchError: forms {mismatched_forms} not returned by API")
# Raise an error if any form-id from config is not matching
# from ids from API response
raise FormMistmatchError


@utils.handle_top_exception(LOGGER)
def main():
args = utils.parse_args(REQUIRED_CONFIG_KEYS)
atx = Context(args.config, args.state)
config = args.config
client = Client(config)
validate_form_ids(client, config)
if args.discover:
validate_form_ids(args.config)
# the schema is static from file so we don't need to pass in atx for connection info.
catalog = discover()
catalog = _discover()
catalog.dump()
else:
atx.catalog = args.catalog \
if args.catalog else discover()
sync(atx)
catalog = args.catalog \
if args.catalog else _discover()
_sync(client, config, args.state, catalog.to_dict())

if __name__ == "__main__":
main()
166 changes: 75 additions & 91 deletions tap_typeform/http.py → tap_typeform/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
LOGGER = singer.get_logger()

REQUEST_TIMEOUT = 300
MAX_RESPONSES_PAGE_SIZE = 1000
FORMS_PAGE_SIZE = 200

class TypeformError(Exception):
def __init__(self, message=None, response=None):
Expand Down Expand Up @@ -67,95 +69,10 @@ class TypeformNotAvailableError(TypeformError):
}
}

class Client(object):
BASE_URL = 'https://api.typeform.com'

def __init__(self, config):
self.token = 'Bearer ' + config.get('token')
self.metric = config.get('metric')
self.session = requests.Session()
# Set and pass request timeout to config param `request_timeout` value.
config_request_timeout = config.get('request_timeout')
if config_request_timeout and float(config_request_timeout):
self.request_timeout = float(config_request_timeout)
else:
self.request_timeout = REQUEST_TIMEOUT # If value is 0,"0","" or not passed then it set default to 300 seconds.

def build_url(self, endpoint):
return f"{self.BASE_URL}/{endpoint}"

@backoff.on_exception(backoff.expo,
(Timeout, ConnectionError), # Backoff for Timeout and ConnectionError.
max_tries=5,
factor=2)
@backoff.on_exception(backoff.expo,
(TypeformInternalError, TypeformNotAvailableError,
TypeformTooManyError, ChunkedEncodingError),
max_tries=3,
factor=2)
def request(self, method, url, params=None, **kwargs):
# note that typeform response api doesn't return limit headers

if 'headers' not in kwargs:
kwargs['headers'] = {}
if self.token:
kwargs['headers']['Authorization'] = self.token

request = requests.Request(method, url, headers=kwargs['headers'], params=params)

response = self.session.send(request.prepare(), timeout=self.request_timeout)# Pass request timeout

if response.status_code != 200:
raise_for_error(response)
return None

if 'total_items' in response.json():
LOGGER.info('raw data items= {}'.format(response.json()['total_items']))
return response.json()

# Max page size for forms API is 200
def get_forms(self, page_size=200):
url = self.build_url(endpoint='forms')
with singer.metrics.http_request_timer(endpoint=url):
return self._get_forms('get', url, page_size)

def _get_forms(self, method, url, page_size):
page = 1
paginate = True
records = []
params = {'page_size': page_size}

while paginate:
params['page'] = page
response = self.request(method, url, params=params)
page_count = response.get('page_count')
paginate = page_count > page
page += 1

records += response.get('items')

return records

def get_form_definition(self, form_id, **kwargs):
endpoint = f"forms/{form_id}"
url = self.build_url(endpoint=endpoint)
with singer.metrics.http_request_timer(endpoint=url):
try:
return self.request('get', url, **kwargs)
except TypeformForbiddenError as err:
raise RuntimeError("Maybe add the Forms:Read scope to your token") from err

def get_form_responses(self, form_id, **kwargs):
endpoint = f"forms/{form_id}/responses"
url = self.build_url(endpoint)
with singer.metrics.http_request_timer(endpoint=url):
try:
return self.request('get', url, **kwargs)
except TypeformForbiddenError as err:
raise RuntimeError("Maybe add the Responses:Read scope to your token") from err


def raise_for_error(response):
"""
Retrieve the error code and the error message from the response and return custom exceptions accordingly.
"""
try:
response.raise_for_status()
except (requests.HTTPError, requests.ConnectionError) as error:
Expand All @@ -168,20 +85,20 @@ def raise_for_error(response):
api_rate_limit_message = ERROR_CODE_EXCEPTION_MAPPING[429]["message"]
message = "HTTP-error-code: 429, Error: {}. Please retry after {} seconds".format(api_rate_limit_message, resp_headers.get("Retry-After"))

# Handling status code 403 specially since response of API does not contain enough information
# Handling status code 403 specially since the response of API does not contain enough information
elif error_code in (403, 401):
api_message = ERROR_CODE_EXCEPTION_MAPPING[error_code]["message"]
message = "HTTP-error-code: {}, Error: {}".format(error_code, api_message)
else:
# Forming a response message for raising custom exception
# Forming a response message for raising a custom exception
try:
response_json = response.json()
except Exception:
response_json = {}

message = "HTTP-error-code: {}, Error: {}".format(
error_code,
response_json.get("description", "Uknown Error"))
response_json.get("description", "Unknown Error"))

exc = ERROR_CODE_EXCEPTION_MAPPING.get(error_code, {}).get("raise_exception", TypeformError)
message = ERROR_CODE_EXCEPTION_MAPPING.get(error_code, {}).get("message", "")
Expand All @@ -190,3 +107,70 @@ def raise_for_error(response):

except (ValueError, TypeError):
raise TypeformError(error) from None

class Client(object):
"""
The client class is used for making REST calls to the Github API.
"""
BASE_URL = 'https://api.typeform.com'

def __init__(self, config):
self.token = 'Bearer ' + config.get('token')
self.metric = config.get('metric')
self.session = requests.Session()
self.page_size = MAX_RESPONSES_PAGE_SIZE
self.form_page_size = FORMS_PAGE_SIZE
self.get_page_size(config)

# Set and pass request timeout to config param `request_timeout` value.
config_request_timeout = config.get('request_timeout')
if config_request_timeout and float(config_request_timeout):
self.request_timeout = float(config_request_timeout)
else:
self.request_timeout = REQUEST_TIMEOUT # If value is 0,"0","" or not passed then it set default to 300 seconds.

def get_page_size(self, config):
"""
This function will get page size from config,
and will return the default value if invalid page size is given.
"""
page_size = config.get('page_size')
try:
if page_size is None:
pass
elif int(float(page_size)) > 0:
self.page_size = int(float(page_size))
self.form_page_size = min(self.form_page_size, self.page_size)
else:
raise Exception
except Exception:
raise Exception(f"The entered page size is invalid, it should be a valid integer.") from None

def build_url(self, endpoint):
"""
Returns full URL for a given endpoint.
"""
return f"{self.BASE_URL}/{endpoint}"

@backoff.on_exception(backoff.expo,(Timeout, ConnectionError), # Backoff for Timeout and ConnectionError.
max_tries=5, factor=2, jitter=None)
@backoff.on_exception(backoff.expo, (TypeformInternalError, TypeformNotAvailableError, TypeformTooManyError, ChunkedEncodingError),
max_tries=3, factor=2)
def request(self, url, params={}, **kwargs):
"""
Call rest API and return the response in case of status code 200.
"""

if 'headers' not in kwargs:
kwargs['headers'] = {}
if self.token:
kwargs['headers']['Authorization'] = self.token

LOGGER.info("URL: %s and Params: %s", url, params)
response = self.session.get(url, params=params, headers=kwargs['headers'], timeout=self.request_timeout)
if response.status_code != 200:
raise_for_error(response)

if 'total_items' in response.json():
LOGGER.info('raw data items= {}'.format(response.json()['total_items']))
return response.json()
Loading