Skip to content

Commit

Permalink
9.7 — Improve dbt logs errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Bl3f committed Aug 30, 2023
1 parent 96ba7cd commit b0f4ab3
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 23 deletions.
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions tdf/assets/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dagster_dbt import DagsterDbtTranslator, DbtCliResource, dbt_assets

from dagster import OpExecutionContext
from tdf.resources import dbt_manifest_path
from tdf.resources import DbtResource, dbt_manifest_path


class CustomDagsterDbtTranslator(DagsterDbtTranslator):
Expand All @@ -15,5 +15,5 @@ def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]
manifest=dbt_manifest_path,
dagster_dbt_translator=CustomDagsterDbtTranslator(),
)
def analytics_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
def analytics_dbt_assets(context: OpExecutionContext, dbt: DbtResource):
yield from dbt.build(context)
2 changes: 1 addition & 1 deletion tdf/assets/extract.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pandas as pd
from bdp_contracts import get_contract

from dagster import (
AssetKey,
Expand All @@ -7,7 +8,6 @@
StaticPartitionsDefinition,
asset,
)
from bdp_contracts import get_contract
from tdf.resources import GoogleSheetResource, PostgresResource

public_race = SourceAsset(
Expand Down
8 changes: 5 additions & 3 deletions tdf/helpers/dbt.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import click
import yaml

from bdp_contracts import Dataset, get_contracts


@click.command()
@click.argument('sources_file')
@click.argument("sources_file")
def generate_dbt_sources(sources_file):
sources = {}
contract: Dataset
Expand All @@ -23,7 +22,10 @@ def generate_dbt_sources(sources_file):
"name": contract.name,
"description": contract.description,
"columns": contract.get_dbt_serialization(),
"meta": {"dagster": {"asset_key": contract.name}, "external_location": external_location}
"meta": {
"dagster": {"asset_key": contract.name},
"external_location": external_location,
},
}
sources["tables"].append(source)

Expand Down
47 changes: 33 additions & 14 deletions tdf/resources.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import base64
import json
import os
from pathlib import Path
from typing import List, Optional

import pandas as pd
from dagster_dbt import DbtCliResource, DagsterDbtCliRuntimeError
from dagster_dbt import DagsterDbtCliRuntimeError, DbtCliResource
from dagster_gcp import GCSResource as DagsterGCSResource
from google.cloud import storage
from google.oauth2.service_account import Credentials
from pydantic import Field
from sqlalchemy import create_engine

from dagster import ConfigurableResource
from dagster import ConfigurableResource, Failure, OpExecutionContext


class PostgresResource(ConfigurableResource):
Expand Down Expand Up @@ -59,24 +58,44 @@ def get_client(self) -> storage.Client:
return storage.client.Client(project=self.project, credentials=self.credentials)


class DbtParseError(Exception):
def __init__(self, events):
message = " ".join(events)
super().__init__(message)
class DbtCliError(Failure):
def __init__(self, action: str, events: List[any]):
super().__init__(
description=f"dbt {action} failed",
metadata={
"DBT LOG": "\n".join([event["info"]["msg"] for event in events]),
},
)


class DbtResource(DbtCliResource):
def parse(self):
def do(self, action, *args, **kwargs):
events = []
try:
dbt = DbtCliResource(project_dir="analytics", target=os.getenv("DBT_TARGET"))
cli = dbt.cli(["parse"], manifest={})
events = []
cli = self.cli([action], **kwargs)
for event in cli.stream_raw_events():
events.append(json.dumps(event.raw_event))
events.append(event.raw_event)

if action == "build":
yield from event.to_default_asset_events(
manifest=cli.manifest,
dagster_dbt_translator=cli.dagster_dbt_translator,
)

if action == "parse":
yield cli.target_path.joinpath("manifest.json")

return cli.target_path.joinpath("manifest.json")
except DagsterDbtCliRuntimeError as er:
raise DbtParseError(events)
raise DbtCliError(
action=action,
events=events,
)

def parse(self):
return self.do("parse", manifest={}).__next__()

def build(self, context: OpExecutionContext):
yield from self.do("build", context=context)


dbt = DbtResource(project_dir="analytics", target=os.getenv("DBT_TARGET"))
Expand Down

0 comments on commit b0f4ab3

Please sign in to comment.