Skip to content

Commit

Permalink
[MotherDuck connector] fix: Pass config to connect_args in create_eng…
Browse files Browse the repository at this point in the history
…ine (#48562)
  • Loading branch information
guenp authored Dec 6, 2024
1 parent bb17590 commit 464cf6b
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,25 @@
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Sequence
from urllib.parse import parse_qsl, urlparse

import pyarrow as pa
from airbyte_cdk import DestinationSyncMode
from airbyte_cdk.sql import exceptions as exc
from airbyte_cdk.sql.constants import AB_EXTRACTED_AT_COLUMN
from airbyte_cdk.sql.constants import AB_EXTRACTED_AT_COLUMN, DEBUG_MODE
from airbyte_cdk.sql.secrets import SecretString
from airbyte_cdk.sql.shared.sql_processor import SqlConfig, SqlProcessorBase, SQLRuntimeError
from duckdb_engine import DuckDBEngineWarning
from overrides import overrides
from pydantic import Field
from sqlalchemy import Executable, TextClause, text
from sqlalchemy import Executable, TextClause, create_engine, text
from sqlalchemy.exc import ProgrammingError, SQLAlchemyError

if TYPE_CHECKING:
from sqlalchemy.engine import Connection, Engine

BUFFER_TABLE_NAME = "_airbyte_temp_buffer_data"
MOTHERDUCK_SCHEME = "md"

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,7 +54,16 @@ def get_sql_alchemy_url(self) -> SecretString:
message="duckdb-engine doesn't yet support reflection on indices",
category=DuckDBEngineWarning,
)
return SecretString(f"duckdb:///{self.db_path!s}")
parsed_db_path = urlparse(self.db_path)
if parsed_db_path.scheme == MOTHERDUCK_SCHEME:
path = f"{MOTHERDUCK_SCHEME}:{parsed_db_path.path}"
else:
path = parsed_db_path.path
return SecretString(f"duckdb:///{path!s}")

def get_duckdb_config(self) -> Dict[str, Any]:
"""Get config dictionary to pass to duckdb"""
return dict(parse_qsl(urlparse(self.db_path).query))

@overrides
def get_database_name(self) -> str:
Expand All @@ -75,21 +86,30 @@ def _is_file_based_db(self) -> bool:
return (
("/" in db_path_str or "\\" in db_path_str)
and db_path_str != ":memory:"
and "md:" not in db_path_str
and f"{MOTHERDUCK_SCHEME}:" not in db_path_str
and "motherduck:" not in db_path_str
)

@overrides
def get_sql_engine(self) -> Engine:
"""Return the SQL Alchemy engine.
"""
Return a new SQL engine to use.
This method is overridden to ensure that the database parent directory is created if it
doesn't exist.
This method is overridden to:
- ensure that the database parent directory is created if it doesn't exist.
- pass the DuckDB query parameters (such as motherduck_token) via the config
"""
if self._is_file_based_db():
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)

return super().get_sql_engine()
return create_engine(
url=self.get_sql_alchemy_url(),
echo=DEBUG_MODE,
execution_options={
"schema_translate_map": {None: self.schema_name},
},
future=True,
)


class DuckDBSqlProcessor(SqlProcessorBase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import warnings

from airbyte_cdk.sql.constants import DEBUG_MODE
from airbyte_cdk.sql.secrets import SecretString
from destination_motherduck.processors.duckdb import DuckDBConfig, DuckDBSqlProcessor
from duckdb_engine import DuckDBEngineWarning
from overrides import overrides
from pydantic import Field
from sqlalchemy import Engine, create_engine

# Suppress warnings from DuckDB about reflection on indices.
# https://github.com/Mause/duckdb_engine/issues/905
Expand Down Expand Up @@ -52,18 +54,37 @@ def get_sql_alchemy_url(self) -> SecretString:
category=DuckDBEngineWarning,
)

return SecretString(
f"duckdb:///md:{self.database}?motherduck_token={self.api_key}"
f"&custom_user_agent={self.custom_user_agent}"
# Not sure why this doesn't work. We have to override later in the flow.
# f"&schema={self.schema_name}"
)
# We defer adding schema name and API token until `create_engine()` call.
return SecretString(f"duckdb:///md:{self.database}?custom_user_agent={self.custom_user_agent}")

@overrides
def get_database_name(self) -> str:
"""Return the name of the database."""
return self.database

@overrides
def get_sql_engine(self) -> Engine:
"""
Return a new SQL engine to use.
This method is overridden to:
- ensure that the database parent directory is created if it doesn't exist.
- pass the DuckDB query parameters (such as motherduck_token) via the config
"""
return create_engine(
url=self.get_sql_alchemy_url(),
echo=DEBUG_MODE,
execution_options={
"schema_translate_map": {None: self.schema_name},
},
future=True,
connect_args={
"config": {
"motherduck_token": self.api_key,
},
},
)


class MotherDuckSqlProcessor(DuckDBSqlProcessor):
"""A cache implementation for MotherDuck."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
SyncMode,
Type,
)
from airbyte_cdk.sql.secrets import SecretString
from destination_motherduck import DestinationMotherDuck
from destination_motherduck.destination import CONFIG_MOTHERDUCK_API_KEY
from faker import Faker
Expand Down Expand Up @@ -69,6 +70,9 @@ def config(request, test_schema_name: str) -> Generator[Any, Any, Any]:
elif request.param == "motherduck_config":
config_dict = json.loads(Path(SECRETS_CONFIG_PATH).read_text())
config_dict["schema"] = test_schema_name
if CONFIG_MOTHERDUCK_API_KEY in config_dict:
# Prevent accidentally printing API Key if `config_dict` is printed.
config_dict[CONFIG_MOTHERDUCK_API_KEY] = SecretString(config_dict[CONFIG_MOTHERDUCK_API_KEY])
yield config_dict

else:
Expand Down Expand Up @@ -296,7 +300,7 @@ def test_check_succeeds(
):
destination = DestinationMotherDuck()
status = destination.check(logger=MagicMock(), config=config)
assert status.status == Status.SUCCEEDED
assert status.status == Status.SUCCEEDED, status.message


def _state(data: Dict[str, Any]) -> AirbyteMessage:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 042ee9b5-eb98-4e99-a4e5-3f0d573bee66
dockerImageTag: 0.1.15
dockerImageTag: 0.1.16
dockerRepository: airbyte/destination-motherduck
githubIssueLabel: destination-motherduck
icon: duckdb.svg
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "airbyte-destination-motherduck"
version = "0.1.15"
version = "0.1.16"
description = "Destination implementation for MotherDuck."
authors = ["Guen Prawiroatmodjo, Simon Späti, Airbyte"]
license = "MIT"
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/motherduck.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ This connector is primarily designed to work with MotherDuck and local DuckDB fi

| Version | Date | Pull Request | Subject |
|:--------| :--------- | :-------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| 0.1.16 | 2024-12-06 | [48562](https://github.com/airbytehq/airbyte/pull/48562) | Improved handling of config parameters during SQL engine creation. |
| 0.1.15 | 2024-11-07 | [48405](https://github.com/airbytehq/airbyte/pull/48405) | Updated docs and hovertext for schema, api key, and database name. |
| 0.1.14 | 2024-10-30 | [48006](https://github.com/airbytehq/airbyte/pull/48006) | Fix bug in _flush_buffer, explicitly register dataframe before inserting |
| 0.1.13 | 2024-10-30 | [47969](https://github.com/airbytehq/airbyte/pull/47969) | Preserve Platform-generated id in state messages. |
Expand Down

0 comments on commit 464cf6b

Please sign in to comment.