Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tdl 19964 add missing tap tester tests #65

Merged
merged 13 commits into from
Aug 29, 2022
60 changes: 37 additions & 23 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from datetime import timedelta
from datetime import datetime as dt

from singer import get_logger
from tap_tester import connections, menagerie, runner
from tap_tester import connections, menagerie, runner, LOGGER


class TypeformBaseTest(unittest.TestCase):
Expand All @@ -27,8 +26,11 @@ class TypeformBaseTest(unittest.TestCase):
INCREMENTAL = "INCREMENTAL"
FULL_TABLE = "FULL_TABLE"
START_DATE_FORMAT = "%Y-%m-%dT00:00:00Z"
RECORD_REPLICATION_KEY_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT00:00:00+00:00"
LOGGER = get_logger()
OBEYS_START_DATE = "obey-start-date"
PAGE_SIZE = 1000
FORM_PAGE_SIZE = 200

start_date = '2021-05-10T00:00:00Z'

Expand All @@ -39,7 +41,7 @@ def tap_name():

@staticmethod
def get_type():
"""the expected url route ending"""
"""The expected url route ending"""
return "platform.typeform"

def get_properties(self, original: bool = True):
Expand Down Expand Up @@ -70,21 +72,31 @@ def expected_metadata(self):
"answers": {
self.PRIMARY_KEYS: {"landing_id", "question_id"},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {"landed_at"}
self.REPLICATION_KEYS: {"submitted_at"},
self.OBEYS_START_DATE: True
},
"landings": {
"submitted_landings": {
self.PRIMARY_KEYS: {"landing_id"},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {"landed_at"}
self.REPLICATION_KEYS: {"submitted_at"},
self.OBEYS_START_DATE: True
},
"unsubmitted_landings": {
self.PRIMARY_KEYS: {"landing_id"},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {"landed_at"},
self.OBEYS_START_DATE: True
},
"questions": {
self.PRIMARY_KEYS: {"form_id", "question_id"},
self.REPLICATION_METHOD: self.FULL_TABLE,
self.OBEYS_START_DATE: False
},
"forms": {
self.PRIMARY_KEYS: {"id"},
self.REPLICATION_METHOD: self.INCREMENTAL,
self.REPLICATION_KEYS: {"last_updated_at"}
self.REPLICATION_KEYS: {"last_updated_at"},
self.OBEYS_START_DATE: True
}
}

Expand All @@ -102,7 +114,7 @@ def child_streams(self):

def expected_primary_keys(self):
"""
return a dictionary with key of table name
return a dictionary with the key of table name
and value as a set of primary key fields
"""
return {table: properties.get(self.PRIMARY_KEYS, set())
Expand All @@ -111,7 +123,7 @@ def expected_primary_keys(self):

def expected_replication_keys(self):
"""
return a dictionary with key of table name
return a dictionary with the key of table name
and value as a set of replication key fields
"""
return {table: properties.get(self.REPLICATION_KEYS, set())
Expand All @@ -120,22 +132,24 @@ def expected_replication_keys(self):

def expected_foreign_keys(self):
"""
return a dictionary with key of table name
return a dictionary with the key of table name
and value as a set of foreign key fields
"""
return {table: properties.get(self.FOREIGN_KEYS, set())
for table, properties
in self.expected_metadata().items()}

def expected_automatic_fields(self):
auto_fields = {}
for k, v in self.expected_metadata().items():
auto_fields[k] = v.get(self.PRIMARY_KEYS, set()) | v.get(self.REPLICATION_KEYS, set()) \
| v.get(self.FOREIGN_KEYS, set())
return auto_fields
"""
Return a dictionary with the key of the table name
and value as a set of automatic key fields
"""
return {table: ((self.expected_primary_keys().get(table) or set()) |
(self.expected_replication_keys().get(table) or set()))
for table in self.expected_metadata()}

def expected_replication_method(self):
"""return a dictionary with key of table name nd value of replication method"""
"""return a dictionary with the key of table name and value of replication method"""
return {table: properties.get(self.REPLICATION_METHOD, None)
for table, properties
in self.expected_metadata().items()}
Expand All @@ -153,7 +167,7 @@ def setUp(self):
def run_and_verify_check_mode(self, conn_id):
"""
Run the tap in check mode and verify it succeeds.
This should be ran prior to field selection and initial sync.
This should run before field selection and initial sync.
Return the connection id and found catalogs from menagerie.
"""
# run in check mode
Expand All @@ -169,7 +183,7 @@ def run_and_verify_check_mode(self, conn_id):
found_catalog_names = set(map(lambda c: c['stream_name'], found_catalogs))

self.assertSetEqual(self.expected_streams(), found_catalog_names, msg="discovered schemas do not match")
print("discovered schemas are OK")
LOGGER.info("discovered schemas are OK")

return found_catalogs

Expand All @@ -193,7 +207,7 @@ def run_and_verify_sync(self, conn_id):
sum(sync_record_count.values()), 0,
msg="failed to replicate any data: {}".format(sync_record_count)
)
print("total replicated row count: {}".format(sum(sync_record_count.values())))
LOGGER.info("total replicated row count: {}".format(sum(sync_record_count.values())))

return sync_record_count

Expand All @@ -202,7 +216,7 @@ def perform_and_verify_table_and_field_selection(self,
test_catalogs,
select_all_fields=True):
"""
Perform table and field selection based off of the streams to select
Perform table and field selection based on the streams to select
set and field selection parameters.
Verify this results in the expected streams selected and all or no
fields selected for those streams.
Expand All @@ -222,7 +236,7 @@ def perform_and_verify_table_and_field_selection(self,

# Verify all testable streams are selected
selected = catalog_entry.get('annotated-schema').get('selected')
print("Validating selection on {}: {}".format(cat['stream_name'], selected))
LOGGER.info("Validating selection on {}: {}".format(cat['stream_name'], selected))
if cat['stream_name'] not in expected_selected:
self.assertFalse(selected, msg="Stream selected, but not testable.")
continue # Skip remaining assertions if we aren't selecting this stream
Expand All @@ -232,7 +246,7 @@ def perform_and_verify_table_and_field_selection(self,
# Verify all fields within each selected stream are selected
for field, field_props in catalog_entry.get('annotated-schema').get('properties').items():
field_selected = field_props.get('selected')
print("\tValidating selection on {}.{}: {}".format(
LOGGER.info("\tValidating selection on {}.{}: {}".format(
cat['stream_name'], field, field_selected))
self.assertTrue(field_selected, msg="Field not selected.")
else:
Expand Down
79 changes: 79 additions & 0 deletions tests/test_typeform_all_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import tap_tester.connections as connections
import tap_tester.runner as runner
import tap_tester.menagerie as menagerie
from base import TypeformBaseTest


class TypeformAllFieldsTest(TypeformBaseTest):
"""Ensure running the tap with all streams and fields selected results in the replication of all fields."""

def name(self):
return "tap_tester_typeform_all_fields_test"

def test_run(self):
"""
• Verify no unexpected streams were replicated
• Verify that more than just the automatic fields are replicated for each stream.
• verify all fields for each stream are replicated
"""

# Streams to verify all fields tests
streams_to_test = self.expected_streams()

conn_id = connections.ensure_connection(self)

expected_automatic_fields = self.expected_automatic_fields()

# Verify that there are catalogs found
found_catalogs = self.run_and_verify_check_mode(conn_id)

# Table and field selection
test_catalogs_all_fields = [catalog for catalog in found_catalogs
if catalog.get('tap_stream_id') in streams_to_test]

self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_all_fields)

# Grab metadata after performing table-and-field selection to set expectations
# Used for asserting all fields are replicated
stream_to_all_catalog_fields = dict()
for catalog in test_catalogs_all_fields:
stream_id, stream_name = catalog['stream_id'], catalog['stream_name']
catalog_entry = menagerie.get_annotated_schema(conn_id, stream_id)
fields_from_field_level_md = [md_entry['breadcrumb'][1]
for md_entry in catalog_entry['metadata']
if md_entry['breadcrumb'] != []]
stream_to_all_catalog_fields[stream_name] = set(
fields_from_field_level_md)

self.run_and_verify_sync(conn_id)

synced_records = runner.get_records_from_target_output()

# Verify no unexpected streams were replicated
synced_stream_names = set(synced_records.keys())
self.assertSetEqual(streams_to_test, synced_stream_names)

for stream in streams_to_test:
with self.subTest(stream=stream):

# Expected values
expected_all_keys = stream_to_all_catalog_fields[stream]
expected_automatic_keys = expected_automatic_fields.get(
stream, set())

# Verify that more than just the automatic fields are replicated for each stream.
self.assertTrue(expected_automatic_keys.issubset(
expected_all_keys), msg='{} is not in "expected_all_keys"'.format(expected_automatic_keys-expected_all_keys))

messages = synced_records.get(stream)
# Collect actual values
actual_all_keys = set()
for message in messages['messages']:
if message['action'] == 'upsert':
actual_all_keys.update(message['data'].keys())

# Verify all fields for each stream are replicated
self.assertGreater(len(expected_all_keys), len(expected_automatic_keys))
self.assertTrue(expected_automatic_keys.issubset(expected_all_keys), msg=f'{expected_automatic_keys-expected_all_keys} is not in "expected_all_keys"')
self.assertSetEqual(expected_all_keys, actual_all_keys)
30 changes: 22 additions & 8 deletions tests/test_typeform_automatic_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,46 @@ def test_run(self):
For EACH stream add enough data that you surpass the limit of a single
fetch of data. For instance if you have a limit of 250 records ensure
that 251 (or more) records have been posted for that stream.

• Verify we can deselect all fields except when inclusion=automatic, which is handled by base.py methods
• Verify that only the automatic fields are sent to the target.
• Verify that all replicated records have unique primary key values.
"""

expected_streams = self.expected_streams()

# instantiate connection
# Instantiate connection
conn_id = connections.ensure_connection(self)

# run check mode
# Run check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
# Table and field selection
test_catalogs_automatic_fields = [catalog for catalog in found_catalogs
if catalog.get('stream_name') in expected_streams]

self.perform_and_verify_table_and_field_selection(
conn_id, test_catalogs_automatic_fields, select_all_fields=False,
)

# run initial sync
# Run initial sync
record_count_by_stream = self.run_and_verify_sync(conn_id)
synced_records = runner.get_records_from_target_output()

for stream in expected_streams:
with self.subTest(stream=stream):

# expected values
# Expected values
expected_keys = self.expected_automatic_fields().get(stream)
expected_primary_keys = self.expected_primary_keys()[stream]

# collect actual values
# Collect actual values
data = synced_records.get(stream)
record_messages_keys = [set(row['data'].keys()) for row in data['messages']]

primary_keys_list = [tuple(message.get('data', {}).get(expected_pk) for expected_pk in expected_primary_keys)
for message in data.get('messages', [])
if message.get('action') == 'upsert']
unique_primary_keys_list = set(primary_keys_list)

# Verify that you get some records for each stream
self.assertGreater(
Expand All @@ -63,4 +71,10 @@ def test_run(self):

# Verify that only the automatic fields are sent to the target
for actual_keys in record_messages_keys:
self.assertSetEqual(expected_keys, actual_keys)
self.assertSetEqual(expected_keys, actual_keys)

# Verify that all replicated records have unique primary key values.
self.assertEqual(
len(primary_keys_list),
len(unique_primary_keys_list),
msg="Replicated record does not have unique primary key values.")
Loading