Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AP-1772] Added size check option for sync_tables command #1170

Merged
merged 23 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/user_guide/resync.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,26 @@ add the ``--tables`` argument:
as partial synced.
Currently this option is available only for :ref:`tap-mysql` and :ref:`tap-postgres` to Snowflake.

.. attention::

There is an option for :ref:`tap-mysql` and :ref:`tap-postgres` to :ref:`target-snowflake` in main pipelinewise
config file for ignoring resync in a case the size of a table in the tap is greater than the defined value.
this setting is optional and even yet you can force the resync by using ``--force`` argument.

$ pipelinewise sync_tables --target <target_id> --tap <tap_id> --force

this setting can be added in the `config.yml` for checking the table size:

.. code-block:: yaml

allowed_resync_max_size:
table_mb: <integer/float>






2. **Partial resync**

If you want to partial resync a table from a specific tap then use the ``partial_sync_table`` command
Expand Down
3 changes: 3 additions & 0 deletions pipelinewise/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ def main():
parser.add_argument('--column', type=str, default='*', help='Name of the column to use as sync key in partial sync')
parser.add_argument('--start_value', type=str, default='*', help='start value of the column to partial sync')
parser.add_argument('--end_value', type=str, default=None, help='end value of the column to partial sync')
parser.add_argument('--force', default=False, required=False,
help='Force sync_tables for full sync', action='store_true'
)

args = parser.parse_args()

Expand Down
2 changes: 2 additions & 0 deletions pipelinewise/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ def build_fastsync_command(
profiling_mode: bool = False,
profiling_dir: str = None,
drop_pg_slot: bool = False,
autoresync_size: int = None
) -> str:
"""
Builds a command that starts fastsync from a given tap to a
Expand Down Expand Up @@ -449,6 +450,7 @@ def build_fastsync_command(
else '',
f'--tables {tables}' if tables else '',
'--drop_pg_slot' if drop_pg_slot else '',
f'--autoresync_size {autoresync_size}' if autoresync_size else ''
],
)
)
Expand Down
11 changes: 10 additions & 1 deletion pipelinewise/cli/pipelinewise.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def __init__(self, args, config_dir, venv_dir, profiling_dir=None):
self.TRANSFORM_FIELD_CONNECTOR_NAME
)
self.tap_run_log_file = None
self.force_fast_sync = True

# Catch SIGINT and SIGTERM to exit gracefully
for sig in [signal.SIGINT, signal.SIGTERM]:
Expand Down Expand Up @@ -1109,6 +1110,10 @@ def run_tap_fastsync(
Generating and running shell command to sync tables using the native fastsync components
"""
# Build the fastsync executable command
max_autoresync_table_size = None
if tap.type in ('tap-mysql', 'tap-postgres') and target.type == 'target-snowflake' and not self.force_fast_sync:
max_autoresync_table_size = self.config.get('allowed_resync_max_size', {}).get('table_mb')

command = commands.build_fastsync_command(
tap=tap,
target=target,
Expand All @@ -1119,6 +1124,7 @@ def run_tap_fastsync(
profiling_mode=self.profiling_mode,
profiling_dir=self.profiling_dir,
drop_pg_slot=self.drop_pg_slot,
autoresync_size=max_autoresync_table_size
)

# Fastsync is running in subprocess.
Expand Down Expand Up @@ -1167,6 +1173,8 @@ def run_tap(self):

not_partial_syned_tables = set()

self.force_fast_sync = True

self.logger.info('Running %s tap in %s target', tap_id, target_id)

# Run only if tap enabled
Expand Down Expand Up @@ -1372,6 +1380,7 @@ def sync_tables(self):
"""
This method calls do_sync_tables if sync_tables command is chosen
"""
self.force_fast_sync = self.args.force
try:
with pidfile.PIDFile(self.tap['files']['pidfile']):
self.do_sync_tables()
Expand Down Expand Up @@ -1399,7 +1408,7 @@ def do_sync_tables(self, fastsync_stream_ids=None):

if selected_tables['full_sync']:
fast_sync_process = Process(
target=self.sync_tables_fast_sync, args=(selected_tables['full_sync'],))
target=self.sync_tables_fast_sync, args=(selected_tables['full_sync'], ))
fast_sync_process.start()
processes_list.append(fast_sync_process)

Expand Down
9 changes: 9 additions & 0 deletions pipelinewise/cli/schemas/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@
"victorops": { "$ref": "#/definitions/alert_victorops" }
},
"additionalProperties": false
},
"allowed_resync_max_size": {
"type": "object",
"properties": {
"table_mb": {
"type": "number"
}
},
"required": ["table_mb"]
}
},
"required": [],
Expand Down
1 change: 1 addition & 0 deletions pipelinewise/fastsync/commons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ def parse_args(required_config_keys: Dict) -> argparse.Namespace:
help='Drop pg replication slot before starting resync',
action='store_true',
)
parser.add_argument('--autoresync_size', help='maximum value for table size to resync', )

args: argparse.Namespace = parser.parse_args()

Expand Down
31 changes: 25 additions & 6 deletions pipelinewise/fastsync/mysql_to_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from .commons import utils
from .commons.tap_mysql import FastSyncTapMySql
from .commons.target_snowflake import FastSyncTargetSnowflake
from pipelinewise.utils import (get_tables_size,
filter_out_selected_tables,
get_maximum_value_from_list_of_dicts, get_schemas_of_tables_set)

LOGGER = Logger().get_logger(__name__)

Expand Down Expand Up @@ -204,14 +207,30 @@ def main_impl():
pool_size,
)

can_run_sync = True
if args.autoresync_size:
schemas = get_schemas_of_tables_set(args.tables)
tap_obj = FastSyncTapMySql(args.tap, tap_type_to_target_type)
for schema in schemas:
all_tables_in_this_schema = get_tables_size(schema, tap_obj)
only_selected_tables = filter_out_selected_tables(all_tables_in_this_schema, args.tables)
table_with_maximum_size = get_maximum_value_from_list_of_dicts(only_selected_tables, 'table_size')
if table_with_maximum_size.get('table_size') > float(args.autoresync_size):
can_run_sync = False
table_sync_excs.append(
f're-sync can not be done because size of table '
f'`{table_with_maximum_size["table_name"]}` is greater than `{args.autoresync_size}`!'
f' Use --force argument to force sync_tables!')

# Start loading tables in parallel in spawning processes
with multiprocessing.Pool(pool_size) as proc:
table_sync_excs = list(
filter(
lambda x: not isinstance(x, bool),
proc.map(partial(sync_table, args=args), args.tables),
if can_run_sync:
with multiprocessing.Pool(pool_size) as proc:
table_sync_excs = list(
filter(
lambda x: not isinstance(x, bool),
proc.map(partial(sync_table, args=args), args.tables),
)
)
)

# Log summary
end_time = datetime.now()
Expand Down
32 changes: 26 additions & 6 deletions pipelinewise/fastsync/postgres_to_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
from .commons import utils
from .commons.tap_postgres import FastSyncTapPostgres
from .commons.target_snowflake import FastSyncTargetSnowflake
from pipelinewise.utils import (get_tables_size,
filter_out_selected_tables,
get_maximum_value_from_list_of_dicts, get_schemas_of_tables_set)


LOGGER = Logger().get_logger(__name__)

Expand Down Expand Up @@ -201,18 +205,34 @@ def main_impl():
pool_size,
)

can_run_sync = True
if args.autoresync_size:
schemas = get_schemas_of_tables_set(args.tables)
tap_obj = FastSyncTapPostgres(args.tap, tap_type_to_target_type)
for schema in schemas:
all_tables_in_this_schema = get_tables_size(schema, tap_obj)
only_selected_tables = filter_out_selected_tables(all_tables_in_this_schema, args.tables)
table_with_maximum_size = get_maximum_value_from_list_of_dicts(only_selected_tables, 'table_size')
if table_with_maximum_size.get('table_size') > float(args.autoresync_size):
can_run_sync = False
table_sync_excs.append(
f're-sync can not be done because size of table '
f'`{table_with_maximum_size["table_name"]}` is greater than `{args.autoresync_size}`!'
f' Use --force argument to force sync_tables!')

# if internal arg drop_pg_slot is set to True, then we drop the slot before starting resync
if args.drop_pg_slot:
FastSyncTapPostgres.drop_slot(args.tap)

# Start loading tables in parallel in spawning processes
with multiprocessing.Pool(pool_size) as proc:
table_sync_excs = list(
filter(
lambda x: not isinstance(x, bool),
proc.map(partial(sync_table, args=args), args.tables),
if can_run_sync:
with multiprocessing.Pool(pool_size) as proc:
table_sync_excs = list(
filter(
lambda x: not isinstance(x, bool),
proc.map(partial(sync_table, args=args), args.tables),
)
)
)

# Log summary
end_time = datetime.now()
Expand Down
67 changes: 67 additions & 0 deletions pipelinewise/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,70 @@ def safe_column_name(
if name:
return f'{quote_character}{name.upper()}{quote_character}'
return name


def get_tables_size(schema: str, tap) -> dict:
"""get tables size"""
result = []
if 'FastSyncTapMySql' in str(type(tap)):
tap.open_connections()
result_list = tap.query(
'select TABLE_NAME as table_name,'
' TABLE_ROWS as table_rows,'
' (DATA_LENGTH + INDEX_LENGTH)/ 1024 / 1024 as table_size'
f' from information_schema.TABLES where TABLE_SCHEMA = \'{schema}\';')
tap.close_connections()
for res in result_list:
result.append({
'table_name': f'{schema}.{res["table_name"]}',
'table_rows': res['table_rows'],
'table_size': res['table_size']
})

if 'FastSyncTapPostgres' in str(type(tap)):
tap.open_connection()
result_list = tap.query(
'SELECT TABLE_NAME as table_name,'
' (xpath(\'/row/c/text()\','
' query_to_xml(format(\'select count(*) as c from %I.%I\', table_schema, TABLE_NAME), FALSE, TRUE, \'\'))'
')[1]::text::int AS table_rows,'
' pg_total_relation_size('
'\'"\'||table_schema||\'"."\'||table_name||\'"\')::NUMERIC/1024::NUMERIC/1024 as table_size '
'FROM (SELECT table_schema, TABLE_NAME FROM information_schema.tables '
f'WHERE TABLE_NAME not like \'pg_%\' AND table_schema in (\'{schema}\')) as tb'
)
for res in result_list:
result.append({
'table_name': f'{schema}.{res[0]}',
'table_rows': res[1],
'table_size': res[2]
})
tap.close_connection()
return result


def get_maximum_value_from_list_of_dicts(list_of_dicts: list, key: str) -> Optional[int]:
"""get maximum value from list of dicts"""
try:
return max(list_of_dicts, key=lambda x: x.get(key))
except Exception:
return None


def filter_out_selected_tables(all_schema_tables: list, selected_tables: set) -> list:
"""filter out selected tables"""
filtered_tables = []
for table in selected_tables:
found_table = next((item for item in all_schema_tables if item['table_name'] == table), None)
if found_table:
filtered_tables.append(found_table)
return filtered_tables


def get_schemas_of_tables_set(set_of_tables: set) -> set:
"""get schema from set of tables"""
set_of_schemas = set()
for table in set_of_tables:
schema = table.split('.')[0]
set_of_schemas.add(schema)
return set_of_schemas
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
'dnspython==2.1.*',
'boto3>=1.21,<1.27',
'chardet==4.0.0',
'backports.tarfile==1.2.0'
],
extras_require={
'test': [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import os

from tests.end_to_end.target_snowflake.tap_mariadb import TapMariaDB
from tests.end_to_end.helpers import tasks
from tests.end_to_end.target_snowflake import TEST_PROJECTS_DIR_PATH


TAP_ID = 'mariadb_to_sf'
TARGET_ID = 'snowflake'


def _create_ppw_config_file(table_mb):
with open(f'{TEST_PROJECTS_DIR_PATH}/config.yml', 'w', encoding='utf-8') as config_file:
config_file.write('allowed_resync_max_size:\n')
config_file.write(f' table_mb: {table_mb}\n')

[return_code, _, _] = tasks.run_command(f'pipelinewise import_config --dir {TEST_PROJECTS_DIR_PATH}')
assert return_code == 0


class TestResyncMariaDBToSF(TapMariaDB):
"""Test Resync MariaDB to SF."""
def setUp(self, *args, **kwargs): # pylint: disable = unused-argument
super().setUp(tap_id=TAP_ID, target_id=TARGET_ID)

def tearDown(self):
try:
os.remove(f'{TEST_PROJECTS_DIR_PATH}/config.yml')
except OSError:
pass
super().tearDown()

def test_resync_mariadb_to_sf_if_table_size_greater_than_limit(self): # pylint: disable = no-self-use
"""test resync mariadb to SF returns error 1 if table size is greater than the limit"""

a_small_number = 0.001 # Mb
_create_ppw_config_file(table_mb=a_small_number)

command = f'pipelinewise sync_tables --tap {TAP_ID} --target {TARGET_ID}'

[return_code, _, _] = tasks.run_command(command)

assert return_code == 1

def test_resync_mariadb_to_sf_if_table_size_less_than_limit(self): # pylint: disable = no-self-use
"""test resync mariadb to SF returns error if table size is less than the limit"""
a_big_number = 10000 #Mb
_create_ppw_config_file(table_mb=a_big_number)

command = f'pipelinewise sync_tables --tap {TAP_ID} --target {TARGET_ID}'
[return_code, _, _] = tasks.run_command(command)

assert return_code == 0

def test_resync_mariadb_to_sf_if_table_size_greater_than_limit_and_force(self): # pylint: disable = no-self-use
"""test resync mariadb to SF returns error if table size is greater than the limit and --force is used"""
a_small_number = 0.001 # Mb
_create_ppw_config_file(table_mb=a_small_number)

command = f'pipelinewise sync_tables --tap {TAP_ID} --target {TARGET_ID} --force'

[return_code, _, _] = tasks.run_command(command)

assert return_code == 0

def test_run_tap_mariadb_to_sf_if_size_greater_than_limit(self): # pylint: disable = no-self-use
"""test run_tap mariadb to sf if table size is greater than the limit"""
a_small_number = 0.001 # Mb
_create_ppw_config_file(table_mb=a_small_number)

command = f'pipelinewise run_tap --tap {TAP_ID} --target {TARGET_ID}'

[return_code, _, _] = tasks.run_command(command)

assert return_code == 0
Loading
Loading