diff --git a/.github/workflows/unsafe_pr_checks.yml b/.github/workflows/unsafe_pr_checks.yml index 9e6b347ec7..2e6c58617e 100644 --- a/.github/workflows/unsafe_pr_checks.yml +++ b/.github/workflows/unsafe_pr_checks.yml @@ -17,5 +17,6 @@ jobs: - name: Integration Tests (External) env: REDSHIFT_TEST_URI: ${{ secrets.REDSHIFT_TEST_URI }} + REDSHIFT_TEST_DB_SCHEMA: ${{ secrets.REDSHIFT_TEST_DB_SCHEMA }} SNOWFLAKE_TEST_URI: ${{ secrets.SNOWFLAKE_TEST_URI }} run: make pytest-integration-external diff --git a/Makefile b/Makefile index 2e59c9177b..0d21b115bb 100644 --- a/Makefile +++ b/Makefile @@ -126,7 +126,7 @@ pytest-integration-erasure: compose-build # These tests connect to external third-party test databases pytest-integration-external: compose-build @echo "Running tests that connect to external third party test databases" - @docker-compose run -e REDSHIFT_TEST_URI -e SNOWFLAKE_TEST_URI $(IMAGE_NAME) \ + @docker-compose run -e REDSHIFT_TEST_URI -e SNOWFLAKE_TEST_URI -e REDSHIFT_TEST_DB_SCHEMA $(IMAGE_NAME) \ pytest $(pytestpath) -m "integration_external" diff --git a/data/dataset/redshift_example_test_dataset.yml b/data/dataset/redshift_example_test_dataset.yml new file mode 100644 index 0000000000..150fc18aac --- /dev/null +++ b/data/dataset/redshift_example_test_dataset.yml @@ -0,0 +1,225 @@ +dataset: + - fides_key: redshift_example_test_dataset + name: Redshift Example Test Dataset + description: Example of a Redshift dataset containing a variety of related tables like customers, products, addresses, etc. + collections: + - name: address + fields: + - name: city + data_categories: [user.provided.identifiable.contact.city] + - name: house + data_categories: [user.provided.identifiable.contact.street] + - name: id + data_categories: [system.operations] + fidesops_meta: + primary_key: True + - name: state + data_categories: [user.provided.identifiable.contact.state] + - name: street + data_categories: [user.provided.identifiable.contact.street] + - name: zip + data_categories: [user.provided.identifiable.contact.postal_code] + + - name: customer + fields: + - name: address_id + data_categories: [system.operations] + fidesops_meta: + references: + - dataset: redshift_example_test_dataset + field: address.id + direction: to + - name: created + data_categories: [system.operations] + - name: email + data_categories: [user.provided.identifiable.contact.email] + fidesops_meta: + identity: email + data_type: string + - name: id + data_categories: [user.derived.identifiable.unique_id] + fidesops_meta: + primary_key: True + - name: name + data_categories: [user.provided.identifiable.name] + fidesops_meta: + data_type: string + length: 40 + + - name: employee + fields: + - name: address_id + data_categories: [system.operations] + fidesops_meta: + references: + - dataset: redshift_example_test_dataset + field: address.id + direction: to + - name: email + data_categories: [user.provided.identifiable.contact.email] + fidesops_meta: + identity: email + data_type: string + - name: id + data_categories: [user.derived.identifiable.unique_id] + fidesops_meta: + primary_key: True + - name: name + data_categories: [user.provided.identifiable.name] + fidesops_meta: + data_type: string + + - name: login + fields: + - name: customer_id + data_categories: [user.derived.identifiable.unique_id] + fidesops_meta: + references: + - dataset: redshift_example_test_dataset + field: customer.id + direction: from + - name: id + data_categories: [system.operations] + fidesops_meta: + primary_key: True + - name: time + data_categories: [user.derived.nonidentifiable.sensor] + + - name: order + fields: + - name: customer_id + data_categories: [user.derived.identifiable.unique_id] + fidesops_meta: + references: + - dataset: redshift_example_test_dataset + field: customer.id + direction: from + - name: id + data_categories: [system.operations] + fidesops_meta: + primary_key: True + - name: shipping_address_id + data_categories: [system.operations] + fidesops_meta: + references: + - dataset: redshift_example_test_dataset + field: address.id + direction: to + + # order_item + - name: order_item + fields: + - name: order_id + data_categories: [system.operations] + fidesops_meta: + references: + - dataset: redshift_example_test_dataset + field: order.id + direction: from + - name: product_id + data_categories: [system.operations] + fidesops_meta: + references: + - dataset: redshift_example_test_dataset + field: product.id + direction: to + - name: quantity + data_categories: [system.operations] + + - name: payment_card + fields: + - name: billing_address_id + data_categories: [system.operations] + fidesops_meta: + references: + - dataset: redshift_example_test_dataset + field: address.id + direction: to + - name: ccn + data_categories: [user.provided.identifiable.financial.account_number] + - name: code + data_categories: [user.provided.identifiable.financial] + - name: customer_id + data_categories: [user.derived.identifiable.unique_id] + fidesops_meta: + references: + - dataset: redshift_example_test_dataset + field: customer.id + direction: from + - name: id + data_categories: [system.operations] + fidesops_meta: + primary_key: True + - name: name + data_categories: [user.provided.identifiable.financial] + - name: preferred + data_categories: [user.provided.nonidentifiable] + + - name: product + fields: + - name: id + data_categories: [system.operations] + fidesops_meta: + primary_key: True + - name: name + data_categories: [system.operations] + - name: price + data_categories: [system.operations] + + - name: report + fields: + - name: email + data_categories: [user.provided.identifiable.contact.email] + fidesops_meta: + identity: email + data_type: string + - name: id + data_categories: [system.operations] + fidesops_meta: + primary_key: True + - name: month + data_categories: [system.operations] + - name: name + data_categories: [system.operations] + - name: total_visits + data_categories: [system.operations] + - name: year + data_categories: [system.operations] + + - name: service_request + fields: + - name: alt_email + data_categories: [user.provided.identifiable.contact.email] + fidesops_meta: + identity: email + data_type: string + - name: closed + data_categories: [system.operations] + - name: email + data_categories: [system.operations] + fidesops_meta: + identity: email + data_type: string + - name: employee_id + data_categories: [user.derived.identifiable.unique_id] + fidesops_meta: + references: + - dataset: redshift_example_test_dataset + field: employee.id + direction: from + - name: id + data_categories: [system.operations] + fidesops_meta: + primary_key: True + - name: opened + data_categories: [system.operations] + + - name: visit + fields: + - name: email + data_categories: [user.provided.identifiable.contact.email] + fidesops_meta: + identity: email + data_type: string + - name: last_visit + data_categories: [system.operations] diff --git a/docs/fidesops/docs/guides/database_connectors.md b/docs/fidesops/docs/guides/database_connectors.md index cd892ecefc..b8ef5ecab4 100644 --- a/docs/fidesops/docs/guides/database_connectors.md +++ b/docs/fidesops/docs/guides/database_connectors.md @@ -131,6 +131,22 @@ PUT api/v1/connection/my_mongo_db/secret?verify=false` } ``` +#### Example 3: Amazon Redshift: Set URL and Schema + +This Amazon Redshift example sets the database secrets as a `url` property and a `db_schema` property. Redshift +databases have one or more schemas, with the default being named `public`. If you need to set a different schema, +specify `db_schema` for Redshift and it will be set as the `search_path` when querying. + + +``` +PUT api/v1/connection/my_redshift_db/secret` + +{ + "url": "redshift+psycopg2://username@host.amazonaws.com:5439/database", + "db_schema": "my_test_schema" +} +``` + ### Testing your connection You can verify that a ConnectionConfig's secrets are valid at any time by calling the [Test a ConnectionConfig's Secrets](/fidesops/api#operations-Connections-test_connection_config_secrets_api_v1_connection__connection_key__test_get) operation: diff --git a/src/fidesops/schemas/connection_configuration/connection_secrets_redshift.py b/src/fidesops/schemas/connection_configuration/connection_secrets_redshift.py index 27f7595c3a..4d4d2aaf62 100644 --- a/src/fidesops/schemas/connection_configuration/connection_secrets_redshift.py +++ b/src/fidesops/schemas/connection_configuration/connection_secrets_redshift.py @@ -14,6 +14,7 @@ class RedshiftSchema(ConnectionConfigSecretsSchema): database: Optional[str] = None user: Optional[str] = None password: Optional[str] = None + db_schema: Optional[str] = None _required_components: List[str] = ["host", "user", "password"] diff --git a/src/fidesops/service/connectors/query_config.py b/src/fidesops/service/connectors/query_config.py index 2db1c896b7..1fe8abdc7a 100644 --- a/src/fidesops/service/connectors/query_config.py +++ b/src/fidesops/service/connectors/query_config.py @@ -405,6 +405,19 @@ def get_formatted_update_stmt( return f'UPDATE "{self.node.address.collection}" SET {",".join(update_clauses)} WHERE {" AND ".join(pk_clauses)}' +class RedshiftQueryConfig(SQLQueryConfig): + """Generates SQL in Redshift's custom dialect.""" + + def get_formatted_query_string( + self, + field_list: str, + clauses: List[str], + ) -> str: + """Returns a query string with double quotation mark formatting for tables that have the same names as + Redshift reserved words.""" + return f'SELECT {field_list} FROM "{self.node.node.collection.name}" WHERE {" OR ".join(clauses)}' + + MongoStatement = Tuple[Dict[str, Any], Dict[str, Any]] """A mongo query is expressed in the form of 2 dicts, the first of which represents the query object(s) and the second of which represents fields to return. diff --git a/src/fidesops/service/connectors/sql_connector.py b/src/fidesops/service/connectors/sql_connector.py index b3059408ca..a0374ba089 100644 --- a/src/fidesops/service/connectors/sql_connector.py +++ b/src/fidesops/service/connectors/sql_connector.py @@ -2,8 +2,14 @@ from abc import abstractmethod from typing import Any, Dict, List, Optional -from sqlalchemy import Column -from sqlalchemy.engine import Engine, create_engine, CursorResult, LegacyCursorResult +from sqlalchemy import Column, text +from sqlalchemy.engine import ( + Engine, + create_engine, + CursorResult, + LegacyCursorResult, + Connection, +) from sqlalchemy.exc import OperationalError, InternalError from snowflake.sqlalchemy import URL @@ -24,6 +30,7 @@ from fidesops.service.connectors.query_config import ( SnowflakeQueryConfig, SQLQueryConfig, + RedshiftQueryConfig, ) logger = logging.getLogger(__name__) @@ -175,6 +182,7 @@ def create_client(self) -> Engine: class RedshiftConnector(SQLConnector): """Connector specific to Amazon Redshift""" + # Overrides BaseConnector.build_uri def build_uri(self) -> str: """Build URI of format redshift+psycopg2://user:password@[host][:port][/database]""" config = RedshiftSchema(**self.configuration.secrets or {}) @@ -184,6 +192,7 @@ def build_uri(self) -> str: url = f"redshift+psycopg2://{config.user}:{config.password}@{config.host}{port}{database}" return url + # Overrides SQLConnector.create_client def create_client(self) -> Engine: """Returns a SQLAlchemy Engine that can be used to interact with an Amazon Redshift cluster""" config = RedshiftSchema(**self.configuration.secrets or {}) @@ -194,6 +203,66 @@ def create_client(self) -> Engine: echo=not self.hide_parameters, ) + def set_schema(self, connection: Connection) -> None: + """Sets the search_path for the duration of the session""" + config = RedshiftSchema(**self.configuration.secrets or {}) + if config.db_schema: + logger.info("Setting Redshift search_path before retrieving data") + stmt = text("SET search_path to :search_path") + stmt = stmt.bindparams(search_path=config.db_schema) + connection.execute(stmt) + + # Overrides SQLConnector.retrieve_data + def retrieve_data( + self, node: TraversalNode, policy: Policy, input_data: Dict[str, List[Any]] + ) -> List[Row]: + """Retrieve data from Amazon Redshift + + For redshift, we also set the search_path to be the schema defined on the ConnectionConfig if + applicable - persists for the current session. + """ + query_config = self.query_config(node) + client = self.client() + stmt = query_config.generate_query(input_data, policy) + if stmt is None: + return [] + + logger.info(f"Starting data retrieval for {node.address}") + with client.connect() as connection: + self.set_schema(connection) + results = connection.execute(stmt) + return SQLConnector.cursor_result_to_rows(results) + + # Overrides SQLConnector.mask_data + def mask_data( + self, + node: TraversalNode, + policy: Policy, + request: PrivacyRequest, + rows: List[Row], + ) -> int: + """Execute a masking request. Returns the number of records masked + + For redshift, we also set the search_path to be the schema defined on the ConnectionConfig if + applicable - persists for the current session. + """ + query_config = self.query_config(node) + update_ct = 0 + client = self.client() + for row in rows: + update_stmt = query_config.generate_update_stmt(row, policy, request) + if update_stmt is not None: + with client.connect() as connection: + self.set_schema(connection) + results: LegacyCursorResult = connection.execute(update_stmt) + update_ct = update_ct + results.rowcount + return update_ct + + # Overrides SQLConnector.query_config + def query_config(self, node: TraversalNode) -> RedshiftQueryConfig: + """Query wrapper corresponding to the input traversal_node.""" + return RedshiftQueryConfig(node) + class SnowflakeConnector(SQLConnector): """Connector specific to Snowflake""" diff --git a/src/fidesops/task/task_resources.py b/src/fidesops/task/task_resources.py index dbdeeaab8b..6dc3e7ed79 100644 --- a/src/fidesops/task/task_resources.py +++ b/src/fidesops/task/task_resources.py @@ -18,6 +18,7 @@ MySQLConnector, PostgreSQLConnector, SnowflakeConnector, + RedshiftConnector, ) from fidesops.util.cache import get_cache @@ -50,6 +51,8 @@ def build_connector(connection_config: ConnectionConfig) -> BaseConnector: return MySQLConnector(connection_config) if connection_config.connection_type == ConnectionType.snowflake: return SnowflakeConnector(connection_config) + if connection_config.connection_type == ConnectionType.redshift: + return RedshiftConnector(connection_config) raise NotImplementedError( f"No connector available for {connection_config.connection_type}" ) diff --git a/tests/api/v1/endpoints/test_connection_config_endpoints.py b/tests/api/v1/endpoints/test_connection_config_endpoints.py index 6bf612d3da..22bee70070 100644 --- a/tests/api/v1/endpoints/test_connection_config_endpoints.py +++ b/tests/api/v1/endpoints/test_connection_config_endpoints.py @@ -597,6 +597,7 @@ def test_put_connection_config_redshift_secrets( "database": "dev", "user": "awsuser", "password": "test_password", + "db_schema": "test" } resp = api_client.put( url + "?verify=False", @@ -615,6 +616,7 @@ def test_put_connection_config_redshift_secrets( "database": "dev", "user": "awsuser", "password": "test_password", + "db_schema": "test", "url": None, } assert redshift_connection_config.last_test_timestamp is None diff --git a/tests/api/v1/endpoints/test_dataset_endpoints.py b/tests/api/v1/endpoints/test_dataset_endpoints.py index a423ab0148..424b06a8bd 100644 --- a/tests/api/v1/endpoints/test_dataset_endpoints.py +++ b/tests/api/v1/endpoints/test_dataset_endpoints.py @@ -34,7 +34,6 @@ def _reject_key(dict: Dict, key: str) -> Dict: def test_example_datasets(example_datasets): """Ensure the test fixture loads the right sample data""" assert example_datasets - assert len(example_datasets) == 3 assert example_datasets[0]["fides_key"] == "postgres_example_test_dataset" assert len(example_datasets[0]["collections"]) == 11 assert example_datasets[1]["fides_key"] == "mongo_test" @@ -425,7 +424,7 @@ def test_patch_datasets_bulk_create( assert response.status_code == 200 response_body = json.loads(response.text) - assert len(response_body["succeeded"]) == 3 + assert len(response_body["succeeded"]) == 4 assert len(response_body["failed"]) == 0 # Confirm that the created dataset matches the values we provided @@ -488,7 +487,7 @@ def test_patch_datasets_bulk_update( assert response.status_code == 200 response_body = json.loads(response.text) - assert len(response_body["succeeded"]) == 3 + assert len(response_body["succeeded"]) == 4 assert len(response_body["failed"]) == 0 postgres_dataset = response_body["succeeded"][0] @@ -545,7 +544,7 @@ def test_patch_datasets_failed_response( assert response.status_code == 200 # Returns 200 regardless response_body = json.loads(response.text) assert len(response_body["succeeded"]) == 0 - assert len(response_body["failed"]) == 3 + assert len(response_body["failed"]) == 4 for failed_response in response_body["failed"]: assert "Dataset create/update failed" in failed_response["message"] diff --git a/tests/fixtures.py b/tests/fixtures.py index 32b736d8b3..76af5b800a 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -37,7 +37,7 @@ ) from fidesops.models.storage import StorageConfig, ResponseFormat from fidesops.schemas.connection_configuration import ( - SnowflakeSchema, + SnowflakeSchema, RedshiftSchema, ) from fidesops.schemas.storage.storage import ( FileNaming, @@ -240,6 +240,17 @@ def redshift_connection_config(db: Session) -> Generator: "access": AccessLevel.write, }, ) + uri = integration_config.get("redshift", {}).get("external_uri") or os.environ.get( + "REDSHIFT_TEST_URI" + ) + db_schema = integration_config.get("redshift", {}).get("db_schema") or os.environ.get( + "REDSHIFT_TEST_DB_SCHEMA" + ) + if uri and db_schema: + schema = RedshiftSchema(url=uri, db_schema=db_schema) + connection_config.secrets = schema.dict() + connection_config.save(db=db) + yield connection_config connection_config.delete(db) @@ -941,6 +952,7 @@ def example_datasets() -> List[Dict]: "data/dataset/postgres_example_test_dataset.yml", "data/dataset/mongo_example_test_dataset.yml", "data/dataset/snowflake_example_test_dataset.yml", + "data/dataset/redshift_example_test_dataset.yml", ] for filename in example_filenames: example_datasets += load_dataset(filename) @@ -967,6 +979,25 @@ def snowflake_example_test_dataset_config( dataset_config.delete(db=db) +@pytest.fixture +def redshift_example_test_dataset_config( + redshift_connection_config: ConnectionConfig, + db: Session, + example_datasets: List[Dict], +) -> Generator: + dataset = example_datasets[3] + fides_key = dataset["fides_key"] + dataset_config = DatasetConfig.create( + db=db, + data={ + "connection_config_id": redshift_connection_config.id, + "fides_key": fides_key, + "dataset": dataset, + }, + ) + yield dataset_config + dataset_config.delete(db=db) + @pytest.fixture def postgres_example_test_dataset_config( connection_config: ConnectionConfig, diff --git a/tests/integration_tests/test_external_database_connections.py b/tests/integration_tests/test_external_database_connections.py index c1d96467d3..41a243068a 100644 --- a/tests/integration_tests/test_external_database_connections.py +++ b/tests/integration_tests/test_external_database_connections.py @@ -22,17 +22,24 @@ @pytest.fixture(scope="session") def redshift_test_engine() -> Generator: """Return a connection to an Amazon Redshift Cluster""" - # Pulling from integration config file or GitHub secrets - uri = integration_config.get("redshift", {}).get("external_uri") or os.environ.get( - "REDSHIFT_TEST_URI" - ) - schema = RedshiftSchema(url=uri) + connection_config = ConnectionConfig( name="My Redshift Config", key="test_redshift_key", connection_type=ConnectionType.redshift, - secrets=schema.dict(), ) + + # Pulling from integration config file or GitHub secrets + uri = integration_config.get("redshift", {}).get("external_uri") or os.environ.get( + "REDSHIFT_TEST_URI" + ) + db_schema = integration_config.get("redshift", {}).get("db_schema") or os.environ.get( + "REDSHIFT_TEST_DB_SCHEMA" + ) + if uri and db_schema: + schema = RedshiftSchema(url=uri, db_schema=db_schema) + connection_config.secrets = schema.dict() + connector: RedshiftConnector = get_connector(connection_config) engine = connector.client() yield engine diff --git a/tests/service/privacy_request/request_runner_service_test.py b/tests/service/privacy_request/request_runner_service_test.py index 175099ed35..1c9b857f2a 100644 --- a/tests/service/privacy_request/request_runner_service_test.py +++ b/tests/service/privacy_request/request_runner_service_test.py @@ -26,7 +26,10 @@ from fidesops.schemas.masking.masking_secrets import MaskingSecretCache from fidesops.schemas.policy import Rule from fidesops.service.connectors import PostgreSQLConnector -from fidesops.service.connectors.sql_connector import SnowflakeConnector +from fidesops.service.connectors.sql_connector import ( + SnowflakeConnector, + RedshiftConnector, +) from fidesops.service.masking.strategy.masking_strategy_factory import get_strategy from fidesops.service.privacy_request.request_runner_service import PrivacyRequestRunner from fidesops.util.async_util import wait_for @@ -204,7 +207,7 @@ def test_create_and_process_erasure_request_specific_category( if customer_id in row: customer_found = True # Check that the `name` field is `None` - assert row[1] is None + assert row.name is None assert customer_found @@ -252,11 +255,11 @@ def test_create_and_process_erasure_request_generic_category( # Check that the `email` field is `None` and that its data category # ("user.provided.identifiable.contact.email") has been erased by the parent # category ("user.provided.identifiable.contact") - assert row[1] is None - assert row[2] is not None + assert row.email is None + assert row.name is not None else: # There are two rows other rows, and they should not have been erased - assert row[1] in ["customer-1@example.com", "jane@example.com"] + assert row.email in ["customer-1@example.com", "jane@example.com"] assert customer_found @@ -354,11 +357,11 @@ def test_create_and_process_erasure_request_with_table_joins( card_found = False for row in res: - if row[0] == customer_id: + if row.customer_id == customer_id: card_found = True - assert row[2] is None - assert row[3] is None - assert row[4] is None + assert row.ccn is None + assert row.code is None + assert row.name is None assert card_found is True @@ -405,7 +408,7 @@ def test_create_and_process_erasure_request_read_access( customer_found = True # Check that the `name` field is NOT `None`. We couldn't erase, because the ConnectionConfig only had # "read" access - assert row[1] is not None + assert row.name is not None assert customer_found @@ -495,8 +498,156 @@ def test_create_and_process_erasure_request_snowflake( stmt = f'select "name", "variant_eg" from "customer" where "email" = {formatted_customer_email};' res = snowflake_client.execute(stmt).all() for row in res: - assert row[0] == None - assert row[1] == None + assert row.name is None + assert row.variant_eg is None + + +@pytest.fixture(scope="function") +def redshift_resources( + redshift_example_test_dataset_config, +): + redshift_connection_config = redshift_example_test_dataset_config.connection_config + connector = RedshiftConnector(redshift_connection_config) + redshift_client = connector.client() + with redshift_client.connect() as connection: + connector.set_schema(connection) + uuid = str(uuid4()) + customer_email = f"customer-{uuid}@example.com" + customer_name = f"{uuid}" + + stmt = "select max(id) from customer;" + res = connection.execute(stmt) + customer_id = res.all()[0][0] + 1 + + stmt = "select max(id) from address;" + res = connection.execute(stmt) + address_id = res.all()[0][0] + 1 + + city = "Test City" + state = "TX" + stmt = f""" + insert into address (id, house, street, city, state, zip) + values ({address_id}, '{111}', 'Test Street', '{city}', '{state}', '55555'); + """ + connection.execute(stmt) + + stmt = f""" + insert into customer (id, email, name, address_id) + values ({customer_id}, '{customer_email}', '{customer_name}', '{address_id}'); + """ + connection.execute(stmt) + + yield { + "email": customer_email, + "name": customer_name, + "id": customer_id, + "client": redshift_client, + "address_id": address_id, + "city": city, + "state": state, + "connector": connector, + } + # Remove test data and close Redshift connection in teardown + stmt = f"delete from customer where email = '{customer_email}';" + connection.execute(stmt) + + stmt = f'delete from address where "id" = {address_id};' + connection.execute(stmt) + + +@pytest.mark.integration_external +def test_create_and_process_access_request_redshift( + redshift_resources, + db, + cache, + policy, +): + customer_email = redshift_resources["email"] + customer_name = redshift_resources["name"] + data = { + "requested_at": "2021-08-30T16:09:37.359Z", + "policy_key": policy.key, + "identity": {"email": customer_email}, + } + pr = get_privacy_request_results(db, policy, cache, data) + results = pr.get_results() + customer_table_key = ( + f"EN_{pr.id}__access_request__redshift_example_test_dataset:customer" + ) + assert len(results[customer_table_key]) == 1 + assert results[customer_table_key][0]["email"] == customer_email + assert results[customer_table_key][0]["name"] == customer_name + + address_table_key = ( + f"EN_{pr.id}__access_request__redshift_example_test_dataset:address" + ) + + city = redshift_resources["city"] + state = redshift_resources["state"] + assert len(results[address_table_key]) == 1 + assert results[address_table_key][0]["city"] == city + assert results[address_table_key][0]["state"] == state + + pr.delete(db=db) + + +@pytest.mark.integration_external +def test_create_and_process_erasure_request_redshift( + redshift_example_test_dataset_config, + redshift_resources, + integration_config: Dict[str, str], + db, + cache, + erasure_policy, +): + customer_email = redshift_resources["email"] + data = { + "requested_at": "2021-08-30T16:09:37.359Z", + "policy_key": erasure_policy.key, + "identity": {"email": customer_email}, + } + + # Should erase customer name + pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr.delete(db=db) + + connector = redshift_resources["connector"] + redshift_client = redshift_resources["client"] + with redshift_client.connect() as connection: + connector.set_schema(connection) + stmt = f"select name from customer where email = '{customer_email}';" + res = connection.execute(stmt).all() + for row in res: + assert row.name is None + + address_id = redshift_resources["address_id"] + stmt = f"select 'id', city, state from address where id = {address_id};" + res = connection.execute(stmt).all() + for row in res: + # Not yet masked because these fields aren't targeted by erasure policy + assert row.city == redshift_resources["city"] + assert row.state == redshift_resources["state"] + + target = erasure_policy.rules[0].targets[0] + target.data_category = "user.provided.identifiable.contact.state" + target.save(db=db) + + # Should erase state fields on address table + pr = get_privacy_request_results(db, erasure_policy, cache, data) + pr.delete(db=db) + + connector = redshift_resources["connector"] + redshift_client = redshift_resources["client"] + with redshift_client.connect() as connection: + connector.set_schema(connection) + + address_id = redshift_resources["address_id"] + stmt = f"select 'id', city, state from address where id = {address_id};" + res = connection.execute(stmt).all() + for row in res: + # State field was targeted by erasure policy but city was not + assert row.city is not None + assert row.state is None class TestPrivacyRequestRunnerRunWebhooks: @@ -550,7 +701,9 @@ def test_run_webhooks_client_error( privacy_request_runner, policy_pre_execution_webhooks, ): - mock_trigger_policy_webhook.side_effect = ClientUnsuccessfulException(status_code=500) + mock_trigger_policy_webhook.side_effect = ClientUnsuccessfulException( + status_code=500 + ) proceed = privacy_request_runner.run_webhooks_and_report_status(db, privacy_request, PolicyPreWebhook) assert not proceed