Skip to content

Commit

Permalink
Merge pull request #4 from degreed-data-engineering/Update-state-each…
Browse files Browse the repository at this point in the history
…-elt

Update state if no full table sync
  • Loading branch information
brose7230 authored Mar 29, 2022
2 parents 1b4bccb + 82485eb commit 2fad2d4
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions tap_mssql/sync_strategies/logical.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def execute_log_based_sync(self):

if self.catalog_entry.tap_stream_id == "dbo-InputMetadata":
prev_converter = modify_ouput_converter(open_conn)

results = open_conn.execute(ct_sql_query)

row = results.fetchone()
Expand All @@ -243,7 +243,10 @@ def execute_log_based_sync(self):
counter.tags["database"] = self.database_name
counter.tags["table"] = self.table_name

rows_updated = False # Checks to see if there are any new records. If not then records state below

while row:
rows_updated = True
counter.increment()
desired_columns = []
ordered_row = []
Expand Down Expand Up @@ -294,11 +297,20 @@ def execute_log_based_sync(self):
# do more
row = results.fetchone()

if not rows_updated: # updates the state if no new records have been added
self.current_log_version = self._get_current_log_version()
self.state = singer.write_bookmark(
self.state,
self.catalog_entry.tap_stream_id,
"current_log_version",
self.current_log_version,
)

singer.write_message(singer.StateMessage(value=copy.deepcopy(self.state)))

if self.catalog_entry.tap_stream_id == "dbo-InputMetadata":
revert_ouput_converter(open_conn, prev_converter)

def _build_ct_sql_query(self, key_properties):
"""Using Selected columns, return an SQL query to select updated records from Change Tracking"""
# Order column list in alphabetical order starting with key_properties then other columns
Expand Down

0 comments on commit 2fad2d4

Please sign in to comment.