Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ibis support - hand over credentials to ibis backend for a number of destinations #2004

Merged
merged 30 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
98b374d
add PoC for ibis table support on readabledbapidataset
sh-rp Oct 30, 2024
df7f63f
add PoC for exposing an ibis backend for a destination
sh-rp Oct 30, 2024
767d6de
install ibis dependency for tests
sh-rp Oct 31, 2024
854b855
add support for filesystem
sh-rp Oct 31, 2024
b7bd1e3
remove print statments
sh-rp Nov 3, 2024
ef536e1
remove ibis tables from dbapirelation
sh-rp Nov 7, 2024
da78768
clean up interfaces
sh-rp Nov 7, 2024
902bf92
move backend creation and skip tests for unsupported backend
sh-rp Nov 7, 2024
cc4d6ff
fix dependencies and typing
sh-rp Nov 7, 2024
d73ece3
mark import not found, can't be linted on 3.8 and 3.9
sh-rp Nov 7, 2024
a7a9001
add snowflake and bigquery support
sh-rp Nov 11, 2024
cf0a8eb
add redshift and maybe fix linter
sh-rp Nov 11, 2024
371a557
fix linter
sh-rp Nov 11, 2024
7f2328f
remove unneeded dependency
sh-rp Nov 11, 2024
c818140
add in missing pipeline drop
sh-rp Nov 11, 2024
2682c2b
fix snowflake table access test
sh-rp Nov 12, 2024
fff06d7
add mssql support
sh-rp Nov 12, 2024
7534106
enable synapse
sh-rp Nov 12, 2024
c78e282
add clickhouse support
sh-rp Nov 12, 2024
23951d0
enable motherduck
sh-rp Nov 12, 2024
49eae5f
post rebase lock file update
sh-rp Nov 12, 2024
c3eaf80
enable motherduck
sh-rp Nov 12, 2024
b01c950
add missing ibis framework extras
sh-rp Nov 12, 2024
e450b58
remove argument of create ibis backend
sh-rp Nov 12, 2024
313caef
extract destination client factories into dataset file
sh-rp Nov 12, 2024
f93a515
fix partial loading example
sh-rp Nov 12, 2024
71a404c
fix setting of default schema name in destination config
sh-rp Nov 12, 2024
20b4dae
fix default dataset for staging destination
sh-rp Nov 13, 2024
cf954ca
post rebase lockfile update
sh-rp Nov 18, 2024
7f767b8
always set azure transport connection
sh-rp Nov 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion dlt/common/destination/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,16 @@
try:
from dlt.common.libs.pandas import DataFrame
from dlt.common.libs.pyarrow import Table as ArrowTable
from dlt.common.libs.ibis import BaseBackend as IbisBackend
except MissingDependencyException:
DataFrame = Any
ArrowTable = Any
IbisBackend = Any

else:
DataFrame = Any
ArrowTable = Any
IbisBackend = Any


class StorageSchemaInfo(NamedTuple):
Expand Down Expand Up @@ -291,7 +295,6 @@ def _make_dataset_name(self, schema_name: str) -> str:
# if default schema is None then suffix is not added
if self.default_schema_name is not None and schema_name != self.default_schema_name:
return (self.dataset_name or "") + "_" + schema_name

return self.dataset_name


Expand Down Expand Up @@ -574,12 +577,17 @@ def close(self) -> None: ...
class SupportsReadableDataset(Protocol):
"""A readable dataset retrieved from a destination, has support for creating readable relations for a query or table"""

@property
def schema(self) -> Schema: ...

def __call__(self, query: Any) -> SupportsReadableRelation: ...

def __getitem__(self, table: str) -> SupportsReadableRelation: ...

def __getattr__(self, table: str) -> SupportsReadableRelation: ...

def ibis(self) -> IbisBackend: ...


class JobClientBase(ABC):
def __init__(
Expand Down
121 changes: 121 additions & 0 deletions dlt/common/libs/ibis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from typing import cast

from dlt.common.exceptions import MissingDependencyException

from dlt.common.destination.reference import TDestinationReferenceArg, Destination, JobClientBase

try:
import ibis # type: ignore
from ibis import BaseBackend
except ModuleNotFoundError:
raise MissingDependencyException("dlt ibis Helpers", ["ibis"])


SUPPORTED_DESTINATIONS = [
"dlt.destinations.postgres",
"dlt.destinations.duckdb",
"dlt.destinations.motherduck",
"dlt.destinations.filesystem",
"dlt.destinations.bigquery",
"dlt.destinations.snowflake",
"dlt.destinations.redshift",
"dlt.destinations.mssql",
"dlt.destinations.synapse",
"dlt.destinations.clickhouse",
# NOTE: Athena could theoretically work with trino backend, but according to
# https://github.com/ibis-project/ibis/issues/7682 connecting with aws credentials
# does not work yet.
# "dlt.destinations.athena",
]


def create_ibis_backend(
destination: TDestinationReferenceArg, client: JobClientBase
) -> BaseBackend:
rudolfix marked this conversation as resolved.
Show resolved Hide resolved
"""Create a given ibis backend for a destination client and dataset"""

# check if destination is supported
destination_type = Destination.from_reference(destination).destination_type
if destination_type not in SUPPORTED_DESTINATIONS:
raise NotImplementedError(f"Destination of type {destination_type} not supported by ibis.")

if destination_type in ["dlt.destinations.motherduck", "dlt.destinations.duckdb"]:
import duckdb
from dlt.destinations.impl.duckdb.duck import DuckDbClient

duck_client = cast(DuckDbClient, client)
duck = duckdb.connect(
database=duck_client.config.credentials._conn_str(),
read_only=duck_client.config.credentials.read_only,
config=duck_client.config.credentials._get_conn_config(),
)
con = ibis.duckdb.from_connection(duck)
elif destination_type in [
"dlt.destinations.postgres",
"dlt.destinations.redshift",
]:
credentials = client.config.credentials.to_native_representation()
con = ibis.connect(credentials)
elif destination_type == "dlt.destinations.snowflake":
from dlt.destinations.impl.snowflake.snowflake import SnowflakeClient

sf_client = cast(SnowflakeClient, client)
credentials = sf_client.config.credentials.to_connector_params()
con = ibis.snowflake.connect(**credentials)
elif destination_type in ["dlt.destinations.mssql", "dlt.destinations.synapse"]:
from dlt.destinations.impl.mssql.mssql import MsSqlJobClient

mssql_client = cast(MsSqlJobClient, client)
con = ibis.mssql.connect(
host=mssql_client.config.credentials.host,
port=mssql_client.config.credentials.port,
database=mssql_client.config.credentials.database,
user=mssql_client.config.credentials.username,
password=mssql_client.config.credentials.password,
driver=mssql_client.config.credentials.driver,
)
elif destination_type == "dlt.destinations.bigquery":
from dlt.destinations.impl.bigquery.bigquery import BigQueryClient

bq_client = cast(BigQueryClient, client)
credentials = bq_client.config.credentials.to_native_credentials()
con = ibis.bigquery.connect(
credentials=credentials,
project_id=bq_client.sql_client.project_id,
location=bq_client.sql_client.location,
)
elif destination_type == "dlt.destinations.clickhouse":
from dlt.destinations.impl.clickhouse.clickhouse import ClickHouseClient

ch_client = cast(ClickHouseClient, client)
con = ibis.clickhouse.connect(
host=ch_client.config.credentials.host,
port=ch_client.config.credentials.http_port,
database=ch_client.config.credentials.database,
user=ch_client.config.credentials.username,
password=ch_client.config.credentials.password,
secure=bool(ch_client.config.credentials.secure),
# compression=True,
)
elif destination_type == "dlt.destinations.filesystem":
import duckdb
from dlt.destinations.impl.filesystem.sql_client import (
FilesystemClient,
FilesystemSqlClient,
)
from dlt.destinations.impl.duckdb.factory import DuckDbCredentials

# we create an in memory duckdb and create all tables on there
duck = duckdb.connect(":memory:")
fs_client = cast(FilesystemClient, client)
creds = DuckDbCredentials(duck)
sql_client = FilesystemSqlClient(
fs_client, dataset_name=fs_client.dataset_name, credentials=creds
)

# NOTE: we should probably have the option for the user to only select a subset of tables here
with sql_client as _:
sql_client.create_views_for_all_tables()
con = ibis.duckdb.from_connection(duck)

return con
119 changes: 110 additions & 9 deletions dlt/destinations/dataset.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from typing import Any, Generator, Optional, Sequence, Union, List
from typing import Any, Generator, Sequence, Union, TYPE_CHECKING, Tuple

from contextlib import contextmanager

from dlt import version

from dlt.common.json import json
from copy import deepcopy

from dlt.common.normalizers.naming.naming import NamingConvention
from dlt.common.exceptions import MissingDependencyException

from contextlib import contextmanager
from dlt.common.destination import AnyDestination
from dlt.common.destination.reference import (
SupportsReadableRelation,
SupportsReadableDataset,
Expand All @@ -14,13 +19,24 @@
JobClientBase,
WithStateSync,
DestinationClientDwhConfiguration,
DestinationClientStagingConfiguration,
DestinationClientConfiguration,
DestinationClientDwhWithStagingConfiguration,
)

from dlt.common.schema.typing import TTableSchemaColumns
from dlt.destinations.sql_client import SqlClientBase, WithSqlClient
from dlt.common.schema import Schema
from dlt.common.exceptions import DltException

if TYPE_CHECKING:
try:
from dlt.common.libs.ibis import BaseBackend as IbisBackend
except MissingDependencyException:
IbisBackend = Any
else:
IbisBackend = Any


class DatasetException(DltException):
pass
Expand Down Expand Up @@ -228,6 +244,16 @@ def __init__(
self._sql_client: SqlClientBase[Any] = None
self._schema: Schema = None

def ibis(self) -> IbisBackend:
"""return a connected ibis backend"""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is OK. when we have full ibis support we can return Dataset with ibis implementation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw. maybe you should overload

def _dataset(self, dataset_type: TDatasetType = "dbapi") -> SupportsReadableDataset:

to return different dataset impl. per engine. or just return DBAPI one for now. otherwise people won't see ibis

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not quite sure what you mean. But I was thinking we should have dataset_type be "dbapi", "ibis" and "auto". If it is auto it will select dbapi if there are no ibis expressions available and ibis if there is. That said, dbapi should have a different name, since the ibis expression based one also uses dbapi.

from dlt.common.libs.ibis import create_ibis_backend

self._ensure_client_and_schema()
return create_ibis_backend(
self._destination,
self._destination_client(self.schema),
)

@property
def schema(self) -> Schema:
self._ensure_client_and_schema()
Expand All @@ -239,12 +265,9 @@ def sql_client(self) -> SqlClientBase[Any]:
return self._sql_client

def _destination_client(self, schema: Schema) -> JobClientBase:
client_spec = self._destination.spec()
if isinstance(client_spec, DestinationClientDwhConfiguration):
client_spec._bind_dataset_name(
dataset_name=self._dataset_name, default_schema_name=schema.name
)
return self._destination.client(schema, client_spec)
return get_destination_clients(
schema, destination=self._destination, destination_dataset_name=self._dataset_name
)[0]

def _ensure_client_and_schema(self) -> None:
"""Lazy load schema and client"""
Expand Down Expand Up @@ -310,3 +333,81 @@ def dataset(
if dataset_type == "dbapi":
return ReadableDBAPIDataset(destination, dataset_name, schema)
raise NotImplementedError(f"Dataset of type {dataset_type} not implemented")


# helpers
def get_destination_client_initial_config(
destination: AnyDestination,
default_schema_name: str,
dataset_name: str,
as_staging: bool = False,
) -> DestinationClientConfiguration:
client_spec = destination.spec

# this client supports many schemas and datasets
if issubclass(client_spec, DestinationClientDwhConfiguration):
if issubclass(client_spec, DestinationClientStagingConfiguration):
spec: DestinationClientDwhConfiguration = client_spec(as_staging_destination=as_staging)
else:
spec = client_spec()

spec._bind_dataset_name(dataset_name, default_schema_name)
return spec

return client_spec()


def get_destination_clients(
schema: Schema,
destination: AnyDestination = None,
destination_dataset_name: str = None,
destination_initial_config: DestinationClientConfiguration = None,
staging: AnyDestination = None,
staging_dataset_name: str = None,
staging_initial_config: DestinationClientConfiguration = None,
# pipeline specific settings
default_schema_name: str = None,
) -> Tuple[JobClientBase, JobClientBase]:
destination = Destination.from_reference(destination) if destination else None
staging = Destination.from_reference(staging) if staging else None

try:
# resolve staging config in order to pass it to destination client config
staging_client = None
if staging:
if not staging_initial_config:
# this is just initial config - without user configuration injected
staging_initial_config = get_destination_client_initial_config(
staging,
dataset_name=staging_dataset_name,
default_schema_name=default_schema_name,
as_staging=True,
)
# create the client - that will also resolve the config
staging_client = staging.client(schema, staging_initial_config)

if not destination_initial_config:
# config is not provided then get it with injected credentials
initial_config = get_destination_client_initial_config(
destination,
dataset_name=destination_dataset_name,
default_schema_name=default_schema_name,
)

# attach the staging client config to destination client config - if its type supports it
if (
staging_client
and isinstance(initial_config, DestinationClientDwhWithStagingConfiguration)
and isinstance(staging_client.config, DestinationClientStagingConfiguration)
):
initial_config.staging_config = staging_client.config
# create instance with initial_config properly set
client = destination.client(schema, initial_config)
return client, staging_client
except ModuleNotFoundError:
client_spec = destination.spec()
raise MissingDependencyException(
f"{client_spec.destination_type} destination",
[f"{version.DLT_PKG_NAME}[{client_spec.destination_type}]"],
"Dependencies for specific destinations are available as extras of dlt",
)
6 changes: 2 additions & 4 deletions dlt/destinations/impl/duckdb/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,8 @@ def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]:
yield self.native_cursor.fetch_arrow_table()
return
# iterate
try:
yield from self.native_cursor.fetch_record_batch(chunk_size)
except StopIteration:
pass
for item in self.native_cursor.fetch_record_batch(chunk_size):
yield ArrowTable.from_batches([item])


class DuckDbSqlClient(SqlClientBase[duckdb.DuckDBPyConnection], DBTransaction):
Expand Down
13 changes: 8 additions & 5 deletions dlt/destinations/impl/filesystem/sql_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,6 @@ def create_authentication(self, persistent: bool = False, secret_name: str = Non
elif self.fs_client.config.protocol == "memory":
self._conn.register_filesystem(self.fs_client.fs_client)

# the line below solves problems with certificate path lookup on linux
# see duckdb docs
if self.fs_client.config.protocol in ["az", "abfss"]:
self._conn.sql("SET azure_transport_option_type = 'curl';")

def open_connection(self) -> duckdb.DuckDBPyConnection:
# we keep the in memory instance around, so if this prop is set, return it
first_connection = self.credentials.has_open_connection
Expand All @@ -195,8 +190,16 @@ def open_connection(self) -> duckdb.DuckDBPyConnection:
self._conn.sql(f"USE {self.fully_qualified_dataset_name()}")
self.create_authentication()

# the line below solves problems with certificate path lookup on linux
# see duckdb docs
if self.fs_client.config.protocol in ["az", "abfss"]:
self._conn.sql("SET azure_transport_option_type = 'curl';")

return self._conn

def create_views_for_all_tables(self) -> None:
self.create_views_for_tables({v: v for v in self.fs_client.schema.tables.keys()})

@raise_database_error
def create_views_for_tables(self, tables: Dict[str, str]) -> None:
"""Add the required tables as views to the duckdb in memory instance"""
Expand Down
Loading
Loading