Skip to content

Commit

Permalink
Sqlalchemy merge support (#1842)
Browse files Browse the repository at this point in the history
* Sqlalchemy merge job (insert replace)

* Fix

* Sqlalchemy scd2

* Update test for mysql

* Fix sqlalchemy 2.0
  • Loading branch information
steinitzu authored Oct 7, 2024
1 parent b97a45c commit 4e90278
Show file tree
Hide file tree
Showing 9 changed files with 519 additions and 36 deletions.
10 changes: 5 additions & 5 deletions dlt/destinations/impl/sqlalchemy/db_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,16 @@ def _sqlite_create_dataset(self, dataset_name: str) -> None:
"""Mimic multiple schemas in sqlite using ATTACH DATABASE to
attach a new database file to the current connection.
"""
if dataset_name == "main":
# main always exists
return
if self._sqlite_is_memory_db():
new_db_fn = ":memory:"
else:
new_db_fn = self._sqlite_dataset_filename(dataset_name)

statement = "ATTACH DATABASE :fn AS :name"
self.execute_sql(statement, fn=new_db_fn, name=dataset_name)
if dataset_name != "main": # main is the current file, it is always attached
statement = "ATTACH DATABASE :fn AS :name"
self.execute_sql(statement, fn=new_db_fn, name=dataset_name)
# WAL mode is applied to all currently attached databases
self.execute_sql("PRAGMA journal_mode=WAL")
self._sqlite_attached_datasets.add(dataset_name)

def _sqlite_drop_dataset(self, dataset_name: str) -> None:
Expand Down
12 changes: 12 additions & 0 deletions dlt/destinations/impl/sqlalchemy/factory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import typing as t

from dlt.common import pendulum
from dlt.common.destination import Destination, DestinationCapabilitiesContext
from dlt.common.destination.capabilities import DataTypeMapper
from dlt.common.arithmetics import DEFAULT_NUMERIC_PRECISION, DEFAULT_NUMERIC_SCALE
Expand All @@ -9,6 +10,7 @@
SqlalchemyCredentials,
SqlalchemyClientConfiguration,
)
from dlt.common.data_writers.escape import format_datetime_literal

SqlalchemyTypeMapper: t.Type[DataTypeMapper]

Expand All @@ -24,6 +26,13 @@
from sqlalchemy.engine import Engine


def _format_mysql_datetime_literal(
v: pendulum.DateTime, precision: int = 6, no_tz: bool = False
) -> str:
# Format without timezone to prevent tz conversion in SELECT
return format_datetime_literal(v, precision, no_tz=True)


class sqlalchemy(Destination[SqlalchemyClientConfiguration, "SqlalchemyJobClient"]):
spec = SqlalchemyClientConfiguration

Expand All @@ -50,6 +59,7 @@ def _raw_capabilities(self) -> DestinationCapabilitiesContext:
caps.supports_multiple_statements = False
caps.type_mapper = SqlalchemyTypeMapper
caps.supported_replace_strategies = ["truncate-and-insert", "insert-from-staging"]
caps.supported_merge_strategies = ["delete-insert", "scd2"]

return caps

Expand All @@ -67,6 +77,8 @@ def adjust_capabilities(
caps.max_identifier_length = dialect.max_identifier_length
caps.max_column_identifier_length = dialect.max_identifier_length
caps.supports_native_boolean = dialect.supports_native_boolean
if dialect.name == "mysql":
caps.format_datetime_literal = _format_mysql_datetime_literal

return caps

Expand Down
9 changes: 9 additions & 0 deletions dlt/destinations/impl/sqlalchemy/load_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from dlt.destinations.sql_jobs import SqlFollowupJob, SqlJobParams

from dlt.destinations.impl.sqlalchemy.db_api_client import SqlalchemyClient
from dlt.destinations.impl.sqlalchemy.merge_job import SqlalchemyMergeFollowupJob

if TYPE_CHECKING:
from dlt.destinations.impl.sqlalchemy.sqlalchemy_job_client import SqlalchemyJobClient
Expand Down Expand Up @@ -134,3 +135,11 @@ def generate_sql(
statements.append(stmt)

return statements


__all__ = [
"SqlalchemyJsonLInsertJob",
"SqlalchemyParquetInsertJob",
"SqlalchemyStagingCopyJob",
"SqlalchemyMergeFollowupJob",
]
Loading

0 comments on commit 4e90278

Please sign in to comment.