Skip to content

Commit

Permalink
Merge pull request #11 from degreed-data-engineering/fastsync-new-branch
Browse files Browse the repository at this point in the history
New fastsync branch
  • Loading branch information
brose7230 authored Aug 11, 2022
2 parents af31b2e + 9b082a6 commit d0b098a
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 13 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ Create a config file containing the database connection credentials, e.g.:
"port": "3306",
"user": "root",
"password": "password",
"include_schemas_in_destination_stream_name": true
"include_schemas_in_destination_stream_name": true,
"fastsync_batch_rows": 100000
}
```

Expand Down Expand Up @@ -296,6 +297,10 @@ specified in the table's metadata as well.
Log based replication works in conjunction with a state file to extract
new and changed records that have been recorded by SQL Server Change Tracking each time the tap is invoked. This requires change tracking to be enabled on the source database as well as each table to be replicated with this method. The initial sync with this method will default to full table, and log based replication will occur on subsequent runs.

## FastSync

Any Full Table replication, or initial full sync for a Log Based extract, will use FasySync, an altertative method that bypasses the singer spec, sepcifically the RECORD row, in order to decrease duration times. Defining a value for the config fastsync_batch_rows will set the batch number of records synced at a time for each table.

#### Examples

Let's sync the `animals` table again, but this time using incremental
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"backoff==1.8.0",
"MarkupSafe==2.0.1",
"jinja2==2.11.3",
"pandas==1.2.2",
],
entry_points="""
[console_scripts]
Expand Down
19 changes: 19 additions & 0 deletions tap_mssql/sync_strategies/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,25 @@ def generate_select_sql(catalog_entry, columns):
select_sql = select_sql.replace("%", "%%")
return select_sql

def fast_sync_generate_select_sql(catalog_entry, columns):
database_name = get_database_name(catalog_entry)
escaped_db = escape(database_name)
escaped_table = escape(catalog_entry.table)
escaped_columns = [escape(c) for c in columns]

time_extracted = utils.now()
_sdc_extracted_at = f"'{time_extracted}' as _SDC_EXTRACTED_AT"
_sdc_deleted_at = "NULL as _SDC_DELETED_AT"
_sdc_batched_at = f"'{time_extracted}' as _SDC_BATCHED_AT"

select_sql = "SELECT {}, {}, {}, {} FROM {}.{}".format(
",".join(escaped_columns), _sdc_extracted_at, _sdc_deleted_at, _sdc_batched_at, escaped_db, escaped_table
)

# escape percent signs
select_sql = select_sql.replace("%", "%%")
return select_sql


def row_to_singer_record(
catalog_entry, version, table_stream, row, columns, time_extracted
Expand Down
112 changes: 100 additions & 12 deletions tap_mssql/sync_strategies/full_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,18 @@
# pylint: disable=duplicate-code,too-many-locals,simplifiable-if-expression

import copy
import csv
import datetime
import json
import pandas as pd
import os
import secrets
import singer
from singer import metadata
from singer import utils
import singer.metrics as metrics
import string
import sys

import tap_mssql.sync_strategies.common as common

Expand Down Expand Up @@ -32,7 +42,19 @@ def generate_bookmark_keys(catalog_entry):
bookmark_keys = base_bookmark_keys

return bookmark_keys

def write_dataframe_record(row, catalog_entry, stream_version, columns, table_stream, time_extracted):

rec = row.to_dict()

record_message = singer.RecordMessage(
stream=table_stream,
record=rec,
version=stream_version,
time_extracted=time_extracted,
)

singer.write_message(record_message)

def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version):
mssql_conn = get_azure_sql_engine(config)
Expand Down Expand Up @@ -64,29 +86,95 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version

with mssql_conn.connect() as open_conn:
LOGGER.info("Generating select_sql")
select_sql = common.generate_select_sql(catalog_entry, columns)

params = {}

if catalog_entry.tap_stream_id == "dbo-InputMetadata":
prev_converter = modify_ouput_converter(open_conn)

common.sync_query(
open_conn,
catalog_entry,
state,
select_sql,
columns,
stream_version,
table_stream,
params,
)

columns.sort()
select_sql = common.fast_sync_generate_select_sql(catalog_entry, columns)

columns.extend(['_SDC_EXTRACTED_AT','_SDC_DELETED_AT','_SDC_BATCHED_AT'])

query_df = df = pd.DataFrame(columns=columns) #TODO: delete?
time_extracted = utils.now() #TODO: delete?

conn = mssql_conn.connect().execution_options(stream_results=True)

csv_saved = 0

chunk_size = config.get("fastsync_batch_rows") #TODO: update this so that its not required (if not set, fastsync disabled)
files = []
for chunk_dataframe in pd.read_sql(select_sql, conn, chunksize=chunk_size):
csv_saved += 1

filename = gen_export_filename(table=table_stream)
filepath = os.path.join('fastsync', filename)
chunk_dataframe.to_csv(f'{filepath}', sep=',', encoding='utf-8',index=False,header=False, compression='gzip')

files.append(filename)

# creating singer-like record to signify FASTSYNC for initial sync
singer_message = {'type': 'FASTSYNC','stream':table_stream, 'version': stream_version, 'files':files }
LOGGER.info(singer_message)
json_object = json.dumps(singer_message)
sys.stdout.write(str(json_object) + '\n')
sys.stdout.flush()

if catalog_entry.tap_stream_id == "dbo-InputMetadata":
revert_ouput_converter(open_conn, prev_converter)

# clear max pk value and last pk fetched upon successful sync
singer.clear_bookmark(state, catalog_entry.tap_stream_id, "max_pk_values")
singer.clear_bookmark(state, catalog_entry.tap_stream_id, "last_pk_fetched")

singer.write_message(activate_version_message)

def generate_random_string(length: int = 8) -> str:
"""
Generate cryptographically secure random strings
Uses best practice from Python doc https://docs.python.org/3/library/secrets.html#recipes-and-best-practices
Args:
length: length of the string to generate
Returns: random string
"""

if length < 1:
raise Exception('Length must be at least 1!')

if 0 < length < 8:
LOGGER.warn('Length is too small! consider 8 or more characters')

return ''.join(
secrets.choice(string.ascii_uppercase + string.digits) for _ in range(length)
)

def gen_export_filename(
table: str, suffix: str = None, postfix: str = None, ext: str = None
) -> str:
"""
Generates a unique filename used for exported fastsync data that avoids file name collision
Default pattern:
pipelinewise_<tap_id>_<table>_<timestamp_with_ms>_fastsync_<random_string>.csv.gz
Args:
tap_id: Unique tap id
table: Name of the table to export
suffix: Generated filename suffix. Defaults to current timestamp in milliseconds
postfix: Generated filename postfix. Defaults to a random 8 character length string
ext: Filename extension. Defaults to .csv.gz
Returns:
Unique filename as a string
"""
if not suffix:
suffix = datetime.datetime.now().strftime('%Y%m%d-%H%M%S-%f')

if not postfix:
postfix = generate_random_string()

if not ext:
ext = 'csv.gz'

return 'pipelinewise_{}_{}_batch_{}.{}'.format(
table, suffix, postfix, ext
)

0 comments on commit d0b098a

Please sign in to comment.