Skip to content

Commit

Permalink
Basic sort of working (not in all cases) parallel table implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
JSKenyon committed Feb 23, 2022
1 parent bf40c65 commit cdd2d3f
Showing 1 changed file with 31 additions and 80 deletions.
111 changes: 31 additions & 80 deletions daskms/parallel_table.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,13 @@
from pyrap.tables import table as Table
import logging
import threading
from pyrap.tables import table as Table
from pathlib import Path
from weakref import finalize


log = logging.getLogger(__name__)


# List of CASA Table methods to proxy and the appropriate locking mode
_parallel_methods = [
("getcol",),
("getcolslice",),
("getcolnp",),
("getvarcol",),
("getcell",),
("getcellslice",),
("getkeywords",),
("getcolkeywords",),
]


def parallel_method_factory(method):
"""
Proxy pyrap.tables.table.method calls.
Creates a private implementation function which performs
the call locked according to to ``locktype``.
The private implementation is accessed by a public ``method``
which submits a call to the implementation
on a concurrent.futures.ThreadPoolExecutor.
"""

def _impl(table, args, kwargs):
try:
return getattr(table, method)(*args, **kwargs)
except Exception:
if logging.DEBUG >= log.getEffectiveLevel():
log.exception("Exception in %s", method)
raise

_impl.__name__ = method + "_impl"
_impl.__doc__ = ("Calls table.%s." %
(method))

def public_method(self, *args, **kwargs):
"""
Submits _impl(args, kwargs) to the executor
and returns a Future
"""
return _impl(self._tables[threading.get_ident()], args, kwargs)

public_method.__name__ = method
# public_method.__doc__ = _PROXY_DOCSTRING % method

return public_method


class ParallelTableMetaClass(type):

def __new__(cls, name, bases, dct):
for (method,) in _parallel_methods:
parallel_method = parallel_method_factory(method)
dct[method] = parallel_method

return type.__new__(cls, name, bases, dct)

# def __call__(cls, *args, **kwargs):
# key = arg_hasher((cls,) + args + (kwargs,))

# with _table_lock:
# try:
# return _table_cache[key]
# except KeyError:
# instance = type.__call__(cls, *args, **kwargs)
# _table_cache[key] = instance
# return instance


def _parallel_table_finalizer(_linked_tables):

for table in _linked_tables.values():
Expand All @@ -102,13 +31,39 @@ def __init__(self, *args, **kwargs):

finalize(self, _parallel_table_finalizer, self._linked_tables)

def getcol(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcol(*args, **kwargs)

def getcolslice(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcolslice(*args, **kwargs)

def getcolnp(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcolnp(*args, **kwargs)

def getvarcol(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getvarcol(*args, **kwargs)

def getcell(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcell(*args, **kwargs)

def getcellslice(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcellslice(*args, **kwargs)

thread_id = threading.get_ident()
def getkeywords(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getkeywords(*args, **kwargs)

return self.get_table(thread_id).getcolnp(*args, **kwargs)
def getcolkeywords(self, *args, **kwargs):
table = self._get_table(threading.get_ident())
return table.getcolkeywords(*args, **kwargs)

def get_table(self, thread_id):
def _get_table(self, thread_id):

try:
table = self._linked_tables[thread_id]
Expand All @@ -117,11 +72,7 @@ def get_table(self, thread_id):
table_name = table_path.name
link_path = Path(f"/tmp/{thread_id}-{table_name}")

try:
link_path.symlink_to(table_path)
except FileExistsError: # This should raise a warning.
link_path.unlink()
link_path.symlink_to(table_path)
link_path.symlink_to(table_path)

self._linked_tables[thread_id] = table = Table(
str(link_path),
Expand Down

0 comments on commit cdd2d3f

Please sign in to comment.