Skip to content

Commit

Permalink
crazy ideas
Browse files Browse the repository at this point in the history
  • Loading branch information
TomHodson committed Oct 18, 2024
1 parent 5f7eec7 commit 00ed05c
Showing 1 changed file with 104 additions and 75 deletions.
179 changes: 104 additions & 75 deletions pyfdb/pyfdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
# limitations under the License.
import io
import json
import multiprocessing
import os
from functools import wraps

import cffi
import findlibs
Expand All @@ -32,6 +32,9 @@ class FDBException(RuntimeError):
pass


fdb_loaded_in_this_process = False


class PatchedLib:
"""
Patch a CFFI library with error handling
Expand All @@ -40,14 +43,16 @@ class PatchedLib:
and patches the accessors with automatic python-C error handling.
"""

def __init__(self):
self.path = findlibs.find("fdb5")
def __init__(self, libpath=None):
self.path = libpath or findlibs.find("fdb5")

if self.path is None:
raise RuntimeError("FDB5 library not found")

ffi.cdef(self.__read_header())
self.__lib = ffi.dlopen(self.path)
global fdb_loaded_in_this_process
fdb_loaded_in_this_process = True

# All of the executable members of the CFFI-loaded library are functions in the FDB
# C API. These should be wrapped with the correct error handling. Otherwise forward
Expand Down Expand Up @@ -102,15 +107,63 @@ def __repr__(self):
return f"<pyfdb.pyfdb.PatchedLib FDB5 version {self.version} from {self.path}>"


# Bootstrap the library
class RemotePatchedLib:
""" """

def __init__(self, libpath):
self.libpath = libpath
self.queue = multiprocessing.Queue() # Queue for communicating with the worker process
self.process = multiprocessing.Process(target=self._worker, args=(self.queue, libpath))
self.process.start()

@classmethod
def _worker(cls, queue, libpath):
"""Worker process that loads the library and handles method calls."""

while True:
lib = PatchedLib(libpath)

method_name, args, kwargs = queue.get() # Receive method calls
if method_name == "TERMINATE":
break # Cleanly exit the worker process

try:
method = getattr(lib, method_name)
result = method(*args, **kwargs)
queue.put(("RESULT", result)) # Send the result back
except Exception as e:
queue.put(("ERROR", str(e))) # Send error message back

def __getattr__(self, attr):
"""Proxy method calls to the worker process."""

def method_proxy(*args, **kwargs):
# Send the method call to the worker process
self.queue.put((attr, args, kwargs))

# Wait for the result
result_type, result = self.queue.get()

if result_type == "ERROR":
raise RuntimeError(f"Error in method {attr}: {result}")
return result

return method_proxy

def close(self):
"""Cleanly shut down the worker process."""
self.queue.put(("TERMINATE", (), {}))
self.process.join()

lib = PatchedLib()
def __repr__(self):
return "<RemotePatchedLib with separate process handling>"


class Key:
__key = None

def __init__(self, keys):
def __init__(self, lib, keys):
self.lib = lib
key = ffi.new("fdb_key_t**")
lib.fdb_new_key(key)
# Set free function
Expand All @@ -120,7 +173,7 @@ def __init__(self, keys):
self.set(k, v)

def set(self, param, value):
lib.fdb_key_add(
self.lib.fdb_key_add(
self.__key,
ffi.new("const char[]", param.encode("ascii")),
ffi.new("const char[]", value.encode("ascii")),
Expand All @@ -134,11 +187,12 @@ def ctype(self):
class Request:
__request = None

def __init__(self, request):
def __init__(self, lib, request):
self.lib = lib
newrequest = ffi.new("fdb_request_t**")

# we assume a retrieve request represented as a dictionary
lib.fdb_new_request(newrequest)
self.lib.fdb_new_request(newrequest)
self.__request = ffi.gc(newrequest[0], lib.fdb_delete_request)

for name, values in request.items():
Expand All @@ -155,15 +209,15 @@ def value(self, name, values):
cval = ffi.new("const char[]", value.encode("ascii"))
cvals.append(cval)

lib.fdb_request_add(
self.lib.fdb_request_add(
self.__request,
ffi.new("const char[]", name.encode("ascii")),
ffi.new("const char*[]", cvals),
len(values),
)

def expand(self):
lib.fdb_expand_request(self.__request)
self.lib.fdb_expand_request(self.__request)

@property
def ctype(self):
Expand All @@ -174,10 +228,11 @@ class ListIterator:
__iterator = None
__key = False

def __init__(self, fdb, request, duplicates, key=False, expand=True):
def __init__(self, lib: PatchedLib | RemotePatchedLib, fdb: "FDB", request, duplicates, key=False, expand=True):
self.lib = lib
iterator = ffi.new("fdb_listiterator_t**")
if request:
req = Request(request)
req = Request(lib, request)
if expand:
req.expand()
lib.fdb_list(fdb.ctype, req.ctype, iterator, duplicates)
Expand All @@ -192,27 +247,27 @@ def __init__(self, fdb, request, duplicates, key=False, expand=True):
self.len = ffi.new("size_t*")

def __next__(self) -> dict:
err = lib.fdb_listiterator_next(self.__iterator)
err = self.lib.fdb_listiterator_next(self.__iterator)

if err != 0:
raise StopIteration

lib.fdb_listiterator_attrs(self.__iterator, self.path, self.off, self.len)
self.lib.fdb_listiterator_attrs(self.__iterator, self.path, self.off, self.len)
el = dict(path=ffi.string(self.path[0]).decode("utf-8"), offset=self.off[0], length=self.len[0])

if self.__key:
splitkey = ffi.new("fdb_split_key_t**")
lib.fdb_new_splitkey(splitkey)
key = ffi.gc(splitkey[0], lib.fdb_delete_splitkey)
self.lib.fdb_new_splitkey(splitkey)
key = ffi.gc(splitkey[0], self.lib.fdb_delete_splitkey)

lib.fdb_listiterator_splitkey(self.__iterator, key)
self.lib.fdb_listiterator_splitkey(self.__iterator, key)

k = ffi.new("const char**")
v = ffi.new("const char**")
level = ffi.new("size_t*")

meta = dict()
while lib.fdb_splitkey_next_metadata(key, k, v, level) == 0:
while self.lib.fdb_splitkey_next_metadata(key, k, v, level) == 0:
meta[ffi.string(k[0]).decode("utf-8")] = ffi.string(v[0]).decode("utf-8")
el["keys"] = meta

Expand All @@ -226,31 +281,32 @@ class DataRetriever(io.RawIOBase):
__dataread = None
__opened = False

def __init__(self, fdb, request, expand=True):
def __init__(self, lib, fdb, request, expand=True):
self.lib = lib
dataread = ffi.new("fdb_datareader_t **")
lib.fdb_new_datareader(dataread)
self.__dataread = ffi.gc(dataread[0], lib.fdb_delete_datareader)
req = Request(request)
self.lib.fdb_new_datareader(dataread)
self.__dataread = ffi.gc(dataread[0], self.lib.fdb_delete_datareader)
req = Request(self.lib, request)
if expand:
req.expand()
lib.fdb_retrieve(fdb.ctype, req.ctype, self.__dataread)
self.lib.fdb_retrieve(fdb.ctype, req.ctype, self.__dataread)

mode = "rb"

def open(self):
if not self.__opened:
self.__opened = True
lib.fdb_datareader_open(self.__dataread, ffi.NULL)
self.lib.fdb_datareader_open(self.__dataread, ffi.NULL)

def close(self):
if self.__opened:
self.__opened = False
lib.fdb_datareader_close(self.__dataread)
self.lib.fdb_datareader_close(self.__dataread)

def skip(self, count):
self.open()
if isinstance(count, int):
lib.fdb_datareader_skip(self.__dataread, count)
self.lib.fdb_datareader_skip(self.__dataread, count)

def seek(self, where, whence=io.SEEK_SET):
if whence != io.SEEK_SET:
Expand All @@ -259,20 +315,20 @@ def seek(self, where, whence=io.SEEK_SET):
)
self.open()
if isinstance(where, int):
lib.fdb_datareader_seek(self.__dataread, where)
self.lib.fdb_datareader_seek(self.__dataread, where)

def tell(self):
self.open()
where = ffi.new("long*")
lib.fdb_datareader_tell(self.__dataread, where)
self.lib.fdb_datareader_tell(self.__dataread, where)
return where[0]

def read(self, count=-1) -> bytes:
self.open()
if isinstance(count, int):
buf = bytearray(count)
read = ffi.new("long*")
lib.fdb_datareader_read(self.__dataread, ffi.from_buffer(buf), count, read)
self.lib.fdb_datareader_read(self.__dataread, ffi.from_buffer(buf), count, read)
return buf[0 : read[0]]
return bytearray()

Expand All @@ -296,7 +352,14 @@ class FDB:

__fdb = None

def __init__(self, config=None, user_config=None):
def __init__(self, config=None, user_config=None, libpath=None):
global fdb_loaded_in_this_process
if fdb_loaded_in_this_process:
print("Loading second fdb library in a sandbox process")
self.lib = RemotePatchedLib(libpath)
else:
self.lib = PatchedLib(libpath)

fdb = ffi.new("fdb_handle_t**")

if config is not None or user_config is not None:
Expand All @@ -311,16 +374,16 @@ def prepare_config(c):
config = prepare_config(config)
user_config = prepare_config(user_config)

lib.fdb_new_handle_from_yaml(
self.lib.fdb_new_handle_from_yaml(
fdb,
ffi.new("const char[]", config.encode("utf-8")),
ffi.new("const char[]", user_config.encode("utf-8")),
)
else:
lib.fdb_new_handle(fdb)
self.lib.fdb_new_handle(fdb)

# Set free function
self.__fdb = ffi.gc(fdb[0], lib.fdb_delete_handle)
self.__fdb = ffi.gc(fdb[0], self.lib.fdb_delete_handle)

def archive(self, data, request=None) -> None:
"""Archive data into the FDB5 database
Expand All @@ -331,13 +394,15 @@ def archive(self, data, request=None) -> None:
if not provided the key will be constructed from the data.
"""
if request is None:
lib.fdb_archive_multiple(self.ctype, ffi.NULL, ffi.from_buffer(data), len(data))
self.lib.fdb_archive_multiple(self.ctype, ffi.NULL, ffi.from_buffer(data), len(data))
else:
lib.fdb_archive_multiple(self.ctype, Request(request).ctype, ffi.from_buffer(data), len(data))
self.lib.fdb_archive_multiple(
self.ctype, Request(self.lib, request).ctype, ffi.from_buffer(data), len(data)
)

def flush(self) -> None:
"""Flush any archived data to disk"""
lib.fdb_flush(self.ctype)
self.lib.fdb_flush(self.ctype)

def list(self, request=None, duplicates=False, keys=False) -> ListIterator:
"""List entries in the FDB5 database
Expand All @@ -350,7 +415,7 @@ def list(self, request=None, duplicates=False, keys=False) -> ListIterator:
Returns:
ListIterator: an iterator over the entries.
"""
return ListIterator(self, request, duplicates, keys)
return ListIterator(self.lib, self, request, duplicates, keys)

def retrieve(self, request) -> DataRetriever:
"""Retrieve data as a stream.
Expand All @@ -361,44 +426,8 @@ def retrieve(self, request) -> DataRetriever:
Returns:
DataRetriever: An object implementing a file-like interface to the data stream.
"""
return DataRetriever(self, request)
return DataRetriever(self.lib, self, request)

@property
def ctype(self):
return self.__fdb


fdb = None


# Use functools.wraps to copy over the docstring from FDB.xxx to the module level functions
@wraps(FDB.archive)
def archive(data) -> None:
global fdb
if not fdb:
fdb = FDB()
fdb.archive(data)


@wraps(FDB.list)
def list(request, duplicates=False, keys=False) -> ListIterator:
global fdb
if not fdb:
fdb = FDB()
return ListIterator(fdb, request, duplicates, keys)


@wraps(FDB.retrieve)
def retrieve(request) -> DataRetriever:
global fdb
if not fdb:
fdb = FDB()
return DataRetriever(fdb, request)


@wraps(FDB.flush)
def flush():
global fdb
if not fdb:
fdb = FDB()
return fdb.flush()

0 comments on commit 00ed05c

Please sign in to comment.