Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: delta plugin support write #284

9 changes: 9 additions & 0 deletions Questions.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Something what i don't understand?

- For example we want to push something with a sql plugin to a database. Why we first write it down to disk and then read in memory with pandas and then push it further?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Primarily an artifact of the historical architecture here-- we built the plugins on top of the existing external materialization type. There isn't a reason we could not also support plugins on non-external materializations (tables/views/etc.), it's just work that hasn't come up yet.




Future:

- If we make an external materialization we want that upstream models can refer to it? How to do it we have to register an df and create an view to it?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current external materialization does this already; dbt-duckdb creates a VIEW in the DuckDB database that points at the externally materialized file so that it can be used/queried like normal by any other models that we run.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit tricky because the underlying delta table can't be just created but it has to be registered first as df. We do that currently when the delta table is defined in the source but we have to invoke this process here too. Let me try some stuff here if not we can make it a limitation and provide as the last layer that can't be referenced

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As of my understanding, if you use SQL/excel write plugin, you cant reference it afterward?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you can reference any external materialization after it gets created in other, downstream models (sorry for the lag I've been traveling in Asia and will be here for the next few weeks)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting that I would support a custom materialization type for delta tables if that is what we really need to make the write-side work well here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think that we can also put it in external but i have to refactor it a bit, but i am still not so far away, currently i just play around and try to make it work anyway

12 changes: 10 additions & 2 deletions dbt/adapters/duckdb/environments/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from .. import utils
from dbt.contracts.connection import AdapterResponse
from dbt.exceptions import DbtRuntimeError

import duckdb

class DuckDBCursorWrapper:
def __init__(self, cursor):
Expand Down Expand Up @@ -146,7 +146,15 @@ def store_relation(self, plugin_name: str, target_config: utils.TargetConfig) ->
+ ",".join(self._plugins.keys())
)
plugin = self._plugins[plugin_name]
plugin.store(target_config)

handle = self.handle()
cursor = handle.cursor()

df = cursor.sql(target_config.config.model.compiled_code).arrow()
milicevica23 marked this conversation as resolved.
Show resolved Hide resolved
plugin.store(target_config,df)

cursor.close()
handle.close()

def close(self):
if self.conn:
Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/duckdb/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def load(self, source_config: SourceConfig):
"""
raise NotImplementedError(f"load method not implemented for {self.name}")

def store(self, target_config: TargetConfig):
def store(self, target_config: TargetConfig, df = None):
raise NotImplementedError(f"store method not implemented for {self.name}")

def configure_cursor(self, cursor):
Expand Down
121 changes: 113 additions & 8 deletions dbt/adapters/duckdb/plugins/delta.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
import pyarrow.compute as pc
from typing import Any
from typing import Dict

from deltalake import DeltaTable
from deltalake import DeltaTable, write_deltalake

from . import BasePlugin
from ..utils import SourceConfig
from ..utils import SourceConfig, TargetConfig


class Plugin(BasePlugin):
Expand All @@ -16,15 +18,14 @@ def configure_cursor(self, cursor):

def load(self, source_config: SourceConfig):
if "delta_table_path" not in source_config:
raise Exception("'delta_table_path' is a required argument for the delta table!")
raise Exception(
"'delta_table_path' is a required argument for the delta table!"
)

table_path = source_config["delta_table_path"]
storage_options = source_config.get("storage_options", None)
storage_options = source_config.get("storage_options", {})

if storage_options:
dt = DeltaTable(table_path, storage_options=storage_options)
else:
dt = DeltaTable(table_path)
dt = DeltaTable(table_path, storage_options=storage_options)

# delta attributes
as_of_version = source_config.get("as_of_version", None)
Expand All @@ -43,6 +44,110 @@ def load(self, source_config: SourceConfig):
def default_materialization(self):
return "view"

def store(self, target_config: TargetConfig, df=None):
Copy link

@ZergRocks ZergRocks Jan 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part will be updated as integration with current profiles.yml's filesystem option after this PR finished delta-io/delta-rs#570 right?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delta-rs's filesystem seems to will use pyarrow's fs and that can be incorporated into this method with if-else or match clause

def initialize_plugins(cls, creds: DuckDBCredentials) -> Dict[str, BasePlugin]:

also we can instantiate pyarrow.fs in current existing pattern. but I'm not familar with this repository's convention

just leaving a comment to want to be of little help..

I've never contributed to open source, is there any way I can apply if I want to contribute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ZergRocks,
I am not sure if i follow here. As i understand, you ask if we support custom fs which can be used in the delta-rs package
https://delta-io.github.io/delta-rs/usage/loading-table/#custom-storage-backends?
So it is currently not implemented but maybe it can be at least for begining in the read part which is done by my side, so if you want to do it i would be happy to support you. You can write me in the slack dbt chanell and we can comment on that together

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pyarrow.fs won't be implemented, it was a placeholder for a possible implementation but we are not continuing with that anymore.

mode = target_config.config.get("mode", "overwrite")
table_path = target_config.location.path
storage_options = target_config.config.get("storage_options", {})

if mode == "overwrite_partition":
partition_key = target_config.config.get("partition_key", None)
if not partition_key:
raise Exception(
"'partition_key' has to be defined when mode 'overwrite_partition'!"
)

if isinstance(partition_key, str):
partition_key = [partition_key]

partition_dict = []
for each_key in partition_key:
unique_key_array = pc.unique(df[each_key])

if len(unique_key_array) == 1:
partition_dict.append((each_key, str(unique_key_array[0])))
else:
raise Exception(
f"'{each_key}' column has not one unique value, values are: {str(unique_key_array)}"
)
create_insert_partition(table_path, df, partition_dict, storage_options)
elif mode == "merge":
# very slow -> https://github.com/delta-io/delta-rs/issues/1846

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the case anymore :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @ion-elgreco, thank you very much for all the comments; I am happy that there is somebody from the delta-rs side with some insights. I very much appreciate your help.
I will recheck this when I finish general plugin refactoring on the dbt side and start again to test with the deltalake integration.

unique_key = target_config.config.get("unique_key", None)
if not unique_key:
raise Exception("'unique_key' has to be defined when mode 'merge'!")
if isinstance(unique_key, str):
unique_key = [unique_key]

predicate_stm = " and ".join(
[
f'source."{each_unique_key}" = target."{each_unique_key}"'
for each_unique_key in unique_key
]
)

try:
target_dt = DeltaTable(table_path, storage_options=storage_options)
except Exception:
# TODO handle this better
write_deltalake(
table_or_uri=table_path, data=df, storage_options=storage_options
)

target_dt = DeltaTable(table_path, storage_options=storage_options)
# TODO there is a problem if the column name is uppercase
target_dt.merge(
source=df,
predicate=predicate_stm,
source_alias="source",
target_alias="target",
).when_not_matched_insert_all().execute()
else:
write_deltalake(
table_or_uri=table_path,
data=df,
mode=mode,
storage_options=storage_options,
)


def table_exists(table_path, storage_options):
# this is bad, i have to find the way to see if there is table behind path
try:
DeltaTable(table_path, storage_options=storage_options)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do this operation a bit cheaper, set also without_files=True.

except Exception:
return False
return True


## TODO
# add partition writing
# add optimization, vacumm options to automatically run before each run ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan is to make these things configurable on the delta table itself which then should handle at which interval to vacuum or optimize, similar to spark-delta.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am courious about that one is there some docu?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not yet :) Was just sharing what I was planning to add

# can deltars optimize if the data is bigger then memory?


def create_insert_partition(table_path, data, partitions, storage_options):
"""create a new delta table on the path or overwrite existing partition"""

if table_exists(table_path, storage_options):
partition_expr = [
(partition_name, "=", partition_value)
for (partition_name, partition_value) in partitions
]
print(
f"Overwriting delta table under: {table_path} \nwith partition expr: {partition_expr}"
)
write_deltalake(
table_path, data, partition_filters=partition_expr, mode="overwrite"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition_filters are going to be a thing of the past. We are moving to the rust engine slowly and soon there will be a predicate overwrite (in the next python 0.15.2) that is more flexible than the partition_filter overwrite that used by the pyarrow writer.

)
else:
partitions = [
partition_name for (partition_name, partition_value) in partitions
]
print(
f"Creating delta table under: {table_path} \nwith partitions: {partitions}"
)
write_deltalake(table_path, data, partition_by=partitions)


# Future
# TODO add databricks catalog
9 changes: 5 additions & 4 deletions dbt/include/duckdb/macros/materializations/external.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
{% materialization external, adapter="duckdb", supported_languages=['sql', 'python'] %}

{%- set location = render(config.get('location', default=external_location(this, config))) -%})
{%- set location = render(config.get('location', default=external_location(this, config))) -%}
{%- set location_tmp = location ~ "_tmp.parquet" -%}
{%- set rendered_options = render_write_options(config) -%}
{%- set format = config.get('format', 'parquet') -%}
{%- set write_options = adapter.external_write_options(location, rendered_options) -%}
{%- set read_location = adapter.external_read_location(location, rendered_options) -%}
{%- set write_options = adapter.external_write_options(location_tmp, rendered_options) -%}
{%- set read_location = adapter.external_read_location(location_tmp, rendered_options) -%}

-- set language - python or sql
{%- set language = model['language'] -%}
Expand Down Expand Up @@ -46,7 +47,7 @@
{%- endcall %}

-- write an temp relation into file
{{ write_to_file(temp_relation, location, write_options) }}
{{write_to_file(temp_relation, location_tmp, write_options)}}
-- create a view on top of the location
{% call statement('main', language='sql') -%}
create or replace view {{ intermediate_relation }} as (
Expand Down
19 changes: 19 additions & 0 deletions dbt/include/duckdb/macros/materializations/external_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{% materialization external_table, adapter="duckdb", supported_languages=['sql', 'python'] %}
{{ log("External macro") }}

{%- set target_relation = this.incorporate(type='view') %}

{%- set plugin_name = config.get('plugin') -%}
{%- set location = render(config.get('location', default=external_location(this, config))) -%})
{%- set format = config.get('format', 'parquet') -%}

{% do store_relation(plugin_name, target_relation, location, format, config) %}

{% call statement('main', language='sql') -%}

{%- endcall %}

-- we have to load this table as df and create target_relation view

{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
94 changes: 94 additions & 0 deletions tests/functional/plugins/test_delta_write.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import pytest
from pathlib import Path
import pandas as pd
import tempfile

from dbt.tests.util import (
check_relations_equal,
run_dbt,
)
from deltalake.writer import write_deltalake

delta_schema_yml = """

"""

ref1 = """
select 2 as a, 'test' as b
"""

delta1_sql = """
{{ config(
materialized='external_table',
plugin = 'delta',
location = '/workspaces/dbt-duckdb/delta_test',
storage_options = {
'test' : 'test'
}

) }}
select * from {{ref('ref1')}}
"""

delta2_sql = """
{{ config(
materialized='external_table',
plugin = 'delta',
location = '/workspaces/dbt-duckdb/delta2_test',
mode = 'merge',
unique_key = 'a'

) }}
select * from {{ref('ref1')}}
"""


@pytest.mark.skip_profile("buenavista", "md")
class TestPlugins:
@pytest.fixture(scope="class")
def delta_test_table1(self):
td = tempfile.TemporaryDirectory()
path = Path(td.name)
table_path = path / "test_delta_table1"

yield table_path

td.cleanup()

@pytest.fixture(scope="class")
def profiles_config_update(self, dbt_profile_target):
plugins = [{"module": "delta"}]
return {
"test": {
"outputs": {
"dev": {
"type": "duckdb",
"path": dbt_profile_target.get("path", ":memory:"),
"plugins": plugins,
}
},
"target": "dev",
}
}

@pytest.fixture(scope="class")
def models(self, delta_test_table1):
return {

"delta_table2.sql": delta2_sql,
"ref1.sql": ref1
}

def test_plugins(self, project):
results = run_dbt()
#assert len(results) == 4

# check_relations_equal(
# project.adapter,
# [
# "delta_table3",
# "delta_table3_expected",
# ],
# )
# res = project.run_sql("SELECT count(1) FROM 'delta_table3'", fetch="one")
# assert res[0] == 2
Loading
Loading