diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ea8959..88ceb97 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ ### 0.3.0 +- Update the `Connection` and `Pool` classes API. By @stankudrow in #130: + - remove the deprecated `connected` property from the `Connection` class + - fix type hinting for `Cursor` class as incoming parameter for the connection `cursor` method + - make the connection `close` async method more consistent + - turn the `async/connection.py:connect` function into the async context manager + - get rid of inheritance from the `asyncio.AbstractServer` for the `Pool` class (mypy got satisfied) + - check the freshness of a connection before giving it from a pool. Inspired by the issue #127 from @nils-borrmann-tacto. + - mutate the `create_pool` function into the async context manager - Add `mypy` dependency into the `lint` section. By @stankudrow in #128 - Gracefully handle connections terminated by the server. By @nils-borrmann-tacto in #129. - Remove the deprecated API from `cursor.py` module. By @stankudrow in #125. diff --git a/README.md b/README.md index a29b59b..f36d1cb 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,19 @@ If you want to install [`clickhouse-cityhash`](https://pypi.org/project/clickhou > pip install asynch[compression] ``` +## Release v0.3.0 announcement + +The version 0.2.5 should have been named v0.3.0 due to compatibility-breaking changes. +Since the release v0.2.5 is already yanked (of sorts), a smooth upgrade seems inexpedient. + +Before upgrading to the v0.3.0, please pay attention to the incompatible changes: + +1. the `connect` function from the `async/connection.py` module is now asynchronous context manager. +2. the same kind of changes touches the `create_pool` function from the `async/pool.py` module. +3. all deprecated methods from `Connection`, `Cursor` and `Pool` classes are removed. + +For more details, please refer to the project [CHANGELOG.md](./CHANGELOG.md) file. + ## Usage Basically, a connection to a ClickHouse server can be established in two ways: @@ -32,9 +45,24 @@ Basically, a connection to a ClickHouse server can be established in two ways: # connecting with a DSN string async def connect_database(): - conn = await connect( + async with connect( + dsn = "clickhouse://ch_user:P@55w0rD:@127.0.0.1:9000/chdb", + ) as conn: + pass + ``` + + or + + ```python + from asynch import Connection + + # connecting with a DSN string + async def connect_database(): + conn = Connection( dsn = "clickhouse://ch_user:P@55w0rD:@127.0.0.1:9000/chdb", ) + async with conn: + pass ``` 2. with separately given connection/DSN parameters: `user` (optional), `password` (optional), `host`, `port`, `database`. @@ -44,13 +72,31 @@ Basically, a connection to a ClickHouse server can be established in two ways: # connecting with DSN parameters async def connect_database(): - conn = await connect( + async with connect( user = "ch_user", password = "P@55w0rD", host = "127.0.0.1", port = 9000, database = "chdb", - ) + ) as conn: + pass + ``` + + or + + ```python + from asynch import Connection + + # connecting with DSN parameters + async def connect_database(): + async with Connection( + user = "ch_user", + password = "P@55w0rD", + host = "127.0.0.1", + port = 9000, + database = "chdb", + ) as conn: + pass ``` If a DSN string is given, it takes priority over any specified connection parameter. @@ -181,7 +227,7 @@ Since the v0.2.5: ```python async def use_pool(): - # init a Pool and fill it with `minsize` opened connections + # init a Pool and fill it with the `minsize` opened connections async with Pool(minsize=1, maxsize=2) as pool: # acquire a connection from the pool async with pool.connection() as conn: @@ -191,6 +237,28 @@ async def use_pool(): assert ret == (1,) ``` +which decomposes into + +```python +async def use_pool(): + pool = Pool(minsize=1, maxsize=2) + await pool.startup() + + # some logic + + await pool.shutdown() +``` + +Since the v0.3.0 the `create_pool` is asynchronous context manager: + +```python +from asynch.pool import create_pool + +async def use_pool(): + async with create_pool(minsize=1, maxsize=2) as pool: + # some logic +``` + ## ThanksTo - [clickhouse-driver](https://github.com/mymarilyn/clickhouse-driver), ClickHouse Python Driver with native interface support. diff --git a/asynch/connection.py b/asynch/connection.py index 846a43c..ca11a3f 100644 --- a/asynch/connection.py +++ b/asynch/connection.py @@ -1,5 +1,6 @@ +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager from typing import Optional, Type -from warnings import warn from asynch.cursors import Cursor from asynch.errors import NotSupportedError @@ -67,30 +68,6 @@ def __repr__(self) -> str: status = self.status return f"<{cls_name} object at 0x{id(self):x}; status: {status}>" - @property - def connected(self) -> Optional[bool]: - """Returns the connection open status. - - If the return value is None, - the connection was only created, - but neither opened or closed. - - The attribute is deprecated in favour of `opened` one. - The reason is about tautology on `connection.connected` case. - - :returns: the connection open status - :rtype: None | bool - """ - - warn( - ( - "Please consider using the `opened` property. " - "The `connected` property may be removed in the version 0.2.6 or later." - ), - DeprecationWarning, - ) - return self._opened - @property def opened(self) -> Optional[bool]: """Returns the connection open status. @@ -137,11 +114,12 @@ def status(self) -> str: :rtype: str (ConnectionStatus StrEnum) """ - if self._opened is None and self._closed is None: + opened, closed = self._opened, self._closed + if opened is None and closed is None: return ConnectionStatus.created - if self._opened: + if opened: return ConnectionStatus.opened - if self._closed: + if closed: return ConnectionStatus.closed raise ConnectionError(f"{self} is in an unknown state") @@ -172,17 +150,16 @@ def echo(self) -> bool: async def close(self) -> None: """Close the connection.""" + if self._closed: + return if self._opened: await self._connection.disconnect() - self._opened = False - self._closed = True + self._opened = False + self._closed = True async def commit(self): raise NotSupportedError - async def rollback(self): - raise NotSupportedError - async def connect(self) -> None: if not self._opened: await self._connection.connect() @@ -199,8 +176,8 @@ def cursor(self, cursor: Optional[Type[Cursor]] = None, *, echo: bool = False) - of a default `Cursor` class will be created with echoing set to True even if the `self.echo` property returns False. - :param cursor Optional[Cursor]: Cursor factory class - :param echo bool: to override the `Connection.echo` parametre for a cursor + :param cursor Optional[Type[Cursor]]: Cursor factory class + :param echo bool: to override the `Connection.echo` parameter for a cursor :return: the cursor object of a connection :rtype: Cursor @@ -220,7 +197,38 @@ async def ping(self) -> None: msg = f"Ping has failed for {self}" raise ConnectionError(msg) + async def _refresh(self) -> None: + """Refresh the connection. + + It does ping and if it fails, + attempts to connect again. + If reconnecting fails, + an exception is raised and + the connection cannot be refreshed + + :raises ConnectionError: refreshing already closed connection + + :return: None + """ + + status = self.status + if status == ConnectionStatus.created: + msg = f"the {self} is not opened to be refreshed" + raise ConnectionError(msg) + if status == ConnectionStatus.closed: + msg = f"the {self} is already closed" + raise ConnectionError(msg) + + try: + await self.ping() + except ConnectionError: + await self.connect() + + async def rollback(self): + raise NotSupportedError + +@asynccontextmanager async def connect( dsn: Optional[str] = None, user: str = constants.DEFAULT_USER, @@ -231,16 +239,16 @@ async def connect( cursor_cls=Cursor, echo: bool = False, **kwargs, -) -> Connection: +) -> AsyncIterator[Connection]: """Return an opened connection to a ClickHouse server. - Equivalent to the following steps: + Before the v0.3.0, was equivalent to: 1. conn = Connection(...) # init a Connection instance 2. conn.connect() # connect to a ClickHouse server 3. return conn - When the connection is no longer needed, - consider `await`ing the `conn.close()` method. + Since the v0.3.0 is an asynchronous context manager + that handles resource clean-up. :param dsn str: DSN/connection string (if None -> constructed from default dsn parts) :param user str: user string ("default" by default) @@ -252,8 +260,8 @@ async def connect( :param echo bool: connection echo mode (False by default) :param kwargs dict: connection settings - :return: an opened Connection object - :rtype: Connection + :return: an async Connection context manager + :rtype: AsyncIterator[Connection] """ conn = Connection( @@ -267,5 +275,8 @@ async def connect( echo=echo, **kwargs, ) - await conn.connect() - return conn + try: + await conn.connect() + yield conn + finally: + await conn.close() diff --git a/asynch/pool.py b/asynch/pool.py index 06a76cf..8d62229 100644 --- a/asynch/pool.py +++ b/asynch/pool.py @@ -4,7 +4,7 @@ from contextlib import asynccontextmanager from typing import AsyncIterator, Optional -from asynch.connection import Connection, connect +from asynch.connection import Connection from asynch.errors import AsynchPoolError from asynch.proto import constants from asynch.proto.models.enums import PoolStatus @@ -90,26 +90,13 @@ def closed(self) -> Optional[bool]: return self._closed - @property - def connections(self) -> int: - """Returns the number of connections in the pool. - - This number represents the current size of the pool - which is the sum of the acquired and free connections. - - :return: the number of connections in the pool - :rtype: int - """ - - return self.acquired_connections + self.free_connections - @property def acquired_connections(self) -> int: """Returns the number of connections acquired from the pool. A connection is acquired when `pool.connection()` is invoked. - :return: the number of connections requested the pool + :return: the number of connections requested from the pool :rtype: int """ @@ -119,12 +106,25 @@ def acquired_connections(self) -> int: def free_connections(self) -> int: """Returns the number of free connections in the pool. - :return: the number of connections in the pool + :return: the number of free connections in the pool :rtype: int """ return len(self._free_connections) + @property + def connections(self) -> int: + """Returns the number of connections associated with the pool. + + This number represents the current size of the pool, + which is the sum of the acquired and free connections. + + :return: the number of connections related to the pool + :rtype: int + """ + + return self.acquired_connections + self.free_connections + @property def maxsize(self) -> int: return self._maxsize @@ -140,14 +140,38 @@ async def _create_connection(self) -> None: if pool_size > maxsize: raise AsynchPoolError(f"{self} is overburden") - conn = await connect(**self._connection_kwargs) + conn = Connection(**self._connection_kwargs) + await conn.connect() self._free_connections.append(conn) - async def _acquire_connection(self) -> Connection: + def _pop_connection(self) -> Connection: if not self._free_connections: raise AsynchPoolError(f"no free connection in {self}") - conn = self._free_connections.popleft() + return self._free_connections.popleft() + + async def _get_fresh_connection(self) -> Optional[Connection]: + while self._free_connections: + conn = self._pop_connection() + try: + await conn._refresh() + return conn + except ConnectionError as e: + msg = f"the {conn} is invalidated: {e}" + logger.warning(msg) + + async def _acquire_connection(self) -> Connection: + if conn := await self._get_fresh_connection(): + self._acquired_connections.append(conn) + return conn + + avail = self.maxsize - self.connections + if avail < 0: + msg = f"no fresh connection to acquire from {self}" + raise AsynchPoolError(msg) + + await self._create_connection() + conn = self._pop_connection() self._acquired_connections.append(conn) return conn @@ -155,24 +179,38 @@ async def _release_connection(self, conn: Connection) -> None: if conn not in self._acquired_connections: raise AsynchPoolError(f"the connection {conn} does not belong to {self}") + try: + await conn._refresh() + self._free_connections.append(conn) + except ConnectionError: + pass # the invalidated connection is lost + self._acquired_connections.remove(conn) - self._free_connections.append(conn) async def _init_connections(self, n: Optional[int] = None) -> None: to_create = n if n is not None else self.minsize + if to_create < 0: - msg = f"cannot create ({to_create}) negative connections for {self}" + msg = f"cannot create negative number ({to_create}) of connections for {self}" raise ValueError(msg) - if to_create == 0: - return if (self.connections + to_create) > self.maxsize: - msg = f"cannot create {to_create} connections to exceed the size of {self}" + msg = f"cannot create {to_create} connections that will exceed the size of {self}" raise AsynchPoolError(msg) + + if not to_create: + return tasks: list[asyncio.Task] = [ asyncio.create_task(self._create_connection()) for _ in range(to_create) ] await asyncio.wait(fs=tasks) + async def _ensure_minsize_connections(self) -> None: + connections = self.connections + minsize = self.minsize + if connections < minsize: + gap = minsize - connections + await self._init_connections(min(minsize, gap)) + @asynccontextmanager async def connection(self) -> AsyncIterator[Connection]: """Get a connection from the pool. @@ -188,23 +226,17 @@ async def connection(self) -> AsyncIterator[Connection]: async with self._sem: async with self._lock: - if not self._free_connections: - conns, maxsize = self.connections, self.maxsize - avail = maxsize - conns - if (maxsize - conns) < 0: - msg = ( - f"the number of pool connections ({conns}) " - f"exceeds the pool maxsize ({maxsize}) for {self}" - ) - raise AsynchPoolError(msg) - to_create = min(self.minsize, avail) - await self._init_connections(to_create) + await self._ensure_minsize_connections() conn = await self._acquire_connection() + # due to possible connection exhaustion, + # ensuring minsize connection number + await self._ensure_minsize_connections() try: yield conn finally: async with self._lock: await self._release_connection(conn) + await self._ensure_minsize_connections() async def startup(self) -> "Pool": """Initialise the pool. @@ -212,6 +244,9 @@ async def startup(self) -> "Pool": When entering the context, the pool get filled with connections up to the pool `minsize` value. + + :return: a pool object with `minsize` opened connections + :rtype: Pool """ async with self._lock: @@ -242,22 +277,24 @@ async def shutdown(self) -> None: self._closed = True +@asynccontextmanager async def create_pool( minsize: int = constants.POOL_MIN_SIZE, maxsize: int = constants.POOL_MAX_SIZE, loop: Optional[asyncio.AbstractEventLoop] = None, **kwargs, -) -> Pool: +) -> AsyncIterator[Pool]: """Returns an initiated connection pool. The initiated pool means it is filled with `minsize` connections. - Equivalent to: + Before the v0.3.0, was equivalent to: 1. pool = Pool(...) 2. await pool.startup() 3. return pool - Do not forget to `await pool.shutdown()` for resource clean-up. + Since the v0.3.0 is an asynchronous context manager + that handles resource clean-up. :param minsize int: the minimum number of connections in the pool :param maxsize int: the maximum number of connections in the pool @@ -265,7 +302,7 @@ async def create_pool( :param kwargs dict: connection settings :return: a connection pool object - :rtype: Pool + :rtype: AsyncIterator[Pool] """ pool = Pool( @@ -274,5 +311,8 @@ async def create_pool( loop=loop, **kwargs, ) - await pool.startup() - return pool + try: + await pool.startup() + yield pool + finally: + await pool.shutdown() diff --git a/tests/conftest.py b/tests/conftest.py index 5f643ce..6bfbac1 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -65,40 +65,38 @@ async def column_options(): @pytest.fixture(scope="session", autouse=True) async def initialize_tests(): - conn = await connect(dsn=CONNECTION_DSN) - async with conn.cursor(cursor=DictCursor) as cursor: - await cursor.execute("create database if not exists test") - await cursor.execute("drop table if exists test.asynch") - await cursor.execute( - """ - CREATE TABLE if not exists test.asynch - ( - `id` Int32, - `decimal` Decimal(10, 2), - `date` Date, - `datetime` DateTime, - `float` Float32, - `uuid` UUID, - `string` String, - `ipv4` IPv4, - `ipv6` IPv6, - `bool` Bool + async with connect(dsn=CONNECTION_DSN) as conn: + async with conn.cursor(cursor=DictCursor) as cursor: + await cursor.execute("create database if not exists test") + await cursor.execute("drop table if exists test.asynch") + await cursor.execute( + """ + CREATE TABLE if not exists test.asynch + ( + `id` Int32, + `decimal` Decimal(10, 2), + `date` Date, + `datetime` DateTime, + `float` Float32, + `uuid` UUID, + `string` String, + `ipv4` IPv4, + `ipv6` IPv6, + `bool` Bool + ) + ENGINE = MergeTree + ORDER BY id + """ ) - ENGINE = MergeTree - ORDER BY id - """ - ) - yield - await conn.close() + yield @pytest.fixture(scope="function", autouse=True) async def truncate_table(): - conn = await connect(dsn=CONNECTION_DSN) - async with conn.cursor(cursor=DictCursor) as cursor: - await cursor.execute("truncate table test.asynch") - yield - await conn.close() + async with connect(dsn=CONNECTION_DSN) as conn: + async with conn.cursor(cursor=DictCursor) as cursor: + await cursor.execute("truncate table test.asynch") + yield @pytest.fixture(scope="function") diff --git a/tests/test_connection.py b/tests/test_connection.py index b9a5498..15e0787 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -228,3 +228,19 @@ async def test_connection_cleanup(get_tcp_connections): final_tcps = await get_tcp_connections(cn) assert final_tcps == init_tcps + + +@pytest.mark.asyncio +async def test_connection_close(): + conn = Connection() + + await conn.close() + + assert not conn.opened + assert conn.closed + + async with Connection() as conn: + await conn.close() + + assert not conn.opened + assert conn.closed diff --git a/tests/test_pool.py b/tests/test_pool.py index d490487..1ac173d 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -4,7 +4,7 @@ import pytest from asynch.connection import Connection -from asynch.pool import Pool +from asynch.pool import Pool, create_pool from asynch.proto import constants from asynch.proto.models.enums import PoolStatus @@ -184,3 +184,35 @@ async def _test_pool_connection(pool: Pool, *, selectee: Any = 42): assert noc == init_tcps assert selectees == answers + + +@pytest.mark.asyncio +async def test_pool_broken_connection_handling(): + min_size, max_size = 1, 1 + async with Pool(minsize=min_size, maxsize=max_size) as pool: + async with pool.connection() as conn: + await conn.ping() + + assert pool.free_connections == 0 + assert pool.acquired_connections == 1 + + # things go wrong here -> the connection gets broken + await conn.close() + with pytest.raises(ConnectionError): + await conn.ping() + + # when leaving the connection context, + # the pool should ensure its consistency + + assert pool.free_connections == 1 + assert pool.acquired_connections == 0 + + +@pytest.mark.asyncio +async def test_create_pool_async_context_manager(): + async with create_pool() as pool: + async with pool.connection() as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT 42") + ret = await cursor.fetchone() + assert ret == (42,)