Skip to content

Commit

Permalink
finish API reconsideration and marking redundant parts deprecated
Browse files Browse the repository at this point in the history
  • Loading branch information
Stanley Kudrow committed Aug 26, 2024
1 parent ae858f4 commit 0058f78
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 45 deletions.
8 changes: 4 additions & 4 deletions asynch/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async def __aenter__(self) -> "Connection":
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()

def __repr__(self):
def __repr__(self) -> str:
cls_name = self.__class__.__name__
status = self.status
return f"<{cls_name} object at 0x{id(self):x}; status: {status}>"
Expand Down Expand Up @@ -140,8 +140,8 @@ def status(self) -> str:
When leaving the context, the `conn.closed` is True
and the `conn.opened` is False.
:raise ConnectionError: unknown connection state
:return: connection status
:raise ConnectionError: an unresolved connection state
:return: the Connection object status
:rtype: str (ConnectionStatuses StrEnum)
"""

Expand Down Expand Up @@ -177,7 +177,7 @@ def database(self) -> str:
def echo(self) -> bool:
warn(
(
"The echo parametre should be specified in the `connection.cursor(...)` method."
"The `echo` parameter should be specified in the `connection.cursor(...)` method."
"The property may be removed in the version 0.2.6 or later."
),
DeprecationWarning,
Expand Down
144 changes: 106 additions & 38 deletions asynch/cursors.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,50 @@
import logging
from collections import namedtuple
from typing import Optional
from warnings import warn

from asynch.errors import InterfaceError, ProgrammingError
from asynch.proto.models.enums import CursorStatuses

Column = namedtuple("Column", "name type_code display_size internal_size precision scale null_ok")

logger = logging.getLogger(__name__)


class CursorError(Exception):
pass


class States:
warn(
(
"Should not be used in the version 0.2.6 or later."
"Should be replaced with the reconsidered `CursorStatuses` enum "
"from the `asynch.proto.models.enums` module."
),
DeprecationWarning,
)
(NONE, RUNNING, FINISHED, CURSOR_CLOSED) = range(4)


class Cursor:
_states = States()
_columns_with_types = None

def __init__(self, connection=None, echo=False):
def __init__(self, connection=None, echo: bool = False):
self._connection = connection
self._reset_state()
self._rows = []
self._echo = echo
self._arraysize = 1

def __repr__(self) -> str:
cls_name = self.__class__.__name__
status = self.status
return (
f"<{cls_name}(connection={self._connection}, echo={self._echo})"
f" object at 0x{id(self):x}; status: {status}>"
)

@property
def connection(self):
"""This read-only attribute return a reference to the Connection
Expand All @@ -36,14 +58,36 @@ def rowcount(self):
"""
return self._rowcount

@property
def status(self) -> str:
"""Return the status of the cursor.
If conn.connected is None and conn.closed is None,
then the connection is in the "created" state.
It was neither opened nor closed.
When executing `async with conn: ...`,
the `conn.opened` is True and `conn.closed` is None.
When leaving the context, the `conn.closed` is True
and the `conn.opened` is False.
:raise CursorError: an unresolved cursor state
:return: the Cursor object status
:rtype: str (CursorStatuses StrEnum)
"""

if not self._state:
raise CursorError(f"{self} is in an unknown state")
return self._state

def setinputsizes(self, *args):
"""Does nothing, required by DB API."""

def setoutputsizes(self, *args):
"""Does nothing, required by DB API."""

async def close(self):
self._state = self._states.CURSOR_CLOSED
self._state = CursorStatuses.closed

async def execute(
self,
Expand Down Expand Up @@ -116,30 +160,24 @@ async def fetchone(self):
except: # noqa: E722
return None

else:
if not self._rows:
return None

return self._rows.pop(0)
if not self._rows:
return None
return self._rows.pop(0)

async def fetchmany(self, size: int):
async def fetchmany(self, size: Optional[int]):
self._check_query_started()

if size == 0:
return []

if size is None:
size = self._arraysize
if size == 0:
return []

if self._stream_results:
rv = []

async for i in self._rows:
rv.append(i)

if size > 0 and len(rv) >= size:
break

return rv

if size < 0:
Expand All @@ -148,7 +186,6 @@ async def fetchmany(self, size: int):
else:
rv = self._rows[:size]
self._rows = self._rows[size:]

return rv

async def fetchall(self):
Expand All @@ -162,10 +199,12 @@ async def fetchall(self):
return rv

def _reset_state(self):
"""Reset the state of the cursor.
Prepares a cursor object to handle another query.
"""
Resets query state and get ready for another query.
"""
self._state = self._states.NONE

self._state = CursorStatuses.ready

self._columns = None
self._types = None
Expand Down Expand Up @@ -258,7 +297,7 @@ async def __aenter__(self):

@property
def description(self):
if self._state == self._states.NONE:
if self._state == CursorStatuses.ready:
return None

columns = self._columns or []
Expand All @@ -270,8 +309,8 @@ def description(self):
]

def _check_query_started(self):
if self._state == self._states.NONE:
raise ProgrammingError("no results to fetch")
if self._state == CursorStatuses.ready:
raise ProgrammingError(f"no results to fetch from the {self}")

def _check_query_executing(self):
if self._connection._connection.is_query_executing:
Expand All @@ -280,14 +319,14 @@ def _check_query_executing(self):
)

def _check_cursor_closed(self):
if self._state == self._states.CURSOR_CLOSED:
raise InterfaceError("cursor is already closed")
if self._state == CursorStatuses.closed:
raise InterfaceError(f"the {self} is already closed")

def _begin_query(self):
self._state = self._states.RUNNING
self._state = CursorStatuses.running

def _end_query(self):
self._state = self._states.FINISHED
self._state = CursorStatuses.finished

def set_stream_results(self, stream_results, max_row_buffer):
"""
Expand Down Expand Up @@ -349,23 +388,52 @@ def set_query_id(self, query_id=""):


class DictCursor(Cursor):
async def fetchone(self):
row = await super(DictCursor, self).fetchone()
def __repr__(self) -> str:
cls_name = self.__class__.__name__
status = self.status
return (
f"<{cls_name}(connection={self._connection}, echo={self._echo})"
f" object at 0x{id(self):x}; status: {status}>"
)

async def fetchone(self) -> dict:
"""Fetch exactly one row from the last executed query.
:raises AttributeError: columns mismatch
:return: one row from the query
:rtype: dict
"""

row = await super().fetchone()
if self._columns:
return dict(zip(self._columns, row)) if row else {}
else:
raise AttributeError("Invalid columns.")
raise AttributeError("Invalid columns.")

async def fetchmany(self, size: int):
rows = await super(DictCursor, self).fetchmany(size)
async def fetchmany(self, size: int) -> list[dict]:
"""Fetch no more than `size` rows from the last executed query.
:raises AttributeError: columns mismatch
:return: the list of rows from the query
:rtype: list[dict]
"""

rows = await super().fetchmany(size)
if self._columns:
return [dict(zip(self._columns, item)) for item in rows] if rows else []
else:
raise AttributeError("Invalid columns.")
raise AttributeError("Invalid columns.")

async def fetchall(self):
rows = await super(DictCursor, self).fetchall()
async def fetchall(self) -> list[dict]:
"""Fetch all resulting rows from the last executed query.
:raises AttributeError: columns mismatch
:return: the list of all possible rows from the query
:rtype: list[dict]
"""

rows = await super().fetchall()
if self._columns:
return [dict(zip(self._columns, item)) for item in rows] if rows else []
else:
raise AttributeError("Invalid columns.")
raise AttributeError("Invalid columns.")
6 changes: 3 additions & 3 deletions asynch/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async def __aenter__(self) -> "Pool":
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.shutdown()

def __repr__(self):
def __repr__(self) -> str:
cls_name = self.__class__.__name__
status = self.status
return (
Expand All @@ -161,8 +161,8 @@ def status(self) -> str:
When leaving the context, the `pool.closed` is True
and the `pool.opened` is False.
:raise PoolError: unresolved pool state.
:return: pool status
:raise PoolError: an unresolved pool state.
:return: the Pool object status
:rtype: str (PoolStatuses StrEnum)
"""

Expand Down
7 changes: 7 additions & 0 deletions asynch/proto/models/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ class ConnectionStatuses(str, Enum):
closed = "closed"


class CursorStatuses(str, Enum):
ready = "ready"
running = "running"
finished = "finished"
closed = "closed"


class PoolStatuses(str, Enum):
created = "created"
opened = "opened"
Expand Down
19 changes: 19 additions & 0 deletions tests/test_cursors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,25 @@
from asynch.proto import constants


async def test_dict_cursor_repr(conn):
repstr = "<DictCursor(connection={conn}, echo={echo}) object at 0x{cid:x};"

echo = True
async with conn.cursor(cursor=DictCursor, echo=echo) as cursor:
repstr = repstr.format(conn=conn, echo=echo, cid=id(cursor))
repstr = repstr + " status: {status}>"

assert repr(cursor) == repstr.format(status="ready")

await cursor.execute("SELECT 1")
assert repr(cursor) == repstr.format(status="finished")

ret = await cursor.fetchone()
assert ret == {"1": 1}

assert repr(cursor) == repstr.format(status="closed")


@pytest.mark.asyncio
async def test_fetchone(conn):
async with conn.cursor() as cursor:
Expand Down

0 comments on commit 0058f78

Please sign in to comment.