From 63bff7aafa05608b793e29e0022c42d4547d98cc Mon Sep 17 00:00:00 2001 From: Suman Maharana Date: Mon, 14 Oct 2024 13:07:22 +0530 Subject: [PATCH] Fixed dbt Manifest and Run results parsing (#18234) --- .../source/database/dbt/constants.py | 55 ++++++++++++++ .../source/database/dbt/dbt_service.py | 74 ++++++++----------- 2 files changed, 84 insertions(+), 45 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/constants.py b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py index 63731473b72d..83c49c0724a4 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/constants.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/constants.py @@ -22,6 +22,61 @@ # Based on https://schemas.getdbt.com/dbt/catalog/v1.json REQUIRED_CATALOG_KEYS = ["name", "type", "index"] +REQUIRED_RESULTS_KEYS = { + "status", + "timing", + "thread_id", + "execution_time", + "message", + "adapter_response", + "unique_id", +} + +REQUIRED_NODE_KEYS = { + "schema_", + "schema", + "freshness", + "name", + "resource_type", + "path", + "unique_id", + "source_name", + "source_description", + "source_meta", + "loader", + "identifier", + "relation_name", + "fqn", + "alias", + "checksum", + "config", + "column_name", + "test_metadata", + "original_file_path", + "root_path", + "database", + "tags", + "description", + "columns", + "meta", + "owner", + "created_at", + "group", + "sources", + "compiled", + "docs", + "version", + "latest_version", + "package_name", + "depends_on", + "compiled_code", + "compiled_sql", + "raw_code", + "raw_sql", + "language", +} + + NONE_KEYWORDS_LIST = ["none", "null"] DBT_CATALOG_FILE_NAME = "catalog.json" diff --git a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py index 44307b6df755..aa2d65f4e2cf 100644 --- a/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py +++ b/ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py @@ -13,7 +13,7 @@ """ from abc import ABC, abstractmethod -from typing import Iterable +from typing import Iterable, List from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results from pydantic import Field @@ -37,6 +37,10 @@ TopologyNode, ) from metadata.ingestion.source.database.database_service import DataModelLink +from metadata.ingestion.source.database.dbt.constants import ( + REQUIRED_NODE_KEYS, + REQUIRED_RESULTS_KEYS, +) from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details from metadata.ingestion.source.database.dbt.models import ( DbtFiles, @@ -169,51 +173,27 @@ def remove_manifest_non_required_keys(self, manifest_dict: dict): } ) - required_nodes_keys = { - "schema_", - "schema", - "name", - "resource_type", - "path", - "unique_id", - "fqn", - "alias", - "checksum", - "config", - "column_name", - "test_metadata", - "original_file_path", - "root_path", - "database", - "tags", - "description", - "columns", - "meta", - "owner", - "created_at", - "group", - "sources", - "compiled", - "docs", - "version", - "latest_version", - "package_name", - "depends_on", - "compiled_code", - "compiled_sql", - "raw_code", - "raw_sql", - "language", - } + for field in ["nodes", "sources"]: + for node, value in manifest_dict.get( # pylint: disable=unused-variable + field + ).items(): + keys_to_delete = [ + key for key in value if key.lower() not in REQUIRED_NODE_KEYS + ] + for key in keys_to_delete: + del value[key] - for node, value in manifest_dict.get( # pylint: disable=unused-variable - "nodes" - ).items(): - keys_to_delete = [ - key for key in value if key.lower() not in required_nodes_keys - ] - for key in keys_to_delete: - del value[key] + def remove_run_result_non_required_keys(self, run_results: List[dict]): + """ + Method to remove the non required keys from run results file + """ + for run_result in run_results: + for result in run_result.get("results"): + keys_to_delete = [ + key for key in result if key.lower() not in REQUIRED_RESULTS_KEYS + ] + for key in keys_to_delete: + del result[key] def get_dbt_files(self) -> Iterable[DbtFiles]: dbt_files = get_dbt_details(self.source_config.dbtConfigSource) @@ -225,6 +205,10 @@ def get_dbt_objects(self) -> Iterable[DbtObjects]: self.remove_manifest_non_required_keys( manifest_dict=self.context.get().dbt_file.dbt_manifest ) + if self.context.get().dbt_file.dbt_run_results: + self.remove_run_result_non_required_keys( + run_results=self.context.get().dbt_file.dbt_run_results + ) dbt_objects = DbtObjects( dbt_catalog=parse_catalog(self.context.get().dbt_file.dbt_catalog) if self.context.get().dbt_file.dbt_catalog