Skip to content

Commit

Permalink
added snapshots to the report
Browse files Browse the repository at this point in the history
  • Loading branch information
MikaKerman committed Dec 18, 2024
1 parent 24c46a1 commit de9269f
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 5 deletions.
2 changes: 2 additions & 0 deletions elementary/monitor/api/groups/groups.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.fetchers.tests.schema import NormalizedTestSchema
Expand All @@ -31,6 +32,7 @@
NormalizedExposureSchema,
NormalizedTestSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
]


Expand Down
30 changes: 28 additions & 2 deletions elementary/monitor/api/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
TotalsModelRunsSchema,
)
Expand All @@ -26,6 +27,7 @@
from elementary.monitor.fetchers.models.schema import (
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand All @@ -39,6 +41,7 @@ class ModelsAPI(APIClient):
SourceSchema: "sources",
ModelSchema: "models",
ExposureSchema: "exposures",
SnapshotSchema: "snapshots",
}

def __init__(self, dbt_runner: BaseDbtRunner):
Expand Down Expand Up @@ -162,6 +165,16 @@ def get_sources(self) -> Dict[str, NormalizedSourceSchema]:
sources[source_unique_id] = normalized_source
return sources

def get_snapshots(self) -> Dict[str, NormalizedSnapshotSchema]:
snapshots_results = self.models_fetcher.get_snapshots()
snapshots = dict()
if snapshots_results:
for snapshot_result in snapshots_results:
normalized_snapshot = self._normalize_dbt_artifact_dict(snapshot_result)
snapshot_unique_id = cast(str, normalized_snapshot.unique_id)
snapshots[snapshot_unique_id] = normalized_snapshot
return snapshots

def get_exposures(
self,
upstream_node_ids: Optional[List[str]] = None,
Expand Down Expand Up @@ -257,19 +270,30 @@ def _normalize_dbt_artifact_dict(
) -> NormalizedSourceSchema:
...

@overload
def _normalize_dbt_artifact_dict(
self, artifact: SnapshotSchema
) -> NormalizedSnapshotSchema:
...

def _normalize_dbt_artifact_dict(
self, artifact: Union[SeedSchema, ModelSchema, ExposureSchema, SourceSchema]
self,
artifact: Union[
SeedSchema, ModelSchema, ExposureSchema, SourceSchema, SnapshotSchema
],
) -> Union[
NormalizedSeedSchema,
NormalizedModelSchema,
NormalizedExposureSchema,
NormalizedSourceSchema,
NormalizedSnapshotSchema,
]:
schema_to_normalized_schema_map = {
SeedSchema: NormalizedSeedSchema,
ExposureSchema: NormalizedExposureSchema,
ModelSchema: NormalizedModelSchema,
SourceSchema: NormalizedSourceSchema,
SnapshotSchema: NormalizedSnapshotSchema,
}
artifact_name = artifact.name
normalized_artifact = json.loads(artifact.json())
Expand Down Expand Up @@ -308,7 +332,9 @@ def _normalize_artifact_path(cls, artifact: ArtifactSchemaType, fqn: str) -> str
@classmethod
def _fqn(
cls,
artifact: Union[ModelSchema, ExposureSchema, SourceSchema, SeedSchema],
artifact: Union[
ModelSchema, ExposureSchema, SourceSchema, SeedSchema, SnapshotSchema
],
) -> str:
if isinstance(artifact, ExposureSchema):
path = (artifact.meta or {}).get("path")
Expand Down
6 changes: 6 additions & 0 deletions elementary/monitor/api/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ExposureSchema,
ModelSchema,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.pydantic_shim import BaseModel, Field, validator
Expand Down Expand Up @@ -51,6 +52,11 @@ class NormalizedSourceSchema(NormalizedArtifactSchema, SourceSchema):
artifact_type: str = Field("source", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedSnapshotSchema(NormalizedArtifactSchema, SnapshotSchema):
artifact_type: str = Field("snapshot", const=True) # type: ignore # noqa


# NormalizedArtifactSchema must be first in the inheritance order
class NormalizedExposureSchema(NormalizedArtifactSchema, ExposureSchema):
artifact_type: str = Field("exposure", const=True) # type: ignore # noqa
Expand Down
19 changes: 16 additions & 3 deletions elementary/monitor/api/report/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NormalizedExposureSchema,
NormalizedModelSchema,
NormalizedSeedSchema,
NormalizedSnapshotSchema,
NormalizedSourceSchema,
)
from elementary.monitor.api.report.schema import ReportDataEnvSchema, ReportDataSchema
Expand Down Expand Up @@ -47,11 +48,19 @@ def _get_groups(
sources: Iterable[NormalizedSourceSchema],
exposures: Iterable[NormalizedExposureSchema],
seeds: Iterable[NormalizedSeedSchema],
snapshots: Iterable[NormalizedSnapshotSchema],
singular_tests: Iterable[NormalizedTestSchema],
) -> GroupsSchema:
groups_api = GroupsAPI(self.dbt_runner)
return groups_api.get_groups(
artifacts=[*models, *sources, *exposures, *seeds, *singular_tests]
artifacts=[
*models,
*sources,
*exposures,
*seeds,
*snapshots,
*singular_tests,
]
)

def get_report_data(
Expand Down Expand Up @@ -90,6 +99,8 @@ def get_report_data(
lineage_node_ids.extend(models.keys())
sources = models_api.get_sources()
lineage_node_ids.extend(sources.keys())
snapshots = models_api.get_snapshots()
lineage_node_ids.extend(snapshots.keys())
exposures = models_api.get_exposures(upstream_node_ids=lineage_node_ids)
lineage_node_ids.extend(exposures.keys())
singular_tests = tests_api.get_singular_tests()
Expand All @@ -99,6 +110,7 @@ def get_report_data(
sources.values(),
exposures.values(),
seeds.values(),
snapshots.values(),
singular_tests,
)

Expand Down Expand Up @@ -147,7 +159,7 @@ def get_report_data(

serializable_groups = groups.dict()
serializable_models = self._serialize_models(
models, sources, exposures, seeds
models, sources, snapshots, exposures, seeds
)
serializable_model_runs = self._serialize_models_runs(models_runs.runs)
serializable_model_runs_totals = models_runs.dict(include={"totals"})[
Expand Down Expand Up @@ -207,10 +219,11 @@ def _serialize_models(
self,
models: Dict[str, NormalizedModelSchema],
sources: Dict[str, NormalizedSourceSchema],
snapshots: Dict[str, NormalizedSnapshotSchema],
exposures: Dict[str, NormalizedExposureSchema],
seeds: Dict[str, NormalizedSeedSchema],
) -> Dict[str, dict]:
nodes = dict(**models, **sources, **exposures, **seeds)
nodes = dict(**models, **sources, **snapshots, **exposures, **seeds)
serializable_nodes = dict()
for key in nodes.keys():
serializable_nodes[key] = dict(nodes[key])
Expand Down
33 changes: 33 additions & 0 deletions elementary/monitor/dbt_project/macros/get_snapshots.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{% macro get_snapshots() %}
{% set dbt_snapshots_relation = ref('elementary', 'dbt_snapshots') %}
{%- if elementary.relation_exists(dbt_snapshots_relation) -%}
{% set get_snapshots_query %}
with dbt_artifacts_snapshots as (
select
name,
unique_id,
owner,
tags,
package_name,
description,
meta,
materialization,
database_name,
schema_name,
depends_on_macros,
depends_on_nodes,
original_path as full_path,
path,
patch_path,
generated_at,
unique_key,
incremental_strategy
from {{ dbt_snapshots_relation }}
)
select * from dbt_artifacts_snapshots
{% endset %}

{% set snapshots_agate = run_query(get_snapshots_query) %}
{% do return(elementary.agate_to_dicts(snapshots_agate)) %}
{%- endif -%}
{% endmacro %}
11 changes: 11 additions & 0 deletions elementary/monitor/fetchers/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
ModelSchema,
ModelTestCoverage,
SeedSchema,
SnapshotSchema,
SourceSchema,
)
from elementary.utils.log import get_logger
Expand Down Expand Up @@ -62,6 +63,16 @@ def get_sources(self) -> List[SourceSchema]:
sources = [SourceSchema(**source) for source in sources]
return sources

def get_snapshots(self) -> List[SnapshotSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_snapshots"
)
snapshots = (
json.loads(run_operation_response[0]) if run_operation_response else []
)
snapshots = [SnapshotSchema(**snapshot) for snapshot in snapshots]
return snapshots

def get_exposures(self) -> List[ExposureSchema]:
run_operation_response = self.dbt_runner.run_operation(
macro_name="elementary_cli.get_exposures"
Expand Down
18 changes: 18 additions & 0 deletions elementary/monitor/fetchers/models/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,24 @@ def ref(self):
return f"source('{self.source_name}', '{self.table_name}')"


class SnapshotSchema(ArtifactSchema):
database_name: str
schema_name: str
depends_on_macros: str
depends_on_nodes: str
path: str
patch_path: Optional[str]
generated_at: str
unique_key: str
incremental_strategy: Optional[str]

table_name: Optional[str] = None

@validator("table_name", always=True)
def set_table_name(cls, table_name, values):
return values.get("name")


class OwnerSchema(ExtendedBaseModel):
name: Optional[str] = None
email: Optional[str] = None
Expand Down

0 comments on commit de9269f

Please sign in to comment.