Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move the contents of dbt.contracts.results to a new dbt.artifacts directory #9350

Merged
merged 6 commits into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Under the Hood-20240108-160140.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Under the Hood
body: Move result objects to dbt.artifacts
time: 2024-01-08T16:01:40.20348-05:00
custom:
Author: gshank
Issue: "9193"
180 changes: 180 additions & 0 deletions core/dbt/artifacts/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import dataclasses
from datetime import datetime
from typing import ClassVar, Type, TypeVar, Dict, Any, Optional

from dbt.common.clients.system import write_json, read_json
from dbt.exceptions import (
DbtInternalError,
DbtRuntimeError,
IncompatibleSchemaError,
)
from dbt.version import __version__

from dbt.common.events.functions import get_metadata_vars
from dbt.common.invocation import get_invocation_id
from dbt.common.dataclass_schema import dbtClassMixin

from mashumaro.jsonschema import build_json_schema
from mashumaro.jsonschema.dialects import DRAFT_2020_12
import functools


BASE_SCHEMAS_URL = "https://schemas.getdbt.com/"
SCHEMA_PATH = "dbt/{name}/v{version}.json"


@dataclasses.dataclass
class SchemaVersion:
name: str
version: int

@property
def path(self) -> str:
return SCHEMA_PATH.format(name=self.name, version=self.version)

def __str__(self) -> str:
return BASE_SCHEMAS_URL + self.path


class Writable:
def write(self, path: str):
write_json(path, self.to_dict(omit_none=False)) # type: ignore


class Readable:
@classmethod
def read(cls, path: str):
try:
data = read_json(path)
except (EnvironmentError, ValueError) as exc:
raise DbtRuntimeError(

Check warning on line 50 in core/dbt/artifacts/base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/artifacts/base.py#L47-L50

Added lines #L47 - L50 were not covered by tests
f'Could not read {cls.__name__} at "{path}" as JSON: {exc}'
) from exc

return cls.from_dict(data) # type: ignore

Check warning on line 54 in core/dbt/artifacts/base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/artifacts/base.py#L54

Added line #L54 was not covered by tests


# This is used in the ManifestMetadata, RunResultsMetadata, RunOperationResultMetadata,
# FreshnessMetadata, and CatalogMetadata classes
@dataclasses.dataclass
class BaseArtifactMetadata(dbtClassMixin):
dbt_schema_version: str
dbt_version: str = __version__
generated_at: datetime = dataclasses.field(default_factory=datetime.utcnow)
invocation_id: Optional[str] = dataclasses.field(default_factory=get_invocation_id)
env: Dict[str, str] = dataclasses.field(default_factory=get_metadata_vars)

def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
if dct["generated_at"] and dct["generated_at"].endswith("+00:00"):
dct["generated_at"] = dct["generated_at"].replace("+00:00", "") + "Z"

Check warning on line 70 in core/dbt/artifacts/base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/artifacts/base.py#L70

Added line #L70 was not covered by tests
return dct


# This is used as a class decorator to set the schema_version in the
# 'dbt_schema_version' class attribute. (It's copied into the metadata objects.)
# Name attributes of SchemaVersion in classes with the 'schema_version' decorator:
# manifest
# run-results
# run-operation-result
# sources
# catalog
# remote-compile-result
# remote-execution-result
# remote-run-result
def schema_version(name: str, version: int):
def inner(cls: Type[VersionedSchema]):
cls.dbt_schema_version = SchemaVersion(
name=name,
version=version,
)
return cls

return inner


# This is used in the ArtifactMixin and RemoteResult classes
@dataclasses.dataclass
class VersionedSchema(dbtClassMixin):
dbt_schema_version: ClassVar[SchemaVersion]

@classmethod
@functools.lru_cache
def json_schema(cls) -> Dict[str, Any]:
json_schema_obj = build_json_schema(cls, dialect=DRAFT_2020_12, with_dialect_uri=True)
json_schema = json_schema_obj.to_dict()
json_schema["$id"] = str(cls.dbt_schema_version)
return json_schema

@classmethod
def is_compatible_version(cls, schema_version):
compatible_versions = [str(cls.dbt_schema_version)]
if hasattr(cls, "compatible_previous_versions"):
for name, version in cls.compatible_previous_versions():
compatible_versions.append(str(SchemaVersion(name, version)))
return str(schema_version) in compatible_versions

@classmethod
def read_and_check_versions(cls, path: str):
try:
data = read_json(path)
except (EnvironmentError, ValueError) as exc:
raise DbtRuntimeError(

Check warning on line 122 in core/dbt/artifacts/base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/artifacts/base.py#L121-L122

Added lines #L121 - L122 were not covered by tests
f'Could not read {cls.__name__} at "{path}" as JSON: {exc}'
) from exc

# Check metadata version. There is a class variable 'dbt_schema_version', but
# that doesn't show up in artifacts, where it only exists in the 'metadata'
# dictionary.
if hasattr(cls, "dbt_schema_version"):
if "metadata" in data and "dbt_schema_version" in data["metadata"]:
previous_schema_version = data["metadata"]["dbt_schema_version"]
# cls.dbt_schema_version is a SchemaVersion object
if not cls.is_compatible_version(previous_schema_version):
raise IncompatibleSchemaError(
expected=str(cls.dbt_schema_version),
found=previous_schema_version,
)

return cls.upgrade_schema_version(data)

@classmethod
def upgrade_schema_version(cls, data):
"""This will modify the data (dictionary) passed in to match the current
artifact schema code, if necessary. This is the default method, which
just returns the instantiated object via from_dict."""
return cls.from_dict(data)


T = TypeVar("T", bound="ArtifactMixin")


# metadata should really be a Generic[T_M] where T_M is a TypeVar bound to
# BaseArtifactMetadata. Unfortunately this isn't possible due to a mypy issue:
# https://github.com/python/mypy/issues/7520
# This is used in the WritableManifest, RunResultsArtifact, RunOperationResultsArtifact,
# and CatalogArtifact
@dataclasses.dataclass(init=False)
class ArtifactMixin(VersionedSchema, Writable, Readable):
metadata: BaseArtifactMetadata

@classmethod
def validate(cls, data):
super().validate(data)
if cls.dbt_schema_version is None:
raise DbtInternalError("Cannot call from_dict with no schema version!")

Check warning on line 165 in core/dbt/artifacts/base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/artifacts/base.py#L165

Added line #L165 was not covered by tests


def get_artifact_schema_version(dct: dict) -> int:
schema_version = dct.get("metadata", {}).get("dbt_schema_version", None)
if not schema_version:
raise ValueError("Artifact is missing schema version")

Check warning on line 171 in core/dbt/artifacts/base.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/artifacts/base.py#L171

Added line #L171 was not covered by tests

# schema_version is in this format: https://schemas.getdbt.com/dbt/manifest/v10.json
# What the code below is doing:
# 1. Split on "/" – v10.json
# 2. Split on "." – v10
# 3. Skip first character – 10
# 4. Convert to int
# TODO: If this gets more complicated, turn into a regex
return int(schema_version.split("/")[-1].split(".")[0][1:])
109 changes: 109 additions & 0 deletions core/dbt/artifacts/catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from typing import Dict, Union, Optional, NamedTuple, Any, List
from dataclasses import dataclass, field
from datetime import datetime

from dbt.common.dataclass_schema import dbtClassMixin
from dbt.common.utils.formatting import lowercase
from dbt.common.contracts.util import Replaceable
from dbt.artifacts.base import ArtifactMixin, BaseArtifactMetadata, schema_version

Primitive = Union[bool, str, float, None]
PrimitiveDict = Dict[str, Primitive]

CatalogKey = NamedTuple(
"CatalogKey", [("database", Optional[str]), ("schema", str), ("name", str)]
)


@dataclass
class StatsItem(dbtClassMixin):
id: str
label: str
value: Primitive
include: bool
description: Optional[str] = None


StatsDict = Dict[str, StatsItem]


@dataclass
class ColumnMetadata(dbtClassMixin):
type: str
index: int
name: str
comment: Optional[str] = None


ColumnMap = Dict[str, ColumnMetadata]


@dataclass
class TableMetadata(dbtClassMixin):
type: str
schema: str
name: str
database: Optional[str] = None
comment: Optional[str] = None
owner: Optional[str] = None


@dataclass
class CatalogTable(dbtClassMixin, Replaceable):
metadata: TableMetadata
columns: ColumnMap
stats: StatsDict
# the same table with two unique IDs will just be listed two times
unique_id: Optional[str] = None

def key(self) -> CatalogKey:
return CatalogKey(
lowercase(self.metadata.database),
self.metadata.schema.lower(),
self.metadata.name.lower(),
)


@dataclass
class CatalogMetadata(BaseArtifactMetadata):
dbt_schema_version: str = field(
default_factory=lambda: str(CatalogArtifact.dbt_schema_version)
)


@dataclass
class CatalogResults(dbtClassMixin):
nodes: Dict[str, CatalogTable]
sources: Dict[str, CatalogTable]
errors: Optional[List[str]] = None
_compile_results: Optional[Any] = None

def __post_serialize__(self, dct):
dct = super().__post_serialize__(dct)
if "_compile_results" in dct:
del dct["_compile_results"]
return dct


@dataclass
@schema_version("catalog", 1)
class CatalogArtifact(CatalogResults, ArtifactMixin):
metadata: CatalogMetadata

@classmethod
def from_results(
cls,
generated_at: datetime,
nodes: Dict[str, CatalogTable],
sources: Dict[str, CatalogTable],
compile_results: Optional[Any],
errors: Optional[List[str]],
) -> "CatalogArtifact":
meta = CatalogMetadata(generated_at=generated_at)
return cls(
metadata=meta,
nodes=nodes,
sources=sources,
errors=errors,
_compile_results=compile_results,
)
Loading