Skip to content

Commit

Permalink
[AP-1772] Added size check option for sync_tables command (#1170)
Browse files Browse the repository at this point in the history
* Added size check option for sync_tables command

* fix lint

* fix lint

* fix pep8

* fix unit test

* fix unit test

* fix unit test

* fix run_tap bug

* fix lint

* changed setting to MB

* changed setting to MB

* changed setting to MB

* updated requirements

* changed setting to MB

* changed setting to MB

* changed setting to MB

* changed setting to MB

* debug

* debug 2

* debug 3

* fixed schema

* fixed schema

* fixed schema
  • Loading branch information
amofakhar authored Jul 19, 2024
1 parent 109ad82 commit b53688b
Show file tree
Hide file tree
Showing 16 changed files with 456 additions and 14 deletions.
20 changes: 20 additions & 0 deletions docs/user_guide/resync.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ add the ``--tables`` argument:
as partial synced.
Currently this option is available only for :ref:`tap-mysql` and :ref:`tap-postgres` to Snowflake.

.. attention::

There is an option for :ref:`tap-mysql` and :ref:`tap-postgres` to :ref:`target-snowflake` in main pipelinewise
config file for ignoring resync in a case the size of a table in the tap is greater than the defined value.
this setting is optional and even yet you can force the resync by using ``--force`` argument.

$ pipelinewise sync_tables --target <target_id> --tap <tap_id> --force

this setting can be added in the `config.yml` for checking the table size:

.. code-block:: yaml
allowed_resync_max_size:
table_mb: <integer/float>
2. **Partial resync**

If you want to partial resync a table from a specific tap then use the ``partial_sync_table`` command
Expand Down
3 changes: 3 additions & 0 deletions pipelinewise/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ def main():
parser.add_argument('--column', type=str, default='*', help='Name of the column to use as sync key in partial sync')
parser.add_argument('--start_value', type=str, default='*', help='start value of the column to partial sync')
parser.add_argument('--end_value', type=str, default=None, help='end value of the column to partial sync')
parser.add_argument('--force', default=False, required=False,
help='Force sync_tables for full sync', action='store_true'
)

args = parser.parse_args()

Expand Down
2 changes: 2 additions & 0 deletions pipelinewise/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ def build_fastsync_command(
profiling_mode: bool = False,
profiling_dir: str = None,
drop_pg_slot: bool = False,
autoresync_size: int = None
) -> str:
"""
Builds a command that starts fastsync from a given tap to a
Expand Down Expand Up @@ -449,6 +450,7 @@ def build_fastsync_command(
else '',
f'--tables {tables}' if tables else '',
'--drop_pg_slot' if drop_pg_slot else '',
f'--autoresync_size {autoresync_size}' if autoresync_size else ''
],
)
)
Expand Down
11 changes: 10 additions & 1 deletion pipelinewise/cli/pipelinewise.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(self, args, config_dir, venv_dir, profiling_dir=None):
self.TRANSFORM_FIELD_CONNECTOR_NAME
)
self.tap_run_log_file = None
self.force_fast_sync = True

# Catch SIGINT and SIGTERM to exit gracefully
for sig in [signal.SIGINT, signal.SIGTERM]:
Expand Down Expand Up @@ -1109,6 +1110,10 @@ def run_tap_fastsync(
Generating and running shell command to sync tables using the native fastsync components
"""
# Build the fastsync executable command
max_autoresync_table_size = None
if tap.type in ('tap-mysql', 'tap-postgres') and target.type == 'target-snowflake' and not self.force_fast_sync:
max_autoresync_table_size = self.config.get('allowed_resync_max_size', {}).get('table_mb')

command = commands.build_fastsync_command(
tap=tap,
target=target,
Expand All @@ -1119,6 +1124,7 @@ def run_tap_fastsync(
profiling_mode=self.profiling_mode,
profiling_dir=self.profiling_dir,
drop_pg_slot=self.drop_pg_slot,
autoresync_size=max_autoresync_table_size
)

# Fastsync is running in subprocess.
Expand Down Expand Up @@ -1167,6 +1173,8 @@ def run_tap(self):

not_partial_syned_tables = set()

self.force_fast_sync = True

self.logger.info('Running %s tap in %s target', tap_id, target_id)

# Run only if tap enabled
Expand Down Expand Up @@ -1372,6 +1380,7 @@ def sync_tables(self):
"""
This method calls do_sync_tables if sync_tables command is chosen
"""
self.force_fast_sync = self.args.force
try:
with pidfile.PIDFile(self.tap['files']['pidfile']):
self.do_sync_tables()
Expand Down Expand Up @@ -1399,7 +1408,7 @@ def do_sync_tables(self, fastsync_stream_ids=None):

if selected_tables['full_sync']:
fast_sync_process = Process(
target=self.sync_tables_fast_sync, args=(selected_tables['full_sync'],))
target=self.sync_tables_fast_sync, args=(selected_tables['full_sync'], ))
fast_sync_process.start()
processes_list.append(fast_sync_process)

Expand Down
9 changes: 9 additions & 0 deletions pipelinewise/cli/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@
"victorops": { "$ref": "#/definitions/alert_victorops" }
},
"additionalProperties": false
},
"allowed_resync_max_size": {
"type": "object",
"properties": {
"table_mb": {
"type": "number"
}
},
"required": ["table_mb"]
}
},
"required": [],
Expand Down
1 change: 1 addition & 0 deletions pipelinewise/fastsync/commons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ def parse_args(required_config_keys: Dict) -> argparse.Namespace:
help='Drop pg replication slot before starting resync',
action='store_true',
)
parser.add_argument('--autoresync_size', help='maximum value for table size to resync', )

args: argparse.Namespace = parser.parse_args()

Expand Down
31 changes: 25 additions & 6 deletions pipelinewise/fastsync/mysql_to_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from .commons import utils
from .commons.tap_mysql import FastSyncTapMySql
from .commons.target_snowflake import FastSyncTargetSnowflake
from pipelinewise.utils import (get_tables_size,
filter_out_selected_tables,
get_maximum_value_from_list_of_dicts, get_schemas_of_tables_set)

LOGGER = Logger().get_logger(__name__)

Expand Down Expand Up @@ -204,14 +207,30 @@ def main_impl():
pool_size,
)

can_run_sync = True
if args.autoresync_size:
schemas = get_schemas_of_tables_set(args.tables)
tap_obj = FastSyncTapMySql(args.tap, tap_type_to_target_type)
for schema in schemas:
all_tables_in_this_schema = get_tables_size(schema, tap_obj)
only_selected_tables = filter_out_selected_tables(all_tables_in_this_schema, args.tables)
table_with_maximum_size = get_maximum_value_from_list_of_dicts(only_selected_tables, 'table_size')
if table_with_maximum_size.get('table_size') > float(args.autoresync_size):
can_run_sync = False
table_sync_excs.append(
f're-sync can not be done because size of table '
f'`{table_with_maximum_size["table_name"]}` is greater than `{args.autoresync_size}`!'
f' Use --force argument to force sync_tables!')

# Start loading tables in parallel in spawning processes
with multiprocessing.Pool(pool_size) as proc:
table_sync_excs = list(
filter(
lambda x: not isinstance(x, bool),
proc.map(partial(sync_table, args=args), args.tables),
if can_run_sync:
with multiprocessing.Pool(pool_size) as proc:
table_sync_excs = list(
filter(
lambda x: not isinstance(x, bool),
proc.map(partial(sync_table, args=args), args.tables),
)
)
)

# Log summary
end_time = datetime.now()
Expand Down
32 changes: 26 additions & 6 deletions pipelinewise/fastsync/postgres_to_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
from .commons import utils
from .commons.tap_postgres import FastSyncTapPostgres
from .commons.target_snowflake import FastSyncTargetSnowflake
from pipelinewise.utils import (get_tables_size,
filter_out_selected_tables,
get_maximum_value_from_list_of_dicts, get_schemas_of_tables_set)


LOGGER = Logger().get_logger(__name__)

Expand Down Expand Up @@ -201,18 +205,34 @@ def main_impl():
pool_size,
)

can_run_sync = True
if args.autoresync_size:
schemas = get_schemas_of_tables_set(args.tables)
tap_obj = FastSyncTapPostgres(args.tap, tap_type_to_target_type)
for schema in schemas:
all_tables_in_this_schema = get_tables_size(schema, tap_obj)
only_selected_tables = filter_out_selected_tables(all_tables_in_this_schema, args.tables)
table_with_maximum_size = get_maximum_value_from_list_of_dicts(only_selected_tables, 'table_size')
if table_with_maximum_size.get('table_size') > float(args.autoresync_size):
can_run_sync = False
table_sync_excs.append(
f're-sync can not be done because size of table '
f'`{table_with_maximum_size["table_name"]}` is greater than `{args.autoresync_size}`!'
f' Use --force argument to force sync_tables!')

# if internal arg drop_pg_slot is set to True, then we drop the slot before starting resync
if args.drop_pg_slot:
FastSyncTapPostgres.drop_slot(args.tap)

# Start loading tables in parallel in spawning processes
with multiprocessing.Pool(pool_size) as proc:
table_sync_excs = list(
filter(
lambda x: not isinstance(x, bool),
proc.map(partial(sync_table, args=args), args.tables),
if can_run_sync:
with multiprocessing.Pool(pool_size) as proc:
table_sync_excs = list(
filter(
lambda x: not isinstance(x, bool),
proc.map(partial(sync_table, args=args), args.tables),
)
)
)

# Log summary
end_time = datetime.now()
Expand Down
67 changes: 67 additions & 0 deletions pipelinewise/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,70 @@ def safe_column_name(
if name:
return f'{quote_character}{name.upper()}{quote_character}'
return name


def get_tables_size(schema: str, tap) -> dict:
"""get tables size"""
result = []
if 'FastSyncTapMySql' in str(type(tap)):
tap.open_connections()
result_list = tap.query(
'select TABLE_NAME as table_name,'
' TABLE_ROWS as table_rows,'
' (DATA_LENGTH + INDEX_LENGTH)/ 1024 / 1024 as table_size'
f' from information_schema.TABLES where TABLE_SCHEMA = \'{schema}\';')
tap.close_connections()
for res in result_list:
result.append({
'table_name': f'{schema}.{res["table_name"]}',
'table_rows': res['table_rows'],
'table_size': res['table_size']
})

if 'FastSyncTapPostgres' in str(type(tap)):
tap.open_connection()
result_list = tap.query(
'SELECT TABLE_NAME as table_name,'
' (xpath(\'/row/c/text()\','
' query_to_xml(format(\'select count(*) as c from %I.%I\', table_schema, TABLE_NAME), FALSE, TRUE, \'\'))'
')[1]::text::int AS table_rows,'
' pg_total_relation_size('
'\'"\'||table_schema||\'"."\'||table_name||\'"\')::NUMERIC/1024::NUMERIC/1024 as table_size '
'FROM (SELECT table_schema, TABLE_NAME FROM information_schema.tables '
f'WHERE TABLE_NAME not like \'pg_%\' AND table_schema in (\'{schema}\')) as tb'
)
for res in result_list:
result.append({
'table_name': f'{schema}.{res[0]}',
'table_rows': res[1],
'table_size': res[2]
})
tap.close_connection()
return result


def get_maximum_value_from_list_of_dicts(list_of_dicts: list, key: str) -> Optional[int]:
"""get maximum value from list of dicts"""
try:
return max(list_of_dicts, key=lambda x: x.get(key))
except Exception:
return None


def filter_out_selected_tables(all_schema_tables: list, selected_tables: set) -> list:
"""filter out selected tables"""
filtered_tables = []
for table in selected_tables:
found_table = next((item for item in all_schema_tables if item['table_name'] == table), None)
if found_table:
filtered_tables.append(found_table)
return filtered_tables


def get_schemas_of_tables_set(set_of_tables: set) -> set:
"""get schema from set of tables"""
set_of_schemas = set()
for table in set_of_tables:
schema = table.split('.')[0]
set_of_schemas.add(schema)
return set_of_schemas
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
'dnspython==2.1.*',
'boto3>=1.21,<1.27',
'chardet==4.0.0',
'backports.tarfile==1.2.0'
],
extras_require={
'test': [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os

from tests.end_to_end.target_snowflake.tap_mariadb import TapMariaDB
from tests.end_to_end.helpers import tasks
from tests.end_to_end.target_snowflake import TEST_PROJECTS_DIR_PATH


TAP_ID = 'mariadb_to_sf'
TARGET_ID = 'snowflake'


def _create_ppw_config_file(table_mb):
with open(f'{TEST_PROJECTS_DIR_PATH}/config.yml', 'w', encoding='utf-8') as config_file:
config_file.write('allowed_resync_max_size:\n')
config_file.write(f' table_mb: {table_mb}\n')

[return_code, _, _] = tasks.run_command(f'pipelinewise import_config --dir {TEST_PROJECTS_DIR_PATH}')
assert return_code == 0


class TestResyncMariaDBToSF(TapMariaDB):
"""Test Resync MariaDB to SF."""
def setUp(self, *args, **kwargs): # pylint: disable = unused-argument
super().setUp(tap_id=TAP_ID, target_id=TARGET_ID)

def tearDown(self):
try:
os.remove(f'{TEST_PROJECTS_DIR_PATH}/config.yml')
except OSError:
pass
super().tearDown()

def test_resync_mariadb_to_sf_if_table_size_greater_than_limit(self): # pylint: disable = no-self-use
"""test resync mariadb to SF returns error 1 if table size is greater than the limit"""

a_small_number = 0.001 # Mb
_create_ppw_config_file(table_mb=a_small_number)

command = f'pipelinewise sync_tables --tap {TAP_ID} --target {TARGET_ID}'

[return_code, _, _] = tasks.run_command(command)

assert return_code == 1

def test_resync_mariadb_to_sf_if_table_size_less_than_limit(self): # pylint: disable = no-self-use
"""test resync mariadb to SF returns error if table size is less than the limit"""
a_big_number = 10000 #Mb
_create_ppw_config_file(table_mb=a_big_number)

command = f'pipelinewise sync_tables --tap {TAP_ID} --target {TARGET_ID}'
[return_code, _, _] = tasks.run_command(command)

assert return_code == 0

def test_resync_mariadb_to_sf_if_table_size_greater_than_limit_and_force(self): # pylint: disable = no-self-use
"""test resync mariadb to SF returns error if table size is greater than the limit and --force is used"""
a_small_number = 0.001 # Mb
_create_ppw_config_file(table_mb=a_small_number)

command = f'pipelinewise sync_tables --tap {TAP_ID} --target {TARGET_ID} --force'

[return_code, _, _] = tasks.run_command(command)

assert return_code == 0

def test_run_tap_mariadb_to_sf_if_size_greater_than_limit(self): # pylint: disable = no-self-use
"""test run_tap mariadb to sf if table size is greater than the limit"""
a_small_number = 0.001 # Mb
_create_ppw_config_file(table_mb=a_small_number)

command = f'pipelinewise run_tap --tap {TAP_ID} --target {TARGET_ID}'

[return_code, _, _] = tasks.run_command(command)

assert return_code == 0
Loading

0 comments on commit b53688b

Please sign in to comment.