Skip to content

Commit

Permalink
Build dsn from components (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
aamalev authored Aug 1, 2023
1 parent cdcc2bb commit 1f96fcd
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 9 deletions.
38 changes: 38 additions & 0 deletions aioworkers_pg/abc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Optional

from aioworkers.core.base import AbstractConnector
from aioworkers.core.config import ValueExtractor
from aioworkers.net.uri import URI


class AbstractPGConnector(AbstractConnector):
_dsn: Optional[URI] = None
_default_dsn: URI = URI("postgresql:///")

def set_config(self, config: ValueExtractor) -> None:
cfg: ValueExtractor = config.new_parent(logger=__package__)
super().set_config(cfg)

dsn = self.config.get_uri("dsn", null=True)

host = self.config.get("host")
if host:
dsn = (dsn or self._default_dsn).with_host(host)

port = self.config.get_int("port", null=True)
if port:
dsn = (dsn or self._default_dsn).with_port(port)

username = self.config.get("username")
if username:
dsn = (dsn or self._default_dsn).with_username(username)

password = self.config.get("password")
if password:
dsn = (dsn or self._default_dsn).with_password(password)

database = self.config.get("database")
if database:
dsn = (dsn or self._default_dsn).with_path(database)

self._dsn = dsn
35 changes: 26 additions & 9 deletions aioworkers_pg/sa.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,46 @@
from typing import Union
from typing import Optional, Union

from aioworkers.core.base import AbstractConnector
from aioworkers.net.uri import URI
from sqlalchemy import CursorResult, Executable, text
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from aioworkers_pg.abc import AbstractPGConnector

class Connector(AbstractConnector):
engine: AsyncEngine

class Connector(AbstractPGConnector):
_default_scheme: str = "postgresql+asyncpg"
_default_dsn = URI(f"{_default_scheme}:///")
_engine: Optional[AsyncEngine] = None

async def connect(self):
dsn = self.config.get_uri("dsn")
dsn = dsn.with_scheme("postgresql+asyncpg")
assert self._engine is None, "Engine already created"

dsn = self._dsn
if dsn:
dsn = dsn.with_scheme(self._default_scheme)
else:
dsn = self._default_dsn

self.engine = create_async_engine(
self._engine = create_async_engine(
dsn,
echo=True,
)

self.logger.debug("Create engine with dsn %s", dsn.with_password("***"))

async def disconnect(self):
await self.engine.dispose()
assert self._engine is not None, "Engine not created"
await self._engine.dispose()
self._engine = None

@property
def engine(self) -> AsyncEngine:
assert self._engine
return self._engine

async def execute(self, statement: Union[str, Executable], *args, **kwargs) -> CursorResult:
async with self.engine.connect() as conn:
assert self._engine
async with self._engine.connect() as conn:
if isinstance(statement, str):
statement = text(statement)
return await conn.execute(statement, *args, **kwargs)

0 comments on commit 1f96fcd

Please sign in to comment.