-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New fastsync branch #11
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,18 @@ | |
# pylint: disable=duplicate-code,too-many-locals,simplifiable-if-expression | ||
|
||
import copy | ||
import csv | ||
import datetime | ||
import json | ||
import pandas as pd | ||
import os | ||
import secrets | ||
import singer | ||
from singer import metadata | ||
from singer import utils | ||
import singer.metrics as metrics | ||
import string | ||
import sys | ||
|
||
import tap_mssql.sync_strategies.common as common | ||
|
||
|
@@ -32,7 +42,19 @@ def generate_bookmark_keys(catalog_entry): | |
bookmark_keys = base_bookmark_keys | ||
|
||
return bookmark_keys | ||
|
||
def write_dataframe_record(row, catalog_entry, stream_version, columns, table_stream, time_extracted): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are |
||
|
||
rec = row.to_dict() | ||
|
||
record_message = singer.RecordMessage( | ||
stream=table_stream, | ||
record=rec, | ||
version=stream_version, | ||
time_extracted=time_extracted, | ||
) | ||
|
||
singer.write_message(record_message) | ||
|
||
def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version): | ||
mssql_conn = get_azure_sql_engine(config) | ||
|
@@ -64,29 +86,95 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version | |
|
||
with mssql_conn.connect() as open_conn: | ||
LOGGER.info("Generating select_sql") | ||
select_sql = common.generate_select_sql(catalog_entry, columns) | ||
|
||
params = {} | ||
|
||
if catalog_entry.tap_stream_id == "dbo-InputMetadata": | ||
prev_converter = modify_ouput_converter(open_conn) | ||
|
||
common.sync_query( | ||
open_conn, | ||
catalog_entry, | ||
state, | ||
select_sql, | ||
columns, | ||
stream_version, | ||
table_stream, | ||
params, | ||
) | ||
|
||
columns.sort() | ||
select_sql = common.fast_sync_generate_select_sql(catalog_entry, columns) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we already discussed this, but again just to confirm - we want full syncs to always default to fast sync right? We don't even want the option to do a full sync without it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's ok if that's what we want to do, but I think my preference is to provide the option to use either fast sync or the Singer spec for the full table refresh. That could look something like moving all of this logic into another method called This way we could turn fast sync on on a table by table basis if we wanted. |
||
|
||
columns.extend(['_SDC_EXTRACTED_AT','_SDC_DELETED_AT','_SDC_BATCHED_AT']) | ||
|
||
query_df = df = pd.DataFrame(columns=columns) #TODO: delete? | ||
time_extracted = utils.now() #TODO: delete? | ||
|
||
conn = mssql_conn.connect().execution_options(stream_results=True) | ||
|
||
csv_saved = 0 | ||
|
||
chunk_size = config.get("fastsync_batch_rows") #TODO: update this so that its not required (if not set, fastsync disabled) | ||
files = [] | ||
for chunk_dataframe in pd.read_sql(select_sql, conn, chunksize=chunk_size): | ||
csv_saved += 1 | ||
|
||
filename = gen_export_filename(table=table_stream) | ||
filepath = os.path.join('fastsync', filename) | ||
chunk_dataframe.to_csv(f'{filepath}', sep=',', encoding='utf-8',index=False,header=False, compression='gzip') | ||
|
||
files.append(filename) | ||
|
||
# creating singer-like record to signify FASTSYNC for initial sync | ||
singer_message = {'type': 'FASTSYNC','stream':table_stream, 'version': stream_version, 'files':files } | ||
LOGGER.info(singer_message) | ||
json_object = json.dumps(singer_message) | ||
sys.stdout.write(str(json_object) + '\n') | ||
sys.stdout.flush() | ||
|
||
if catalog_entry.tap_stream_id == "dbo-InputMetadata": | ||
revert_ouput_converter(open_conn, prev_converter) | ||
|
||
# clear max pk value and last pk fetched upon successful sync | ||
singer.clear_bookmark(state, catalog_entry.tap_stream_id, "max_pk_values") | ||
singer.clear_bookmark(state, catalog_entry.tap_stream_id, "last_pk_fetched") | ||
|
||
singer.write_message(activate_version_message) | ||
|
||
def generate_random_string(length: int = 8) -> str: | ||
""" | ||
Generate cryptographically secure random strings | ||
Uses best practice from Python doc https://docs.python.org/3/library/secrets.html#recipes-and-best-practices | ||
Args: | ||
length: length of the string to generate | ||
Returns: random string | ||
""" | ||
|
||
if length < 1: | ||
raise Exception('Length must be at least 1!') | ||
|
||
if 0 < length < 8: | ||
LOGGER.warn('Length is too small! consider 8 or more characters') | ||
|
||
return ''.join( | ||
secrets.choice(string.ascii_uppercase + string.digits) for _ in range(length) | ||
) | ||
|
||
def gen_export_filename( | ||
table: str, suffix: str = None, postfix: str = None, ext: str = None | ||
) -> str: | ||
""" | ||
Generates a unique filename used for exported fastsync data that avoids file name collision | ||
Default pattern: | ||
pipelinewise_<tap_id>_<table>_<timestamp_with_ms>_fastsync_<random_string>.csv.gz | ||
Args: | ||
tap_id: Unique tap id | ||
table: Name of the table to export | ||
suffix: Generated filename suffix. Defaults to current timestamp in milliseconds | ||
postfix: Generated filename postfix. Defaults to a random 8 character length string | ||
ext: Filename extension. Defaults to .csv.gz | ||
Returns: | ||
Unique filename as a string | ||
""" | ||
if not suffix: | ||
suffix = datetime.datetime.now().strftime('%Y%m%d-%H%M%S-%f') | ||
|
||
if not postfix: | ||
postfix = generate_random_string() | ||
|
||
if not ext: | ||
ext = 'csv.gz' | ||
|
||
return 'pipelinewise_{}_{}_batch_{}.{}'.format( | ||
table, suffix, postfix, ext | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unused imports