diff --git a/README.md b/README.md index cf45c43e..0778ae64 100755 --- a/README.md +++ b/README.md @@ -67,7 +67,8 @@ Create a config file containing the database connection credentials, e.g.: "port": "3306", "user": "root", "password": "password", - "include_schemas_in_destination_stream_name": true + "include_schemas_in_destination_stream_name": true, + "fastsync_batch_rows": 100000 } ``` @@ -296,6 +297,10 @@ specified in the table's metadata as well. Log based replication works in conjunction with a state file to extract new and changed records that have been recorded by SQL Server Change Tracking each time the tap is invoked. This requires change tracking to be enabled on the source database as well as each table to be replicated with this method. The initial sync with this method will default to full table, and log based replication will occur on subsequent runs. +## FastSync + +Any Full Table replication, or initial full sync for a Log Based extract, will use FasySync, an altertative method that bypasses the singer spec, sepcifically the RECORD row, in order to decrease duration times. Defining a value for the config fastsync_batch_rows will set the batch number of records synced at a time for each table. + #### Examples Let's sync the `animals` table again, but this time using incremental diff --git a/setup.py b/setup.py index 9ed14437..44a49208 100755 --- a/setup.py +++ b/setup.py @@ -22,6 +22,7 @@ "backoff==1.8.0", "MarkupSafe==2.0.1", "jinja2==2.11.3", + "pandas==1.2.2", ], entry_points=""" [console_scripts] diff --git a/tap_mssql/sync_strategies/common.py b/tap_mssql/sync_strategies/common.py index 6fd8d3c6..86c453ab 100755 --- a/tap_mssql/sync_strategies/common.py +++ b/tap_mssql/sync_strategies/common.py @@ -101,6 +101,25 @@ def generate_select_sql(catalog_entry, columns): select_sql = select_sql.replace("%", "%%") return select_sql +def fast_sync_generate_select_sql(catalog_entry, columns): + database_name = get_database_name(catalog_entry) + escaped_db = escape(database_name) + escaped_table = escape(catalog_entry.table) + escaped_columns = [escape(c) for c in columns] + + time_extracted = utils.now() + _sdc_extracted_at = f"'{time_extracted}' as _SDC_EXTRACTED_AT" + _sdc_deleted_at = "NULL as _SDC_DELETED_AT" + _sdc_batched_at = f"'{time_extracted}' as _SDC_BATCHED_AT" + + select_sql = "SELECT {}, {}, {}, {} FROM {}.{}".format( + ",".join(escaped_columns), _sdc_extracted_at, _sdc_deleted_at, _sdc_batched_at, escaped_db, escaped_table + ) + + # escape percent signs + select_sql = select_sql.replace("%", "%%") + return select_sql + def row_to_singer_record( catalog_entry, version, table_stream, row, columns, time_extracted diff --git a/tap_mssql/sync_strategies/full_table.py b/tap_mssql/sync_strategies/full_table.py index ad37dd1d..92080711 100755 --- a/tap_mssql/sync_strategies/full_table.py +++ b/tap_mssql/sync_strategies/full_table.py @@ -2,8 +2,18 @@ # pylint: disable=duplicate-code,too-many-locals,simplifiable-if-expression import copy +import csv +import datetime +import json +import pandas as pd +import os +import secrets import singer from singer import metadata +from singer import utils +import singer.metrics as metrics +import string +import sys import tap_mssql.sync_strategies.common as common @@ -32,7 +42,19 @@ def generate_bookmark_keys(catalog_entry): bookmark_keys = base_bookmark_keys return bookmark_keys + +def write_dataframe_record(row, catalog_entry, stream_version, columns, table_stream, time_extracted): + rec = row.to_dict() + + record_message = singer.RecordMessage( + stream=table_stream, + record=rec, + version=stream_version, + time_extracted=time_extracted, + ) + + singer.write_message(record_message) def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version): mssql_conn = get_azure_sql_engine(config) @@ -64,23 +86,42 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version with mssql_conn.connect() as open_conn: LOGGER.info("Generating select_sql") - select_sql = common.generate_select_sql(catalog_entry, columns) params = {} if catalog_entry.tap_stream_id == "dbo-InputMetadata": prev_converter = modify_ouput_converter(open_conn) - common.sync_query( - open_conn, - catalog_entry, - state, - select_sql, - columns, - stream_version, - table_stream, - params, - ) + + columns.sort() + select_sql = common.fast_sync_generate_select_sql(catalog_entry, columns) + + columns.extend(['_SDC_EXTRACTED_AT','_SDC_DELETED_AT','_SDC_BATCHED_AT']) + + query_df = df = pd.DataFrame(columns=columns) #TODO: delete? + time_extracted = utils.now() #TODO: delete? + + conn = mssql_conn.connect().execution_options(stream_results=True) + + csv_saved = 0 + + chunk_size = config.get("fastsync_batch_rows") #TODO: update this so that its not required (if not set, fastsync disabled) + files = [] + for chunk_dataframe in pd.read_sql(select_sql, conn, chunksize=chunk_size): + csv_saved += 1 + + filename = gen_export_filename(table=table_stream) + filepath = os.path.join('fastsync', filename) + chunk_dataframe.to_csv(f'{filepath}', sep=',', encoding='utf-8',index=False,header=False, compression='gzip') + + files.append(filename) + + # creating singer-like record to signify FASTSYNC for initial sync + singer_message = {'type': 'FASTSYNC','stream':table_stream, 'version': stream_version, 'files':files } + LOGGER.info(singer_message) + json_object = json.dumps(singer_message) + sys.stdout.write(str(json_object) + '\n') + sys.stdout.flush() if catalog_entry.tap_stream_id == "dbo-InputMetadata": revert_ouput_converter(open_conn, prev_converter) @@ -88,5 +129,52 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version # clear max pk value and last pk fetched upon successful sync singer.clear_bookmark(state, catalog_entry.tap_stream_id, "max_pk_values") singer.clear_bookmark(state, catalog_entry.tap_stream_id, "last_pk_fetched") - singer.write_message(activate_version_message) + +def generate_random_string(length: int = 8) -> str: + """ + Generate cryptographically secure random strings + Uses best practice from Python doc https://docs.python.org/3/library/secrets.html#recipes-and-best-practices + Args: + length: length of the string to generate + Returns: random string + """ + + if length < 1: + raise Exception('Length must be at least 1!') + + if 0 < length < 8: + LOGGER.warn('Length is too small! consider 8 or more characters') + + return ''.join( + secrets.choice(string.ascii_uppercase + string.digits) for _ in range(length) + ) + +def gen_export_filename( + table: str, suffix: str = None, postfix: str = None, ext: str = None +) -> str: + """ + Generates a unique filename used for exported fastsync data that avoids file name collision + Default pattern: + pipelinewise____fastsync_.csv.gz + Args: + tap_id: Unique tap id + table: Name of the table to export + suffix: Generated filename suffix. Defaults to current timestamp in milliseconds + postfix: Generated filename postfix. Defaults to a random 8 character length string + ext: Filename extension. Defaults to .csv.gz + Returns: + Unique filename as a string + """ + if not suffix: + suffix = datetime.datetime.now().strftime('%Y%m%d-%H%M%S-%f') + + if not postfix: + postfix = generate_random_string() + + if not ext: + ext = 'csv.gz' + + return 'pipelinewise_{}_{}_batch_{}.{}'.format( + table, suffix, postfix, ext + ) \ No newline at end of file