From a2c732b3b58afe2df240e7469d6748e099c4f922 Mon Sep 17 00:00:00 2001 From: "K.Y" Date: Mon, 4 Dec 2023 12:52:56 +0800 Subject: [PATCH 1/3] :sparkles: Client-Server Architecture support --- .gitignore | 3 +- flaxkv/__init__.py | 22 +++++++--- flaxkv/__main__.py | 41 ++++++++++++++++++ flaxkv/base.py | 30 +++++++++----- flaxkv/manager.py | 2 +- flaxkv/pack.py | 14 +++++++ flaxkv/serve/__init__.py | 0 flaxkv/serve/app.py | 87 +++++++++++++++++++++++++++++++++++++++ flaxkv/serve/client.py | 65 +++++++++++++++++++++++++++++ flaxkv/serve/interface.py | 23 +++++++++++ flaxkv/serve/manager.py | 30 ++++++++++++++ pyproject.toml | 10 +++-- tests/test_db.py | 19 +++++++++ 13 files changed, 325 insertions(+), 21 deletions(-) create mode 100644 flaxkv/__main__.py create mode 100644 flaxkv/serve/__init__.py create mode 100644 flaxkv/serve/app.py create mode 100644 flaxkv/serve/client.py create mode 100644 flaxkv/serve/interface.py create mode 100644 flaxkv/serve/manager.py diff --git a/.gitignore b/.gitignore index 29532a3..7d4a666 100644 --- a/.gitignore +++ b/.gitignore @@ -3,8 +3,7 @@ CACHE_LMDB CHACHE_SQLITE3.db CACHE_LEVELDB Log - -flaxkv/server +FLAXKV_DB .idea .ipynb_checkpoints diff --git a/flaxkv/__init__.py b/flaxkv/__init__.py index f6ca363..0af9e32 100644 --- a/flaxkv/__init__.py +++ b/flaxkv/__init__.py @@ -13,7 +13,7 @@ # limitations under the License. from .base import LevelDBDict, LMDBDict -from .log import enable_logging +from .serve.client import RemoteDictDB __version__ = "0.1.6" @@ -22,15 +22,27 @@ "dbdict", "LMDBDict", "LevelDBDict", - "enable_logging", + "RemoteDictDB", ] -def dictdb(path, backend='lmdb', rebuild=False, raw=False, **kwargs): +def dictdb( + path_or_url: str, + backend='lmdb', + remote=False, + db_name=None, + rebuild=False, + raw=False, + **kwargs, +): + if remote: + return RemoteDictDB( + path_or_url, db_name=db_name, rebuild=rebuild, backend=backend + ) if backend == 'lmdb': - return LMDBDict(path, rebuild=rebuild, raw=raw, **kwargs) + return LMDBDict(path_or_url, rebuild=rebuild, raw=raw, **kwargs) elif backend == 'leveldb': - return LevelDBDict(path, rebuild=rebuild, raw=raw, **kwargs) + return LevelDBDict(path_or_url, rebuild=rebuild, raw=raw, **kwargs) else: raise ValueError(f"Unsupported DB type {backend}.") diff --git a/flaxkv/__main__.py b/flaxkv/__main__.py new file mode 100644 index 0000000..cbc598f --- /dev/null +++ b/flaxkv/__main__.py @@ -0,0 +1,41 @@ +import os +import platform + +import fire +import uvicorn + + +class Cli: + @staticmethod + def run(port=8000, workers=1, **kwargs): + """ + Runs the application using the Uvicorn server. + + Args: + port (int): The port number on which to run the server. Default is 8000. + workers (int): The number of worker processes to run. Default is 1. + + Returns: + None + """ + + if platform.system() == "Windows": + os.environ["TZ"] = "" + + uvicorn.run( + app="flaxkv.serve.app:app", + host="0.0.0.0", + port=port, + workers=workers, + app_dir="..", + ssl_keyfile=kwargs.get("ssl_keyfile", None), + ssl_certfile=kwargs.get("ssl_certfile", None), + ) + + +def main(): + fire.Fire(Cli) + + +if __name__ == "__main__": + main() diff --git a/flaxkv/base.py b/flaxkv/base.py index ae7dc9b..4c25c1b 100644 --- a/flaxkv/base.py +++ b/flaxkv/base.py @@ -46,7 +46,7 @@ class _enc_prefix: dict = b'd' array = b'a' - def __init__(self, db_type, path, rebuild=False, raw=False, **kwargs): + def __init__(self, db_type, path, rebuild=False, raw=False, db_name=None, **kwargs): """ Initializes the BaseDBDict class which provides a dictionary-like interface to a database. @@ -69,6 +69,7 @@ def __init__(self, db_type, path, rebuild=False, raw=False, **kwargs): else: logger.disable('flaxkv') + self._db_name = db_name self._db_manager = DBManager( db_type=db_type, db_path=path, rebuild=rebuild, **kwargs ) @@ -221,7 +222,10 @@ def get_db_value(self, key: str): Returns: value: The encoded value associated with the key. """ - return self._static_view.get(encode(key)) + if self.raw: + return self._static_view.get(key) + else: + return self._static_view.get(encode(key)) def get_batch(self, keys): """ @@ -356,7 +360,7 @@ def _write_buffer_to_db( self.delete_buffer_set = self.delete_buffer_set - delete_buffer_set_snapshot self.buffer_dict = self._diff_buffer(self.buffer_dict, buffer_dict_snapshot) - self._db_manager.close_view(self._static_view) + self._db_manager.close_static_view(self._static_view) self._static_view = self._db_manager.new_static_view() self._logger.info( f"write {self._db_manager.db_type.upper()} buffer to db successfully-{current_write_num=}-{self._latest_write_num=}" @@ -423,7 +427,10 @@ def pop(self, key, default=None): value = self.buffer_dict.pop(key) return value else: - value = self._static_view.get(encode(key)) + if self.raw: + value = self._static_view.get(key) + else: + value = self._static_view.get(encode(key)) return decode(value) else: return default @@ -444,14 +451,17 @@ def __contains__(self, key): if key in self.delete_buffer_set: return False - return self._static_view.get(encode(key)) is not None + if self.raw: + return self._static_view.get(key) is not None + else: + return self._static_view.get(encode(key)) is not None def clear(self): """ Clears the database and resets the buffer. """ with self._buffer_lock: - self._db_manager.close_view(self._static_view) + self._db_manager.close_static_view(self._static_view) self._db_manager.close() self._close_background_worker(write=False) @@ -500,7 +510,7 @@ def close(self, write=True): """ self._close_background_worker(write=write) - self._db_manager.close_view(self._static_view) + self._db_manager.close_static_view(self._static_view) self._db_manager.close() self._logger.info(f"Closed ({self._db_manager.db_type.upper()}) successfully") @@ -549,7 +559,7 @@ class LMDBDict(BaseDBDict): value: int, float, bool, str, list, dict, and np.ndarray, """ - def __init__(self, path, map_size=1024**3, rebuild=False, **kwargs): + def __init__(self, path: str, map_size=1024**3, rebuild=False, **kwargs): super().__init__( "lmdb", path, max_dbs=1, map_size=map_size, rebuild=rebuild, **kwargs ) @@ -564,7 +574,7 @@ def keys(self): lmdb_keys = set( decode_key(key) for key in cursor.iternext(keys=True, values=False) ) - self._db_manager.close_view(session) + self._db_manager.close_static_view(session) return list(lmdb_keys.union(buffer_keys) - delete_buffer_set) @@ -636,7 +646,7 @@ class LevelDBDict(BaseDBDict): value: int, float, bool, str, list, dict and np.ndarray, """ - def __init__(self, path, rebuild=False, **kwargs): + def __init__(self, path: str, rebuild=False, **kwargs): super().__init__("leveldb", path=path, rebuild=rebuild) def keys(self): diff --git a/flaxkv/manager.py b/flaxkv/manager.py index af1ec27..a5d4652 100644 --- a/flaxkv/manager.py +++ b/flaxkv/manager.py @@ -106,7 +106,7 @@ def new_static_view(self): else: raise ValueError(f"Unsupported DB type {self.db_type}.") - def close_view(self, static_view): + def close_static_view(self, static_view): """ Closes the provided static view of the database. diff --git a/flaxkv/pack.py b/flaxkv/pack.py index 421f7e3..b11bc76 100644 --- a/flaxkv/pack.py +++ b/flaxkv/pack.py @@ -1,3 +1,17 @@ +# Copyright (c) 2023 K.Y. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import msgpack import msgspec.msgpack import numpy as np diff --git a/flaxkv/serve/__init__.py b/flaxkv/serve/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flaxkv/serve/app.py b/flaxkv/serve/app.py new file mode 100644 index 0000000..debc533 --- /dev/null +++ b/flaxkv/serve/app.py @@ -0,0 +1,87 @@ +import msgspec +from litestar import Litestar, MediaType, Request, get, post + +from ..pack import encode +from .interface import AttachRequest, DetachRequest, StructSetData +from .manager import DBManager + +db_manager = DBManager(root_path="./FLAXKV_DB") + + +@post(path="/attach") +async def attach(data: AttachRequest) -> dict: + db = db_manager.get(data.db_name) + if db is None or data.rebuild: + db_manager.set_db( + db_name=data.db_name, backend=data.backend, rebuild=data.rebuild + ) + return {"success": True} + + +@post(path="/detach") +async def detach(data: DetachRequest) -> dict: + db = db_manager.detach(db_name=data.db_name) + if db is None: + return {"success": False, "info": "db not found"} + return {"success": True} + + +@post(path="/set_raw") +async def set_raw(db_name: str, request: Request) -> dict: + print(f"{db_name=}") + db = db_manager.get(db_name) + if db is None: + return {"success": False, "info": "db not found"} + data = await request.body() + data = msgspec.msgpack.decode(data, type=StructSetData) + db[data.key] = data.value + db.write_immediately(write=True, wait=True) + return {"success": True} + + +@post("/get_raw", media_type=MediaType.TEXT) +async def get_raw(db_name: str, request: Request) -> bytes: + db = db_manager.get(db_name) + if db is None: + return encode({"success": False, "info": "db not found"}) + key = await request.body() + value = db.get(key) + if value is None: + return encode(None) + return db.get(key) + + +@post("/contains", media_type=MediaType.TEXT) +async def contains(db_name: str, request: Request) -> bytes: + db = db_manager.get(db_name) + if db is None: + return encode({"success": False, "info": "db not found"}) + key = await request.body() + is_contain = key in db + return encode(is_contain) + + +@get("/keys") +async def get_keys(db_name: str) -> dict: + db = db_manager.get(db_name) + if db is None: + return {"success": False, "info": "db not found"} + return {"keys": list(db.keys())} + + +def on_shutdown(): + print("on_shutdown") + + +app = Litestar( + route_handlers=[ + attach, + detach, + set_raw, + get_raw, + contains, + get_keys, + ], + on_startup=[lambda: print("on_startup")], + on_shutdown=[on_shutdown], +) diff --git a/flaxkv/serve/client.py b/flaxkv/serve/client.py new file mode 100644 index 0000000..567dc44 --- /dev/null +++ b/flaxkv/serve/client.py @@ -0,0 +1,65 @@ +import httpx + +from ..pack import decode, encode + + +class RemoteDictDB: + def __init__( + self, url: str, db_name: str, rebuild=False, backend="leveldb", timeout=6 + ): + self._url = url + self._db_name = db_name + self._client = httpx.Client(timeout=timeout) + self._attach_db(rebuild=rebuild, backend=backend) + + def _attach_db(self, rebuild=False, backend="lmdb"): + url = f"{self._url}/attach" + response = self._client.post( + url, json={"db_name": self._db_name, "backend": backend, "rebuild": rebuild} + ) + return response.json() + + def detach_db(self, db_name=None): + if db_name is None: + db_name = self._db_name + + url = f"{self._url}/detach" + response = self._client.post(url, json={"db_name": db_name}) + return response.json() + + def get(self, key): + url = f"{self._url}/get_raw?db_name={self._db_name}" + response = self._client.post(url, data=encode(key)) + return decode(response.read()) + + def set(self, key, value): + url = f"{self._url}/set_raw?db_name={self._db_name}" + data = {"key": encode(key), "value": encode(value)} + response = self._client.post(url, data=encode(data)) + return response.read() + + def keys(self): + url = f"{self._url}/keys?db_name={self._db_name}" + response = self._client.get(url) + return response.json()["keys"] + + def __contains__(self, key): + url = f"{self._url}/contains?db_name={self._db_name}" + response = self._client.post(url, data=encode(key)) + return decode(response.read()) + + def setdefault(self, key, default=None): + value = self.get(key) + if value is None: + self.set(key, default) + return default + return value + + def __setitem__(self, key, value): + self.set(key, value) + + def __getitem__(self, key): + value = self.get(key) + if value is None: + raise KeyError(f"Key `{key}` not found in the database.") + return value diff --git a/flaxkv/serve/interface.py b/flaxkv/serve/interface.py new file mode 100644 index 0000000..a2a9cb9 --- /dev/null +++ b/flaxkv/serve/interface.py @@ -0,0 +1,23 @@ +from dataclasses import dataclass +from typing import Annotated, Any + +import msgspec +from litestar.enums import RequestEncodingType +from litestar.params import Body + + +@dataclass +class AttachRequest: + db_name: str + backend: str + rebuild: bool + + +@dataclass +class DetachRequest: + db_name: str + + +class StructSetData(msgspec.Struct): + key: bytes + value: bytes diff --git a/flaxkv/serve/manager.py b/flaxkv/serve/manager.py new file mode 100644 index 0000000..57cca1d --- /dev/null +++ b/flaxkv/serve/manager.py @@ -0,0 +1,30 @@ +from pathlib import Path + +from .. import dictdb + + +class DBManager: + def __init__(self, root_path="./FLAXKV_DB"): + self._db_dict = {} + self._root_path = Path(root_path) + if not self._root_path.exists(): + self._root_path.mkdir(parents=True) + + def detach(self, db_name: str): + self._db_dict.pop(db_name, None) + + def set_db(self, db_name: str, backend: str, rebuild: bool): + db_path = self._root_path / db_name + self._db_dict[db_name] = dictdb( + path_or_url=db_path.__str__(), + backend=backend, + rebuild=rebuild, + raw=True, + log=True, + ) + + def get(self, db_name: str, raise_key_error=False): + if raise_key_error: + return self._db_dict[db_name] + else: + return self._db_dict.get(db_name, None) diff --git a/pyproject.toml b/pyproject.toml index 279b84e..283582e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "flaxkv" -description = "A persistent database masquerading as a dictionary. Simple and high-performance persistent database solutions." +description = "A high-performance dictionary database." authors = [ { name = "K.Y", email = "beidongjiedeguang@gmail.com" }, ] @@ -52,6 +52,10 @@ database = [ "plyvel>=1.5.0", ] +server = [ + "litestar", +] + leveldb = [ "plyvel>=1.5.0", @@ -59,8 +63,8 @@ leveldb = [ -#[project.scripts] -#flaxkv = "flaxkv.__main__:main" +[project.scripts] +flaxkv = "flaxkv.__main__:main" [tool.hatch.version] path = "flaxkv/__init__.py" diff --git a/tests/test_db.py b/tests/test_db.py index c763294..d925eec 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -106,6 +106,25 @@ def test_numpy_array(temp_db): assert np.array_equal(temp_db[key], value) +def test_large_value(temp_db): + if isinstance(temp_db, type(None)): + pytest.skip("Skipping") + + target_dict = { + 'l1': ["test " for i in range(100 * 100)], + 'l2': ["test " for i in range(100 * 100 * 100)], + } + + for key, value in target_dict.items(): + temp_db[key] = value + + for key, value in target_dict.items(): + assert temp_db[key] == value + temp_db.write_immediately(wait=True) + for key, value in target_dict.items(): + assert temp_db[key] == value + + def test_setdefault(temp_db): if isinstance(temp_db, type(None)): pytest.skip("Skipping") From cea819b427c8a6ffbf7e295f5c0dfee568217cbf Mon Sep 17 00:00:00 2001 From: "K.Y" Date: Mon, 4 Dec 2023 12:55:51 +0800 Subject: [PATCH 2/3] fix --- flaxkv/serve/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flaxkv/serve/manager.py b/flaxkv/serve/manager.py index 57cca1d..c9a33dc 100644 --- a/flaxkv/serve/manager.py +++ b/flaxkv/serve/manager.py @@ -11,7 +11,7 @@ def __init__(self, root_path="./FLAXKV_DB"): self._root_path.mkdir(parents=True) def detach(self, db_name: str): - self._db_dict.pop(db_name, None) + return self._db_dict.pop(db_name, None) def set_db(self, db_name: str, backend: str, rebuild: bool): db_path = self._root_path / db_name From caaa9b3ea9c7dc835581a57cbdd65c6c21fe355f Mon Sep 17 00:00:00 2001 From: "K.Y" Date: Mon, 4 Dec 2023 12:59:31 +0800 Subject: [PATCH 3/3] Update README --- README.md | 5 ++++- README_ZH.md | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 2087681..454ab4c 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ intuitive manner. You can use it just like a Python dictionary without worrying ## TODO -- [ ] Client-Server Architecture +- [x] Client-Server Architecture - [ ] Benchmark --- @@ -85,6 +85,9 @@ from flaxkv import dictdb import numpy as np db = dictdb('./test_db') +# or run server `flaxkv run --port 8000`, then: +# db = dictdb('http://localhost:8000', remote=True) + db[1] = 1 db[1.1] = 1 / 3 db['key'] = 'value' diff --git a/README_ZH.md b/README_ZH.md index 9dbe694..606a2ee 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -72,7 +72,7 @@ ## TODO -- [ ] 客户端-服务器架构 +- [x] 客户端-服务器架构 - [ ] 性能测试 --- @@ -90,6 +90,9 @@ from flaxkv import dictdb import numpy as np db = dictdb('./test_db') +# or run server `flaxkv run --port 8000`, then: +# db = dictdb('http://localhost:8000', remote=True) + db[1] = 1 db[1.1] = 1 / 3 db['key'] = 'value'