Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[experiment] ibis integration with connection sharing #1491

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions dlt/helpers/ibis_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import ibis
from typing import cast
from typing import Iterator
from dlt import Pipeline
from contextlib import contextmanager
from ibis import BaseBackend
from importlib import import_module

IBIS_DESTINATION_MAP = {"synapse": "mssql", "redshift": "postgres"}


@contextmanager
def ibis_helper(p: Pipeline) -> Iterator[BaseBackend]:
"""This helpers wraps a pipeline to expose an ibis backend to the main"""

destination_type = p.destination_client().config.destination_type

# apply destination map
destination_type = IBIS_DESTINATION_MAP.get(destination_type, destination_type)

# get the right ibis module
ibis_module = import_module(f"ibis.backends.{destination_type}")
ibis_backend = cast(BaseBackend, ibis_module.Backend())

with p.sql_client() as c:
yield ibis_backend.from_connection(c)
64 changes: 64 additions & 0 deletions tests/load/test_ibis_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import pytest
import ibis
import dlt

from tests.load.pipeline.utils import destinations_configs, DestinationTestConfiguration
from dlt.helpers.ibis_helper import ibis_helper


@pytest.mark.essential
@pytest.mark.parametrize(
"destination_config",
destinations_configs(
default_sql_configs=True, exclude=["athena", "dremio", "redshift", "databricks", "synapse"]
),
ids=lambda x: x.name,
)
def test_ibis_helper(destination_config: DestinationTestConfiguration) -> None:
# we load a table with child table and check wether ibis works
pipeline = destination_config.setup_pipeline(
"ibis_pipeline", dataset_name="ibis_test", dev_mode=True
)
pipeline.run(
[{"id": i + 10, "children": [{"id": i + 100}, {"id": i + 1000}]} for i in range(5)],
table_name="ibis_items",
)

with ibis_helper(pipeline) as ibis_backend:
# check we can read table names
assert {tname.lower() for tname in ibis_backend.list_tables()} >= {
"_dlt_loads",
"_dlt_pipeline_state",
"_dlt_version",
"ibis_items",
"ibis_items__children",
}

id_identifier = "id"
if destination_config.destination == "snowflake":
id_identifier = id_identifier.upper()

# check we can read data
assert ibis_backend.sql("SELECT id FROM ibis_items").to_pandas()[
id_identifier
].tolist() == [
10,
11,
12,
13,
14,
]
assert ibis_backend.sql("SELECT id FROM ibis_items__children").to_pandas()[
id_identifier
].tolist() == [
100,
1000,
101,
1001,
102,
1002,
103,
1003,
104,
1004,
]
Loading