From e524c5c178a354d70e9ca441978658b566f5758f Mon Sep 17 00:00:00 2001 From: AlvaroMarquesAndrade <1a789766b1c4c8b679e80f11fa6d63d42fa4bcdf> Date: Thu, 18 Feb 2021 17:22:25 -0300 Subject: [PATCH] create query for cassandra --- butterfree/migrations/cassandra_migration.py | 53 +++++++++++++++++-- butterfree/migrations/metastore_migration.py | 6 +-- butterfree/migrations/migration.py | 8 +-- tests/unit/butterfree/migrations/__init__.py | 0 tests/unit/butterfree/migrations/conftest.py | 32 +++++++++++ .../migrations/test_cassandra_migration.py | 39 ++++++++++++++ 6 files changed, 127 insertions(+), 11 deletions(-) create mode 100644 tests/unit/butterfree/migrations/__init__.py create mode 100644 tests/unit/butterfree/migrations/conftest.py create mode 100644 tests/unit/butterfree/migrations/test_cassandra_migration.py diff --git a/butterfree/migrations/cassandra_migration.py b/butterfree/migrations/cassandra_migration.py index e9cecdc7..dd7378be 100644 --- a/butterfree/migrations/cassandra_migration.py +++ b/butterfree/migrations/cassandra_migration.py @@ -1,18 +1,56 @@ """Cassandra Migration entity.""" +import warnings from typing import Any, Dict, List -from butterfree.migrations import DatabaseMigration +from butterfree.configs.db import CassandraConfig +from butterfree.migrations.migration import DatabaseMigration class CassandraMigration(DatabaseMigration): """Cassandra class for Migrations.""" + @staticmethod + def _get_alter_table_query(columns: List[Dict[str, Any]], table_name: str) -> str: + parsed_columns = [] + for col in columns: + parsed_columns.append(f"{col['column_name']} {col['type']}") + + parsed_columns = ", ".join(parsed_columns) # type: ignore + + return f"ALTER TABLE {table_name} " f"ADD ({parsed_columns});" + + @staticmethod + def _get_create_table_query(columns: List[Dict[str, Any]], table_name: str,) -> str: + """Creates CQL statement to create a table.""" + parsed_columns = [] + primary_keys = [] + + for col in columns: + col_str = f"{col['column_name']} {col['type']}" + if col["primary_key"]: + primary_keys.append(col["column_name"]) + parsed_columns.append(col_str) + + joined_parsed_columns = ", ".join(parsed_columns) + + if len(primary_keys) > 0: + joined_primary_keys = ", ".join(primary_keys) + columns_str = ( + f"{joined_parsed_columns}, PRIMARY KEY ({joined_primary_keys})" + ) + else: + columns_str = joined_parsed_columns + + keyspace = CassandraConfig().keyspace + + return f"CREATE TABLE {keyspace}.{table_name} " f"({columns_str});" + def create_query( self, - fs_schema: List[Dict[str, Any]], - db_schema: List[Dict[str, Any]], table_name: str, + db_schema: List[Dict[str, Any]] = None, + schema_diff: List[Dict[str, Any]] = None, ) -> Any: """Create a query regarding Cassandra. @@ -20,4 +58,11 @@ def create_query( Schema object. """ - pass + if not schema_diff: + warnings.warn("No migration was performed", UserWarning, stacklevel=1) + return + + if not db_schema: + return self._get_create_table_query(schema_diff, table_name) + + return self._get_alter_table_query(schema_diff, table_name) diff --git a/butterfree/migrations/metastore_migration.py b/butterfree/migrations/metastore_migration.py index bb208f2a..9ccd7426 100644 --- a/butterfree/migrations/metastore_migration.py +++ b/butterfree/migrations/metastore_migration.py @@ -2,7 +2,7 @@ from typing import Any, Dict, List -from butterfree.migrations import DatabaseMigration +from butterfree.migrations.migration import DatabaseMigration class MetastoreMigration(DatabaseMigration): @@ -10,9 +10,9 @@ class MetastoreMigration(DatabaseMigration): def create_query( self, - fs_schema: List[Dict[str, Any]], - db_schema: List[Dict[str, Any]], table_name: str, + db_schema: List[Dict[str, Any]] = None, + schema_diff: List[Dict[str, Any]] = None, ) -> Any: """Create a query regarding Metastore. diff --git a/butterfree/migrations/migration.py b/butterfree/migrations/migration.py index c53945bf..a86afa28 100644 --- a/butterfree/migrations/migration.py +++ b/butterfree/migrations/migration.py @@ -12,9 +12,9 @@ class DatabaseMigration(ABC): @abstractmethod def create_query( self, - fs_schema: List[Dict[str, Any]], - db_schema: List[Dict[str, Any]], table_name: str, + db_schema: List[Dict[str, Any]] = None, + schema_diff: List[Dict[str, Any]] = None, ) -> Any: """Create a query regarding a data source. @@ -46,10 +46,10 @@ def _get_schema(self, db_client: Callable, table_name: str) -> List[Dict[str, An pass def _apply_migration(self, query: str, db_client: Callable) -> None: - """Apply the migration in the respective database.""" + """Apply the migrations in the respective database.""" def _send_logs_to_s3(self) -> None: - """Send all migration logs to S3.""" + """Send all migrations logs to S3.""" pass def run(self, pipelines: List[FeatureSetPipeline]) -> None: diff --git a/tests/unit/butterfree/migrations/__init__.py b/tests/unit/butterfree/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/butterfree/migrations/conftest.py b/tests/unit/butterfree/migrations/conftest.py new file mode 100644 index 00000000..8f25b424 --- /dev/null +++ b/tests/unit/butterfree/migrations/conftest.py @@ -0,0 +1,32 @@ +from pytest import fixture + + +@fixture +def dummy_db_schema(): + dummy_db_schema = [ + {"column_name": "id", "type": "int", "primary_key": True}, + {"column_name": "platform", "type": "text", "primary_key": False}, + {"column_name": "ts", "type": "bigint", "primary_key": False}, + ] + return dummy_db_schema + + +@fixture +def dummy_schema_diff(): + dummy_schema_diff = [ + {"column_name": "kappa_column", "type": "int", "primary_key": False}, + {"column_name": "pogchamp", "type": "uuid", "primary_key": False}, + ] + return dummy_schema_diff + + +@fixture +def dummy_schema(): + dummy_schema = [ + {"column_name": "id", "type": "int", "primary_key": True}, + {"column_name": "platform", "type": "text", "primary_key": False}, + {"column_name": "ts", "type": "bigint", "primary_key": False}, + {"column_name": "kappa_column", "type": "int", "primary_key": False}, + {"column_name": "pogchamp", "type": "uuid", "primary_key": False}, + ] + return dummy_schema diff --git a/tests/unit/butterfree/migrations/test_cassandra_migration.py b/tests/unit/butterfree/migrations/test_cassandra_migration.py new file mode 100644 index 00000000..7fe6b3ba --- /dev/null +++ b/tests/unit/butterfree/migrations/test_cassandra_migration.py @@ -0,0 +1,39 @@ +from butterfree.migrations import CassandraMigration + + +class TestCassandraMigration: + def test_alter_table_query(self, dummy_db_schema, dummy_schema_diff): + cassandra_migration = CassandraMigration() + + expected_query = "ALTER TABLE test ADD (kappa_column int, pogchamp uuid);" + query = cassandra_migration.create_query( + "test", dummy_db_schema, dummy_schema_diff + ) + + assert isinstance(query, str) + assert query, expected_query + + def test_create_table_query(self, dummy_db_schema): + + cassandra_migration = CassandraMigration() + + expected_query = ( + "CREATE TABLE test.test_table " + "(id int, platform text, ts bigint, PRIMARY KEY (id));" + ) + query = cassandra_migration.create_query( + table_name="test_table", schema_diff=dummy_db_schema + ) + + assert isinstance(query, str) + assert query, expected_query + + def test_no_diff(self, dummy_db_schema): + + cassandra_migration = CassandraMigration() + + query = cassandra_migration.create_query( + table_name="test_table", db_schema=dummy_db_schema, + ) + + assert query is None