Skip to content

Commit

Permalink
feat(ingest): dbt,looker,sql_common,kafka - moving sources to produce…
Browse files Browse the repository at this point in the history
… display names and subtypes more consistently (#4496)
  • Loading branch information
shirshanka authored Mar 27, 2022
1 parent ab36ac0 commit a69eac8
Show file tree
Hide file tree
Showing 29 changed files with 9,212 additions and 7,516 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/source/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,7 @@ def _create_dataset_properties_aspect(
description=description,
customProperties=custom_props,
tags=node.tags,
name=node.name,
)
return dbt_properties

Expand Down
16 changes: 15 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
ChangeTypeClass,
DataPlatformInstanceClass,
JobStatusClass,
SubTypesClass,
)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -463,9 +464,22 @@ def _extract_record(self, topic: str) -> Iterable[MetadataWorkUnit]: # noqa: C9
self.report.report_workunit(wu)
yield wu

# 5. Add the subtype aspect marking this as a "topic"
subtype_wu = MetadataWorkUnit(
id=f"{topic}-subtype",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="subTypes",
aspect=SubTypesClass(typeNames=["topic"]),
),
)
yield subtype_wu

domain_urn: Optional[str] = None

# 5. Emit domains aspect MCPW
# 6. Emit domains aspect MCPW
for domain, pattern in self.source_config.domain.items():
if pattern.allowed(dataset_name):
domain_urn = make_domain_urn(domain)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ def _to_metadata_events( # noqa: C901
if self.source_file is not None:
custom_properties["looker.explore.file"] = str(self.source_file)
dataset_props = DatasetPropertiesClass(
name=self.name,
description=self.description,
customProperties=custom_properties,
)
Expand Down
4 changes: 3 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/lookml.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,9 @@ def _get_custom_properties(self, looker_view: LookerView) -> DatasetPropertiesCl
], # grab a limited slice of characters from the file
"looker.file.path": file_path,
}
dataset_props = DatasetPropertiesClass(customProperties=custom_properties)
dataset_props = DatasetPropertiesClass(
name=looker_view.id.view_name, customProperties=custom_properties
)

if self.source_config.github_info is not None:
github_file_url = self.source_config.github_info.get_url_for_file_path(
Expand Down
66 changes: 53 additions & 13 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
DataPlatformInstanceClass,
DatasetPropertiesClass,
JobStatusClass,
SubTypesClass,
ViewPropertiesClass,
)
from datahub.telemetry import telemetry
from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport
Expand Down Expand Up @@ -845,12 +847,12 @@ def _process_table(
)
checkpoint_state.add_table_urn(dataset_urn)
description, properties = self.get_table_properties(inspector, schema, table)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
)
dataset_snapshot.aspects.append(dataset_properties)
dataset_properties = DatasetPropertiesClass(
name=table,
description=description,
customProperties=properties,
)
dataset_snapshot.aspects.append(dataset_properties)
pk_constraints: dict = inspector.get_pk_constraint(table, schema)
foreign_keys = self._get_foreign_keys(dataset_urn, inspector, schema, table)
schema_fields = self.get_schema_fields(dataset_name, columns, pk_constraints)
Expand All @@ -873,6 +875,18 @@ def _process_table(
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
if dpi_aspect:
yield dpi_aspect
subtypes_aspect = MetadataWorkUnit(
id=f"{dataset_name}-subtypes",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="subTypes",
aspect=SubTypesClass(typeNames=["table"]),
),
)
yield subtypes_aspect

yield from self._get_domain_wu(
dataset_name=dataset_name,
entity_urn=dataset_urn,
Expand Down Expand Up @@ -1100,13 +1114,12 @@ def _process_view(
BaseSQLAlchemyCheckpointState, cur_checkpoint.state
)
checkpoint_state.add_view_urn(dataset_urn)
if description is not None or properties:
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties=properties,
# uri=dataset_name,
)
dataset_snapshot.aspects.append(dataset_properties)
dataset_properties = DatasetPropertiesClass(
name=view,
description=description,
customProperties=properties,
)
dataset_snapshot.aspects.append(dataset_properties)
if schema_metadata:
dataset_snapshot.aspects.append(schema_metadata)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
Expand All @@ -1116,6 +1129,33 @@ def _process_view(
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
if dpi_aspect:
yield dpi_aspect
subtypes_aspect = MetadataWorkUnit(
id=f"{view}-subtypes",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="subTypes",
aspect=SubTypesClass(typeNames=["view"]),
),
)
yield subtypes_aspect
if "view_definition" in properties:
view_definition_string = properties["view_definition"]
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
)
yield MetadataWorkUnit(
id=f"{view}-viewProperties",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="viewProperties",
aspect=view_properties_aspect,
),
)

yield from self._get_domain_wu(
dataset_name=dataset_name,
entity_urn=dataset_urn,
Expand Down
Loading

0 comments on commit a69eac8

Please sign in to comment.