Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes Druid Profiler failures #13700

Merged
merged 26 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c283030
fix: updated playwrigth test structure
TeddyCr Oct 25, 2023
94a20e5
fix: druid profiler queries
TeddyCr Oct 25, 2023
56c582a
fix: python linting
TeddyCr Oct 25, 2023
e504582
fix: python linting
TeddyCr Oct 25, 2023
eb2ea16
fix: do not compute random sample if profile sample is 100
TeddyCr Oct 25, 2023
ecfe2d9
fix: updated workflow to test on push
TeddyCr Oct 25, 2023
bcf0cd6
fix: move connector config to category folder
TeddyCr Oct 25, 2023
d914405
fix: updated imports
TeddyCr Oct 25, 2023
1b075dd
fix: added pytest-dependency package
TeddyCr Oct 25, 2023
f7a3565
fix: updated readme.md
TeddyCr Oct 25, 2023
eaaeb27
fix: python linting
TeddyCr Oct 25, 2023
0271622
fix: updated profile doc for Druid sampling
TeddyCr Oct 25, 2023
10b9ca0
Merge remote-tracking branch 'upstream/main' into playwright-branch
TeddyCr Oct 25, 2023
9297ea1
fix: empty commit for CI
TeddyCr Oct 25, 2023
2c907da
fix: added workflow constrain back
TeddyCr Oct 25, 2023
444d170
fix: sonar code smell
TeddyCr Oct 25, 2023
d441400
fix: added secrets to container
TeddyCr Oct 25, 2023
686b1b2
Update openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/ingestion…
TeddyCr Oct 25, 2023
fb03810
Update openmetadata-docs/content/v1.2.x-SNAPSHOT/connectors/ingestion…
TeddyCr Oct 25, 2023
4109d60
Merge remote-tracking branch 'origin/druid-fixes' into druid-fixes
TeddyCr Oct 25, 2023
72870b6
Update ingestion/tests/e2e/entity/database/test_redshift.py
TeddyCr Oct 25, 2023
1824ad8
fix: ran pylint
TeddyCr Oct 25, 2023
cb5f6fa
Merge remote-tracking branch 'upstream/main' into druid-fixes
TeddyCr Oct 25, 2023
e525ffc
fix: updated redshift env var.
TeddyCr Oct 25, 2023
4854406
Merge remote-tracking branch 'upstream/main' into druid-fixes
TeddyCr Oct 25, 2023
b296d9d
fix: import linting
TeddyCr Oct 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/playwright-integration-tests-mysql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ jobs:
E2E_REDSHIFT_HOST_PORT: ${{ secrets.E2E_REDSHIFT_HOST_PORT }}
E2E_REDSHIFT_USERNAME: ${{ secrets.E2E_REDSHIFT_USERNAME }}
E2E_REDSHIFT_PASSWORD: ${{ secrets.E2E_REDSHIFT_PASSWORD }}
E2E_REDSHIFT_DATABASE: ${{ secrets.E2E_REDSHIFT_DATABASE }}
E2E_REDSHIFT_DB: ${{ secrets.E2E_REDSHIFT_DB }}
E2E_DRUID_HOST_PORT: ${{ secrets.E2E_DRUID_HOST_PORT }}
E2E_HIVE_HOST_PORT: ${{ secrets.E2E_HIVE_HOST_PORT }}
run: |
source env/bin/activate
make install_e2e_tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ jobs:
E2E_REDSHIFT_HOST_PORT: ${{ secrets.E2E_REDSHIFT_HOST_PORT }}
E2E_REDSHIFT_USERNAME: ${{ secrets.E2E_REDSHIFT_USERNAME }}
E2E_REDSHIFT_PASSWORD: ${{ secrets.E2E_REDSHIFT_PASSWORD }}
E2E_REDSHIFT_DATABASE: ${{ secrets.E2E_REDSHIFT_DATABASE }}
E2E_REDSHIFT_DB: ${{ secrets.E2E_REDSHIFT_DB }}
E2E_DRUID_HOST_PORT: ${{ secrets.E2E_DRUID_HOST_PORT }}
E2E_HIVE_HOST_PORT: ${{ secrets.E2E_HIVE_HOST_PORT }}
run: |
source env/bin/activate
make install_e2e_tests
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ unit_ingestion: ## Run Python unit tests

.PHONY: run_e2e_tests
run_e2e_tests: ## Run e2e tests
pytest --screenshot=only-on-failure --output="ingestion/tests/e2e/artifacts" $(ARGS) --junitxml=ingestion/junit/test-results-e2e.xml ingestion/tests/e2e
pytest --screenshot=only-on-failure --output="ingestion/tests/e2e/artifacts" $(ARGS) --slowmo 5 --junitxml=ingestion/junit/test-results-e2e.xml ingestion/tests/e2e

.PHONY: run_python_tests
run_python_tests: ## Run all Python tests with coverage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from typing import Dict, List

from sqlalchemy import Column, inspect
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.exc import ProgrammingError, ResourceClosedError
from sqlalchemy.orm import scoped_session

from metadata.generated.schema.entity.data.table import TableData
Expand All @@ -38,6 +38,7 @@
from metadata.profiler.orm.functions.table_metric_construct import (
table_metric_construct_factory,
)
from metadata.profiler.orm.registry import Dialects
from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_
from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor
Expand Down Expand Up @@ -258,6 +259,15 @@ def _compute_query_metrics(

row = runner.select_first_from_query(metric_query)
return dict(row)
except ResourceClosedError as exc:
# if the query returns no results, we will get a ResourceClosedError from Druid
if (
# pylint: disable=protected-access
runner._session.get_bind().dialect.name
!= Dialects.Druid
):
msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}"
handle_query_exception(msg, exc, session)
except Exception as exc:
msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}"
handle_query_exception(msg, exc, session)
Expand Down
9 changes: 9 additions & 0 deletions ingestion/src/metadata/profiler/metrics/static/stddev.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ def _(element, compiler, **kw):
return "if(isNaN(stddevPop(%s)), null, stddevPop(%s))" % ((proc,) * 2)


@compiles(StdDevFn, Dialects.Druid)
def _(element, compiler, **kw): # pylint: disable=unused-argument
"""returns stdv for druid. Could not validate with our cluster
we might need to look into installing the druid-stats module
https://druid.apache.org/docs/latest/configuration/extensions/#loading-extensions
"""
return "NULL"


class StdDev(StaticMetric):
"""
STD Metric
Expand Down
1 change: 1 addition & 0 deletions ingestion/src/metadata/profiler/orm/functions/length.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def _(element, compiler, **kw):
@compiles(LenFn, Dialects.IbmDbSa)
@compiles(LenFn, Dialects.Db2)
@compiles(LenFn, Dialects.Hana)
@compiles(LenFn, Dialects.Druid)
def _(element, compiler, **kw):
return "LENGTH(%s)" % compiler.process(element.clauses, **kw)

Expand Down
8 changes: 8 additions & 0 deletions ingestion/src/metadata/profiler/orm/functions/median.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ def _(elements, compiler, **kwargs):
return f"if({null_check}({quantile_str}), null, {quantile_str})"


@compiles(MedianFn, Dialects.Druid)
def _(elements, compiler, **kwargs):
col, _, percentile = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return f"APPROX_QUANTILE({col}, {percentile})"


# pylint: disable=unused-argument
@compiles(MedianFn, Dialects.Athena)
@compiles(MedianFn, Dialects.Presto)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Helper module to handle data sampling
for the profiler
"""
import traceback
from typing import List, Optional, Union, cast

from sqlalchemy import Column, inspect, text
Expand All @@ -30,6 +31,7 @@
from metadata.profiler.orm.registry import Dialects
from metadata.profiler.processor.handle_partition import partition_filter_handler
from metadata.profiler.processor.sampler.sampler_interface import SamplerInterface
from metadata.utils.logger import profiler_interface_registry_logger
from metadata.utils.sqa_utils import (
build_query_filter,
dispatch_to_date_or_datetime,
Expand All @@ -38,6 +40,8 @@
get_value_filter,
)

logger = profiler_interface_registry_logger()

RANDOM_LABEL = "random"


Expand Down Expand Up @@ -105,7 +109,7 @@ def random_sample(self) -> Union[DeclarativeMeta, AliasedClass]:
if self._profile_sample_query:
return self._rdn_sample_from_user_query()

if not self.profile_sample:
if not self.profile_sample or int(self.profile_sample) == 100:
if self._partition_details:
return self._partitioned_table()

Expand Down Expand Up @@ -143,12 +147,23 @@ def fetch_sample_data(self, columns: Optional[List[Column]] = None) -> TableData
if col.name != RANDOM_LABEL and col.name in names
]

sqa_sample = (
self.client.query(*sqa_columns)
.select_from(rnd)
.limit(self.sample_limit)
.all()
)
try:
sqa_sample = (
self.client.query(*sqa_columns)
.select_from(rnd)
.limit(self.sample_limit)
.all()
)
except Exception:
logger.debug(
"Cannot fetch sample data with random sampling. Falling back to 100 rows."
)
logger.debug(traceback.format_exc())
sqa_columns = list(inspect(self.table).c)
sqa_sample = (
self.client.query(*sqa_columns).select_from(self.table).limit(100).all()
)

return TableData(
columns=[column.name for column in sqa_columns],
rows=[list(row) for row in sqa_sample],
Expand Down
67 changes: 66 additions & 1 deletion ingestion/tests/e2e/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,73 @@ https://playwright.dev/python/docs/intro
In the `e2e` folder you will find 2 folders and 1 file:
- `conftest.py`: defines some module scope fixture (module here is the `e2e` folder). All tests will use `init_with_redshift` by default -- ingestin metadata from a redshift service. The ingestion will only happens on the first test execution. The `create_data_consumer_user` allows tests to login as a Data Consumer and perform some actions
- `configs`: holds all the shared configuration. So far we have 2 main classes families (User and Connector) and common functions
- `entity`: holds entity related tests. It contains a subfolder per source.
- `entity`: holds entity related tests. It contains a subfolder per asset category. In the asset category folder you will find the `common_assertions.py`. This file contains all the common assertions to be ran for that specific asset.

## Install Dependencies and Run Tests
run `make install_e2e_tests`. Run `make run_e2e_tests`, you can also pass arguments such as `make run_e2e_tests ARGS="--browser webkit"` to run tests against webkit browser or `make run_e2e_tests ARGS="--headed --slowmo 100"` to run the tests in slowmo mode and head full.

## Adding a new test
The first step is to define the connector config for your source. this happens in `configs/connectors/<asset category>` folder. For a database connector, you will must ensure your class inherits from `DataBaseConnectorInterface`. You will then need to implement the `get_service()` and `set_connection()`. `get_service` specifies which service to choose from the `<assetCategory>/add-service` page of the webside and `set_connection` the different elements to configure on the connector connection config page. If you are unsure how an element can be accessed on the page you can run `playwright codegen http://localhost:8585/` -- more info [here](https://playwright.dev/python/docs/codegen). By default `DataBaseConnectorInterface` sets `self.supports_profiler_ingestion=True` which will result in the profiler ingestion to run when the test class is executed. You can `self.supports_profiler_ingestion=False` in your specific connector to override this behavior.

e.g.

```python
class DruidConnector(DataBaseConnectorInterface):
"""druid connector"""

def __init__(self, config):
super().__init__(config)
self.supports_profiler_ingestion=False

def set_connection():
...

def get_service():
...
```


Once your connector config has been created you will need to add a new test. Simply create a new file in the asset category of your choice (e.g. `entity/database/test_druid.py`). In this file create a new test class and mark this class with `@pytest.mark.usefixtures("setUpClass")` and `@pytest.mark.parametrize("setUpClass", ...)`. The first mark will make sure `setUpClass` fixture is ran before running your tests (this manage the ingestion of metadata and profiler as of Oct-25 2023) and `@pytest.mark.parametrize` will pass the right connector class to the `setUpClass` fixture. The second argument of `@pytest.mark.parametrize` should be as below
```python
[
{
"connector_obj": <connectorClassConfig>(
ConnectorTestConfig(...)
)
}
]
```

`ConnectorTestConfig` defines the configuration to use for the test. It has 2 arguments:
- `ingestion`: This allows you to define the different filtering when performing the ingestion. it expects a `ConnectorIngestionTestConfig` which will take 2 arguments:
- `metadata`: this allows you to define metadata ingestion filters. It take a `IngestionTestConfig` which takes 3 arguments:
- `database`: it expects an `IngestionFilterConfig` class which takes 2 argumenst:
- `includes`: a list of str
- `excludes`: a list of str
- `schema_`: see `database`
- `table`: see `database`
- `profiler`: see `metadata`
- `validation`: this config can be used when we need to validate expectations against specific entities. As of Oct-25 2023 it is only used in the `assert_profile_data`, `assert_sample_data_ingestion` and `assert_pii_column_auto_tagging` test functions of the profiler.

Once you have set up your class you can create your test. There are currently (as of Oct-25 2023) 5 assertions that can be performed:
- assert pipeline status are `success`. You can refer to the implementation in the existing test
- `assert_change_database_owner`: assert the owner of a data can be changed
- `assert_profile_data`: assert table profile data summary are visible
- `assert_sample_data_ingestion`: assert sample data are ingested and visible
- `assert_pii_column_auto_tagging`: assert auto PII tagging from the profiler has been performed

Note that in every test method you define the following class attributes are accessible:
- `connector_obj`: `<connectorClassConfig>`` the connector class pass to `setUpClass` in the `@pytest.mark.parametrize`
- `service_name`: `str`` the name of the service that was created for the test
- `metadata_ingestion_status`: `PipelineState` the ingestion status of the metadata pipeline
- `profiler_ingestion_status`: `PipelineState` the ingestion status of the profiler pipeline.

## Test Coverage
| **tests** | redshift | druid | hive |
|-----------------------------|:--------:|:-----:|:----:|
| metadata ingestion | ✅ | ✅ | ✅ |
| profiler ingestion | ✅ | ✅ | ✅ |
| change DB owner | ✅ | ✅ | ✅ |
| Table Profiler Summary Data | ✅ | ✅ | ✅ |
| Sample data visible | ✅ | ✅ | ✅ |
| Profiler PII auto Tag | ✅ | ✅ | ❌ |
2 changes: 1 addition & 1 deletion ingestion/tests/e2e/configs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from playwright.sync_api import Page, expect

from ingestion.tests.e2e.configs.users.user import User
from .users.user import User

BASE_URL = "http://localhost:8585"

Expand Down
37 changes: 37 additions & 0 deletions ingestion/tests/e2e/configs/connectors/database/db2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Redshift connector for e2e tests"""

import os

from playwright.sync_api import Page, expect

from .interface import DataBaseConnectorInterface


class Db2Connector(DataBaseConnectorInterface):
"""db2 connector"""

def get_service(self, page: Page):
"""get service from the service page"""
page.get_by_test_id("Db2").click()

def set_connection(self, page):
"""Set connection for redshift service"""
page.get_by_label("Username*").fill(os.environ["E2E_DB2_USERNAME"])
expect(page.get_by_label("Username*")).to_have_value(
os.environ["E2E_DB2_USERNAME"]
)

page.get_by_label("Password").fill(os.environ["E2E_DB2_PASSWORD"])
expect(page.get_by_label("Password")).to_have_value(
os.environ["E2E_DB2_PASSWORD"]
)

page.get_by_label("Host and Port*").fill(os.environ["E2E_DB2_HOST_PORT"])
expect(page.get_by_label("Host and Port*")).to_have_value(
os.environ["E2E_DB2_HOST_PORT"]
)

page.get_by_label("database*").fill(os.environ["E2E_DB2_DATABASE"])
expect(page.get_by_label("database*")).to_have_value(
os.environ["E2E_DB2_DATABASE"]
)
22 changes: 22 additions & 0 deletions ingestion/tests/e2e/configs/connectors/database/druid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Redshift connector for e2e tests"""

import os

from playwright.sync_api import Page, expect

from .interface import DataBaseConnectorInterface


class DruidConnector(DataBaseConnectorInterface):
"""druid connector"""

def get_service(self, page: Page):
"""get service from the service page"""
page.get_by_test_id("Druid").click()

def set_connection(self, page):
"""Set connection for redshift service"""
page.get_by_label("Host and Port*").fill(os.environ["E2E_DRUID_HOST_PORT"])
expect(page.get_by_label("Host and Port*")).to_have_value(
os.environ["E2E_DRUID_HOST_PORT"]
)
24 changes: 24 additions & 0 deletions ingestion/tests/e2e/configs/connectors/database/hive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""MySQL connector for e2e tests"""

import os

from playwright.sync_api import Page, expect

from .interface import DataBaseConnectorInterface


class HiveConnector(DataBaseConnectorInterface):
def get_service(self, page: Page):
"""get service from the service page"""
page.get_by_test_id("Hive").click()

def set_connection(self, page):
"""Set connection for redshift service"""
page.locator('[id="root\\/hostPort"]').fill(os.environ["E2E_HIVE_HOST_PORT"])
expect(page.locator('[id="root\\/hostPort"]')).to_have_value(
os.environ["E2E_HIVE_HOST_PORT"]
)

page.locator('[id="root\\/metastoreConnection__oneof_select"]').select_option(
"2"
)
Loading
Loading