From b0f4ab38b34b1a6b746cc161c6c7827755d6e10e Mon Sep 17 00:00:00 2001 From: Christophe Blefari Date: Wed, 30 Aug 2023 13:43:27 +0200 Subject: [PATCH] =?UTF-8?q?9.7=20=E2=80=94=20Improve=20dbt=20logs=20errors?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- poetry.lock | 4 ++-- tdf/assets/dbt.py | 6 +++--- tdf/assets/extract.py | 2 +- tdf/helpers/dbt.py | 8 +++++--- tdf/resources.py | 47 ++++++++++++++++++++++++++++++------------- 5 files changed, 44 insertions(+), 23 deletions(-) diff --git a/poetry.lock b/poetry.lock index 97127d6..ef7738a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -135,7 +135,7 @@ python-versions = ">=3.7,<4.0" [[package]] name = "bdp-contracts" -version = "0.1.0" +version = "0.1.2" description = "blef data platform data contracts manager" category = "main" optional = false @@ -1997,7 +1997,7 @@ backoff = [ {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, ] bdp-contracts = [ - {file = "bdp-contracts-0.1.0.tar.gz", hash = "sha256:e8f0f86fd9f3574a1858501b1c70de241d2422efc3bc3dc42ebd56bc8e142286"}, + {file = "bdp-contracts-0.1.2.tar.gz", hash = "sha256:1da20289579f9b30dace5e675c35d4e7f493f0ba67944fde6c22febdffa774a2"}, ] black = [ {file = "black-23.7.0-cp310-cp310-macosx_10_16_arm64.whl", hash = "sha256:5c4bc552ab52f6c1c506ccae05681fab58c3f72d59ae6e6639e8885e94fe2587"}, diff --git a/tdf/assets/dbt.py b/tdf/assets/dbt.py index 496b653..9fa9785 100644 --- a/tdf/assets/dbt.py +++ b/tdf/assets/dbt.py @@ -3,7 +3,7 @@ from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets from dagster import OpExecutionContext -from tdf.resources import dbt_manifest_path +from tdf.resources import DbtResource, dbt_manifest_path class CustomDagsterDbtTranslator(DagsterDbtTranslator): @@ -15,5 +15,5 @@ def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str] manifest=dbt_manifest_path, dagster_dbt_translator=CustomDagsterDbtTranslator(), ) -def analytics_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): - yield from dbt.cli(["build"], context=context).stream() +def analytics_dbt_assets(context: OpExecutionContext, dbt: DbtResource): + yield from dbt.build(context) diff --git a/tdf/assets/extract.py b/tdf/assets/extract.py index b84053c..a6efb17 100644 --- a/tdf/assets/extract.py +++ b/tdf/assets/extract.py @@ -1,4 +1,5 @@ import pandas as pd +from bdp_contracts import get_contract from dagster import ( AssetKey, @@ -7,7 +8,6 @@ StaticPartitionsDefinition, asset, ) -from bdp_contracts import get_contract from tdf.resources import GoogleSheetResource, PostgresResource public_race = SourceAsset( diff --git a/tdf/helpers/dbt.py b/tdf/helpers/dbt.py index 2233e5f..7ec3f05 100644 --- a/tdf/helpers/dbt.py +++ b/tdf/helpers/dbt.py @@ -1,11 +1,10 @@ import click import yaml - from bdp_contracts import Dataset, get_contracts @click.command() -@click.argument('sources_file') +@click.argument("sources_file") def generate_dbt_sources(sources_file): sources = {} contract: Dataset @@ -23,7 +22,10 @@ def generate_dbt_sources(sources_file): "name": contract.name, "description": contract.description, "columns": contract.get_dbt_serialization(), - "meta": {"dagster": {"asset_key": contract.name}, "external_location": external_location} + "meta": { + "dagster": {"asset_key": contract.name}, + "external_location": external_location, + }, } sources["tables"].append(source) diff --git a/tdf/resources.py b/tdf/resources.py index 4c5eca1..ab3ad1f 100644 --- a/tdf/resources.py +++ b/tdf/resources.py @@ -1,18 +1,17 @@ import base64 import json import os -from pathlib import Path from typing import List, Optional import pandas as pd -from dagster_dbt import DbtCliResource, DagsterDbtCliRuntimeError +from dagster_dbt import DagsterDbtCliRuntimeError, DbtCliResource from dagster_gcp import GCSResource as DagsterGCSResource from google.cloud import storage from google.oauth2.service_account import Credentials from pydantic import Field from sqlalchemy import create_engine -from dagster import ConfigurableResource +from dagster import ConfigurableResource, Failure, OpExecutionContext class PostgresResource(ConfigurableResource): @@ -59,24 +58,44 @@ def get_client(self) -> storage.Client: return storage.client.Client(project=self.project, credentials=self.credentials) -class DbtParseError(Exception): - def __init__(self, events): - message = " ".join(events) - super().__init__(message) +class DbtCliError(Failure): + def __init__(self, action: str, events: List[any]): + super().__init__( + description=f"dbt {action} failed", + metadata={ + "DBT LOG": "\n".join([event["info"]["msg"] for event in events]), + }, + ) class DbtResource(DbtCliResource): - def parse(self): + def do(self, action, *args, **kwargs): + events = [] try: - dbt = DbtCliResource(project_dir="analytics", target=os.getenv("DBT_TARGET")) - cli = dbt.cli(["parse"], manifest={}) - events = [] + cli = self.cli([action], **kwargs) for event in cli.stream_raw_events(): - events.append(json.dumps(event.raw_event)) + events.append(event.raw_event) + + if action == "build": + yield from event.to_default_asset_events( + manifest=cli.manifest, + dagster_dbt_translator=cli.dagster_dbt_translator, + ) + + if action == "parse": + yield cli.target_path.joinpath("manifest.json") - return cli.target_path.joinpath("manifest.json") except DagsterDbtCliRuntimeError as er: - raise DbtParseError(events) + raise DbtCliError( + action=action, + events=events, + ) + + def parse(self): + return self.do("parse", manifest={}).__next__() + + def build(self, context: OpExecutionContext): + yield from self.do("build", context=context) dbt = DbtResource(project_dir="analytics", target=os.getenv("DBT_TARGET"))