Skip to content

Commit

Permalink
Add engine_adapter_callback
Browse files Browse the repository at this point in the history
  • Loading branch information
umar-ahmed committed Oct 3, 2024
1 parent 543044e commit b263900
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
13 changes: 12 additions & 1 deletion dlt/sources/sql_database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
SqlTableResourceConfiguration,
_detect_precision_hints_deprecated,
TQueryAdapter,
TEngineAdapter,
)
from .schema_types import (
default_table_adapter,
Expand All @@ -45,6 +46,7 @@ def sql_database(
include_views: bool = False,
type_adapter_callback: Optional[TTypeAdapter] = None,
query_adapter_callback: Optional[TQueryAdapter] = None,
engine_adapter_callback: Optional[TEngineAdapter] = None,
) -> Iterable[DltResource]:
"""
A dlt source which loads data from an SQL database using SQLAlchemy.
Expand Down Expand Up @@ -90,8 +92,12 @@ def sql_database(
# set up alchemy engine
engine = engine_from_credentials(credentials)
engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size)
metadata = metadata or MetaData(schema=schema)

if engine_adapter_callback:
engine_adapter_callback(engine)

metadata = metadata or MetaData(schema=schema)

# use provided tables or all tables
if table_names:
tables = [
Expand Down Expand Up @@ -138,6 +144,7 @@ def sql_table(
type_adapter_callback: Optional[TTypeAdapter] = None,
included_columns: Optional[List[str]] = None,
query_adapter_callback: Optional[TQueryAdapter] = None,
engine_adapter_callback: Optional[TEngineAdapter] = None,
) -> DltResource:
"""
A dlt resource which loads data from an SQL database table using SQLAlchemy.
Expand Down Expand Up @@ -182,6 +189,10 @@ def sql_table(

engine = engine_from_credentials(credentials, may_dispose_after_use=True)
engine.execution_options(stream_results=True, max_row_buffer=2 * chunk_size)

if engine_adapter_callback:
engine_adapter_callback(engine)

metadata = metadata or MetaData(schema=schema)

table_obj = metadata.tables.get("table") or Table(
Expand Down
1 change: 1 addition & 0 deletions dlt/sources/sql_database/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

TableBackend = Literal["sqlalchemy", "pyarrow", "pandas", "connectorx"]
TQueryAdapter = Callable[[SelectAny, Table], SelectAny]
TEngineAdapter = Callable[[Engine], Engine]


class TableLoader:
Expand Down

0 comments on commit b263900

Please sign in to comment.