Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate dlt and dbt. Use dbt to run dlt pipelines #39

Merged
merged 2 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,24 @@ sometimes limit the inclusion of community features in the open-source version.
opendbt offers a fully open-source package to address these concerns. **OpenDBT builds upon dbt-core, adding valuable
features without changing dbt-core code.**

`opendbt` unlocks many customization which are not in dbt-core, allowing end user to tailor dbt to his/her specific needs and data workflows.
`opendbt` unlocks many customization which are not in dbt-core, allowing end user to tailor dbt to his/her specific
needs and data workflows.

With `opendbt` you can go beyond the core functionalities of dbt. For example seamlessly integrating your customized
adapter and providing jinja context with further adapter/python methods.

# Features

- Customize Existing Adapters: Leverage object-oriented programming (OOP) inheritance to create custom adapters that
inherit from existing ones. Additionally, you can provide Jinja with custom Python methods within your adapter.
- Customize Existing Adapters: Leverage OOP to create custom adapters that extends existing ones. With this user
could provide more python features to jinja
- Execute Local Python Code: Use dbt Python models to run local Python code. For example, you could import data from web
APIs directly within your dbt model.
- Granular Model-Level Orchestration with Airflow: Integrate Airflow for fine-grained control over model execution.
- Serve dbt Docs in Airflow UI: Create a custom page on the Airflow server that displays dbt documentation as an Airflow
UI page.
- Customized dbt Docs: Replace the default dbt documentation page with your own custom index.html file.
- Run end to end ETL pipeline with dbt
using [dlt](https://dlthub.com/) [integration](https://github.com/memiiso/opendbt/issues/40)

For detailed examples, see: [examples](docs/EXAMPLES.md).

Expand Down
43 changes: 41 additions & 2 deletions opendbt/examples.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import importlib
import sys
import tempfile
from multiprocessing.context import SpawnContext
from typing import Dict
Expand All @@ -11,6 +12,10 @@ class DuckDBAdapterV2Custom(DuckDBAdapter):

@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()

with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
Expand All @@ -22,12 +27,46 @@ def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
# Access and call `model` function of the model! NOTE: session argument is None here.
dbt = module.dbtObj(None)
module.model(dbt, None)
# Access and call `model` function of the model!
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt, session=connection.handle)
model_file.close()

@available
def submit_local_dlt_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()

import dlt
# IMPORTANT: here we are pre-configuring and preparing dlt.pipeline for the model!
_pipeline = dlt.pipeline(
pipeline_name=str(parsed_model['unique_id']).replace(".", "-"),
destination=dlt.destinations.duckdb(connection.handle._env.conn),
dataset_name=parsed_model['schema'],
dev_mode=False,
)

with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
print(f"Created temp py file {model_file.name}")
# load and execute python code!
model_name = parsed_model['name']
# Load the module spec
spec = importlib.util.spec_from_file_location(model_name, model_file.name)
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
dbt = module.dbtObj(None)
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt, pipeline=_pipeline)
model_file.close()

# NOTE! used for testing
class DuckDBAdapterTestingOnlyDbt17(DuckDBAdapter):
Expand Down
26 changes: 26 additions & 0 deletions opendbt/macros/executedlt.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{% materialization executedlt, supported_languages=['python']%}

{%- set identifier = model['alias'] -%}
{%- set language = model['language'] -%}

{% set grant_config = config.get('grants') %}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{{ run_hooks(pre_hooks) }}

{% call noop_statement(name='main', message='Executed DLT pipeline', code=compiled_code, rows_affected=-1, res=None) %}
{%- set res = adapter.submit_local_dlt_job(model, compiled_code) -%}
{% endcall %}
{{ run_hooks(post_hooks) }}

{% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ dependencies = [
]
[project.optional-dependencies]
airflow = ["apache-airflow"]
test = ["testcontainers>=3.7,<4.9", "apache-airflow", "pylint"]
test = ["testcontainers>=3.7,<4.9", "apache-airflow", "pylint", "dlt[duckdb]"]

[tool.setuptools]
include-package-data = true
Expand Down
36 changes: 36 additions & 0 deletions tests/resources/dbttest/models/my_executedlt_model.py.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import dlt
from dlt.pipeline import TPipeline


@dlt.resource(
columns={"event_tstamp": {"data_type": "timestamp", "precision": 3}},
primary_key="event_id",
)
def events():
yield [{"event_id": 1, "event_tstamp": "2024-07-30T10:00:00.123"},
{"event_id": 2, "event_tstamp": "2025-02-30T10:00:00.321"}]


def model(dbt, pipeline: TPipeline):
"""

:param dbt:
:param pipeline: Pre-configured dlt pipeline. dlt target connection and dataset is pre-set using the model config!
:return:
"""
dbt.config(materialized="executedlt")
print("========================================================")
print(f"INFO: DLT Pipeline pipeline_name:{pipeline.pipeline_name}")
print(f"INFO: DLT Pipeline dataset_name:{pipeline.dataset_name}")
print(f"INFO: DLT Pipeline staging:{pipeline.staging}")
print(f"INFO: DLT Pipeline destination:{pipeline.destination}")
print(f"INFO: DLT Pipeline _pipeline_storage:{pipeline._pipeline_storage}")
print(f"INFO: DLT Pipeline _schema_storage:{pipeline._schema_storage}")
print(f"INFO: DLT Pipeline state:{pipeline.state}")
print("========================================================")
load_info = pipeline.run(events())
print(load_info)
row_counts = pipeline.last_trace.last_normalize_info
print(row_counts)
print("========================================================")
return None
31 changes: 31 additions & 0 deletions tests/resources/dbttest/models/my_executepython_dlt_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import dlt


@dlt.resource(
columns={"event_tstamp": {"data_type": "timestamp", "precision": 3}},
primary_key="event_id",
)
def events():
yield [{"event_id": 1, "event_tstamp": "2024-07-30T10:00:00.123"},
{"event_id": 2, "event_tstamp": "2025-02-30T10:00:00.321"}]


def model(dbt, session):
dbt.config(materialized="executepython")
print("========================================================")
print(f"INFO: DLT Version:{dlt.version.__version__}")
print(f"INFO: DBT Duckdb Session:{type(session)}")
print(f"INFO: DBT Duckdb Connection:{type(session._env.conn)}")
print("========================================================")
p = dlt.pipeline(
pipeline_name="dbt_dlt",
destination=dlt.destinations.duckdb(session._env.conn),
dataset_name=dbt.this.schema,
dev_mode=False,
)
load_info = p.run(events())
print(load_info)
row_counts = p.last_trace.last_normalize_info
print(row_counts)
print("========================================================")
return None
14 changes: 14 additions & 0 deletions tests/test_executedlt_materialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pathlib import Path
from unittest import TestCase

from opendbt import OpenDbtProject


class TestOpenDbtProject(TestCase):
RESOURCES_DIR = Path(__file__).parent.joinpath("resources")
DBTTEST_DIR = RESOURCES_DIR.joinpath("dbttest")

def test_run_executedlt_materialization(self):
dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR,
args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom'])
dp.run(command="run", args=['--select', 'my_executedlt_model'])
9 changes: 7 additions & 2 deletions tests/test_executepython_materialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,14 @@ class TestOpenDbtProject(TestCase):
def test_run_executepython_materialization(self):
dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR,
args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom'])
dp.run(command="run", args=['--select', 'my_executepython_dbt_model'])
dp.run(command="run", args=['--select', 'my_executepython_model'])

def test_run_executepython_dlt_pipeline(self):
dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR,
args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom'])
dp.run(command="run", args=['--select', 'my_executepython_dlt_model'])

def test_run_executepython_materialization_subprocess(self):
dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR,
args=['--vars', 'dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom'])
dp.run(command="run", args=['--select', 'my_executepython_dbt_model'], use_subprocess=True)
dp.run(command="run", args=['--select', 'my_executepython_model'], use_subprocess=True)
Loading