Skip to content

Commit

Permalink
Merge pull request #111 from stankudrow/refine-connection-cursor-pool…
Browse files Browse the repository at this point in the history
…-apis

Reconsider the `Connection`, `Cursor` and `Pool` classes in terms of unification and marking some public API deprecated.
  • Loading branch information
long2ice authored Sep 19, 2024
2 parents 49764c5 + 1b18bb7 commit 52a2336
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 155 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### 0.2.5

- Reconsider the API of the `Connection`, `Cursor` and `Pool` classes and deprecate outdated methods or properties. Define the DB-API v2.0 compliant exception hierarchy. Update project dependencies and metadata. By @stankudrow in #111.
- Fix infinite iteration case when a cursor object is put in the `async for` loop. By @stankudrow in #112.
- Fix pool connection management (the discussion #108 by @DFilyushin) by @stankudrow in #109:

Expand Down
115 changes: 74 additions & 41 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,74 +7,91 @@

## Introduction

`asynch` is an asyncio ClickHouse Python Driver with native (TCP) interface support, which reuse most of [clickhouse-driver](https://github.com/mymarilyn/clickhouse-driver) and comply with [PEP249](https://www.python.org/dev/peps/pep-0249/).
`asynch` is an asynchronous ClickHouse Python driver with native TCP interface support, which reuses most of [clickhouse-driver](https://github.com/mymarilyn/clickhouse-driver) features and complies with [PEP249](https://www.python.org/dev/peps/pep-0249/).

## Install
## Installation

```shell
> pip install asynch
```

or if you want to install [`clickhouse-cityhash`](https://pypi.org/project/clickhouse-cityhash/) to enable
transport compression
If you want to install [`clickhouse-cityhash`](https://pypi.org/project/clickhouse-cityhash/) to enable transport compression

```shell
> pip install asynch[compression]
```

## Usage

Connect to ClickHouse
Basically, a connection to a ClickHouse server can be established in two ways:

```python
from asynch import connect

async def connect_database():
conn = await connect(
host = "127.0.0.1",
port = 9000,
database = "default",
user = "default",
password = "",
)
```
1. with a DSN string, e.g., `clickhouse://[user:password]@host:port/database`;

```python
from asynch import connect

# connecting with a DSN string
async def connect_database():
conn = await connect(
dsn = "clickhouse://ch_user:P@55w0rD:@127.0.0.1:9000/chdb",
)
```

2. with separately given connection/DSN parameters: `user` (optional), `password` (optional), `host`, `port`, `database`.

```python
from asynch import connect

# connecting with DSN parameters
async def connect_database():
conn = await connect(
user = "ch_user",
password = "P@55w0rD",
host = "127.0.0.1",
port = 9000,
database = "chdb",
)
```

Create table by sql
If a DSN string is given, it takes priority over any specified connection parameter.

Create a database and a table by executing SQL statements via an instance of the `Cursor` class (here its child `DictCursor` class) acquired from an instance of the `Connection` class.

```python
async def create_table():
async def create_table(conn: Connection):
async with conn.cursor(cursor=DictCursor) as cursor:
await cursor.execute('create database if not exists test')
await cursor.execute("CREATE DATABASE IF NOT EXISTS test")
await cursor.execute("""
CREATE TABLE if not exists test.asynch
(
`id` Int32,
`decimal` Decimal(10, 2),
`date` Date,
`datetime` DateTime,
`float` Float32,
`uuid` UUID,
`string` String,
`ipv4` IPv4,
`ipv6` IPv6
)
ENGINE = MergeTree
ORDER BY id"""
(
`id` Int32,
`decimal` Decimal(10, 2),
`date` Date,
`datetime` DateTime,
`float` Float32,
`uuid` UUID,
`string` String,
`ipv4` IPv4,
`ipv6` IPv6
)
ENGINE = MergeTree
ORDER BY id
"""
)
```

Use `fetchone`
Fetching one row from an executed SQL statement:

```python
async def fetchone():
async def fetchone(conn: Connection):
# by default, an instance of the `Cursor` class
async with conn.cursor() as cursor:
await cursor.execute("SELECT 1")
ret = await cursor.fetchone()
assert ret == (1,)
```

Use `fetchmany`
Fetching all the rows from an executed SQL statement:

```python
async def fetchall():
Expand All @@ -84,7 +101,7 @@ async def fetchall():
assert ret == [(1,)]
```

Use `DictCursor` to get result with dict
Using an instance of the `DictCursor` class to get results as a sequence of `dict`ionaries representing the rows of an executed SQL query:

```python
async def dict_cursor():
Expand All @@ -94,7 +111,7 @@ async def dict_cursor():
assert ret == [{"1": 1}]
```

Insert data with dict
Inserting data with `dict`s via a `DictCursor` instance:

```python
from asynch.cursors import DictCursor
Expand All @@ -120,7 +137,7 @@ async def insert_dict():
assert ret == 1
```

Insert data with tuple
Inserting data with `tuple`s:

```python
async def insert_tuple():
Expand All @@ -144,7 +161,9 @@ async def insert_tuple():
assert ret == 1
```

Use connection pool
### Connection Pool

Before the v0.2.4:

```python
async def use_pool():
Expand All @@ -158,6 +177,20 @@ async def use_pool():
await pool.wait_closed()
```

Since the v0.2.5:

```python
async def use_pool():
# init a Pool and fill it with `minsize` opened connections
async with Pool(minsize=1, maxsize=2) as pool:
# acquire a connection from the pool
async with pool.connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT 1")
ret = await cursor.fetchone()
assert ret == (1,)
```

## ThanksTo

- [clickhouse-driver](https://github.com/mymarilyn/clickhouse-driver), ClickHouse Python Driver with native interface support.
Expand Down
63 changes: 47 additions & 16 deletions asynch/connection.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import Optional
from warnings import warn

from asynch import errors
from asynch.cursors import Cursor
from asynch.errors import NotSupportedError
from asynch.proto import constants
from asynch.proto.connection import Connection as ProtoConnection
from asynch.proto.models.enums import ConnectionStatuses
Expand Down Expand Up @@ -51,9 +51,17 @@ def __init__(
# connection additional settings
self._opened: Optional[bool] = None
self._closed: Optional[bool] = None
self._echo = echo
self._cursor_cls = cursor_cls
self._connection_kwargs = kwargs
warn(
(
"The `echo` flag in the constructor is deprecated since the v0.2.5. "
"This flag is specifiable in the `cursor` method of the Connection class. "
"The `echo` parameter may be removed in the version 0.2.6 or later."
),
DeprecationWarning,
)
self._echo = echo

async def __aenter__(self) -> "Connection":
await self.connect()
Expand All @@ -62,7 +70,7 @@ async def __aenter__(self) -> "Connection":
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()

def __repr__(self):
def __repr__(self) -> str:
cls_name = self.__class__.__name__
status = self.status
return f"<{cls_name} object at 0x{id(self):x}; status: {status}>"
Expand All @@ -84,8 +92,8 @@ def connected(self) -> Optional[bool]:

warn(
(
"Please consider using the `connection.opened` property. "
"This property may be removed in the version 0.2.6 or a later release."
"Please consider using the `opened` property. "
"The `connected` property may be removed in the version 0.2.6 or later."
),
DeprecationWarning,
)
Expand Down Expand Up @@ -132,8 +140,8 @@ def status(self) -> str:
When leaving the context, the `conn.closed` is True
and the `conn.opened` is False.
:raise ConnectionError: unknown connection state
:return: the connection status
:raise ConnectionError: an unresolved connection state
:return: the Connection object status
:rtype: str (ConnectionStatuses StrEnum)
"""

Expand Down Expand Up @@ -167,6 +175,13 @@ def database(self) -> str:

@property
def echo(self) -> bool:
warn(
(
"The `echo` parameter should be specified in the `cursor` method."
"The property may be removed in the version 0.2.6 or later."
),
DeprecationWarning,
)
return self._echo

async def close(self) -> None:
Expand All @@ -178,10 +193,10 @@ async def close(self) -> None:
self._closed = True

async def commit(self):
raise errors.NotSupportedError
raise NotSupportedError

async def rollback(self):
raise errors.NotSupportedError
raise NotSupportedError

async def connect(self) -> None:
if not self._opened:
Expand All @@ -199,14 +214,23 @@ def cursor(self, cursor: Optional[Cursor] = None, *, echo: bool = False) -> Curs
of a default `Cursor` class will be created with echoing
set to True even if the `self.echo` property returns False.
:param cursor None | Cursor: a Cursor factory class
:param cursor Optional[Cursor]: Cursor factory class
:param echo bool:
:return: the cursor from a connection
:return: the cursor object of a connection
:rtype: Cursor
"""

cursor_cls = cursor or self._cursor_cls
return cursor_cls(self, echo or self._echo)
warn(
(
"When `echo` parameter is set to False (by default), "
"the deprecated `self.echo` property is in effect ."
"This behaviour may be removed in the version 0.2.6 or later."
),
UserWarning,
)
return cursor_cls(self, echo or self.echo)

async def ping(self) -> None:
"""Check the connection liveliness.
Expand All @@ -231,11 +255,18 @@ async def connect(
echo: bool = False,
**kwargs,
) -> Connection:
"""Open the connection to a ClickHouse server.
"""Return an opened connection to a ClickHouse server.
Equivalent to the following steps:
1. conn = Connection(...) # init a Connection instance
2. conn.connect() # connect to a ClickHouse instance
2. conn.connect() # connect to a ClickHouse server
3. return conn
When the connection is no longer needed,
consider `await`ing the `conn.close()` method.
The `echo` parameter is deprecated since the version 0.2.5.
It may be removed in the version 0.2.6 or later.
:param dsn str: DSN/connection string (if None -> constructed from default dsn parts)
:param user str: user string ("default" by default)
Expand All @@ -244,10 +275,10 @@ async def connect(
:param port int: port integer (9000 by default)
:param database str: database string ("default" by default)
:param cursor_cls Cursor: Cursor class (asynch.Cursor by default)
:param echo bool: echo mode flag (False by default)
:param echo bool: connection echo mode (False by default)
:param kwargs dict: connection settings
:return: an opened connection
:return: an opened Connection object
:rtype: Connection
"""

Expand Down
Loading

0 comments on commit 52a2336

Please sign in to comment.