diff --git a/.github/workflows/py-cli-e2e-tests.yml b/.github/workflows/py-cli-e2e-tests.yml index 0367a29948bb..9a92c6923bd3 100644 --- a/.github/workflows/py-cli-e2e-tests.yml +++ b/.github/workflows/py-cli-e2e-tests.yml @@ -195,8 +195,8 @@ jobs: with: payload: | { - "text": "🔥 Failed E2E Test for: ${{ matrix.e2e-test }} 🔥" - } + "text": "🔥 Failed E2E Test for: ${{ matrix.e2e-test }} 🔥\nLogs: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}\nCommit: ${{ github.server_url }}/${{ github.repository }}/commit/${{ github.sha }}" + } env: SLACK_WEBHOOK_URL: ${{ secrets.E2E_SLACK_WEBHOOK }} SLACK_WEBHOOK_TYPE: INCOMING_WEBHOOK diff --git a/ingestion/setup.py b/ingestion/setup.py index 82e6f477ccb2..d1ee38c9de01 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -378,7 +378,7 @@ "kafka-python==2.0.2", *plugins["pii-processor"], "requests==2.31.0", - f"{DATA_DIFF['mysql']}==0.11.2", + f"{DATA_DIFF['mysql']}", *plugins["deltalake"], *plugins["datalake-gcs"], *plugins["pgspider"], diff --git a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py index a69b24fca59d..91a4bc3da8d9 100644 --- a/ingestion/src/metadata/ingestion/source/database/snowflake/models.py +++ b/ingestion/src/metadata/ingestion/source/database/snowflake/models.py @@ -25,6 +25,7 @@ SNOWFLAKE_QUERY_LOG_QUERY, ) from metadata.profiler.metrics.system.dml_operation import DatabaseDMLOperations +from metadata.utils.dict import ExtendedDict from metadata.utils.logger import ingestion_logger from metadata.utils.profiler_utils import QueryResult @@ -133,7 +134,7 @@ def get_for_table(session: Session, tablename: str): ) ) return TypeAdapter(List[SnowflakeQueryLogEntry]).validate_python( - map(dict, rows) + [ExtendedDict(r).lower_case_keys() for r in rows] ) diff --git a/ingestion/src/metadata/utils/dict.py b/ingestion/src/metadata/utils/dict.py new file mode 100644 index 000000000000..aaa41d2780c8 --- /dev/null +++ b/ingestion/src/metadata/utils/dict.py @@ -0,0 +1,8 @@ +""" +A custom dictionary class that extends functionality. +""" + + +class ExtendedDict(dict): + def lower_case_keys(self): + return {k.lower(): v for k, v in self.items()} diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index 0bc5eb23f0fd..332bc852e0be 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -220,11 +220,11 @@ def test_data_quality(self) -> None: return self.delete_table_and_view() self.create_table_and_view() + self.build_config_file() + self.run_command() table: Table = self.openmetadata.get_by_name( Table, self.get_data_quality_table(), nullable=False ) - self.build_config_file() - self.run_command() test_case_definitions = self.get_test_case_definitions() self.build_config_file( E2EType.DATA_QUALITY, @@ -236,26 +236,35 @@ def test_data_quality(self) -> None: }, ) result = self.run_command("test") - sink_status, source_status = self.retrieve_statuses(result) - self.assert_status_for_data_quality(source_status, sink_status) - test_case_entities = [ - self.openmetadata.get_by_name( - OMTestCase, - ".".join([table.fullyQualifiedName.root, tcd.name]), - fields=["*"], - nullable=False, - ) - for tcd in test_case_definitions - ] - expected = self.get_expected_test_case_results() try: - for test_case, expected in zip(test_case_entities, expected): - assert_equal_pydantic_objects(expected, test_case.testCaseResult) - finally: - for tc in test_case_entities: - self.openmetadata.delete( - OMTestCase, tc.id, recursive=True, hard_delete=True + sink_status, source_status = self.retrieve_statuses(result) + self.assert_status_for_data_quality(source_status, sink_status) + test_case_entities = [ + self.openmetadata.get_by_name( + OMTestCase, + ".".join([table.fullyQualifiedName.root, tcd.name]), + fields=["*"], + nullable=False, ) + for tcd in test_case_definitions + ] + expected = self.get_expected_test_case_results() + try: + for test_case, expected in zip(test_case_entities, expected): + assert_equal_pydantic_objects( + expected.model_copy( + update={"timestamp": test_case.testCaseResult.timestamp} + ), + test_case.testCaseResult, + ) + finally: + for tc in test_case_entities: + self.openmetadata.delete( + OMTestCase, tc.id, recursive=True, hard_delete=True + ) + except AssertionError: + print(result) + raise def retrieve_table(self, table_name_fqn: str) -> Table: return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn) diff --git a/ingestion/tests/cli_e2e/test_cli_snowflake.py b/ingestion/tests/cli_e2e/test_cli_snowflake.py index f4467d9d8de6..70eb1547dccf 100644 --- a/ingestion/tests/cli_e2e/test_cli_snowflake.py +++ b/ingestion/tests/cli_e2e/test_cli_snowflake.py @@ -321,7 +321,7 @@ def wait_for_query_log(cls, timeout=600): raise TimeoutError(f"Query log not updated for {timeout} seconds") def get_data_quality_table(self): - return "e2e_snowflake.E2E_DB.E2E_TEST.PERSONS" + return self.fqn_created_table() def get_test_case_definitions(self) -> List[TestCaseDefinition]: return [ @@ -343,4 +343,4 @@ def get_test_case_definitions(self) -> List[TestCaseDefinition]: ] def get_expected_test_case_results(self): - return [TestCaseResult(testCaseStatus=TestCaseStatus.Success)] + return [TestCaseResult(testCaseStatus=TestCaseStatus.Success, timestamp=0)]