Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScyllaDB Connector #4946

Merged
merged 17 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The types of changes are:
## [Unreleased](https://github.com/ethyca/fides/compare/2.38.0...main)

### Added
- Adds the start of the Scylla DB Integration [#4946](https://github.com/ethyca/fides/pull/4946)
- Added model and data migrations and CRUD-layer operations for property-specific messaging [#4901](https://github.com/ethyca/fides/pull/4901)
- Added option in FidesJS SDK to only disable notice-served API [#4965](https://github.com/ethyca/fides/pull/4965)
- External ID support for consent management [#4927](https://github.com/ethyca/fides/pull/4927)
Expand Down
1 change: 1 addition & 0 deletions clients/admin-ui/public/images/connector-logos/scylla.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export const CONNECTION_TYPE_LOGO_MAP = new Map<ConnectionType, string>([
[ConnectionType.MYSQL, "mysql.svg"],
[ConnectionType.POSTGRES, "postgres.svg"],
[ConnectionType.REDSHIFT, "redshift.svg"],
[ConnectionType.SCYLLA, "scylla.svg"],
[ConnectionType.SNOWFLAKE, "snowflake.svg"],
[ConnectionType.SOVRN, "sovrn.svg"],
[ConnectionType.TIMESCALE, "timescaledb.svg"],
Expand Down
1 change: 1 addition & 0 deletions clients/admin-ui/src/types/api/models/ConnectionType.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export enum ConnectionType {
MANUAL_WEBHOOK = "manual_webhook",
TIMESCALE = "timescale",
FIDES = "fides",
SCYLLA = "scylla",
GENERIC_ERASURE_EMAIL = "generic_erasure_email",
GENERIC_CONSENT_EMAIL = "generic_consent_email",
}
15 changes: 15 additions & 0 deletions docker/docker-compose.integration-scylladb.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
services:
scylladb_example:
image: bitnami/scylladb:6.0.0
pattisdr marked this conversation as resolved.
Show resolved Hide resolved
container_name: scylladb_example
environment:
- SCYLLADB_PASSWORD_SEEDER=yes
- SCYLLADB_USER=scylla_user
- SCYLLADB_PASSWORD=scylla_pass
- SCYLLADB_AUTHENTICATOR=PasswordAuthenticator
expose:
- 9042
volumes:
- ./docker/sample_data/scylla:/docker-entrypoint-initdb.d
ports:
- "9042:9042"
35 changes: 35 additions & 0 deletions docker/sample_data/scylla/scylla_example.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
CREATE KEYSPACE IF NOT EXISTS app_keyspace WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 3};
USE app_keyspace;

CREATE TABLE if not exists users (
user_id INT PRIMARY KEY,
name TEXT,
age INT,
email TEXT
);

INSERT INTO users (user_id, name, age, email) VALUES (1, 'John', 41, 'customer-1@example.com') IF NOT EXISTS;
INSERT INTO users (user_id, name, age, email) VALUES (2, 'Jane', 38, 'jane@example.com') IF NOT EXISTS;
INSERT INTO users (user_id, name, age, email) VALUES (3, 'Marguerite', 27, 'customer-2@example.com') IF NOT EXISTS;
INSERT INTO users (user_id, name, age, email) VALUES (4, 'Lafayette', 55, 'customer-3@example.com') IF NOT EXISTS;
INSERT INTO users (user_id, name, age, email) VALUES (5, 'Manuel', 23, 'customer-4@example.com') IF NOT EXISTS;


CREATE KEYSPACE IF NOT EXISTS vendors_keyspace WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 3};
USE vendors_keyspace;

CREATE TABLE if not exists vendors (
vendor_id INT PRIMARY KEY,
vendor_name TEXT,
vendor_address TEXT,
primary_contact_name TEXT,
primary_contact_email TEXT,
supplier_type TEXT
);

INSERT INTO vendors (vendor_id, vendor_name, vendor_address, primary_contact_name, primary_contact_email, supplier_type) VALUES (1, 'A+ Tile Supplies', '1810 Test Town, TX', 'Elliot Trace', 'employee-1@example.com', 'building supplies') IF NOT EXISTS;
INSERT INTO vendors (vendor_id, vendor_name, vendor_address, primary_contact_name, primary_contact_email, supplier_type) VALUES (2, 'Carpeting and More', '3421 Test City, TX', 'Chris Osbourne', 'employee-2@example.com', 'building supplies') IF NOT EXISTS;
INSERT INTO vendors (vendor_id, vendor_name, vendor_address, primary_contact_name, primary_contact_email, supplier_type) VALUES (3, 'Sunshine Bakery', '2394 Gottlieb Station, Romeoton, MS', 'Fatima Sultani', 'employee-3@example.com', 'coffee and tea') IF NOT EXISTS;
INSERT INTO vendors (vendor_id, vendor_name, vendor_address, primary_contact_name, primary_contact_email, supplier_type) VALUES (4, 'Sweetbrew', '609 Shanahan Points, Guillermotown, NC', 'Xavier Gutierrez', 'employee-4@example.com', 'coffee and tea') IF NOT EXISTS;
INSERT INTO vendors (vendor_id, vendor_name, vendor_address, primary_contact_name, primary_contact_email, supplier_type) VALUES (5, 'Artevista', '595 Blick Drive, Demetricebury, MO', 'Lana Anderson', 'employee-5@example.com', 'art supplies') IF NOT EXISTS;
INSERT INTO vendors (vendor_id, vendor_name, vendor_address, primary_contact_name, primary_contact_email, supplier_type) VALUES (6, 'CanvasCrafter', '1410 Parkway Street, San Diego, CA', 'Madelyn Houston', 'employee-6@example.com', 'art supplies') IF NOT EXISTS;
3 changes: 3 additions & 0 deletions noxfiles/run_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"mongodb",
"mariadb",
"timescale",
"scylladb",
]
EXTERNAL_DATASTORE_CONFIG = {
"snowflake": [
Expand Down Expand Up @@ -84,6 +85,8 @@ def run_infrastructure(
if datastore in DOCKERFILE_DATASTORES
]

_run_cmd_or_err(f'echo "Docker datastores {docker_datastores}"')
pattisdr marked this conversation as resolved.
Show resolved Hide resolved

# Configure docker compose path
path: str = get_path_for_datastores(datastores, remote_debug)

Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ module = [
"boto3.*",
"botocore.*",
"bson.*",
"cassandra.*",
"celery.*",
"citext.*",
"dask.*",
Expand Down Expand Up @@ -150,7 +151,7 @@ good-names="_,i,setUp,tearDown,maxDiff,default_app_config"
ignore="migrations,tests"

[tool.pylint.whitelist]
extension-pkg-whitelist = ["pydantic", "zlib"]
extension-pkg-whitelist = ["pydantic", "zlib", "cassandra"]

############
## Pytest ##
Expand Down Expand Up @@ -191,6 +192,7 @@ markers = [
"integration_dynamodb",
"integration_saas",
"integration_saas_override",
"integration_scylladb",
"unit_saas"
]
asyncio_mode = "auto"
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pyyaml==6.0.1
redis==3.5.3
rich-click==1.6.1
sendgrid==6.9.7
scylla-driver==3.26.8
slowapi==0.1.8
snowflake-sqlalchemy==1.5.1
sqlalchemy[asyncio]==1.4.27
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""scylla_connection_type

Revision ID: 3304082a6cee
Revises: 5fe01e730171
Create Date: 2024-06-03 19:54:20.907724

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "3304082a6cee"
down_revision = "5fe01e730171"
branch_labels = None
depends_on = None


def upgrade():
# Add to ConnectionType enum
op.execute("alter type connectiontype rename to connectiontype_old")
op.execute(
"create type connectiontype as enum('mongodb', 'mysql', 'https', 'snowflake', 'redshift', 'mssql', 'mariadb', 'bigquery', 'saas', 'manual', 'manual_webhook', 'timescale', 'fides', 'sovrn', 'attentive', 'dynamodb', 'postgres', 'generic_consent_email', 'generic_erasure_email', 'scylla')"
)
op.execute(
(
"alter table connectionconfig alter column connection_type type connectiontype using "
"connection_type::text::connectiontype"
)
)
op.execute("drop type connectiontype_old")
pattisdr marked this conversation as resolved.
Show resolved Hide resolved


def downgrade():
# Remove 'scylla' from ConnectionType
op.execute("alter type connectiontype rename to connectiontype_old")
op.execute(
"create type connectiontype as enum('mongodb', 'mysql', 'https', 'snowflake', 'redshift', 'mssql', 'mariadb', 'bigquery', 'saas', 'manual', 'manual_webhook', 'timescale', 'fides', 'sovrn', 'attentive', 'dynamodb', 'postgres', 'generic_consent_email', 'generic_erasure_email')"
)
op.execute(
(
"alter table connectionconfig alter column connection_type type connectiontype using "
"connection_type::text::connectiontype"
)
)
op.execute("drop type connectiontype_old")
1 change: 1 addition & 0 deletions src/fides/api/api/v1/endpoints/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class ValidTargets(str, Enum):
OKTA = "okta"
BIGQUERY = "bigquery"
DYNAMODB = "dynamodb"
SCYLLADB = "scylla"


class GenerateTypes(str, Enum):
Expand Down
2 changes: 2 additions & 0 deletions src/fides/api/models/connectionconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class ConnectionType(enum.Enum):
fides = "fides"
generic_erasure_email = "generic_erasure_email" # Run after the traversal
generic_consent_email = "generic_consent_email" # Run after the traversal
scylla = "scylla"

@property
def human_readable(self) -> str:
Expand All @@ -76,6 +77,7 @@ def human_readable(self) -> str:
ConnectionType.postgres.value: "PostgreSQL",
ConnectionType.redshift.value: "Amazon Redshift",
ConnectionType.saas.value: "SaaS",
ConnectionType.scylla.value: "Scylla DB",
ConnectionType.snowflake.value: "Snowflake",
ConnectionType.sovrn.value: "Sovrn",
ConnectionType.timescale.value: "TimescaleDB",
Expand Down
5 changes: 5 additions & 0 deletions src/fides/api/schemas/connection_configuration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@
from fides.api.schemas.connection_configuration.connection_secrets_saas import (
SaaSSchemaFactory as SaaSSchemaFactory,
)
from fides.api.schemas.connection_configuration.connection_secrets_scylla import (
ScyllaSchema, ScyllaDocsSchema,
)
from fides.api.schemas.connection_configuration.connection_secrets_snowflake import (
SnowflakeDocsSchema as SnowflakeDocsSchema,
)
Expand Down Expand Up @@ -123,6 +126,7 @@
ConnectionType.postgres.value: PostgreSQLSchema,
ConnectionType.redshift.value: RedshiftSchema,
ConnectionType.saas.value: SaaSSchema,
ConnectionType.scylla.value: ScyllaSchema,
ConnectionType.snowflake.value: SnowflakeSchema,
ConnectionType.sovrn.value: SovrnSchema,
ConnectionType.timescale.value: TimescaleSchema,
Expand Down Expand Up @@ -171,4 +175,5 @@ def get_connection_secrets_schema(
FidesDocsSchema,
SovrnDocsSchema,
DynamoDBDocsSchema,
ScyllaDocsSchema
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import List, Optional

from pydantic import Field

from fides.api.schemas.base_class import NoValidationSchema
from fides.api.schemas.connection_configuration.connection_secrets import (
ConnectionConfigSecretsSchema,
)


class ScyllaSchema(ConnectionConfigSecretsSchema):
"""Schema to validate the secrets needed to connect to a Scylla Database"""

host: str = Field(
title="Host",
description="The hostname or IP address of the server where the database is running.",
)
port: int = Field(
9042,
title="Port",
description="The network port number on which the server is listening for incoming connections (default: 9042).",
)
username: str = Field(
title="Username",
description="The user account used to authenticate and access the database.",
)
password: str = Field(
title="Password",
description="The password used to authenticate and access the database.",
sensitive=True,
)
keyspace: Optional[str] = Field(
title="Keyspace",
description="The keyspace used.",
sensitive=True,
)
pattisdr marked this conversation as resolved.
Show resolved Hide resolved

_required_components: List[str] = [
"host",
"username",
"password",
]


class ScyllaDocsSchema(ScyllaSchema, NoValidationSchema):
"""Scylla Secrets Schema for API docs"""
2 changes: 2 additions & 0 deletions src/fides/api/service/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
MongoDBConnector as MongoDBConnector,
)
from fides.api.service.connectors.saas_connector import SaaSConnector as SaaSConnector
from fides.api.service.connectors.scylla_connector import ScyllaConnector
from fides.api.service.connectors.sql_connector import (
BigQueryConnector as BigQueryConnector,
)
Expand Down Expand Up @@ -70,6 +71,7 @@
ConnectionType.postgres.value: PostgreSQLConnector,
ConnectionType.redshift.value: RedshiftConnector,
ConnectionType.saas.value: SaaSConnector,
ConnectionType.scylla.value: ScyllaConnector,
ConnectionType.snowflake.value: SnowflakeConnector,
ConnectionType.sovrn.value: SovrnConnector,
ConnectionType.timescale.value: TimescaleConnector,
Expand Down
97 changes: 97 additions & 0 deletions src/fides/api/service/connectors/scylla_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from typing import Any, Dict, List, Optional

import cassandra
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from loguru import logger

from fides.api.common_exceptions import ConnectionException
from fides.api.graph.execution import ExecutionNode
from fides.api.models.connectionconfig import ConnectionTestStatus
from fides.api.models.policy import Policy
from fides.api.models.privacy_request import PrivacyRequest, RequestTask
from fides.api.schemas.connection_configuration.connection_secrets_scylla import (
ScyllaSchema,
)
from fides.api.service.connectors.base_connector import BaseConnector
from fides.api.service.connectors.query_config import QueryConfig
from fides.api.util.collection_util import Row


class ScyllaConnector(BaseConnector):
"""Scylla Connector"""

def build_uri(self) -> str:
"""
Builds URI
"""

def create_client(self) -> Cluster:
"""Returns a Scylla cluster"""

config = ScyllaSchema(**self.configuration.secrets or {})

auth_provider = PlainTextAuthProvider(
username=config.username, password=config.password
)
cluster = Cluster([config.host], port=config.port, auth_provider=auth_provider)
return cluster

def query_config(self, node: ExecutionNode) -> QueryConfig[Any]:
pass

Check warning on line 41 in src/fides/api/service/connectors/scylla_connector.py

View check run for this annotation

Codecov / codecov/patch

src/fides/api/service/connectors/scylla_connector.py#L41

Added line #L41 was not covered by tests

def test_connection(self) -> Optional[ConnectionTestStatus]:
"""
Connects to the scylla database and issues a trivial query
"""
logger.info("Starting test connection to {}", self.configuration.key)
config = ScyllaSchema(**self.configuration.secrets or {})
cluster = self.client()

try:
with cluster.connect(
config.keyspace if config.keyspace else None
) as client:
client.execute("select now() from system.local")
except cassandra.cluster.NoHostAvailable as exc:
pattisdr marked this conversation as resolved.
Show resolved Hide resolved
if "Unable to connect to any servers using keyspace" in str(exc):
raise ConnectionException("Unknown keyspace.")
try:
error = list(exc.errors.values())[0]
except Exception:
raise ConnectionException("No host available.")

Check warning on line 62 in src/fides/api/service/connectors/scylla_connector.py

View check run for this annotation

Codecov / codecov/patch

src/fides/api/service/connectors/scylla_connector.py#L61-L62

Added lines #L61 - L62 were not covered by tests

if isinstance(error, cassandra.AuthenticationFailed):
raise ConnectionException("Authentication failed.")

raise ConnectionException("No host available.")

except cassandra.protocol.SyntaxException:
raise ConnectionException("Syntax exception.")
except Exception:
raise ConnectionException("Connection Error connecting to Scylla DB.")

Check warning on line 72 in src/fides/api/service/connectors/scylla_connector.py

View check run for this annotation

Codecov / codecov/patch

src/fides/api/service/connectors/scylla_connector.py#L70-L72

Added lines #L70 - L72 were not covered by tests

return ConnectionTestStatus.succeeded

def retrieve_data(
self,
node: ExecutionNode,
policy: Policy,
privacy_request: PrivacyRequest,
request_task: RequestTask,
input_data: Dict[str, List[Any]],
) -> List[Row]:
"""Retrieve scylla data - not yet implemented"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: worth raising a NotImplementedError here? that always feels like the most proper thing, just to make it transparent to anyone who somehow hits the codepath at runtime without reading the codebase, but maybe i'm overthinking it :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now i'm wondering whether raising the error may unexpectedly impact other DSR workflows and i shouldn't be doing that... πŸ€”

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are datasets for the connection config this could get picked up in a DSR I believe!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep confirmed a NotImplementedError will break DSR's if a dataset is also attached to the ConnectionConfig .


def mask_data(
self,
node: ExecutionNode,
policy: Policy,
privacy_request: PrivacyRequest,
request_task: RequestTask,
rows: List[Row],
) -> int:
"""Execute a masking request - not yet implemented"""

def close(self) -> None:
"""Close any held resources"""
5 changes: 5 additions & 0 deletions tests/fixtures/application_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@
integration_config, "dynamodb_example.aws_secret_access_key"
),
},
"scylla_example": {
"host": pydash.get(integration_config, "scylladb_example.server"),
"username": pydash.get(integration_config, "scylladb_example.username"),
"password": pydash.get(integration_config, "scylladb_example.password"),
},
}


Expand Down
Loading
Loading