Skip to content

Refactor connection classes #107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Aug 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
Expand Down
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

### 0.2.5

- Make Python3.9 the minimum supported version. Update project dependencies, metadata, tests. By @stankudrow in #106.
- Add the asynchronous context manager support to the `asynch.Connection` class. By @stankudrow in #107.
- Make Python3.9 the minimum supported version. Update the project dependencies, metadata, tests. By @stankudrow in #106.

### 0.2.4

Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
export

checkfiles = asynch/ tests/ benchmark/
py_warn = PYTHONDEVMODE=1
py_debug_envvars = PYTHONDEVMODE=1 PYTHONTRACEMALLOC=1

up:
@poetry update
Expand All @@ -23,7 +23,7 @@ check: deps
@ruff check $(checkfiles)

test: deps
$(py_warn) pytest
$(py_debug_envvars) pytest

build: deps clean
@poetry build
Expand Down
220 changes: 105 additions & 115 deletions asynch/connection.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,35 @@
import ssl
from typing import Optional
from urllib.parse import parse_qs, unquote, urlparse

from asynch import errors
from asynch.cursors import Cursor
from asynch.proto import constants
from asynch.proto.connection import Connection as ProtoConnection
from asynch.proto.utils.compat import asbool
from asynch.proto.models.enums import ConnectionStatuses
from asynch.proto.utils.dsn import parse_dsn


class Connection:
def __init__(
self,
dsn: Optional[str] = None,
user: str = constants.DEFAULT_USER,
password: str = constants.DEFAULT_PASSWORD,
host: str = constants.DEFAULT_HOST,
port: int = constants.DEFAULT_PORT,
database: str = constants.DEFAULT_DATABASE,
user: str = constants.DEFAULT_USER,
password: str = constants.DEFAULT_PASSWORD,
cursor_cls=Cursor,
echo: bool = False,
stack_track: bool = False,
**kwargs,
):
self._dsn = dsn
self._user = user
self._password = password
self._host = host
self._port = port
self._database = database
self._connection_kwargs = kwargs
self._is_closed = False
self._echo = echo
self._cursor_cls = cursor_cls
if dsn:
self._connection = ProtoConnection(
**self._parse_dsn(dsn), stack_track=stack_track, **kwargs
)
config = parse_dsn(dsn)
self._connection = ProtoConnection(**config, stack_track=stack_track, **kwargs)
user = config.get("user", None) or user
password = config.get("password", None) or password
host = config.get("host", None) or host
port = config.get("port", None) or port
database = config.get("database", None) or database
else:
self._connection = ProtoConnection(
host=host,
Expand All @@ -47,16 +40,64 @@ def __init__(
stack_track=stack_track,
**kwargs,
)
self._dsn = dsn
# dsn parts
self._user = user
self._password = password
self._host = host
self._port = port
self._database = database
# connection additional settings
self._is_connected: Optional[bool] = None
self._is_closed: Optional[bool] = None
self._echo = echo
self._cursor_cls = cursor_cls
self._connection_kwargs = kwargs

async def __aenter__(self) -> "Connection":
await self.connect()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()

def __repr__(self):
return "<connection object at 0x{0:x}; closed: {1:}>".format(id(self), self._is_closed)
cls_name = self.__class__.__name__
prefix = f"<{cls_name} object at 0x{id(self):x}; status: "
if self.connected:
prefix += ConnectionStatuses.opened
elif self.closed:
prefix += ConnectionStatuses.closed
else:
prefix += ConnectionStatuses.created
return f"{prefix}>"

@property
def connected(self) -> bool:
return self._connection.connected
def connected(self) -> Optional[bool]:
"""Returns the connection open status.

If the return value is None,
the connection was only created,
but neither opened or closed.

:returns: the connection open status
:rtype: None | bool
"""

return self._is_connected

@property
def closed(self) -> bool:
def closed(self) -> Optional[bool]:
"""Returns the connection close status.

If the return value is None,
the connection was only created,
but neither opened or closed.

:returns: the connection close status
:rtype: None | bool
"""

return self._is_closed

@property
Expand Down Expand Up @@ -84,123 +125,72 @@ def echo(self) -> bool:
return self._echo

async def close(self) -> None:
if self._is_closed:
return
await self._connection.disconnect()
self._is_closed = True
if self._is_connected:
await self._connection.disconnect()
self._is_connected = False
self._is_closed = True

async def commit(self):
raise errors.NotSupportedError

async def rollback(self):
raise errors.NotSupportedError

async def connect(self):
if self.connected:
return
await self._connection.connect()
async def connect(self) -> None:
if not self._is_connected:
await self._connection.connect()
self._is_connected = True
if self._is_closed is True:
self._is_closed = False

def cursor(self, cursor: Optional[Cursor] = None) -> Cursor:
def cursor(self, cursor: Optional[Cursor] = None, *, echo: bool = False) -> Cursor:
cursor_cls = cursor or self._cursor_cls
return cursor_cls(self, self._echo)

def _parse_dsn(self, url):
"""
Return a client configured from the given URL.
return cursor_cls(self, self._echo or echo)

For example::
async def ping(self) -> None:
"""Check the connection liveliness.

clickhouse://[user:password]@localhost:9000/default
clickhouses://[user:password]@localhost:9440/default
:raises ConnectionError: if ping() has failed

Three URL schemes are supported:
clickhouse:// creates a normal TCP socket connection
clickhouses:// creates a SSL wrapped TCP socket connection

Any additional querystring arguments will be passed along to
the Connection class's initializer.
:return: None
"""
url = urlparse(url)

settings = {}
kwargs = {}

if url.hostname is not None:
self._host = kwargs["host"] = unquote(url.hostname)

if url.port is not None:
self._port = kwargs["port"] = url.port

path = url.path.replace("/", "", 1)
if path:
self._database = kwargs["database"] = path

if url.username is not None:
self._user = kwargs["user"] = unquote(url.username)

if url.password is not None:
self._password = kwargs["password"] = unquote(url.password)

if url.scheme == "clickhouses":
kwargs["secure"] = True

compression_algs = {"lz4", "lz4hc", "zstd"}
timeouts = {"connect_timeout", "send_receive_timeout", "sync_request_timeout"}

for name, value in parse_qs(url.query).items():
if not value or not len(value):
continue

value = value[0]

if name == "compression":
value = value.lower()
if value in compression_algs:
kwargs[name] = value
else:
kwargs[name] = asbool(value)

elif name == "secure":
kwargs[name] = asbool(value)

elif name == "client_name":
kwargs[name] = value

elif name in timeouts:
kwargs[name] = float(value)

elif name == "compress_block_size":
kwargs[name] = int(value)

# ssl
elif name == "verify":
kwargs[name] = asbool(value)
elif name == "ssl_version":
kwargs[name] = getattr(ssl, value)
elif name in ["ca_certs", "ciphers"]:
kwargs[name] = value
elif name == "alt_hosts":
kwargs["alt_hosts"] = value
else:
settings[name] = value

if settings:
kwargs["settings"] = settings

return kwargs
if not await self._connection.ping():
msg = f"Ping has failed for {self}"
raise ConnectionError(msg)


async def connect(
dsn: Optional[str] = None,
user: str = constants.DEFAULT_USER,
password: str = constants.DEFAULT_PASSWORD,
host: str = constants.DEFAULT_HOST,
port: int = constants.DEFAULT_PORT,
database: str = constants.DEFAULT_DATABASE,
user: str = constants.DEFAULT_USER,
password: str = constants.DEFAULT_PASSWORD,
cursor_cls=Cursor,
echo: bool = False,
**kwargs,
) -> Connection:
conn = Connection(dsn, host, port, database, user, password, cursor_cls, echo=echo, **kwargs)
"""Open the connection to a ClickHouse server.

Equivalent to the following steps:
1. conn = Connection(...) # init a Connection instance
2. conn.connect() # connect to a ClickHouse instance

:return: the open connection
:rtype: Connection
"""

conn = Connection(
dsn=dsn,
user=user,
password=password,
host=host,
port=port,
database=database,
cursor_cls=cursor_cls,
echo=echo,
**kwargs,
)
await conn.connect()
return conn
2 changes: 1 addition & 1 deletion asynch/proto/columns/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
("Point", "Tuple(Float64, Float64)"),
("Ring", "Array(Point)"),
("Polygon", "Array(Ring)"),
("MultiPolygon", "Array(Polygon)")
("MultiPolygon", "Array(Polygon)"),
# End Geo types
]

Expand Down
Loading
Loading