Skip to content

Commit

Permalink
Fixed dbt Manifest and Run results parsing (#18234)
Browse files Browse the repository at this point in the history
  • Loading branch information
SumanMaharana authored and OnkarVO7 committed Oct 14, 2024
1 parent 8650715 commit 63bff7a
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 45 deletions.
55 changes: 55 additions & 0 deletions ingestion/src/metadata/ingestion/source/database/dbt/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,61 @@
# Based on https://schemas.getdbt.com/dbt/catalog/v1.json
REQUIRED_CATALOG_KEYS = ["name", "type", "index"]

REQUIRED_RESULTS_KEYS = {
"status",
"timing",
"thread_id",
"execution_time",
"message",
"adapter_response",
"unique_id",
}

REQUIRED_NODE_KEYS = {
"schema_",
"schema",
"freshness",
"name",
"resource_type",
"path",
"unique_id",
"source_name",
"source_description",
"source_meta",
"loader",
"identifier",
"relation_name",
"fqn",
"alias",
"checksum",
"config",
"column_name",
"test_metadata",
"original_file_path",
"root_path",
"database",
"tags",
"description",
"columns",
"meta",
"owner",
"created_at",
"group",
"sources",
"compiled",
"docs",
"version",
"latest_version",
"package_name",
"depends_on",
"compiled_code",
"compiled_sql",
"raw_code",
"raw_sql",
"language",
}


NONE_KEYWORDS_LIST = ["none", "null"]

DBT_CATALOG_FILE_NAME = "catalog.json"
Expand Down
74 changes: 29 additions & 45 deletions ingestion/src/metadata/ingestion/source/database/dbt/dbt_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""

from abc import ABC, abstractmethod
from typing import Iterable
from typing import Iterable, List

from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results
from pydantic import Field
Expand All @@ -37,6 +37,10 @@
TopologyNode,
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.database.dbt.constants import (
REQUIRED_NODE_KEYS,
REQUIRED_RESULTS_KEYS,
)
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
from metadata.ingestion.source.database.dbt.models import (
DbtFiles,
Expand Down Expand Up @@ -169,51 +173,27 @@ def remove_manifest_non_required_keys(self, manifest_dict: dict):
}
)

required_nodes_keys = {
"schema_",
"schema",
"name",
"resource_type",
"path",
"unique_id",
"fqn",
"alias",
"checksum",
"config",
"column_name",
"test_metadata",
"original_file_path",
"root_path",
"database",
"tags",
"description",
"columns",
"meta",
"owner",
"created_at",
"group",
"sources",
"compiled",
"docs",
"version",
"latest_version",
"package_name",
"depends_on",
"compiled_code",
"compiled_sql",
"raw_code",
"raw_sql",
"language",
}
for field in ["nodes", "sources"]:
for node, value in manifest_dict.get( # pylint: disable=unused-variable
field
).items():
keys_to_delete = [
key for key in value if key.lower() not in REQUIRED_NODE_KEYS
]
for key in keys_to_delete:
del value[key]

for node, value in manifest_dict.get( # pylint: disable=unused-variable
"nodes"
).items():
keys_to_delete = [
key for key in value if key.lower() not in required_nodes_keys
]
for key in keys_to_delete:
del value[key]
def remove_run_result_non_required_keys(self, run_results: List[dict]):
"""
Method to remove the non required keys from run results file
"""
for run_result in run_results:
for result in run_result.get("results"):
keys_to_delete = [
key for key in result if key.lower() not in REQUIRED_RESULTS_KEYS
]
for key in keys_to_delete:
del result[key]

def get_dbt_files(self) -> Iterable[DbtFiles]:
dbt_files = get_dbt_details(self.source_config.dbtConfigSource)
Expand All @@ -225,6 +205,10 @@ def get_dbt_objects(self) -> Iterable[DbtObjects]:
self.remove_manifest_non_required_keys(
manifest_dict=self.context.get().dbt_file.dbt_manifest
)
if self.context.get().dbt_file.dbt_run_results:
self.remove_run_result_non_required_keys(
run_results=self.context.get().dbt_file.dbt_run_results
)
dbt_objects = DbtObjects(
dbt_catalog=parse_catalog(self.context.get().dbt_file.dbt_catalog)
if self.context.get().dbt_file.dbt_catalog
Expand Down

0 comments on commit 63bff7a

Please sign in to comment.