Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration to async DB drivers #60

Merged
merged 55 commits into from
Feb 13, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
e8f8bb7
async and sync conflict
pseusys Dec 27, 2022
67bd37d
github docs building reverted
pseusys Dec 27, 2022
93c04b9
lint updated
pseusys Dec 27, 2022
ab5d288
ydb transferred to sync driver
pseusys Jan 12, 2023
8b31209
refactor/async_db_drivers_migration: fix doc
kudep Jan 15, 2023
cf15f4a
refactor/async_db_drivers_migration: upd type hints in database
kudep Jan 15, 2023
61befdc
refactor/async_db_drivers_migration: merge
kudep Jan 15, 2023
ef750f3
context storages method signatures and type hints updated
pseusys Jan 15, 2023
21457d7
Merge remote-tracking branch 'origin/refactor/async_db_drivers_migrat…
pseusys Jan 15, 2023
3003208
validator updated, linted
pseusys Jan 15, 2023
b4de818
test bugs fixed
pseusys Jan 15, 2023
3952e58
postgres deprecation warning fix
pseusys Jan 15, 2023
86cdd83
DBAbstractContextStorage merged with DBContextStorage
pseusys Jan 15, 2023
04f0baa
tests passing
pseusys Jan 15, 2023
b87bb57
docs building fixed
pseusys Jan 15, 2023
17a3fc6
deleted obsolete, skipped tests info added
pseusys Jan 16, 2023
1b3757d
pytest full added skipped info
pseusys Jan 16, 2023
0a90d92
Merge branch 'dev' into refactor/async_db_drivers_migration
pseusys Jan 16, 2023
5957b97
attempt to fix ydb skipping
pseusys Jan 16, 2023
49ddef0
Merge remote-tracking branch 'origin/refactor/async_db_drivers_migrat…
pseusys Jan 16, 2023
2351209
venv blocking returned
pseusys Jan 16, 2023
f1a0406
installed libraries check added
pseusys Jan 16, 2023
3d859bf
installed libraries further check added
pseusys Jan 16, 2023
cca6e33
ydb six installed
pseusys Jan 16, 2023
68d8c0f
ydb async
pseusys Jan 16, 2023
937db36
async ydb error fixed
pseusys Jan 17, 2023
d7a9d46
table creation fixed
pseusys Jan 27, 2023
cd1374e
methods renamed + deprecation warning fixed
pseusys Jan 27, 2023
060facd
table-dropping script added
pseusys Jan 28, 2023
bcfa7d5
shelve explicit synchronization added
pseusys Jan 28, 2023
e46347c
deleting db files if they exist only
pseusys Jan 28, 2023
8a884ff
docstring warning fixed
pseusys Feb 2, 2023
873873b
`DBAbstractContextStorage` documentation added
pseusys Feb 2, 2023
b1b8c2f
JSON and Pickle dependencies management updated
pseusys Feb 2, 2023
7776df6
sqlalchemy extracted from dependencies
pseusys Feb 2, 2023
c8b85ad
attempt to fix coverage
pseusys Feb 2, 2023
a07dff7
... linted
pseusys Feb 2, 2023
d3a0ab3
test coverage lowered
pseusys Feb 4, 2023
65e18ae
merged with dev
pseusys Feb 4, 2023
e5e3c89
reviewed changes revisited
pseusys Feb 9, 2023
f98b6ef
test coverage magic✨✨
pseusys Feb 9, 2023
2704ee1
sqlalchemy version updated, one of the warnings fixed
pseusys Feb 9, 2023
0026244
`__init__.py`'s reformatted + coverage raised (by fixing concurrency …
pseusys Feb 10, 2023
9564254
casts removed
pseusys Feb 10, 2023
f2ead44
indentation issue fixed
pseusys Feb 10, 2023
ff9db29
database fields docs moved to `__init__()`
pseusys Feb 10, 2023
90144c3
pragma reversed
pseusys Feb 10, 2023
142e008
attempt to fix windows 8 stacking issue
pseusys Feb 10, 2023
9b4628d
attempt to fix windows python8 stacking issue
pseusys Feb 10, 2023
f4bd682
attempt to fix windows python8 hanging issue (correct now)
pseusys Feb 10, 2023
46634e5
refactor/async_db_drivers_migration: upd cover threshhold
kudep Feb 13, 2023
0e42724
pytest plugins versions updated
pseusys Feb 13, 2023
8fa79be
.coveragerc added to special excluded files list
pseusys Feb 13, 2023
2914a10
dependency and workflow changes reverted
pseusys Feb 13, 2023
4af1868
Update setup.py
kudep Feb 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ from dff.script import Context, Actor
from dff.context_storages import SQLContextStorage
from .script import some_df_script

db = SQLContextStorage("postgresql://user:password@host:port/dbname")
db = SQLContextStorage("postgresql+asyncpg://user:password@host:port/dbname")

actor = Actor(some_df_script, start_label=("root", "start"), fallback_label=("root", "fallback"))

Expand Down
5 changes: 5 additions & 0 deletions dff/context_storages/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# -*- coding: utf-8 -*-
# flake8: noqa: F401
# fmt: off
pseusys marked this conversation as resolved.
Show resolved Hide resolved

import nest_asyncio

nest_asyncio.apply()

from .database import DBAbstractContextStorage, DBContextStorage, threadsafe_method, context_storage_factory
from .json import JSONContextStorage
Expand Down
81 changes: 53 additions & 28 deletions dff/context_storages/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
- Abstract context storage interface :py:class:`.DBAbstractContextStorage`.
- An intermediate class to inherit from: :py:class:`.DBContextStorage`
"""
import asyncio
import importlib
import threading
from functools import wraps
from abc import ABC, abstractmethod
from typing import Any, Callable
from typing import Any, Callable, Optional

from .protocol import PROTOCOLS
from ..script import Context


class DBAbstractContextStorage(ABC):
pseusys marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -24,32 +26,53 @@ class DBAbstractContextStorage(ABC):
def __init__(self) -> None:
pass

def __getitem__(self, key: Any) -> Context:
return asyncio.run(self.getitem(key))

@abstractmethod
def __getitem__(self, key: str) -> Any:
async def getitem(self, key: Any) -> Context:
raise NotImplementedError

def __setitem__(self, key: Any, value: Context):
return asyncio.run(self.setitem(key, value))

@abstractmethod
def __setitem__(self, key: str, value: dict) -> None:
async def setitem(self, key: Any, value: Context):
pseusys marked this conversation as resolved.
Show resolved Hide resolved
raise NotImplementedError

def __delitem__(self, key: str):
return asyncio.run(self.delitem(key))

@abstractmethod
def __delitem__(self, key: str) -> None:
async def delitem(self, key: str):
raise NotImplementedError

@abstractmethod
def __contains__(self, key: str) -> bool:
raise NotImplementedError
return asyncio.run(self.contains(key))

@abstractmethod
async def contains(self, key: str) -> bool:
raise NotImplementedError

def __len__(self) -> int:
return asyncio.run(self.len())

@abstractmethod
async def len(self) -> int:
pseusys marked this conversation as resolved.
Show resolved Hide resolved
raise NotImplementedError

def get(self, key: Any, default: Optional[Context] = None) -> Optional[Context]:
return asyncio.run(self.get_async(key, default))

@abstractmethod
def get(self, item) -> Any:
async def get_async(self, key: Any, default: Optional[Context] = None) -> Optional[Context]:
raise NotImplementedError

def clear(self):
return asyncio.run(self.clear_async())

@abstractmethod
def clear(self) -> None:
async def clear_async(self):
raise NotImplementedError


Expand All @@ -58,13 +81,16 @@ class DBContextStorage(DBAbstractContextStorage):
An intermediate class between the abstract context storage interface,
:py:class:`.DBAbstractContextStorage`, and concrete implementations.

:param path: Parameter `path` should be set with the URI of the database.
It includes a prefix and the required connection credentials.
Example: postgresql://user:password@host:port/database
In the case of classes that save data to hard drive instead of external databases
you need to specify the location of the file, like you do in sqlite.
Keep in mind that in Windows you will have to use double backslashes '\\'
instead of forward slashes '/' when defining the file path.
Parameters
----------
: param path:
| Parameter `path` should be set with the URI of the database.
| It includes a prefix and the required connection credentials.
| Example: postgresql+asyncpg://user:password@host:port/database
| In the case of classes that save data to hard drive instead of external databases
| you need to specify the location of the file, like you do in sqlite.
| Keep in mind that in Windows you will have to use double backslashes '\\'
| instead of forward slashes '/' when defining the file path.
:type path: str

"""
Expand All @@ -75,12 +101,12 @@ def __init__(self, path: str):
self.path = file_path
self._lock = threading.Lock()

def get(self, key: str, default=None) -> Any:
async def get_async(self, key: Any, default=None):
key = str(key)
try:
value = self.__getitem__(key)
return await self.getitem(key)
except KeyError:
value = default
return value
return default


def threadsafe_method(func: Callable):
Expand All @@ -96,24 +122,23 @@ def _synchronized(self, *args, **kwargs):
return _synchronized


def context_storage_factory(path: str, **kwargs):
def context_storage_factory(path: str, **kwargs) -> DBAbstractContextStorage:
"""
Use context_storage_factory to lazy import context storage types and instantiate them.
The function takes a database connection URI or its equivalent. It should be prefixed with database name,
followed by the symbol triplet '://'.
Then, you should list the connection parameters like this: user:password@host:port/database
The whole URI will then look like this:

- shelve://path_to_the_file/file_name
- json://path_to_the_file/file_name
- pickle://path_to_the_file/file_name
- sqlite://path_to_the_file/file_name
- redis://:pass@localhost:6379/0
- mongodb://admin:pass@localhost:27017/admin
- mysql+pymysql://root:pass@localhost:3307/test
- postgresql://postgres:pass@localhost:5432/test
- grpc://localhost:2136/local
- grpcs://localhost:2135/local
- sqlite+aiosqlite://path_to_the_file/file_name
- redis://:pass@localhost:6378/0
- mongodb://admin:pass@localhost:27016/admin
- mysql+asyncmy://root:pass@localhost:3306/test
- postgresql+asyncpg://postgres:pass@localhost:5430/test
- grpc://localhost:2134/local
- grpcs://localhost:2134/local

For context storages that write to local files, the function expects a file path instead of connection params:
json://file.json
Expand Down
52 changes: 24 additions & 28 deletions dff/context_storages/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
----
Provides the json-based version of the :py:class:`.DBContextStorage`.
"""
import os
import asyncio
from typing import Any

import aiofiles
import aiofiles.os
from pydantic import BaseModel, Extra, root_validator

from .database import DBContextStorage, threadsafe_method
Expand All @@ -30,57 +33,50 @@ class JSONContextStorage(DBContextStorage):
def __init__(self, path: str):
DBContextStorage.__init__(self, path)

self._load()

def get(self, key: str, default=None):
key = str(key)
try:
return self.__getitem__(key)
except KeyError:
return default
asyncio.run(self._load())

@threadsafe_method
def __len__(self):
async def len(self):
return len(self.storage.__dict__)

@threadsafe_method
def __setitem__(self, key: str, item: Context) -> None:
async def setitem(self, key: Any, item: Context):
key = str(key)
self.storage.__dict__.__setitem__(key, item)
self._save()
await self._save()
RLKRo marked this conversation as resolved.
Show resolved Hide resolved

@threadsafe_method
def __getitem__(self, key: str) -> Context:
async def getitem(self, key: Any) -> Context:
key = str(key)
self._load()
await self._load()
value = self.storage.__dict__.__getitem__(key)
return Context.cast(value)

@threadsafe_method
def __delitem__(self, key: str) -> None:
async def delitem(self, key: str) -> None:
key = str(key)
self.storage.__dict__.__delitem__(key)
self._save()
await self._save()

@threadsafe_method
def __contains__(self, key: str) -> bool:
async def contains(self, key: str) -> bool:
key = str(key)
self._load()
await self._load()
return self.storage.__dict__.__contains__(key)

@threadsafe_method
def clear(self) -> None:
async def clear_async(self) -> None:
self.storage.__dict__.clear()
self._save()
await self._save()

def _save(self) -> None:
with open(self.path, "w+", encoding="utf-8") as file_stream:
file_stream.write(self.storage.json())
async def _save(self) -> None:
async with aiofiles.open(self.path, "w+", encoding="utf-8") as file_stream:
await file_stream.write(self.storage.json())

def _load(self) -> None:
if not os.path.isfile(self.path) or os.stat(self.path).st_size == 0:
async def _load(self) -> None:
if not await aiofiles.os.path.isfile(self.path) or (await aiofiles.os.stat(self.path)).st_size == 0:
self.storage = SerializeableStorage()
self._save()
await self._save()
else:
with open(self.path, "r", encoding="utf-8") as file_stream:
self.storage = SerializeableStorage.parse_raw(file_stream.read())
async with aiofiles.open(self.path, "r", encoding="utf-8") as file_stream:
self.storage = SerializeableStorage.parse_raw(await file_stream.read())
29 changes: 15 additions & 14 deletions dff/context_storages/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
-----
Provides the mongo-based version of the :py:class:`.DBContextStorage`.
"""
from typing import Any

try:
from motor.motor_asyncio import AsyncIOMotorClient
from bson.objectid import ObjectId
from pymongo import MongoClient

mongo_available = True
except ImportError:
Expand Down Expand Up @@ -35,7 +36,7 @@ def __init__(self, path: str, collection: str = "context_collection"):
if not mongo_available:
install_suggestion = get_protocol_install_suggestion("mongodb")
raise ImportError("`mongodb` package is missing.\n" + install_suggestion)
self._mongo = MongoClient(self.full_path)
self._mongo = AsyncIOMotorClient(self.full_path)
db = self._mongo.get_default_database()
self.collection = db[collection]

Expand All @@ -48,38 +49,38 @@ def _adjust_key(key: str):
return {"_id": ObjectId(new_key)}

@threadsafe_method
def __setitem__(self, key: str, value: Context) -> None:
async def setitem(self, key: Any, value: Context) -> None:
new_key = self._adjust_key(key)
value = value if isinstance(value, Context) else Context.cast(value)
document = json.loads(value.json())

document.update(new_key)
self.collection.replace_one(new_key, document, upsert=True)
await self.collection.replace_one(new_key, document, upsert=True)

@threadsafe_method
def __getitem__(self, key: str) -> Context:
async def getitem(self, key: Any) -> Context:
adjust_key = self._adjust_key(key)
document = self.collection.find_one(adjust_key)
document = await self.collection.find_one(adjust_key)
if document:
document.pop("_id")
ctx = Context.cast(document)
return ctx
raise KeyError

@threadsafe_method
def __delitem__(self, key: str) -> None:
async def delitem(self, key: str) -> None:
adjust_key = self._adjust_key(key)
self.collection.delete_one(adjust_key)
await self.collection.delete_one(adjust_key)

@threadsafe_method
def __contains__(self, key: str) -> bool:
async def contains(self, key: str) -> bool:
adjust_key = self._adjust_key(key)
return bool(self.collection.find_one(adjust_key))
return bool(await self.collection.find_one(adjust_key))

@threadsafe_method
def __len__(self) -> int:
return self.collection.estimated_document_count()
async def len(self) -> int:
return await self.collection.estimated_document_count()

@threadsafe_method
def clear(self) -> None:
self.collection.delete_many(dict())
async def clear_async(self) -> None:
await self.collection.delete_many(dict())
Loading