Skip to content

Commit

Permalink
create query for cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
AlvaroMarquesAndrade committed Feb 18, 2021
1 parent a59030d commit e524c5c
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 11 deletions.
53 changes: 49 additions & 4 deletions butterfree/migrations/cassandra_migration.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,68 @@
"""Cassandra Migration entity."""

import warnings
from typing import Any, Dict, List

from butterfree.migrations import DatabaseMigration
from butterfree.configs.db import CassandraConfig
from butterfree.migrations.migration import DatabaseMigration


class CassandraMigration(DatabaseMigration):
"""Cassandra class for Migrations."""

@staticmethod
def _get_alter_table_query(columns: List[Dict[str, Any]], table_name: str) -> str:
parsed_columns = []
for col in columns:
parsed_columns.append(f"{col['column_name']} {col['type']}")

parsed_columns = ", ".join(parsed_columns) # type: ignore

return f"ALTER TABLE {table_name} " f"ADD ({parsed_columns});"

@staticmethod
def _get_create_table_query(columns: List[Dict[str, Any]], table_name: str,) -> str:
"""Creates CQL statement to create a table."""
parsed_columns = []
primary_keys = []

for col in columns:
col_str = f"{col['column_name']} {col['type']}"
if col["primary_key"]:
primary_keys.append(col["column_name"])
parsed_columns.append(col_str)

joined_parsed_columns = ", ".join(parsed_columns)

if len(primary_keys) > 0:
joined_primary_keys = ", ".join(primary_keys)
columns_str = (
f"{joined_parsed_columns}, PRIMARY KEY ({joined_primary_keys})"
)
else:
columns_str = joined_parsed_columns

keyspace = CassandraConfig().keyspace

return f"CREATE TABLE {keyspace}.{table_name} " f"({columns_str});"

def create_query(
self,
fs_schema: List[Dict[str, Any]],
db_schema: List[Dict[str, Any]],
table_name: str,
db_schema: List[Dict[str, Any]] = None,
schema_diff: List[Dict[str, Any]] = None,
) -> Any:
"""Create a query regarding Cassandra.
Returns:
Schema object.
"""
pass
if not schema_diff:
warnings.warn("No migration was performed", UserWarning, stacklevel=1)
return

if not db_schema:
return self._get_create_table_query(schema_diff, table_name)

return self._get_alter_table_query(schema_diff, table_name)
6 changes: 3 additions & 3 deletions butterfree/migrations/metastore_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@

from typing import Any, Dict, List

from butterfree.migrations import DatabaseMigration
from butterfree.migrations.migration import DatabaseMigration


class MetastoreMigration(DatabaseMigration):
"""Metastore class for Migrations."""

def create_query(
self,
fs_schema: List[Dict[str, Any]],
db_schema: List[Dict[str, Any]],
table_name: str,
db_schema: List[Dict[str, Any]] = None,
schema_diff: List[Dict[str, Any]] = None,
) -> Any:
"""Create a query regarding Metastore.
Expand Down
8 changes: 4 additions & 4 deletions butterfree/migrations/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ class DatabaseMigration(ABC):
@abstractmethod
def create_query(
self,
fs_schema: List[Dict[str, Any]],
db_schema: List[Dict[str, Any]],
table_name: str,
db_schema: List[Dict[str, Any]] = None,
schema_diff: List[Dict[str, Any]] = None,
) -> Any:
"""Create a query regarding a data source.
Expand Down Expand Up @@ -46,10 +46,10 @@ def _get_schema(self, db_client: Callable, table_name: str) -> List[Dict[str, An
pass

def _apply_migration(self, query: str, db_client: Callable) -> None:
"""Apply the migration in the respective database."""
"""Apply the migrations in the respective database."""

def _send_logs_to_s3(self) -> None:
"""Send all migration logs to S3."""
"""Send all migrations logs to S3."""
pass

def run(self, pipelines: List[FeatureSetPipeline]) -> None:
Expand Down
Empty file.
32 changes: 32 additions & 0 deletions tests/unit/butterfree/migrations/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from pytest import fixture


@fixture
def dummy_db_schema():
dummy_db_schema = [
{"column_name": "id", "type": "int", "primary_key": True},
{"column_name": "platform", "type": "text", "primary_key": False},
{"column_name": "ts", "type": "bigint", "primary_key": False},
]
return dummy_db_schema


@fixture
def dummy_schema_diff():
dummy_schema_diff = [
{"column_name": "kappa_column", "type": "int", "primary_key": False},
{"column_name": "pogchamp", "type": "uuid", "primary_key": False},
]
return dummy_schema_diff


@fixture
def dummy_schema():
dummy_schema = [
{"column_name": "id", "type": "int", "primary_key": True},
{"column_name": "platform", "type": "text", "primary_key": False},
{"column_name": "ts", "type": "bigint", "primary_key": False},
{"column_name": "kappa_column", "type": "int", "primary_key": False},
{"column_name": "pogchamp", "type": "uuid", "primary_key": False},
]
return dummy_schema
39 changes: 39 additions & 0 deletions tests/unit/butterfree/migrations/test_cassandra_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from butterfree.migrations import CassandraMigration


class TestCassandraMigration:
def test_alter_table_query(self, dummy_db_schema, dummy_schema_diff):
cassandra_migration = CassandraMigration()

expected_query = "ALTER TABLE test ADD (kappa_column int, pogchamp uuid);"
query = cassandra_migration.create_query(
"test", dummy_db_schema, dummy_schema_diff
)

assert isinstance(query, str)
assert query, expected_query

def test_create_table_query(self, dummy_db_schema):

cassandra_migration = CassandraMigration()

expected_query = (
"CREATE TABLE test.test_table "
"(id int, platform text, ts bigint, PRIMARY KEY (id));"
)
query = cassandra_migration.create_query(
table_name="test_table", schema_diff=dummy_db_schema
)

assert isinstance(query, str)
assert query, expected_query

def test_no_diff(self, dummy_db_schema):

cassandra_migration = CassandraMigration()

query = cassandra_migration.create_query(
table_name="test_table", db_schema=dummy_db_schema,
)

assert query is None

0 comments on commit e524c5c

Please sign in to comment.