Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Sqlalchemy connector #123

Merged
merged 11 commits into from
Feb 15, 2023
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `DbtCloudJob` block and `run_dbt_cloud_job` flow - [#101](https://github.com/PrefectHQ/prefect-dbt/pull/101)

- `DbtCoreOperation` block - [#119](https://github.com/PrefectHQ/prefect-dbt/pull/119)
- `SqlAlchemyConnector` support for use for `PostgresTargetConfigs` - [#123](https://github.com/PrefectHQ/prefect-dbt/pull/123)

### Changed
- The minimum version of `prefect-snowflake` - [#112](https://github.com/PrefectHQ/prefect-dbt/pull/112)
Expand All @@ -21,6 +22,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Deprecated

- `DatabaseCredentials` used in `PostgresTargetConfigs` in favor of `SqlAlchemyConnector` - [#123](https://github.com/PrefectHQ/prefect-dbt/pull/123)

### Removed

### Fixed
Expand Down
57 changes: 37 additions & 20 deletions prefect_dbt/cli/configs/postgres.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
"""Module containing models for Postgres configs"""
from typing import Any, Dict

try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
import warnings
from typing import Any, Dict, Union

from pydantic import Field
from typing_extensions import Literal

from prefect_dbt.cli.configs.base import BaseTargetConfigs, MissingExtrasRequireError

try:
from prefect_sqlalchemy.database import DatabaseCredentials
from prefect_sqlalchemy import DatabaseCredentials, SqlAlchemyConnector
except ModuleNotFoundError as e:
raise MissingExtrasRequireError("Postgres") from e

Expand Down Expand Up @@ -62,7 +59,7 @@ class PostgresTargetConfigs(BaseTargetConfigs):
type: Literal["postgres"] = Field(
default="postgres", description="The type of the target."
)
credentials: DatabaseCredentials = Field(
credentials: Union[SqlAlchemyConnector, DatabaseCredentials] = Field(
default=...,
description=(
"The credentials to use to authenticate; if there are duplicate keys "
Expand All @@ -78,21 +75,41 @@ def get_configs(self) -> Dict[str, Any]:
Returns:
A configs JSON.
"""
configs_json = super().get_configs()
invalid_keys = ["driver", "query", "url", "connect_args", "_async_supported"]
if isinstance(self.credentials, DatabaseCredentials):
warnings.warn(
"Using DatabaseCredentials is deprecated and will be removed "
"on May 7th, 2023, use SqlAlchemyConnector instead.",
DeprecationWarning,
)
all_configs_json = super().get_configs()

rename_keys = {
"database": "dbname",
# dbt
"type": "type",
"schema": "schema",
"threads": "threads",
# general
"host": "host",
"username": "user",
"password": "password",
"host": "host",
"port": "port",
"database": "dbname",
# optional
"keepalives_idle": "keepalives_idle",
"connect_timeout": "connect_timeout",
"retries": "retries",
"search_path": "search_path",
"role": "role",
"sslmode": "sslmode",
}
# get the keys from rendered url
for invalid_key in invalid_keys + list(rename_keys):
configs_json.pop(invalid_key, None)

rendered_url = self.credentials.rendered_url
for key in rename_keys:
renamed_key = rename_keys[key]
configs_json[renamed_key] = getattr(rendered_url, key)

configs_json = {}
extras = self.extras or {}
for key in all_configs_json.keys():
if key not in rename_keys and key not in extras:
# skip invalid keys, like fetch_size + poll_frequency_s
continue
# rename key to something dbt profile expects
dbt_key = rename_keys.get(key) or key
configs_json[dbt_key] = all_configs_json[key]
return configs_json
48 changes: 46 additions & 2 deletions tests/cli/configs/test_postgres.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from prefect_sqlalchemy.credentials import DatabaseCredentials, SyncDriver
from prefect_sqlalchemy import (
ConnectionComponents,
DatabaseCredentials,
SqlAlchemyConnector,
SyncDriver,
)
from pydantic import SecretStr

from prefect_dbt.cli.configs import PostgresTargetConfigs
Expand All @@ -23,9 +28,48 @@ def test_postgres_target_configs_get_configs():
"user": "prefect",
"password": "prefect_password",
"host": "host",
"port": 8080,
"port": "8080",
}
for k, v in actual.items():
actual_v = v.get_secret_value() if isinstance(v, SecretStr) else v
expected_v = expected[k]
assert actual_v == expected_v


def test_postgres_target_configs_get_configs_for_sqlalchemy_connector():
configs = PostgresTargetConfigs(
credentials=SqlAlchemyConnector(
connection_info=ConnectionComponents(
driver="postgresql+psycopg2",
database="postgres",
username="prefect",
password="prefect_password",
host="host",
port=8080,
query={"a": "query"},
),
kwarg_that_shouldnt_show_up=True,
),
schema="schema",
extras={"retries": 1},
)
actual = configs.get_configs()
expected = {
"type": "postgres",
"schema": "schema",
"threads": 4,
"dbname": "postgres",
"user": "prefect",
"password": "prefect_password",
"host": "host",
"port": "8080",
"retries": 1,
}
for k, v in actual.items():
actual_v = v.get_secret_value() if isinstance(v, SecretStr) else v
expected_v = expected[k]
assert actual_v == expected_v
assert hasattr(configs.credentials, "kwarg_that_shouldnt_show_up")
assert "kwarg_that_shouldnt_show_up" not in actual.keys()
assert hasattr(configs.credentials.connection_info, "query")
assert "query" not in actual.keys()