Skip to content

Commit

Permalink
move backend creation and skip tests for unsupported backend
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-rp committed Nov 7, 2024
1 parent b4e2dfd commit 500b5ff
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 48 deletions.
53 changes: 52 additions & 1 deletion dlt/common/libs/ibis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from typing import Any
from typing import cast

from dlt.common.exceptions import MissingDependencyException
from dlt.common.schema import Schema

from dlt.common.destination.reference import TDestinationReferenceArg, Destination, JobClientBase

try:
import ibis # type: ignore[import-untyped]
Expand All @@ -8,3 +12,50 @@
from ibis import BaseBackend
except ModuleNotFoundError:
raise MissingDependencyException("dlt ibis Helpers", ["ibis"])


SUPPORTED_DESTINATIONS = [
"dlt.destinations.postgres",
"dlt.destinations.duckdb",
"dlt.destinations.filesystem",
]


def create_ibis_backend(
destination: TDestinationReferenceArg, dataset_name: str, client: JobClientBase
) -> BaseBackend:
"""Create a given ibis backend for a destination client and dataset"""
import duckdb
from dlt.destinations.impl.duckdb.factory import DuckDbCredentials

# check if destination is supported
destination_type = Destination.from_reference(destination).destination_type
if destination_type not in SUPPORTED_DESTINATIONS:
raise NotImplementedError(f"Destination of type {destination_type} not supported")

if destination_type in ["dlt.destinations.postgres", "dlt.destinations.duckdb"]:
credentials = client.config.credentials.to_native_representation()
con = ibis.connect(credentials)
elif destination_type == "dlt.destinations.filesystem":
from dlt.destinations.impl.filesystem.sql_client import (
FilesystemClient,
FilesystemSqlClient,
)

# we create an in memory duckdb and create all tables on there
duck = duckdb.connect(":memory:")
fs_client = cast(FilesystemClient, client)
creds = DuckDbCredentials(duck)
sql_client = FilesystemSqlClient(
fs_client, dataset_name=fs_client.dataset_name, credentials=creds
)

# NOTE: we should probably have the option for the user to only select a subset of tables here
with sql_client as _:
sql_client.create_views_for_all_tables()
con = ibis.duckdb.from_connection(duck)

# NOTE: there seems to be no standardized way to set the current dataset / schema in ibis
con.raw_sql(f"SET search_path TO {dataset_name};")

return con
49 changes: 4 additions & 45 deletions dlt/destinations/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,13 @@ def __init__(

def ibis(self) -> IbisBackend:
"""return a connected ibis backend"""
from dlt.common.libs.ibis import create_ibis_backend

self._ensure_client_and_schema()
return create_ibis_backend(
self._destination,
self._dataset_name,
schema=self.schema,
self._destination_client(self.schema),
)

@property
Expand Down Expand Up @@ -327,50 +330,6 @@ def dataset(
raise NotImplementedError(f"Dataset of type {dataset_type} not implemented")


from dlt.common.destination.reference import JobClientBase


def create_ibis_backend(
destination: TDestinationReferenceArg, dataset_name: str, schema: Schema
) -> IbisBackend:
from dlt.common.libs.ibis import ibis
import duckdb
from dlt.destinations.impl.duckdb.factory import DuckDbCredentials

destination = Destination.from_reference(destination)
client = _get_client_for_destination(destination, schema, dataset_name)

if destination.destination_type in ["dlt.destinations.postgres", "dlt.destinations.duckdb"]:
credentials = client.config.credentials.to_native_representation()
ibis = ibis.connect(credentials)
elif destination.destination_type == "dlt.destinations.filesystem":
from dlt.destinations.impl.filesystem.sql_client import (
FilesystemClient,
FilesystemSqlClient,
)

# we create an in memory duckdb and create all tables on there
duck = duckdb.connect(":memory:")
fs_client = cast(FilesystemClient, client)
creds = DuckDbCredentials(duck)
sql_client = FilesystemSqlClient(
fs_client, dataset_name=fs_client.dataset_name, credentials=creds
)

# NOTE: we should probably have the option for the user to only select a subset of tables here
with sql_client as _:
sql_client.create_views_for_all_tables()
ibis = ibis.duckdb.from_connection(duck)

else:
raise NotImplementedError()

# NOTE: there seems to be no standardized way to set the current dataset / schema in ibis
ibis.raw_sql(f"SET search_path TO {dataset_name};")

return ibis


# helpers
def _get_client_for_destination(
destination: TDestinationReferenceArg, schema: Schema, dataset_name: str
Expand Down
2 changes: 1 addition & 1 deletion dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
from dlt.destinations.sql_client import SqlClientBase, WithSqlClient
from dlt.destinations.fs_client import FSClientBase
from dlt.destinations.job_client_impl import SqlJobClientBase
from dlt.destinations.dataset import dataset, create_ibis_backend
from dlt.destinations.dataset import dataset
from dlt.load.configuration import LoaderConfiguration
from dlt.load import Load

Expand Down
10 changes: 9 additions & 1 deletion tests/load/test_read_interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,19 @@ def test_db_cursor_access(populated_pipeline: Pipeline) -> None:
ids=lambda x: x.name,
)
def test_ibis_dataset_access(populated_pipeline: Pipeline) -> None:
# NOTE: we should generalize this with a context for certain deps
# NOTE: we could generalize this with a context for certain deps
import subprocess

subprocess.check_call(["pip", "install", "ibis-framework[duckdb,postgres,bigquery]"])

from dlt.common.libs.ibis import SUPPORTED_DESTINATIONS

# check correct error if not supported
if populated_pipeline.destination.destination_type not in SUPPORTED_DESTINATIONS:
with pytest.raises(NotImplementedError):
populated_pipeline._dataset().ibis()
return

total_records = _total_records(populated_pipeline)
ibis_connection = populated_pipeline._dataset().ibis()

Expand Down

0 comments on commit 500b5ff

Please sign in to comment.