Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration to async DB drivers #60

Merged
merged 55 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from 42 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
e8f8bb7
async and sync conflict
pseusys Dec 27, 2022
67bd37d
github docs building reverted
pseusys Dec 27, 2022
93c04b9
lint updated
pseusys Dec 27, 2022
ab5d288
ydb transferred to sync driver
pseusys Jan 12, 2023
8b31209
refactor/async_db_drivers_migration: fix doc
kudep Jan 15, 2023
cf15f4a
refactor/async_db_drivers_migration: upd type hints in database
kudep Jan 15, 2023
61befdc
refactor/async_db_drivers_migration: merge
kudep Jan 15, 2023
ef750f3
context storages method signatures and type hints updated
pseusys Jan 15, 2023
21457d7
Merge remote-tracking branch 'origin/refactor/async_db_drivers_migrat…
pseusys Jan 15, 2023
3003208
validator updated, linted
pseusys Jan 15, 2023
b4de818
test bugs fixed
pseusys Jan 15, 2023
3952e58
postgres deprecation warning fix
pseusys Jan 15, 2023
86cdd83
DBAbstractContextStorage merged with DBContextStorage
pseusys Jan 15, 2023
04f0baa
tests passing
pseusys Jan 15, 2023
b87bb57
docs building fixed
pseusys Jan 15, 2023
17a3fc6
deleted obsolete, skipped tests info added
pseusys Jan 16, 2023
1b3757d
pytest full added skipped info
pseusys Jan 16, 2023
0a90d92
Merge branch 'dev' into refactor/async_db_drivers_migration
pseusys Jan 16, 2023
5957b97
attempt to fix ydb skipping
pseusys Jan 16, 2023
49ddef0
Merge remote-tracking branch 'origin/refactor/async_db_drivers_migrat…
pseusys Jan 16, 2023
2351209
venv blocking returned
pseusys Jan 16, 2023
f1a0406
installed libraries check added
pseusys Jan 16, 2023
3d859bf
installed libraries further check added
pseusys Jan 16, 2023
cca6e33
ydb six installed
pseusys Jan 16, 2023
68d8c0f
ydb async
pseusys Jan 16, 2023
937db36
async ydb error fixed
pseusys Jan 17, 2023
d7a9d46
table creation fixed
pseusys Jan 27, 2023
cd1374e
methods renamed + deprecation warning fixed
pseusys Jan 27, 2023
060facd
table-dropping script added
pseusys Jan 28, 2023
bcfa7d5
shelve explicit synchronization added
pseusys Jan 28, 2023
e46347c
deleting db files if they exist only
pseusys Jan 28, 2023
8a884ff
docstring warning fixed
pseusys Feb 2, 2023
873873b
`DBAbstractContextStorage` documentation added
pseusys Feb 2, 2023
b1b8c2f
JSON and Pickle dependencies management updated
pseusys Feb 2, 2023
7776df6
sqlalchemy extracted from dependencies
pseusys Feb 2, 2023
c8b85ad
attempt to fix coverage
pseusys Feb 2, 2023
a07dff7
... linted
pseusys Feb 2, 2023
d3a0ab3
test coverage lowered
pseusys Feb 4, 2023
65e18ae
merged with dev
pseusys Feb 4, 2023
e5e3c89
reviewed changes revisited
pseusys Feb 9, 2023
f98b6ef
test coverage magic✨✨
pseusys Feb 9, 2023
2704ee1
sqlalchemy version updated, one of the warnings fixed
pseusys Feb 9, 2023
0026244
`__init__.py`'s reformatted + coverage raised (by fixing concurrency …
pseusys Feb 10, 2023
9564254
casts removed
pseusys Feb 10, 2023
f2ead44
indentation issue fixed
pseusys Feb 10, 2023
ff9db29
database fields docs moved to `__init__()`
pseusys Feb 10, 2023
90144c3
pragma reversed
pseusys Feb 10, 2023
142e008
attempt to fix windows 8 stacking issue
pseusys Feb 10, 2023
9b4628d
attempt to fix windows python8 stacking issue
pseusys Feb 10, 2023
f4bd682
attempt to fix windows python8 hanging issue (correct now)
pseusys Feb 10, 2023
46634e5
refactor/async_db_drivers_migration: upd cover threshhold
kudep Feb 13, 2023
0e42724
pytest plugins versions updated
pseusys Feb 13, 2023
8fa79be
.coveragerc added to special excluded files list
pseusys Feb 13, 2023
2914a10
dependency and workflow changes reverted
pseusys Feb 13, 2023
4af1868
Update setup.py
kudep Feb 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test_full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ jobs:
if [ "$RUNNER_OS" == "Linux" ]; then
source <(cat .env_file | sed 's/=/=/' | sed 's/^/export /')
fi
pytest --tb=long -vv --cache-clear --cov=dff tests/
pytest -rs --tb=long -vv --cache-clear --cov=dff tests/
shell: bash
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ from dff.script import Context, Actor
from dff.context_storages import SQLContextStorage
from .script import some_df_script

db = SQLContextStorage("postgresql://user:password@host:port/dbname")
db = SQLContextStorage("postgresql+asyncpg://user:password@host:port/dbname")

actor = Actor(some_df_script, start_label=("root", "start"), fallback_label=("root", "fallback"))

Expand Down
11 changes: 8 additions & 3 deletions dff/context_storages/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# -*- coding: utf-8 -*-
# flake8: noqa: F401
# fmt: off
pseusys marked this conversation as resolved.
Show resolved Hide resolved

from .database import DBAbstractContextStorage, DBContextStorage, threadsafe_method, context_storage_factory
from .json import JSONContextStorage
from .pickle import PickleContextStorage
import nest_asyncio

nest_asyncio.apply()

from .database import DBContextStorage, threadsafe_method, context_storage_factory
from .json import JSONContextStorage, json_available
from .pickle import PickleContextStorage, pickle_available
from .sql import SQLContextStorage, postgres_available, mysql_available, sqlite_available, sqlalchemy_available
from .ydb import YDBContextStorage, ydb_available
from .redis import RedisContextStorage, redis_available
Expand Down
202 changes: 146 additions & 56 deletions dff/context_storages/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,85 +2,176 @@
Database
--------
Base module. Provided classes:
- Abstract context storage interface :py:class:`.DBAbstractContextStorage`.
- An intermediate class to inherit from: :py:class:`.DBContextStorage`
- Abstract context storage interface :py:class:`.DBContextStorage`.
"""
import asyncio
import importlib
import threading
from functools import wraps
from abc import ABC, abstractmethod
from typing import Any, Callable
from typing import Callable, Hashable, Optional

from .protocol import PROTOCOLS
from ..script import Context


class DBAbstractContextStorage(ABC):
class DBContextStorage(ABC):
"""
An abstract interface for `dff` DB context storages.
It includes the most essential methods of the python `dict` class.
Can not be instantiated.

Parameters
----------
:param path:
| Parameter `path` should be set with the URI of the database.
| It includes a prefix and the required connection credentials.
| Example: postgresql+asyncpg://user:password@host:port/database
| In the case of classes that save data to hard drive instead of external databases
| you need to specify the location of the file, like you do in sqlite.
| Keep in mind that in Windows you will have to use double backslashes '\\'
| instead of forward slashes '/' when defining the file path.
:type path: str
:param full_path: Full path to access the context storage, as it was provided by user.
:type full_path: str
:param _lock: Threading for methods that require single thread access.

"""

def __init__(self) -> None:
pass
def __init__(self, path: str):
_, _, file_path = path.partition("://")
self.full_path = path
self.path = file_path
self._lock = threading.Lock()
pseusys marked this conversation as resolved.
Show resolved Hide resolved
pseusys marked this conversation as resolved.
Show resolved Hide resolved

@abstractmethod
def __getitem__(self, key: str) -> Any:
raise NotImplementedError
def __getitem__(self, key: Hashable) -> Context:
"""
Synchronous method for accessing stored Context.
:param key: Hashable key used to store Context instance.
:type key: Hashable
:returns: The stored context, associated with the given key.
"""
return asyncio.run(self.get_item_async(key))

@abstractmethod
def __setitem__(self, key: str, value: dict) -> None:
async def get_item_async(self, key: Hashable) -> Context:
"""
Asynchronous method for accessing stored Context.
:param key: Hashable key used to store Context instance.
:type key: Hashable
:returns: The stored context, associated with the given key.
"""
raise NotImplementedError

@abstractmethod
def __delitem__(self, key: str) -> None:
raise NotImplementedError
def __setitem__(self, key: Hashable, value: Context):
"""
Synchronous method for storing Context.
:param key: Hashable key used to store Context instance.
:type key: Hashable
:param value: Context to store.
:type value: Context
"""
return asyncio.run(self.set_item_async(key, value))

@abstractmethod
def __contains__(self, key: str) -> bool:
async def set_item_async(self, key: Hashable, value: Context):
"""
Asynchronous method for storing Context.
:param key: Hashable key used to store Context instance.
:type key: Hashable
:param value: Context to store.
:type value: Context
"""
raise NotImplementedError

@abstractmethod
def __len__(self) -> int:
raise NotImplementedError
def __delitem__(self, key: Hashable):
"""
Synchronous method for removing stored Context.
:param key: Hashable key used to identify Context instance for deletion.
:type key: Hashable
"""
return asyncio.run(self.del_item_async(key))

@abstractmethod
def get(self, item) -> Any:
async def del_item_async(self, key: Hashable):
"""
Asynchronous method for removing stored Context.
:param key: Hashable key used to identify Context instance for deletion.
:type key: Hashable
"""
raise NotImplementedError

def __contains__(self, key: Hashable) -> bool:
"""
Synchronous method for finding whether any Context is stored with given key.
:param key: Hashable key used to check if Context instance is stored.
:type key: Hashable
:returns: True if there is Context accessible by given key, False otherwise.
"""
return asyncio.run(self.contains_async(key))

@abstractmethod
def clear(self) -> None:
async def contains_async(self, key: Hashable) -> bool:
"""
Asynchronous method for finding whether any Context is stored with given key.
:param key: Hashable key used to check if Context instance is stored.
:type key: Hashable
:returns: True if there is Context accessible by given key, False otherwise.
"""
raise NotImplementedError

def __len__(self) -> int:
"""
Synchronous method for retrieving number of stored Contexts.
:returns: The number of stored Contexts.
"""
return asyncio.run(self.len_async())

class DBContextStorage(DBAbstractContextStorage):
"""
An intermediate class between the abstract context storage interface,
:py:class:`.DBAbstractContextStorage`, and concrete implementations.

:param path: Parameter `path` should be set with the URI of the database.
It includes a prefix and the required connection credentials.
Example: postgresql://user:password@host:port/database
In the case of classes that save data to hard drive instead of external databases
you need to specify the location of the file, like you do in sqlite.
Keep in mind that in Windows you will have to use double backslashes '\\'
instead of forward slashes '/' when defining the file path.
:type path: str

"""

def __init__(self, path: str):
_, _, file_path = path.partition("://")
self.full_path = path
self.path = file_path
self._lock = threading.Lock()
@abstractmethod
async def len_async(self) -> int:
"""
Asynchronous method for retrieving number of stored Contexts.
:returns: The number of stored Contexts.
"""
raise NotImplementedError

def get(self, key: str, default=None) -> Any:
def get(self, key: Hashable, default: Optional[Context] = None) -> Context:
"""
Synchronous method for accessing stored Context, returning default if no Context is stored with the given key.
:param key: Hashable key used to store Context instance.
:type key: Hashable
:param default: Optional default value to be returned if no Context is found.
:type key: Optional[Context]
:returns: The stored context, associated with the given key or default value.
"""
return asyncio.run(self.get_async(key, default))

async def get_async(self, key: Hashable, default: Optional[Context] = None) -> Context:
"""
Asynchronous method for accessing stored Context, returning default if no Context is stored with the given key.
:param key: Hashable key used to store Context instance.
:type key: Hashable
:param default: Optional default value to be returned if no Context is found.
:type key: Optional[Context]
:returns: The stored context, associated with the given key or default value.
"""
try:
value = self.__getitem__(key)
return await self.get_item_async(str(key))
except KeyError:
value = default
return value
return default

def clear(self):
"""
Synchronous method for clearing context storage, removing all the stored Contexts.
"""
return asyncio.run(self.clear_async())

@abstractmethod
async def clear_async(self):
"""
Asynchronous method for clearing context storage, removing all the stored Contexts.
"""
raise NotImplementedError


def threadsafe_method(func: Callable):
Expand All @@ -96,24 +187,23 @@ def _synchronized(self, *args, **kwargs):
return _synchronized


def context_storage_factory(path: str, **kwargs):
def context_storage_factory(path: str, **kwargs) -> DBContextStorage:
"""
Use context_storage_factory to lazy import context storage types and instantiate them.
The function takes a database connection URI or its equivalent. It should be prefixed with database name,
followed by the symbol triplet '://'.
Then, you should list the connection parameters like this: user:password@host:port/database
The whole URI will then look like this:

- shelve://path_to_the_file/file_name
- json://path_to_the_file/file_name
- pickle://path_to_the_file/file_name
- sqlite://path_to_the_file/file_name
- redis://:pass@localhost:6379/0
- mongodb://admin:pass@localhost:27017/admin
- mysql+pymysql://root:pass@localhost:3307/test
- postgresql://postgres:pass@localhost:5432/test
- grpc://localhost:2136/local
- grpcs://localhost:2135/local
- shelve://path_to_the_file/file_name
- json://path_to_the_file/file_name
- pickle://path_to_the_file/file_name
- sqlite+aiosqlite://path_to_the_file/file_name
- redis://:pass@localhost:6378/0
- mongodb://admin:pass@localhost:27016/admin
- mysql+asyncmy://root:pass@localhost:3306/test
- postgresql+asyncpg://postgres:pass@localhost:5430/test
- grpc://localhost:2134/local
- grpcs://localhost:2134/local

For context storages that write to local files, the function expects a file path instead of connection params:
json://file.json
Expand Down
Loading