From 49e922be84a4047def1ef61795bb1ec51f06a96b Mon Sep 17 00:00:00 2001 From: sushi30 Date: Wed, 25 Sep 2024 11:24:44 +0200 Subject: [PATCH 1/6] fix snowflake system metrics --- .../src/metadata/ingestion/source/database/snowflake/models.py | 3 ++- ingestion/src/metadata/utils/dict.py | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 ingestion/src/metadata/utils/dict.py diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py index a69b24fca59d..ffca4cd6fd4f 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py @@ -25,6 +25,7 @@ SNOWFLAKE_QUERY_LOG_QUERY, ) from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations +from metadata.utils.dict import CustomDict from metadata.utils.logger import ingestion_logger from metadata.utils.profiler_utils import QueryResult @@ -133,7 +134,7 @@ def get_for_table(session: Session, tablename: str): ) ) return TypeAdapter(List[SnowflakeQueryLogEntry]).validate_python( - map(dict, rows) + [CustomDict(r).lower_case_keys() for r in rows] ) diff --git a/ingestion/src/metadata/utils/dict.py b/ingestion/src/metadata/utils/dict.py new file mode 100644 index 000000000000..4879b9267f44 --- /dev/null +++ b/ingestion/src/metadata/utils/dict.py @@ -0,0 +1,3 @@ +class CustomDict(dict): + def lower_case_keys(self): + return {k.lower(): v for k, v in self.items()} From 71bcc77ec1423e2bf247bcca3956d5e26b856be9 Mon Sep 17 00:00:00 2001 From: sushi30 Date: Wed, 25 Sep 2024 11:26:56 +0200 Subject: [PATCH 2/6] format --- .../tests/integration/datalake/test_datalake_profiler_e2e.py | 2 +- ingestion/tests/integration/datalake/test_ingestion.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestion/tests/integration/datalake/test_datalake_profiler_e2e.py b/ingestion/tests/integration/datalake/test_datalake_profiler_e2e.py index 1a8244348a43..f9816c7deb4d 100644 --- a/ingestion/tests/integration/datalake/test_datalake_profiler_e2e.py +++ b/ingestion/tests/integration/datalake/test_datalake_profiler_e2e.py @@ -17,8 +17,8 @@ No sample data is required beforehand """ import pytest - from ingestion.tests.integration.datalake.conftest import BUCKET_NAME + from metadata.generated.schema.entity.data.table import ColumnProfile, Table from metadata.utils.time_utils import ( get_beginning_of_day_timestamp_mill, diff --git a/ingestion/tests/integration/datalake/test_ingestion.py b/ingestion/tests/integration/datalake/test_ingestion.py index 1bfff44f1c7c..005a651b639b 100644 --- a/ingestion/tests/integration/datalake/test_ingestion.py +++ b/ingestion/tests/integration/datalake/test_ingestion.py @@ -12,8 +12,8 @@ """Datalake ingestion integration tests""" import pytest - from ingestion.tests.integration.datalake.conftest import BUCKET_NAME + from metadata.generated.schema.entity.data.table import DataType, Table from metadata.ingestion.ometa.models import EntityList from metadata.ingestion.ometa.ometa_api import OpenMetadata From 214d5f3fa9b86ab417ca13f130818d109b964996 Mon Sep 17 00:00:00 2001 From: sushi30 Date: Wed, 25 Sep 2024 12:38:36 +0200 Subject: [PATCH 3/6] add link to logs and commit fixed the dq cli test --- .github/workflows/py-cli-e2e-tests.yml | 4 +- ingestion/src/metadata/utils/dict.py | 7 ++- ingestion/tests/cli_e2e/base/test_cli_db.py | 49 +++++++++++-------- ingestion/tests/cli_e2e/test_cli_snowflake.py | 4 +- 4 files changed, 39 insertions(+), 25 deletions(-) diff --git a/.github/workflows/py-cli-e2e-tests.yml b/.github/workflows/py-cli-e2e-tests.yml index 0367a29948bb..9a92c6923bd3 100644 --- a/.github/workflows/py-cli-e2e-tests.yml +++ b/.github/workflows/py-cli-e2e-tests.yml @@ -195,8 +195,8 @@ jobs: with: payload: | { - "text": "🔥 Failed E2E Test for: ${{ matrix.e2e-test }} 🔥" - } + "text": "🔥 Failed E2E Test for: ${{ matrix.e2e-test }} 🔥\nLogs: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}\nCommit: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.sha }}" + } env: SLACK_WEBHOOK_URL: ${{ secrets.E2E_SLACK_WEBHOOK }} SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK diff --git a/ingestion/src/metadata/utils/dict.py b/ingestion/src/metadata/utils/dict.py index 4879b9267f44..aaa41d2780c8 100644 --- a/ingestion/src/metadata/utils/dict.py +++ b/ingestion/src/metadata/utils/dict.py @@ -1,3 +1,8 @@ -class CustomDict(dict): +""" +A custom dictionary class that extends functionality. +""" + + +class ExtendedDict(dict): def lower_case_keys(self): return {k.lower(): v for k, v in self.items()} diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index 0bc5eb23f0fd..332bc852e0be 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -220,11 +220,11 @@ def test_data_quality(self) -> None: return self.delete_table_and_view() self.create_table_and_view() + self.build_config_file() + self.run_command() table: Table = self.openmetadata.get_by_name( Table, self.get_data_quality_table(), nullable=False ) - self.build_config_file() - self.run_command() test_case_definitions = self.get_test_case_definitions() self.build_config_file( E2EType.DATA_QUALITY, @@ -236,26 +236,35 @@ def test_data_quality(self) -> None: }, ) result = self.run_command("test") - sink_status, source_status = self.retrieve_statuses(result) - self.assert_status_for_data_quality(source_status, sink_status) - test_case_entities = [ - self.openmetadata.get_by_name( - OMTestCase, - ".".join([table.fullyQualifiedName.root, tcd.name]), - fields=["*"], - nullable=False, - ) - for tcd in test_case_definitions - ] - expected = self.get_expected_test_case_results() try: - for test_case, expected in zip(test_case_entities, expected): - assert_equal_pydantic_objects(expected, test_case.testCaseResult) - finally: - for tc in test_case_entities: - self.openmetadata.delete( - OMTestCase, tc.id, recursive=True, hard_delete=True + sink_status, source_status = self.retrieve_statuses(result) + self.assert_status_for_data_quality(source_status, sink_status) + test_case_entities = [ + self.openmetadata.get_by_name( + OMTestCase, + ".".join([table.fullyQualifiedName.root, tcd.name]), + fields=["*"], + nullable=False, ) + for tcd in test_case_definitions + ] + expected = self.get_expected_test_case_results() + try: + for test_case, expected in zip(test_case_entities, expected): + assert_equal_pydantic_objects( + expected.model_copy( + update={"timestamp": test_case.testCaseResult.timestamp} + ), + test_case.testCaseResult, + ) + finally: + for tc in test_case_entities: + self.openmetadata.delete( + OMTestCase, tc.id, recursive=True, hard_delete=True + ) + except AssertionError: + print(result) + raise def retrieve_table(self, table_name_fqn: str) -> Table: return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn) diff --git a/ingestion/tests/cli_e2e/test_cli_snowflake.py b/ingestion/tests/cli_e2e/test_cli_snowflake.py index f4467d9d8de6..70eb1547dccf 100644 --- a/ingestion/tests/cli_e2e/test_cli_snowflake.py +++ b/ingestion/tests/cli_e2e/test_cli_snowflake.py @@ -321,7 +321,7 @@ def wait_for_query_log(cls, timeout=600): raise TimeoutError(f"Query log not updated for {timeout} seconds") def get_data_quality_table(self): - return "e2e_snowflake.E2E_DB.E2E_TEST.PERSONS" + return self.fqn_created_table() def get_test_case_definitions(self) -> List[TestCaseDefinition]: return [ @@ -343,4 +343,4 @@ def get_test_case_definitions(self) -> List[TestCaseDefinition]: ] def get_expected_test_case_results(self): - return [TestCaseResult(testCaseStatus=TestCaseStatus.Success)] + return [TestCaseResult(testCaseStatus=TestCaseStatus.Success, timestamp=0)] From 2bb96ceafb6f811c0b529a377a7c65f4fcc57636 Mon Sep 17 00:00:00 2001 From: sushi30 Date: Wed, 25 Sep 2024 16:24:31 +0200 Subject: [PATCH 4/6] reverted bad formatting --- .../tests/integration/datalake/test_datalake_profiler_e2e.py | 2 +- ingestion/tests/integration/datalake/test_ingestion.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestion/tests/integration/datalake/test_datalake_profiler_e2e.py b/ingestion/tests/integration/datalake/test_datalake_profiler_e2e.py index f9816c7deb4d..1a8244348a43 100644 --- a/ingestion/tests/integration/datalake/test_datalake_profiler_e2e.py +++ b/ingestion/tests/integration/datalake/test_datalake_profiler_e2e.py @@ -17,8 +17,8 @@ No sample data is required beforehand """ import pytest -from ingestion.tests.integration.datalake.conftest import BUCKET_NAME +from ingestion.tests.integration.datalake.conftest import BUCKET_NAME from metadata.generated.schema.entity.data.table import ColumnProfile, Table from metadata.utils.time_utils import ( get_beginning_of_day_timestamp_mill, diff --git a/ingestion/tests/integration/datalake/test_ingestion.py b/ingestion/tests/integration/datalake/test_ingestion.py index 005a651b639b..1bfff44f1c7c 100644 --- a/ingestion/tests/integration/datalake/test_ingestion.py +++ b/ingestion/tests/integration/datalake/test_ingestion.py @@ -12,8 +12,8 @@ """Datalake ingestion integration tests""" import pytest -from ingestion.tests.integration.datalake.conftest import BUCKET_NAME +from ingestion.tests.integration.datalake.conftest import BUCKET_NAME from metadata.generated.schema.entity.data.table import DataType, Table from metadata.ingestion.ometa.models import EntityList from metadata.ingestion.ometa.ometa_api import OpenMetadata From d26c776d15eb9c9cd3f03532c40ea34a34477625 Mon Sep 17 00:00:00 2001 From: sushi30 Date: Thu, 26 Sep 2024 07:52:44 +0200 Subject: [PATCH 5/6] fixed models.py --- .../metadata/ingestion/source/database/snowflake/models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py index ffca4cd6fd4f..91a4bc3da8d9 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py @@ -25,7 +25,7 @@ SNOWFLAKE_QUERY_LOG_QUERY, ) from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations -from metadata.utils.dict import CustomDict +from metadata.utils.dict import ExtendedDict from metadata.utils.logger import ingestion_logger from metadata.utils.profiler_utils import QueryResult @@ -134,7 +134,7 @@ def get_for_table(session: Session, tablename: str): ) ) return TypeAdapter(List[SnowflakeQueryLogEntry]).validate_python( - [CustomDict(r).lower_case_keys() for r in rows] + [ExtendedDict(r).lower_case_keys() for r in rows] ) From 5a2f8e1d86ec0016356d5a38959cc896318fa038 Mon Sep 17 00:00:00 2001 From: sushi30 Date: Thu, 26 Sep 2024 11:41:39 +0200 Subject: [PATCH 6/6] removed version pinning for data diff in tests --- ingestion/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingestion/setup.py b/ingestion/setup.py index 82e6f477ccb2..d1ee38c9de01 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -378,7 +378,7 @@ "kafka-python==2.0.2", *plugins["pii-processor"], "requests==2.31.0", - f"{DATA_DIFF['mysql']}==0.11.2", + f"{DATA_DIFF['mysql']}", *plugins["deltalake"], *plugins["datalake-gcs"], *plugins["pgspider"],