Skip to content

Commit

Permalink
TDL-24162 Log based inclusivity updates (#90)
Browse files Browse the repository at this point in the history
* Uncomment all record count assertions, fix with pk count where needed, new method

* First round review comments (set comprehension, get() fallback) and some line length cleanup

* Delete commented out test_sync_full.py

* Review comments 2, make pk count method generic and update to use tuples to fix dupes

* Update log based int test to add new table pk to expected metadata

* Review comments 3, fail test if upsert format or value is incorrect

* Fix typo / bug for pk tuple iteration

* update comment
  • Loading branch information
bhtowles authored Oct 16, 2023
1 parent 2279f42 commit a4c579f
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 875 deletions.
23 changes: 23 additions & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,29 @@ def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = T
connections.select_catalog_and_fields_via_metadata(
conn_id, catalog, schema, additional_md, non_selected_properties)

def unique_pk_count_by_stream(self, recs_by_stream):
"""
Switch from upsert record count verification to unique pk count verification due to
tap-mssql inconsistency with log based inclusivity TDL-24162 (that will not be fixed)
"""
pk_count_by_stream = {}
for strm, recs in recs_by_stream.items():
primary_keys = self.expected_primary_keys_by_stream_id()[strm]

# use tuple generator to handle arbitrary number of pks during set comprehension
stream_pks = {tuple(m.get('data', {}).get(pk) for pk in primary_keys)
for m in recs['messages']
if m['action'] == 'upsert'}

# fail the test if any upserts fail to return 'data' or a pk value
for pk in stream_pks:
for i in range(len(pk)):
self.assertIsNotNone(pk[i])

pk_count_by_stream[strm] = len(stream_pks)

return pk_count_by_stream

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_date = self.get_properties().get("start_date")
90 changes: 41 additions & 49 deletions tests/test_log_based_interruped_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,8 @@ def test_run(self):
# verify records match on the first sync
records_by_stream = runner.get_records_from_target_output()

record_count_by_stream = runner.examine_target_output_file(
self, conn_id, self.expected_sync_streams(), self.expected_primary_keys_by_sync_stream_id())

# BUG : TDL-19583 log_based_interruptible_dbo_int_and_bool_data is replicating the last row twice
#self.assertEqual(record_count_by_stream, self.expected_count())
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)
self.assertEqual(pk_count_by_stream, self.expected_count())

table_version = dict()
initial_log_version = dict()
Expand All @@ -229,45 +226,34 @@ def test_run(self):
stream_expected_data = self.expected_metadata()[stream]
table_version[stream] = records_by_stream[stream]['table_version']

# BUG: TDL-19583 - 3 activate_version messages
# verify on the first sync you get
# activate version message before and after all data for the full table
# and before the logical replication part

# self.assertEqual(
# records_by_stream[stream]['messages'][0]['action'],
# 'activate_version')
# self.assertEqual(
# records_by_stream[stream]['messages'][-1]['action'],
# 'activate_version')
# self.assertEqual(
# records_by_stream[stream]['messages'][-2]['action'],
# 'activate_version')

self.assertEqual(
len([m for m in records_by_stream[stream]['messages'][1:] if m["action"] == "activate_version"]),
2,
msg="Expect 2 more activate version messages for end of full table and beginning of log based")
# gather all actions then verify 3 activate versions, 1 at start, 2 in the last 3
actions = [rec['action'] for rec in records_by_stream[stream]['messages']]
self.assertEqual(actions[0], 'activate_version')
self.assertEqual(len([a for a in actions[-3:] if a == "activate_version"]), 2,
msg=("Expected 2 of the last 3 messages to be activate version messages. 1 for "
"end of full table and 1 for beginning of log based. Position can vary "
"due to TDL-24162")
)

# verify state and bookmarks
initial_state = menagerie.get_state(conn_id)
bookmark = initial_state['bookmarks'][stream]

self.assertIsNone(initial_state.get('currently_syncing'), msg="expected state's currently_syncing to be None")
self.assertIsNotNone(
bookmark.get('current_log_version'),
msg="expected bookmark to have current_log_version because we are using log replication")
self.assertTrue(bookmark['initial_full_table_complete'], msg="expected full table to be complete")
self.assertIsNone(initial_state.get('currently_syncing'),
msg="expected state's currently_syncing to be None")
self.assertIsNotNone(bookmark.get('current_log_version'),
msg="expected bookmark to have current_log_version due to log replication")
self.assertTrue(bookmark['initial_full_table_complete'],
msg="expected full table to be complete")
inital_log_version = bookmark['current_log_version']

self.assertEqual(bookmark['version'], table_version[stream],
msg="expected bookmark for stream to match version")

expected_schemas = self.expected_metadata()[stream]['schema']
self.assertEqual(records_by_stream[stream]['schema'],
expected_schemas,
msg="expected: {} != actual: {}".format(expected_schemas,
records_by_stream[stream]['schema']))
self.assertEqual(records_by_stream[stream]['schema'], expected_schemas,
msg="expected: {} != actual: {}".format(
expected_schemas, records_by_stream[stream]['schema']))
initial_log_version[stream] = bookmark['current_log_version']

initial_log_version_value = set(initial_log_version.values()).pop()
Expand All @@ -281,14 +267,18 @@ def test_run(self):
# --> A table which is interrupted

del interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data']['version']
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data']['initial_full_table_complete'] = False
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data'][
'initial_full_table_complete'] = False

max_pk_values = {'max_pk_values': {'pk': 12}}
last_pk_fetched = {'last_pk_fetched': {'pk': 10}}

interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(max_pk_values)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(last_pk_fetched)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data']['initial_full_table_complete'] = False
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(
max_pk_values)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(
last_pk_fetched)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'][
'initial_full_table_complete'] = False

menagerie.set_state(conn_id, interrupted_state)

Expand All @@ -310,20 +300,24 @@ def test_run(self):
query_list.extend(insert(database_name, schema_name, table_name,
int_after_values))


mssql_cursor_context_manager(*query_list)

# add new table's pk to expected_metadata
self.EXPECTED_METADATA['log_based_interruptible_dbo_int_data_after'] = {
self.PRIMARY_KEYS: {'pk'}}

# invoke the sync job AGAIN following various manipulations to the data

# add the newly created stream in the expectations
expected_sync_streams = self.expected_sync_streams()
expected_sync_streams.add('log_based_interruptible_dbo_int_data_after')
expected_primary_keys_by_sync_stream_id = self.expected_primary_keys_by_sync_stream_id()
expected_primary_keys_by_sync_stream_id['log_based_interruptible_dbo_int_data_after'] = {'pk'}
expected_primary_keys_by_sync_stream_id[
'log_based_interruptible_dbo_int_data_after'] = {'pk'}
expected_count = self.expected_count()
expected_count['log_based_interruptible_dbo_int_data_after'] = 6
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 3
expected_count['log_based_interruptible_dbo_int_data'] = 0
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 2
expected_count['log_based_interruptible_dbo_int_data'] = 14

# run in check mode
check_job_name = runner.run_check_mode(self, conn_id)
Expand All @@ -345,15 +339,14 @@ def test_run(self):

records_by_stream = runner.get_records_from_target_output()

record_count_by_stream = runner.examine_target_output_file(
self, conn_id, expected_sync_streams, expected_primary_keys_by_sync_stream_id)
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)

second_state = menagerie.get_state(conn_id)
bookmark_2 = second_state['bookmarks']

# validate the record count for all the streams after interruption recovery
# BUG: TDL-19583 Duplicate record within a sync with 3 activate_version messages
#self.assertEqual(record_count_by_stream, expected_count)
# validate the record count for all the streams after interruption recovery, use unique
# pks instead of all upserts to de-dupe and avoid inconsistency from TDL-24162
self.assertEqual(pk_count_by_stream, expected_count)

second_log_version = dict()
for stream in expected_sync_streams:
Expand Down Expand Up @@ -409,14 +402,13 @@ def test_run(self):

records_by_stream = runner.get_records_from_target_output()

record_count_by_stream = runner.examine_target_output_file(
self, conn_id, expected_sync_streams, expected_primary_keys_by_sync_stream_id)
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)

expected_count['log_based_interruptible_dbo_int_data_after'] = 3
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 0
expected_count['log_based_interruptible_dbo_int_data'] = 0

self.assertEqual(record_count_by_stream, expected_count)
self.assertEqual(pk_count_by_stream, expected_count)

final_state = menagerie.get_state(conn_id)
bookmark_3 = final_state['bookmarks']
Expand Down
Loading

0 comments on commit a4c579f

Please sign in to comment.