diff --git a/.github/workflows/playwright-integration-tests-mysql.yml b/.github/workflows/playwright-integration-tests-mysql.yml index bac78430015a..ef56279d5562 100644 --- a/.github/workflows/playwright-integration-tests-mysql.yml +++ b/.github/workflows/playwright-integration-tests-mysql.yml @@ -116,7 +116,9 @@ jobs: E2E_REDSHIFT_HOST_PORT: ${{ secrets.E2E_REDSHIFT_HOST_PORT }} E2E_REDSHIFT_USERNAME: ${{ secrets.E2E_REDSHIFT_USERNAME }} E2E_REDSHIFT_PASSWORD: ${{ secrets.E2E_REDSHIFT_PASSWORD }} - E2E_REDSHIFT_DATABASE: ${{ secrets.E2E_REDSHIFT_DATABASE }} + E2E_REDSHIFT_DB: ${{ secrets.E2E_REDSHIFT_DB }} + E2E_DRUID_HOST_PORT: ${{ secrets.E2E_DRUID_HOST_PORT }} + E2E_HIVE_HOST_PORT: ${{ secrets.E2E_HIVE_HOST_PORT }} run: | source env/bin/activate make install_e2e_tests diff --git a/.github/workflows/playwright-integration-tests-postgres.yml b/.github/workflows/playwright-integration-tests-postgres.yml index 619b8548137a..e91b61b5cbb8 100644 --- a/.github/workflows/playwright-integration-tests-postgres.yml +++ b/.github/workflows/playwright-integration-tests-postgres.yml @@ -116,7 +116,9 @@ jobs: E2E_REDSHIFT_HOST_PORT: ${{ secrets.E2E_REDSHIFT_HOST_PORT }} E2E_REDSHIFT_USERNAME: ${{ secrets.E2E_REDSHIFT_USERNAME }} E2E_REDSHIFT_PASSWORD: ${{ secrets.E2E_REDSHIFT_PASSWORD }} - E2E_REDSHIFT_DATABASE: ${{ secrets.E2E_REDSHIFT_DATABASE }} + E2E_REDSHIFT_DB: ${{ secrets.E2E_REDSHIFT_DB }} + E2E_DRUID_HOST_PORT: ${{ secrets.E2E_DRUID_HOST_PORT }} + E2E_HIVE_HOST_PORT: ${{ secrets.E2E_HIVE_HOST_PORT }} run: | source env/bin/activate make install_e2e_tests diff --git a/Makefile b/Makefile index 5f9d85010848..3ba6ab348d75 100644 --- a/Makefile +++ b/Makefile @@ -75,7 +75,7 @@ unit_ingestion: ## Run Python unit tests .PHONY: run_e2e_tests run_e2e_tests: ## Run e2e tests - pytest --screenshot=only-on-failure --output="ingestion/tests/e2e/artifacts" $(ARGS) --junitxml=ingestion/junit/test-results-e2e.xml ingestion/tests/e2e + pytest --screenshot=only-on-failure --output="ingestion/tests/e2e/artifacts" $(ARGS) --slowmo 5 --junitxml=ingestion/junit/test-results-e2e.xml ingestion/tests/e2e .PHONY: run_python_tests run_python_tests: ## Run all Python tests with coverage diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py index feff51c9cb24..7a02eb806da0 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py @@ -23,7 +23,7 @@ from typing import Dict, List from sqlalchemy import Column, inspect -from sqlalchemy.exc import ProgrammingError +from sqlalchemy.exc import ProgrammingError, ResourceClosedError from sqlalchemy.orm import scoped_session from metadata.generated.schema.entity.data.table import TableData @@ -38,6 +38,7 @@ from metadata.profiler.orm.functions.table_metric_construct import ( table_metric_construct_factory, ) +from metadata.profiler.orm.registry import Dialects from metadata.profiler.processor.runner import QueryRunner from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_ from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor @@ -258,6 +259,15 @@ def _compute_query_metrics( row = runner.select_first_from_query(metric_query) return dict(row) + except ResourceClosedError as exc: + # if the query returns no results, we will get a ResourceClosedError from Druid + if ( + # pylint: disable=protected-access + runner._session.get_bind().dialect.name + != Dialects.Druid + ): + msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" + handle_query_exception(msg, exc, session) except Exception as exc: msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" handle_query_exception(msg, exc, session) diff --git a/ingestion/src/metadata/profiler/metrics/static/stddev.py b/ingestion/src/metadata/profiler/metrics/static/stddev.py index e95fe2185743..e55c843ec112 100644 --- a/ingestion/src/metadata/profiler/metrics/static/stddev.py +++ b/ingestion/src/metadata/profiler/metrics/static/stddev.py @@ -70,6 +70,15 @@ def _(element, compiler, **kw): return "if(isNaN(stddevPop(%s)), null, stddevPop(%s))" % ((proc,) * 2) +@compiles(StdDevFn, Dialects.Druid) +def _(element, compiler, **kw): # pylint: disable=unused-argument + """returns stdv for druid. Could not validate with our cluster + we might need to look into installing the druid-stats module + https://druid.apache.org/docs/latest/configuration/extensions/#loading-extensions + """ + return "NULL" + + class StdDev(StaticMetric): """ STD Metric diff --git a/ingestion/src/metadata/profiler/orm/functions/length.py b/ingestion/src/metadata/profiler/orm/functions/length.py index cbe7181cbbe2..ded2893ca6d9 100644 --- a/ingestion/src/metadata/profiler/orm/functions/length.py +++ b/ingestion/src/metadata/profiler/orm/functions/length.py @@ -49,6 +49,7 @@ def _(element, compiler, **kw): @compiles(LenFn, Dialects.IbmDbSa) @compiles(LenFn, Dialects.Db2) @compiles(LenFn, Dialects.Hana) +@compiles(LenFn, Dialects.Druid) def _(element, compiler, **kw): return "LENGTH(%s)" % compiler.process(element.clauses, **kw) diff --git a/ingestion/src/metadata/profiler/orm/functions/median.py b/ingestion/src/metadata/profiler/orm/functions/median.py index 455b9a71bd42..a02104477d1f 100644 --- a/ingestion/src/metadata/profiler/orm/functions/median.py +++ b/ingestion/src/metadata/profiler/orm/functions/median.py @@ -56,6 +56,14 @@ def _(elements, compiler, **kwargs): return f"if({null_check}({quantile_str}), null, {quantile_str})" +@compiles(MedianFn, Dialects.Druid) +def _(elements, compiler, **kwargs): + col, _, percentile = [ + compiler.process(element, **kwargs) for element in elements.clauses + ] + return f"APPROX_QUANTILE({col}, {percentile})" + + # pylint: disable=unused-argument @compiles(MedianFn, Dialects.Athena) @compiles(MedianFn, Dialects.Presto) diff --git a/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py b/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py index ac544e9878e9..925c4e2dbe24 100644 --- a/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py +++ b/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py @@ -12,6 +12,7 @@ Helper module to handle data sampling for the profiler """ +import traceback from typing import List, Optional, Union, cast from sqlalchemy import Column, inspect, text @@ -30,6 +31,7 @@ from metadata.profiler.orm.registry import Dialects from metadata.profiler.processor.handle_partition import partition_filter_handler from metadata.profiler.processor.sampler.sampler_interface import SamplerInterface +from metadata.utils.logger import profiler_interface_registry_logger from metadata.utils.sqa_utils import ( build_query_filter, dispatch_to_date_or_datetime, @@ -38,6 +40,8 @@ get_value_filter, ) +logger = profiler_interface_registry_logger() + RANDOM_LABEL = "random" @@ -105,7 +109,7 @@ def random_sample(self) -> Union[DeclarativeMeta, AliasedClass]: if self._profile_sample_query: return self._rdn_sample_from_user_query() - if not self.profile_sample: + if not self.profile_sample or int(self.profile_sample) == 100: if self._partition_details: return self._partitioned_table() @@ -143,12 +147,23 @@ def fetch_sample_data(self, columns: Optional[List[Column]] = None) -> TableData if col.name != RANDOM_LABEL and col.name in names ] - sqa_sample = ( - self.client.query(*sqa_columns) - .select_from(rnd) - .limit(self.sample_limit) - .all() - ) + try: + sqa_sample = ( + self.client.query(*sqa_columns) + .select_from(rnd) + .limit(self.sample_limit) + .all() + ) + except Exception: + logger.debug( + "Cannot fetch sample data with random sampling. Falling back to 100 rows." + ) + logger.debug(traceback.format_exc()) + sqa_columns = list(inspect(self.table).c) + sqa_sample = ( + self.client.query(*sqa_columns).select_from(self.table).limit(100).all() + ) + return TableData( columns=[column.name for column in sqa_columns], rows=[list(row) for row in sqa_sample], diff --git a/ingestion/tests/e2e/README.md b/ingestion/tests/e2e/README.md index 117503fca028..42757a413f59 100644 --- a/ingestion/tests/e2e/README.md +++ b/ingestion/tests/e2e/README.md @@ -5,8 +5,73 @@ https://playwright.dev/python/docs/intro In the `e2e` folder you will find 2 folders and 1 file: - `conftest.py`: defines some module scope fixture (module here is the `e2e` folder). All tests will use `init_with_redshift` by default -- ingestin metadata from a redshift service. The ingestion will only happens on the first test execution. The `create_data_consumer_user` allows tests to login as a Data Consumer and perform some actions - `configs`: holds all the shared configuration. So far we have 2 main classes families (User and Connector) and common functions -- `entity`: holds entity related tests. It contains a subfolder per source. +- `entity`: holds entity related tests. It contains a subfolder per asset category. In the asset category folder you will find the `common_assertions.py`. This file contains all the common assertions to be ran for that specific asset. ## Install Dependencies and Run Tests run `make install_e2e_tests`. Run `make run_e2e_tests`, you can also pass arguments such as `make run_e2e_tests ARGS="--browser webkit"` to run tests against webkit browser or `make run_e2e_tests ARGS="--headed --slowmo 100"` to run the tests in slowmo mode and head full. +## Adding a new test +The first step is to define the connector config for your source. this happens in `configs/connectors/` folder. For a database connector, you will must ensure your class inherits from `DataBaseConnectorInterface`. You will then need to implement the `get_service()` and `set_connection()`. `get_service` specifies which service to choose from the `/add-service` page of the webside and `set_connection` the different elements to configure on the connector connection config page. If you are unsure how an element can be accessed on the page you can run `playwright codegen http://localhost:8585/` -- more info [here](https://playwright.dev/python/docs/codegen). By default `DataBaseConnectorInterface` sets `self.supports_profiler_ingestion=True` which will result in the profiler ingestion to run when the test class is executed. You can `self.supports_profiler_ingestion=False` in your specific connector to override this behavior. + +e.g. + +```python +class DruidConnector(DataBaseConnectorInterface): + """druid connector""" + + def __init__(self, config): + super().__init__(config) + self.supports_profiler_ingestion=False + + def set_connection(): + ... + + def get_service(): + ... +``` + + +Once your connector config has been created you will need to add a new test. Simply create a new file in the asset category of your choice (e.g. `entity/database/test_druid.py`). In this file create a new test class and mark this class with `@pytest.mark.usefixtures("setUpClass")` and `@pytest.mark.parametrize("setUpClass", ...)`. The first mark will make sure `setUpClass` fixture is ran before running your tests (this manage the ingestion of metadata and profiler as of Oct-25 2023) and `@pytest.mark.parametrize` will pass the right connector class to the `setUpClass` fixture. The second argument of `@pytest.mark.parametrize` should be as below +```python +[ + { + "connector_obj": ( + ConnectorTestConfig(...) + ) + } +] +``` + +`ConnectorTestConfig` defines the configuration to use for the test. It has 2 arguments: +- `ingestion`: This allows you to define the different filtering when performing the ingestion. it expects a `ConnectorIngestionTestConfig` which will take 2 arguments: + - `metadata`: this allows you to define metadata ingestion filters. It take a `IngestionTestConfig` which takes 3 arguments: + - `database`: it expects an `IngestionFilterConfig` class which takes 2 argumenst: + - `includes`: a list of str + - `excludes`: a list of str + - `schema_`: see `database` + - `table`: see `database` + - `profiler`: see `metadata` +- `validation`: this config can be used when we need to validate expectations against specific entities. As of Oct-25 2023 it is only used in the `assert_profile_data`, `assert_sample_data_ingestion` and `assert_pii_column_auto_tagging` test functions of the profiler. + +Once you have set up your class you can create your test. There are currently (as of Oct-25 2023) 5 assertions that can be performed: +- assert pipeline status are `success`. You can refer to the implementation in the existing test +- `assert_change_database_owner`: assert the owner of a data can be changed +- `assert_profile_data`: assert table profile data summary are visible +- `assert_sample_data_ingestion`: assert sample data are ingested and visible +- `assert_pii_column_auto_tagging`: assert auto PII tagging from the profiler has been performed + +Note that in every test method you define the following class attributes are accessible: +- `connector_obj`: ``` the connector class pass to `setUpClass` in the `@pytest.mark.parametrize` +- `service_name`: `str`` the name of the service that was created for the test +- `metadata_ingestion_status`: `PipelineState` the ingestion status of the metadata pipeline +- `profiler_ingestion_status`: `PipelineState` the ingestion status of the profiler pipeline. + +## Test Coverage +| **tests** | redshift | druid | hive | +|-----------------------------|:--------:|:-----:|:----:| +| metadata ingestion | ✅ | ✅ | ✅ | +| profiler ingestion | ✅ | ✅ | ✅ | +| change DB owner | ✅ | ✅ | ✅ | +| Table Profiler Summary Data | ✅ | ✅ | ✅ | +| Sample data visible | ✅ | ✅ | ✅ | +| Profiler PII auto Tag | ✅ | ✅ | ❌ | \ No newline at end of file diff --git a/ingestion/tests/e2e/configs/common.py b/ingestion/tests/e2e/configs/common.py index 7befaf72e30b..ddf6e06a6e81 100644 --- a/ingestion/tests/e2e/configs/common.py +++ b/ingestion/tests/e2e/configs/common.py @@ -5,7 +5,7 @@ from playwright.sync_api import Page, expect -from ingestion.tests.e2e.configs.users.user import User +from .users.user import User BASE_URL = "http://localhost:8585" diff --git a/ingestion/tests/e2e/entity/redshift/__init__.py b/ingestion/tests/e2e/configs/connectors/database/__init__.py similarity index 100% rename from ingestion/tests/e2e/entity/redshift/__init__.py rename to ingestion/tests/e2e/configs/connectors/database/__init__.py diff --git a/ingestion/tests/e2e/configs/connectors/database/db2.py b/ingestion/tests/e2e/configs/connectors/database/db2.py new file mode 100644 index 000000000000..5841d767886e --- /dev/null +++ b/ingestion/tests/e2e/configs/connectors/database/db2.py @@ -0,0 +1,37 @@ +"""Redshift connector for e2e tests""" + +import os + +from playwright.sync_api import Page, expect + +from .interface import DataBaseConnectorInterface + + +class Db2Connector(DataBaseConnectorInterface): + """db2 connector""" + + def get_service(self, page: Page): + """get service from the service page""" + page.get_by_test_id("Db2").click() + + def set_connection(self, page): + """Set connection for redshift service""" + page.get_by_label("Username*").fill(os.environ["E2E_DB2_USERNAME"]) + expect(page.get_by_label("Username*")).to_have_value( + os.environ["E2E_DB2_USERNAME"] + ) + + page.get_by_label("Password").fill(os.environ["E2E_DB2_PASSWORD"]) + expect(page.get_by_label("Password")).to_have_value( + os.environ["E2E_DB2_PASSWORD"] + ) + + page.get_by_label("Host and Port*").fill(os.environ["E2E_DB2_HOST_PORT"]) + expect(page.get_by_label("Host and Port*")).to_have_value( + os.environ["E2E_DB2_HOST_PORT"] + ) + + page.get_by_label("database*").fill(os.environ["E2E_DB2_DATABASE"]) + expect(page.get_by_label("database*")).to_have_value( + os.environ["E2E_DB2_DATABASE"] + ) diff --git a/ingestion/tests/e2e/configs/connectors/database/druid.py b/ingestion/tests/e2e/configs/connectors/database/druid.py new file mode 100644 index 000000000000..4b862fd6c8e1 --- /dev/null +++ b/ingestion/tests/e2e/configs/connectors/database/druid.py @@ -0,0 +1,22 @@ +"""Redshift connector for e2e tests""" + +import os + +from playwright.sync_api import Page, expect + +from .interface import DataBaseConnectorInterface + + +class DruidConnector(DataBaseConnectorInterface): + """druid connector""" + + def get_service(self, page: Page): + """get service from the service page""" + page.get_by_test_id("Druid").click() + + def set_connection(self, page): + """Set connection for redshift service""" + page.get_by_label("Host and Port*").fill(os.environ["E2E_DRUID_HOST_PORT"]) + expect(page.get_by_label("Host and Port*")).to_have_value( + os.environ["E2E_DRUID_HOST_PORT"] + ) diff --git a/ingestion/tests/e2e/configs/connectors/database/hive.py b/ingestion/tests/e2e/configs/connectors/database/hive.py new file mode 100644 index 000000000000..f767d554faa6 --- /dev/null +++ b/ingestion/tests/e2e/configs/connectors/database/hive.py @@ -0,0 +1,24 @@ +"""MySQL connector for e2e tests""" + +import os + +from playwright.sync_api import Page, expect + +from .interface import DataBaseConnectorInterface + + +class HiveConnector(DataBaseConnectorInterface): + def get_service(self, page: Page): + """get service from the service page""" + page.get_by_test_id("Hive").click() + + def set_connection(self, page): + """Set connection for redshift service""" + page.locator('[id="root\\/hostPort"]').fill(os.environ["E2E_HIVE_HOST_PORT"]) + expect(page.locator('[id="root\\/hostPort"]')).to_have_value( + os.environ["E2E_HIVE_HOST_PORT"] + ) + + page.locator('[id="root\\/metastoreConnection__oneof_select"]').select_option( + "2" + ) diff --git a/ingestion/tests/e2e/configs/connectors/database/interface.py b/ingestion/tests/e2e/configs/connectors/database/interface.py new file mode 100644 index 000000000000..f7b233f43dc0 --- /dev/null +++ b/ingestion/tests/e2e/configs/connectors/database/interface.py @@ -0,0 +1,217 @@ +"""connectors interface""" + +import random +import string +import time +from abc import ABC, abstractmethod +from time import sleep + +from playwright.sync_api import Page, TimeoutError, expect + +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineState, +) +from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( + OpenMetadataJWTClientConfig, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.time_utils import ( + get_beginning_of_day_timestamp_mill, + get_end_of_day_timestamp_mill, +) + +from ...connectors.model import ConnectorTestConfig, IngestionFilterConfig + +BASE_URL = "http://localhost:8585" + + +class DataBaseConnectorInterface(ABC): + """Interface for connectors class for e2e tests""" + + def __init__(self, config: ConnectorTestConfig): + """Initialize the connector""" + self.supports_profiler_ingestion = True + self.profiler_summary_card_count = 4 + + self.ingestion_config = config.ingestion + self.validation_config = config.validation + + self.service_type = "Databases" + self.service_name = None + self.metadata_ingestion_pipeline_fqn = None + self.profiler_ingestion_pipeline_fqn = None + self.ometa = OpenMetadata( + OpenMetadataConnection( + hostPort=f"{BASE_URL}/api", + authProvider="openmetadata", + securityConfig=OpenMetadataJWTClientConfig( + jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + ), + ) + ) + + def _check_and_handle_workflow(self, page: Page, ingestion_pipeline_fqn: str): + pipeline_status = None + try_ = 0 + sleep(1) + # we'll iterate until we get a pipeline status + while not pipeline_status: + pipeline_status = self.ometa.get_pipeline_status_between_ts( + f"{self.service_name}.{ingestion_pipeline_fqn}", + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + ) + if not pipeline_status and try_ > 10: + # if we don't get a pipeline status after trying 10 times + # we need to deploy the workflow + try: + page.get_by_role( + "row", name=f"{ingestion_pipeline_fqn}" + ).get_by_test_id("re-deploy").click() + except TimeoutError: + page.get_by_role( + "row", name=f"{ingestion_pipeline_fqn}" + ).get_by_test_id("deploy").click() + if try_ > 20: + # if we've tried 20 times, we'll raise an exception + raise TimeoutError("Pipeline status not found") + try_ += 1 + + @abstractmethod + def get_service(self, page: Page): + """get service from the service page""" + raise NotImplementedError + + @abstractmethod + def set_connection(self, page: Page): + """Set connection for redshift service""" + raise NotImplementedError + + @staticmethod + def generate_service_name(): + """Generate a random service name""" + chinese_char = "".join([chr(random.randint(0x4E00, 0x9FBF)) for _ in range(3)]) + cyrillic_char = "".join([chr(random.randint(1072, 1104)) for _ in range(3)]) + return ( + "".join(random.choices(string.ascii_lowercase, k=10)) + + chinese_char + + cyrillic_char + + "_-1" + ) + + def _set_ingestion_filter(self, type_: str, page: Page): + """Set schema filter for redshift service""" + filter_config: IngestionFilterConfig = getattr(self.ingestion_config, type_) + if not filter_config: + return + + for container_type, value in filter_config: + if not value: + continue + if container_type == "schema_": + container_type = "schema" + for filter_type, filter_elements in value: + if not filter_elements: + continue + for element in filter_elements: + page.locator( + f'xpath=//*[@id="root/{container_type}FilterPattern/{filter_type}"]' + ).fill(element) + + def get_sorted_ingestion_pipeline_statues( + self, ingestion_pipeline_fqn: str, desc=True + ): + statuses = self.ometa.get_pipeline_status_between_ts( + ingestion_pipeline_fqn, + get_beginning_of_day_timestamp_mill(), + get_end_of_day_timestamp_mill(), + ) + return sorted( + statuses, + key=lambda x: x.startDate.__root__, + reverse=True if desc else False, + ) + + def get_pipeline_status(self, ingestion_pipeline_fqn: str): + # Not best practice. Should use `expect`, though playwright does not have a `wait_until` function + # we'll make a call to the API to get the pipeline status and check if it's success + status = None + timeout = time.time() + 60 * 5 # 5 minutes from now + + while not status or status == PipelineState.running: + if time.time() > timeout: + raise TimeoutError( + "Pipeline with status {status} has been running for more than 5 minutes" + ) + statuses = self.get_sorted_ingestion_pipeline_statues( + ingestion_pipeline_fqn, + ) + # we'll get the state of the most recent pipeline run + status = statuses[0].pipelineState + if status != PipelineState.running: + break + + return status + + def create_service_ingest_metadata(self, page: Page): + """Ingest redshift service data + + Args: + page (Page): playwright page. Should be logged in and pointing to the home page + e.g. page.goto(f"{BASE_URL}/") + """ + page.get_by_test_id("app-bar-item-settings").click() + page.get_by_text(self.service_type).click() + page.get_by_test_id("add-service-button").click() + self.get_service(page) + page.get_by_test_id("next-button").click() + self.service_name = self.generate_service_name() + page.get_by_test_id("service-name").fill(self.service_name) + expect(page.get_by_test_id("service-name")).to_have_value(self.service_name) + page.get_by_test_id("next-button").click() + self.set_connection(page) + page.get_by_test_id("submit-btn").click() + page.get_by_test_id("add-ingestion-button").click() + self._set_ingestion_filter("metadata", page) + self.metadata_ingestion_pipeline_fqn = page.get_by_label("name*").input_value() + page.get_by_test_id("submit-btn").click() + page.get_by_test_id("deploy-button").click() + page.get_by_test_id("view-service-button").click() + page.get_by_test_id("ingestions").click() + self._check_and_handle_workflow(page, self.metadata_ingestion_pipeline_fqn) + return self.service_name + + def create_profiler_workflow(self, page: Page): + """create profiler workflow""" + page.get_by_test_id("app-bar-item-settings").click() + page.get_by_text("Databases").click() + page.get_by_test_id(f"service-name-{self.service_name}").click() + page.get_by_text("Ingestions").click() + page.get_by_test_id("add-new-ingestion-button").click() + page.get_by_text("Add Profiler Ingestion").click() + page.locator( + "div:nth-child(5) > div > div:nth-child(2) > .form-group > .ant-row > div:nth-child(2) > .ant-select > .ant-select-selector > .ant-select-selection-overflow" + ).click() + self._set_ingestion_filter("profiler", page) + page.locator('[id="root\\/processPiiSensitive"]').click() + self.profiler_ingestion_pipeline_fqn = page.get_by_label("name*").input_value() + page.get_by_test_id("submit-btn").click() + page.get_by_test_id("deploy-button").click() + page.get_by_test_id("view-service-button").click() + page.get_by_test_id("ingestions").click() + self._check_and_handle_workflow(page, self.profiler_ingestion_pipeline_fqn) + + def delete_service(self, page: Page): + """Delete service""" + page.goto(f"{BASE_URL}/") + page.get_by_test_id("app-bar-item-settings").click() + page.get_by_text("Databases").click() + page.get_by_test_id(f"service-name-{self.service_name}").click() + page.get_by_test_id("manage-button").click() + page.get_by_test_id("delete-button-title").click() + page.get_by_test_id("confirmation-text-input").fill("DELETE") + expect(page.get_by_test_id("confirmation-text-input")).to_have_value("DELETE") + page.get_by_test_id("confirm-button").click() diff --git a/ingestion/tests/e2e/configs/connectors/redshift.py b/ingestion/tests/e2e/configs/connectors/database/redshift.py similarity index 94% rename from ingestion/tests/e2e/configs/connectors/redshift.py rename to ingestion/tests/e2e/configs/connectors/database/redshift.py index 746060d796ff..e70d144314bc 100644 --- a/ingestion/tests/e2e/configs/connectors/redshift.py +++ b/ingestion/tests/e2e/configs/connectors/database/redshift.py @@ -26,7 +26,7 @@ def set_connection(self, page): expect(page.get_by_label("Host and Port")).to_have_value( os.environ["E2E_REDSHIFT_HOST_PORT"] ) - page.get_by_label("Database*").fill(os.environ["E2E_REDSHIFT_DATABASE"]) + page.get_by_label("Database*").fill(os.environ["E2E_REDSHIFT_DB"]) expect(page.get_by_label("Database*")).to_have_value( - os.environ["E2E_REDSHIFT_DATABASE"] + os.environ["E2E_REDSHIFT_DB"] ) diff --git a/ingestion/tests/e2e/configs/connectors/interface.py b/ingestion/tests/e2e/configs/connectors/interface.py deleted file mode 100644 index e4ec2c50366f..000000000000 --- a/ingestion/tests/e2e/configs/connectors/interface.py +++ /dev/null @@ -1,133 +0,0 @@ -"""connectors interface""" - -import random -import re -import string -from abc import ABC, abstractmethod -from typing import List - -from playwright.sync_api import Page, expect - -from ingestion.tests.e2e.configs.users.admin import Admin - -BASE_URL = "http://localhost:8585" - - -class DataBaseConnectorInterface(ABC): - """Interface for connectors class for e2e tests""" - - def __init__(self, schema_filters: List[str] = [], table_filters: List[str] = []): - """Initialize the connector""" - self.schema_filters = list(schema_filters) - self.table_filters = list(table_filters) - self.service_type = "Databases" - self.service_name = None - - def _check_and_handle_workflow(self, page: Page, type_: str): - try: - expect( - page.get_by_role( - "row", name=re.compile(f"{self.service_name}_{type_}_.*") - ).get_by_test_id("re-deploy-btn") - ).to_be_visible(timeout=1000) - except (TimeoutError, AssertionError): - page.get_by_role( - "row", name=re.compile(f"{self.service_name}_{type_}_.*") - ).get_by_test_id("deploy").click() - finally: - expect( - page.get_by_role( - "row", name=re.compile(f"{self.service_name}_{type_}_.*") - ).get_by_test_id("re-deploy-btn") - ).to_be_visible() - - @abstractmethod - def get_service(self, page: Page): - """get service from the service page""" - raise NotImplementedError - - @abstractmethod - def set_connection(self, page: Page): - """Set connection for redshift service""" - raise NotImplementedError - - @staticmethod - def generate_service_name(): - """Generate a random service name""" - chinese_char = "".join([chr(random.randint(0x4E00, 0x9FBF)) for _ in range(3)]) - cyrillic_char = "".join([chr(random.randint(1072, 1104)) for _ in range(3)]) - return ( - "".join(random.choices(string.ascii_lowercase, k=10)) - + chinese_char - + cyrillic_char - + "_-1" - ) - - def _set_schema_filter(self, page: Page): - """Set schema filter for redshift service""" - for schema in self.schema_filters: - page.locator('xpath=//*[@id="root/schemaFilterPattern/includes"]').fill( - schema - ) - - def _set_table_filter(self, page: Page): - """Set schema filter for redshift service""" - for table in self.table_filters: - page.locator('[id="root\\/tableFilterPattern\\/includes"]').fill(table) - - def create_service_ingest_metadata(self, page: Page): - """Ingest redshift service data""" - page.goto(f"{BASE_URL}/") - Admin().login(page) - page.get_by_test_id("app-bar-item-settings").click() - page.get_by_text(self.service_type).click() - page.get_by_test_id("add-service-button").click() - self.get_service(page) - page.get_by_test_id("next-button").click() - self.service_name = self.generate_service_name() - page.get_by_test_id("service-name").fill(self.service_name) - expect(page.get_by_test_id("service-name")).to_have_value(self.service_name) - page.get_by_test_id("next-button").click() - self.set_connection(page) - page.get_by_test_id("submit-btn").click() - page.get_by_test_id("add-ingestion-button").click() - self._set_schema_filter(page) - page.get_by_test_id("submit-btn").click() - page.get_by_test_id("deploy-button").click() - page.get_by_test_id("view-service-button").click() - page.get_by_test_id("ingestions").click() - self._check_and_handle_workflow(page, "metadata") - return self.service_name - - def create_profiler_workflow(self, page: Page): - """create profiler workflow""" - page.goto(f"{BASE_URL}/") - Admin().login(page) - page.get_by_test_id("app-bar-item-settings").click() - page.get_by_text("Databases").click() - page.get_by_test_id(f"service-name-{self.service_name}").click() - page.get_by_text("Ingestions").click() - page.get_by_test_id("add-new-ingestion-button").click() - page.get_by_text("Add Profiler Ingestion").click() - page.locator( - "div:nth-child(5) > div > div:nth-child(2) > .form-group > .ant-row > div:nth-child(2) > .ant-select > .ant-select-selector > .ant-select-selection-overflow" - ).click() - self._set_table_filter(page) - page.locator('[id="root\\/processPiiSensitive"]').click() - page.get_by_test_id("submit-btn").click() - page.get_by_test_id("deploy-button").click() - page.get_by_test_id("view-service-button").click() - page.get_by_test_id("ingestions").click() - self._check_and_handle_workflow(page, "profiler") - - def delete_service(self, page: Page): - """Delete service""" - page.goto(f"{BASE_URL}/") - page.get_by_test_id("app-bar-item-settings").click() - page.get_by_text("Databases").click() - page.get_by_test_id(f"service-name-{self.service_name}").click() - page.get_by_test_id("manage-button").click() - page.get_by_test_id("delete-button-title").click() - page.get_by_test_id("confirmation-text-input").fill("DELETE") - expect(page.get_by_test_id("confirmation-text-input")).to_have_value("DELETE") - page.get_by_test_id("confirm-button").click() diff --git a/ingestion/tests/e2e/configs/connectors/model.py b/ingestion/tests/e2e/configs/connectors/model.py new file mode 100644 index 000000000000..02b567c4d6ce --- /dev/null +++ b/ingestion/tests/e2e/configs/connectors/model.py @@ -0,0 +1,38 @@ +"""Connector model config for testing.""" + +from typing import Optional + +from pydantic import BaseModel + + +class IngestionFilterConfig(BaseModel): + includes: Optional[list[str]] = [] + excludes: Optional[list[str]] = [] + + +class IngestionTestConfig(BaseModel): + database: Optional[IngestionFilterConfig] + schema_: Optional[IngestionFilterConfig] + table: Optional[IngestionFilterConfig] + + +class ConnectorIngestionTestConfig(BaseModel): + metadata: Optional[IngestionTestConfig] + profiler: Optional[IngestionTestConfig] + + +class ValidationTestConfig(BaseModel): + service: Optional[str] + database: Optional[str] + schema_: Optional[str] + table: Optional[str] + + +class ConnectorValidationTestConfig(BaseModel): + metadata: Optional[ValidationTestConfig] + profiler: Optional[ValidationTestConfig] + + +class ConnectorTestConfig(BaseModel): + ingestion: Optional[ConnectorIngestionTestConfig] + validation: Optional[ConnectorValidationTestConfig] diff --git a/ingestion/tests/e2e/configs/users/admin.py b/ingestion/tests/e2e/configs/users/admin.py index 6bc9dee27b40..1c88a53d83de 100644 --- a/ingestion/tests/e2e/configs/users/admin.py +++ b/ingestion/tests/e2e/configs/users/admin.py @@ -1,6 +1,6 @@ """Admin user configuration for e2e tests.""" -from ingestion.tests.e2e.configs.users.user import User +from ...configs.users.user import User class Admin(User): diff --git a/ingestion/tests/e2e/conftest.py b/ingestion/tests/e2e/conftest.py index 341bda4f0e4d..864e293a5763 100644 --- a/ingestion/tests/e2e/conftest.py +++ b/ingestion/tests/e2e/conftest.py @@ -2,10 +2,10 @@ import pytest -from playwright.sync_api import Browser, expect +from playwright.sync_api import Browser, Page, expect -from ingestion.tests.e2e.configs.common import create_user -from ingestion.tests.e2e.configs.users.admin import Admin +from .configs.common import go_to_service +from .configs.users.admin import Admin TIMEOUT = 60000 BASE_URL = "http://localhost:8585" @@ -29,19 +29,48 @@ def browser_context_args(browser_context_args): } -@pytest.fixture(scope="session") -def create_data_consumer_user(browser: Browser): - """Create a data consumer user""" - context_ = browser.new_context( - base_url=BASE_URL, - java_script_enabled=True, - ) - page = context_.new_page() +@pytest.fixture(scope="function") +def admin_page_context(page: Page): page.goto("/") Admin().login(page) - data_consumer = create_user( - page, "data-consumer@example.com", "Data Consumer User", "Data Consumer" + yield page + page.close() + + +@pytest.fixture(scope="class") +def setUpClass(browser: Browser, request): # pylint: disable=invalid-name + """set up class for ingestion pipelines""" + context_ = browser.new_context(base_url=BASE_URL) + page = context_.new_page() + page.goto(f"{BASE_URL}/") + Admin().login(page) + + connector_obj = request.param["connector_obj"] + request.cls.connector_obj = connector_obj + + # create service and ingest metadata + connector_obj.create_service_ingest_metadata(page) + request.cls.service_name = connector_obj.service_name + page.get_by_text("Ingestions").click() + # Not best practice. Should use `expect`, though playwright does not have a `wait_until` function + # we'll make a call to the API to get the pipeline status and check if it's success + request.cls.metadata_ingestion_status = connector_obj.get_pipeline_status( + f"{connector_obj.service_name}.{connector_obj.metadata_ingestion_pipeline_fqn}" ) - yield data_consumer - data_consumer.delete(page) + + if connector_obj.supports_profiler_ingestion: + connector_obj.create_profiler_workflow(page) + go_to_service("Databases", page, connector_obj.service_name) + page.get_by_text("Ingestions").click() + + # Not best practice. Should use `expect`, though playwright does not have a `wait_until` function + # we'll make a call to the API to get the pipeline status and check if it's success + request.cls.profiler_ingestion_status = connector_obj.get_pipeline_status( + f"{connector_obj.service_name}.{connector_obj.profiler_ingestion_pipeline_fqn}" + ) + else: + request.cls.profiler_ingestion_status = None + + yield + connector_obj.delete_service(page) context_.close() diff --git a/ingestion/tests/e2e/entity/redshift/metadata/__init__.py b/ingestion/tests/e2e/entity/database/__init__.py similarity index 100% rename from ingestion/tests/e2e/entity/redshift/metadata/__init__.py rename to ingestion/tests/e2e/entity/database/__init__.py diff --git a/ingestion/tests/e2e/entity/database/common_assertions.py b/ingestion/tests/e2e/entity/database/common_assertions.py new file mode 100644 index 000000000000..692d15124f78 --- /dev/null +++ b/ingestion/tests/e2e/entity/database/common_assertions.py @@ -0,0 +1,80 @@ +"""common database assertions""" + +import time + +from playwright.sync_api import Page, expect + +from ...configs.common import go_to_service + + +def assert_change_database_owner(page_context: Page, service_name: str): + """assert database owner can be changed as expected""" + go_to_service("Databases", page_context, service_name) + page_context.get_by_test_id("edit-owner").click() + page_context.get_by_test_id("owner-select-users-search-bar").click() + page_context.get_by_test_id("owner-select-users-search-bar").fill("Aaron Johnson") + page_context.get_by_text("Aaron Johnson").click() + expect( + page_context.get_by_test_id("owner-label").get_by_test_id("owner-link") + ).to_have_text("Aaron Johnson") + + +def assert_profile_data( + page_context: Page, + service_name: str, + database: str, + schema: str, + table: str, + connector_obj, +): + """Assert profile data have been computed correctly""" + go_to_service("Databases", page_context, service_name) + page_context.get_by_role("link", name=database).click() + page_context.get_by_role("link", name=schema).click() + page_context.get_by_role("link", name=table, exact=True).click() + page_context.get_by_text("Profiler & Data Quality").click() + time.sleep(0.05) + for card in range(connector_obj.profiler_summary_card_count): + summary_card = page_context.get_by_test_id("summary-card-container").nth(card) + description = summary_card.get_by_test_id( + "summary-card-description" + ).inner_text() + assert description not in {"0"} + + +def assert_sample_data_ingestion( + page_context: Page, + service_name: str, + database: str, + schema: str, + table: str, +): + """assert sample data are ingested as expected""" + go_to_service("Databases", page_context, service_name) + page_context.get_by_role("link", name=database).click() + page_context.get_by_role("link", name=schema).click() + page_context.get_by_role("link", name=table, exact=True).click() + page_context.get_by_text("Sample Data").click() + + expect(page_context.get_by_test_id("sample-data")).to_be_visible() + + +def assert_pii_column_auto_tagging( + page_context: Page, + service_name: str, + database: str, + schema: str, + table: str, + column: str, +): + """assert pii column auto tagging tagged as expected""" + go_to_service("Databases", page_context, service_name) + page_context.get_by_role("link", name=database).click() + page_context.get_by_role("link", name=schema).click() + page_context.get_by_role("link", name=table, exact=True).click() + + time.sleep(0.05) + table_row = page_context.locator(f'tr:has-text("{column}")') + tag = table_row.locator("td:nth-child(4)") + expect(tag).to_be_visible() + assert tag.text_content() in {"Sensitive", "NonSensitive"} diff --git a/ingestion/tests/e2e/entity/database/test_db2.py b/ingestion/tests/e2e/entity/database/test_db2.py new file mode 100644 index 000000000000..aaa0f5b0aa83 --- /dev/null +++ b/ingestion/tests/e2e/entity/database/test_db2.py @@ -0,0 +1,40 @@ +"""Test Db2 database ingestion.""" + +import pytest + +from ...configs.connectors.database.db2 import Db2Connector +from ...configs.connectors.model import ( + ConnectorIngestionTestConfig, + ConnectorTestConfig, + ConnectorValidationTestConfig, + IngestionFilterConfig, + IngestionTestConfig, + ValidationTestConfig, +) + + +@pytest.mark.parametrize( + "setUpClass", + [ + { + "connector_obj": Db2Connector( + ConnectorTestConfig( + ingestion=ConnectorIngestionTestConfig( + metadata=IngestionTestConfig( + database=IngestionFilterConfig(includes=["testdb"]), + ), # type: ignore + ), + validation=ConnectorValidationTestConfig( + profiler=ValidationTestConfig( + database="testdb", schema_="sampledata", table="customer" + ) # type: ignore + ), + ) + ) + } + ], + indirect=True, +) +@pytest.mark.usefixtures("setUpClass") +class TestDb2Connector: + """We need to validate dependency can be installed in the test env.""" diff --git a/ingestion/tests/e2e/entity/database/test_druid.py b/ingestion/tests/e2e/entity/database/test_druid.py new file mode 100644 index 000000000000..315dec9ff9c1 --- /dev/null +++ b/ingestion/tests/e2e/entity/database/test_druid.py @@ -0,0 +1,97 @@ +"""Test default database ingestion (Druid).""" + + +import pytest +from playwright.sync_api import Page + +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineState, +) + +from ...configs.connectors.database.druid import DruidConnector +from ...configs.connectors.model import ( + ConnectorIngestionTestConfig, + ConnectorTestConfig, + ConnectorValidationTestConfig, + IngestionFilterConfig, + IngestionTestConfig, + ValidationTestConfig, +) +from ...entity.database.common_assertions import ( + assert_change_database_owner, + assert_pii_column_auto_tagging, + assert_profile_data, + assert_sample_data_ingestion, +) + + +@pytest.mark.parametrize( + "setUpClass", + [ + { + "connector_obj": DruidConnector( + ConnectorTestConfig( + ingestion=ConnectorIngestionTestConfig( + metadata=IngestionTestConfig( + schema_=IngestionFilterConfig(includes=["druid"]), + ), # type: ignore + profiler=IngestionTestConfig( + schema_=IngestionFilterConfig(includes=["druid"]), + ), # type: ignore + ), + validation=ConnectorValidationTestConfig( + profiler=ValidationTestConfig( + database="default", schema_="druid", table="inline_data" + ) # type: ignore + ), + ) + ) + } + ], + indirect=True, +) +@pytest.mark.usefixtures("setUpClass") +class TestDruidConnector: + """Druid connector test case""" + + def test_pipelines_statuses(self): + """check ingestion pipelines ran successfully""" + assert self.metadata_ingestion_status == PipelineState.success + # if the connector does not support profiler ingestion return None as status + assert self.profiler_ingestion_status in {PipelineState.success, None} + + def test_change_database_owner(self, admin_page_context: Page): + """test change database owner""" + assert_change_database_owner(admin_page_context, self.service_name) + + def test_check_profile_data(self, admin_page_context: Page): + """check profile data are visible""" + assert_profile_data( + admin_page_context, + self.service_name, + self.connector_obj.validation_config.profiler.database, + self.connector_obj.validation_config.profiler.schema_, + self.connector_obj.validation_config.profiler.table, + self.connector_obj, + ) + + def test_sample_data_ingestion(self, admin_page_context: Page): + """test sample dta is ingested as expected for the table""" + assert_sample_data_ingestion( + admin_page_context, + self.service_name, + self.connector_obj.validation_config.profiler.database, + self.connector_obj.validation_config.profiler.schema_, + self.connector_obj.validation_config.profiler.table, + ) + + def test_pii_colum_auto_tagging(self, admin_page_context: Page): + """check pii column auto tagging tagged as expected""" + assert_pii_column_auto_tagging( + admin_page_context, + self.service_name, + self.connector_obj.validation_config.profiler.database, + self.connector_obj.validation_config.profiler.schema_, + self.connector_obj.validation_config.profiler.table, + "cityName", + ) diff --git a/ingestion/tests/e2e/entity/database/test_hive.py b/ingestion/tests/e2e/entity/database/test_hive.py new file mode 100644 index 000000000000..7bec8310708b --- /dev/null +++ b/ingestion/tests/e2e/entity/database/test_hive.py @@ -0,0 +1,81 @@ +"""Test Hive database ingestion.""" + +import pytest +from playwright.sync_api import Page + +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineState, +) + +from ...configs.connectors.database.hive import HiveConnector +from ...configs.connectors.model import ( + ConnectorIngestionTestConfig, + ConnectorTestConfig, + ConnectorValidationTestConfig, + IngestionFilterConfig, + IngestionTestConfig, + ValidationTestConfig, +) +from ...entity.database.common_assertions import ( + assert_change_database_owner, + assert_profile_data, + assert_sample_data_ingestion, +) + + +@pytest.mark.parametrize( + "setUpClass", + [ + { + "connector_obj": HiveConnector( + ConnectorTestConfig( + ingestion=ConnectorIngestionTestConfig( + metadata=IngestionTestConfig( + database=IngestionFilterConfig(includes=["default"]), + ), # type: ignore + ), + validation=ConnectorValidationTestConfig( + profiler=ValidationTestConfig( + database="default", schema_="default", table="t1" + ) # type: ignore + ), + ) + ) + } + ], + indirect=True, +) +@pytest.mark.usefixtures("setUpClass") +class TestHiveConnector: + """Hive connector test case""" + + def test_pipelines_statuses(self): + """check ingestion pipelines ran successfully""" + assert self.metadata_ingestion_status == PipelineState.success + # if the connector does not support profiler ingestion return None as status + assert self.profiler_ingestion_status in {PipelineState.success, None} + + def test_change_database_owner(self, admin_page_context: Page): + """test change database owner""" + assert_change_database_owner(admin_page_context, self.service_name) + + def test_check_profile_data(self, admin_page_context: Page): + """check profile data are visible""" + assert_profile_data( + admin_page_context, + self.service_name, + self.connector_obj.validation_config.profiler.database, + self.connector_obj.validation_config.profiler.schema_, + self.connector_obj.validation_config.profiler.table, + self.connector_obj, + ) + + def test_sample_data_ingestion(self, admin_page_context: Page): + """test sample dta is ingested as expected for the table""" + assert_sample_data_ingestion( + admin_page_context, + self.service_name, + self.connector_obj.validation_config.profiler.database, + self.connector_obj.validation_config.profiler.schema_, + self.connector_obj.validation_config.profiler.table, + ) diff --git a/ingestion/tests/e2e/entity/database/test_redshift.py b/ingestion/tests/e2e/entity/database/test_redshift.py new file mode 100644 index 000000000000..96583959b4c2 --- /dev/null +++ b/ingestion/tests/e2e/entity/database/test_redshift.py @@ -0,0 +1,99 @@ +"""Test default database ingestion (Redshift).""" + + +import pytest +from playwright.sync_api import Page + +from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( + PipelineState, +) + +from ...configs.connectors.database.redshift import RedshiftConnector +from ...configs.connectors.model import ( + ConnectorIngestionTestConfig, + ConnectorTestConfig, + ConnectorValidationTestConfig, + IngestionFilterConfig, + IngestionTestConfig, + ValidationTestConfig, +) +from ...entity.database.common_assertions import ( + assert_change_database_owner, + assert_pii_column_auto_tagging, + assert_profile_data, + assert_sample_data_ingestion, +) + + +@pytest.mark.parametrize( + "setUpClass", + [ + { + "connector_obj": RedshiftConnector( + ConnectorTestConfig( + ingestion=ConnectorIngestionTestConfig( + metadata=IngestionTestConfig( + schema_=IngestionFilterConfig(includes=["dbt_jaffle"]), + ), # type: ignore + profiler=IngestionTestConfig( + table=IngestionFilterConfig(includes=["customer"]), + ), # type: ignore + ), + validation=ConnectorValidationTestConfig( + profiler=ValidationTestConfig( + database="e2e_cli_tests", + schema_="dbt_jaffle", + table="customer", + ) # type: ignore + ), + ) + ) + } + ], + indirect=True, +) +@pytest.mark.usefixtures("setUpClass") +class TestRedshiftConnector: + """Redshift connector test case""" + + def test_pipelines_statuses(self): + """check ingestion pipelines ran successfully""" + assert self.metadata_ingestion_status == PipelineState.success + # if the connector does not support profiler ingestion return None as status + assert self.profiler_ingestion_status in {PipelineState.success, None} + + def test_change_database_owner(self, admin_page_context: Page): + """test change database owner""" + assert_change_database_owner(admin_page_context, self.service_name) + + def test_check_profile_data(self, admin_page_context: Page): + """check profile data are visible""" + assert_profile_data( + admin_page_context, + self.service_name, + self.connector_obj.validation_config.profiler.database, + self.connector_obj.validation_config.profiler.schema_, + self.connector_obj.validation_config.profiler.table, + self.connector_obj, + ) + + def test_sample_data_ingestion(self, admin_page_context: Page): + """test sample dta is ingested as expected for the table""" + assert_sample_data_ingestion( + admin_page_context, + self.service_name, + self.connector_obj.validation_config.profiler.database, + self.connector_obj.validation_config.profiler.schema_, + self.connector_obj.validation_config.profiler.table, + ) + + def test_pii_colum_auto_tagging(self, admin_page_context: Page): + """check pii column auto tagging tagged as expected""" + assert_pii_column_auto_tagging( + admin_page_context, + self.service_name, + self.connector_obj.validation_config.profiler.database, + self.connector_obj.validation_config.profiler.schema_, + self.connector_obj.validation_config.profiler.table, + "c_name", + ) diff --git a/ingestion/tests/e2e/entity/redshift/conftest.py b/ingestion/tests/e2e/entity/redshift/conftest.py deleted file mode 100644 index 12e292f8d3b0..000000000000 --- a/ingestion/tests/e2e/entity/redshift/conftest.py +++ /dev/null @@ -1,15 +0,0 @@ -"""Module fixture for data quality e2e tests""" - - -import pytest - -from ingestion.tests.e2e.configs.connectors.redshift import RedshiftConnector - -BASE_URL = "http://localhost:8585" - - -@pytest.fixture(scope="session") -def redshift_connector(): - """Create a redshift connector""" - redshift = RedshiftConnector(["dbt_jaffle"], ["customers"]) - yield redshift diff --git a/ingestion/tests/e2e/entity/redshift/metadata/test_entity_page.py b/ingestion/tests/e2e/entity/redshift/metadata/test_entity_page.py deleted file mode 100644 index eaa577029ea8..000000000000 --- a/ingestion/tests/e2e/entity/redshift/metadata/test_entity_page.py +++ /dev/null @@ -1,67 +0,0 @@ -""" -Entity metadata tests. Scenarios tested: - - -""" -import re - -import pytest -from playwright.sync_api import Page, expect - -from ingestion.tests.e2e.configs.common import go_to_service -from ingestion.tests.e2e.configs.users.admin import Admin - - -@pytest.mark.order(1) -def test_assert_metadata_ingestion_status_success(redshift_connector, page: Page): - """Assert that the ingestion status is success""" - - redshift_connector.create_service_ingest_metadata(page) - service_name = redshift_connector.service_name - go_to_service("Databases", page, service_name) - page.get_by_text("Ingestions").click() - - # Not best practice. Should use `expect`, though playwright does not have a `wait_until` function - status = ( - page.get_by_role("row", name=re.compile(f"^{service_name}_metadata_.*")) - .get_by_test_id("pipeline-status") - .text_content() - ) - while status in ("--", "Running"): - page.reload() - status = ( - page.get_by_role("row", name=re.compile(f"^{service_name}_metadata_.*")) - .get_by_test_id("pipeline-status") - .text_content() - ) - - assert status == "Success" - - -def test_change_database_owner(redshift_connector, page: Page): - """Test changing the database owner works as expected""" - - service_name = redshift_connector.service_name - page.goto("/") - Admin().login(page) - go_to_service("Databases", page, service_name) - page.get_by_test_id("edit-owner").click() - # page.get_by_role("tab", name="Users.*").click() - page.get_by_test_id("owner-select-users-search-bar").click() - page.get_by_test_id("owner-select-users-search-bar").fill("created-user") - page.get_by_text("created-user").click() - expect( - page.get_by_test_id("owner-label").get_by_test_id("owner-link") - ).to_have_text("created-user") - - -def test_data_consumer(redshift_connector, create_data_consumer_user, page: Page): - """...""" - - service_name = redshift_connector.service_name - user = create_data_consumer_user - page.goto("/") - user.login(page) - go_to_service("Databases", page, service_name) - expect(page.get_by_test_id("ingestions")).not_to_be_visible() - expect(page.get_by_test_id("data-testid")).not_to_be_visible() - expect(page.get_by_test_id("databases")).to_be_visible() diff --git a/ingestion/tests/e2e/entity/redshift/profiler/__init__.py b/ingestion/tests/e2e/entity/redshift/profiler/__init__.py deleted file mode 100644 index e69de29bb2d1..000000000000 diff --git a/ingestion/tests/e2e/entity/redshift/profiler/test_profiler_page.py b/ingestion/tests/e2e/entity/redshift/profiler/test_profiler_page.py deleted file mode 100644 index 88b124b3521b..000000000000 --- a/ingestion/tests/e2e/entity/redshift/profiler/test_profiler_page.py +++ /dev/null @@ -1,36 +0,0 @@ -""" -Entity profiler tests. Scenarios tested: - - -""" -import re -import time - -from playwright.sync_api import Page - -from ingestion.tests.e2e.configs.common import go_to_service - - -def test_assert_profiler_ingestion_status_success(redshift_connector, page: Page): - """test profiler ingestion status""" - - service_name = redshift_connector.service_name - redshift_connector.create_profiler_workflow(page) - go_to_service("Databases", page, service_name) - page.get_by_text("Ingestions").click() - - # Not best practice. Should use `expect`, though playwright does not have a `wait_until` function - status = ( - page.get_by_role("row", name=re.compile(f"^{service_name}_profiler_.*")) - .get_by_test_id("pipeline-status") - .text_content() - ) - while status in ("--", "Running"): - time.sleep(2) - page.reload() - status = ( - page.get_by_role("row", name=re.compile(f"{service_name}_profiler_.*")) - .get_by_test_id("pipeline-status") - .text_content() - ) - - assert status == "Success" diff --git a/openmetadata-docs/content/v1.2.x/connectors/ingestion/workflows/profiler/index.md b/openmetadata-docs/content/v1.2.x/connectors/ingestion/workflows/profiler/index.md index fa71d85f2ab9..932185d2c269 100644 --- a/openmetadata-docs/content/v1.2.x/connectors/ingestion/workflows/profiler/index.md +++ b/openmetadata-docs/content/v1.2.x/connectors/ingestion/workflows/profiler/index.md @@ -67,6 +67,8 @@ Set the sample to be use by the profiler for the specific table. - `Percentage`: Value must be between 0 and 100 exclusive (0 < percentage < 100). This will sample the table based on a percentage - `Row Count`: The table will be sampled based on a number of rows (i.e. `1,000`, `2,000`), etc. +⚠️ This option is currently not support for Druid. Sampling leverage `RANDOM` functions in most database (some have specific sampling functions) and Druid provides neither of these option. We recommend using the partitionning or sample query option if you need to limit the amount of data scanned. + **Auto PII Tagging (Optional)** Configuration to automatically tag columns that might contain sensitive information. @@ -107,6 +109,8 @@ Set the sample to be use by the profiler for the specific table. - `Percentage`: Value must be between 0 and 100 exclusive (0 < percentage < 100). This will sample the table based on a percentage - `Row Count`: The table will be sampled based on a number of rows (i.e. `1,000`, `2,000`), etc. +⚠️ This option is currently not support for Druid. Sampling leverage `RANDOM` functions in most database (some have specific sampling functions) and Druid provides neither of these option. We recommend using the partitionning or sample query option if you need to limit the amount of data scanned. + **Profile Sample Query** Use a query to sample data for the profiler. This will overwrite any profle sample set.