Skip to content

Commit

Permalink
Merge branch 'main' into feat-#15380
Browse files Browse the repository at this point in the history
  • Loading branch information
Sachin-chaurasiya authored Dec 18, 2024
2 parents cbf85ee + 6dc7e5c commit fbdc623
Showing 1 changed file with 23 additions and 16 deletions.
39 changes: 23 additions & 16 deletions ingestion/src/metadata/ingestion/source/database/dbt/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,11 +326,14 @@ def add_dbt_tests(
None,
)

def _add_dbt_freshness_test_from_sources(
def add_dbt_sources(
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
):
# in dbt manifest sources node name is table/view name (not test name like with test nodes)
# so in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy
) -> None:
"""
Method to append dbt test cases based on sources file for later processing
In dbt manifest sources node name is table/view name (not test name like with test nodes)
So in order for the test creation to be named precisely I am amending manifest node name within it's deepcopy
"""
manifest_node_new = deepcopy(manifest_node)
manifest_node_new.name = manifest_node_new.name + "_freshness"

Expand All @@ -350,16 +353,6 @@ def _add_dbt_freshness_test_from_sources(
DbtCommonEnum.RESULTS.value
] = freshness_test_result

def add_dbt_sources(
self, key: str, manifest_node, manifest_entities, dbt_objects: DbtObjects
) -> None:
"""
Method to append dbt test cases based on sources file for later processing
"""
self._add_dbt_freshness_test_from_sources(
key, manifest_node, manifest_entities, dbt_objects
)

# pylint: disable=too-many-locals, too-many-branches, too-many-statements
def yield_data_models(
self, dbt_objects: DbtObjects
Expand Down Expand Up @@ -481,18 +474,28 @@ def yield_data_models(
table_name=model_name,
)

table_entity: Optional[
table_entities: Optional[
Union[Table, List[Table]]
] = get_entity_from_es_result(
entity_list=self.metadata.es_search_from_fqn(
entity_type=Table,
fqn_search_string=table_fqn,
fields="sourceHash",
),
fetch_multiple_entities=False,
fetch_multiple_entities=True,
)
logger.debug(
f"Found table entities from {table_fqn}: {table_entities}"
)

table_entity = next(
iter(filter(lambda x: x is not None, table_entities)), None
)

if table_entity:
logger.debug(
f"Using Table Entity for datamodel: {table_entity}"
)
data_model_link = DataModelLink(
table_entity=table_entity,
datamodel=DataModel(
Expand Down Expand Up @@ -948,6 +951,7 @@ def create_dbt_test_case(
self.metadata, entity_link_str
)
table_fqn = get_table_fqn(entity_link_str)
logger.debug(f"Table fqn found: {table_fqn}")
source_elements = table_fqn.split(fqn.FQN_SEPARATOR)
test_case_fqn = fqn.build(
self.metadata,
Expand Down Expand Up @@ -983,6 +987,7 @@ def create_dbt_test_case(
owners=None,
)
)
logger.debug(f"Test case Already Exists: {test_case_fqn}")
except Exception as err: # pylint: disable=broad-except
yield Either(
left=StackTraceError(
Expand Down Expand Up @@ -1072,6 +1077,8 @@ def add_dbt_test_result(self, dbt_test: dict):
else None,
test_case_name=manifest_node.name,
)

logger.debug(f"Adding test case results to {test_case_fqn} ")
try:
self.metadata.add_test_case_results(
test_results=test_case_result,
Expand Down

0 comments on commit fbdc623

Please sign in to comment.