Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed dbt Manifest and Run results parsing #18234

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,61 @@
# Based on https://schemas.getdbt.com/dbt/catalog/v1.json
REQUIRED_CATALOG_KEYS = ["name", "type", "index"]

REQUIRED_RESULTS_KEYS = {
"status",
"timing",
"thread_id",
"execution_time",
"message",
"adapter_response",
"unique_id",
}

REQUIRED_NODE_KEYS = {
"schema_",
"schema",
"freshness",
"name",
"resource_type",
"path",
"unique_id",
"source_name",
"source_description",
"source_meta",
"loader",
"identifier",
"relation_name",
"fqn",
"alias",
"checksum",
"config",
"column_name",
"test_metadata",
"original_file_path",
"root_path",
"database",
"tags",
"description",
"columns",
"meta",
"owner",
"created_at",
"group",
"sources",
"compiled",
"docs",
"version",
"latest_version",
"package_name",
"depends_on",
"compiled_code",
"compiled_sql",
"raw_code",
"raw_sql",
"language",
}


NONE_KEYWORDS_LIST = ["none", "null"]

DBT_CATALOG_FILE_NAME = "catalog.json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"""

from abc import ABC, abstractmethod
from typing import Iterable
from typing import Iterable, List

from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results
from pydantic import Field
Expand All @@ -37,6 +37,10 @@
TopologyNode,
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.database.dbt.constants import (
REQUIRED_NODE_KEYS,
REQUIRED_RESULTS_KEYS,
)
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
from metadata.ingestion.source.database.dbt.models import (
DbtFiles,
Expand Down Expand Up @@ -169,51 +173,27 @@ def remove_manifest_non_required_keys(self, manifest_dict: dict):
}
)

required_nodes_keys = {
"schema_",
"schema",
"name",
"resource_type",
"path",
"unique_id",
"fqn",
"alias",
"checksum",
"config",
"column_name",
"test_metadata",
"original_file_path",
"root_path",
"database",
"tags",
"description",
"columns",
"meta",
"owner",
"created_at",
"group",
"sources",
"compiled",
"docs",
"version",
"latest_version",
"package_name",
"depends_on",
"compiled_code",
"compiled_sql",
"raw_code",
"raw_sql",
"language",
}
for field in ["nodes", "sources"]:
for node, value in manifest_dict.get( # pylint: disable=unused-variable
field
).items():
Comment on lines +177 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SumanMaharana here we should probably check manifest_dict.get(... returns a dict and is not Null -- we can also update the method signature def remove_manifest_non_required_keys(self, manifest_dict: dict): if we are expecting indeed a Dict[str, dict].

Otherwise we'll get

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'str' object has no attribute 'items'

or

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'NoneType' object has no attribute 'items'

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@TeddyCr manifest_dict.get(... will always return a dict as its defined that way in dbt.

keys_to_delete = [
key for key in value if key.lower() not in REQUIRED_NODE_KEYS
]
for key in keys_to_delete:
del value[key]

for node, value in manifest_dict.get( # pylint: disable=unused-variable
"nodes"
).items():
keys_to_delete = [
key for key in value if key.lower() not in required_nodes_keys
]
for key in keys_to_delete:
del value[key]
def remove_run_result_non_required_keys(self, run_results: List[dict]):
"""
Method to remove the non required keys from run results file
"""
for run_result in run_results:
for result in run_result.get("results"):
keys_to_delete = [
key for key in result if key.lower() not in REQUIRED_RESULTS_KEYS
]
for key in keys_to_delete:
del result[key]

def get_dbt_files(self) -> Iterable[DbtFiles]:
dbt_files = get_dbt_details(self.source_config.dbtConfigSource)
Expand All @@ -225,6 +205,10 @@ def get_dbt_objects(self) -> Iterable[DbtObjects]:
self.remove_manifest_non_required_keys(
manifest_dict=self.context.get().dbt_file.dbt_manifest
)
if self.context.get().dbt_file.dbt_run_results:
self.remove_run_result_non_required_keys(
run_results=self.context.get().dbt_file.dbt_run_results
)
dbt_objects = DbtObjects(
dbt_catalog=parse_catalog(self.context.get().dbt_file.dbt_catalog)
if self.context.get().dbt_file.dbt_catalog
Expand Down
Loading