diff --git a/dev-environment.yml b/dev-environment.yml index 3e35226..e7f9d2c 100644 --- a/dev-environment.yml +++ b/dev-environment.yml @@ -18,6 +18,7 @@ name: appose-dev channels: - conda-forge - defaults + - forklift dependencies: - python >= 3.10 # Developer tools diff --git a/src/appose/__init__.py b/src/appose/__init__.py index 93c44c3..db1141a 100644 --- a/src/appose/__init__.py +++ b/src/appose/__init__.py @@ -134,6 +134,7 @@ def task_listener(event): from pathlib import Path from .environment import Builder, Environment +from .types import NDArray, SharedMemory # noqa: F401 def base(directory: Path) -> Builder: diff --git a/src/appose/python_worker.py b/src/appose/python_worker.py index 551e931..97f2605 100644 --- a/src/appose/python_worker.py +++ b/src/appose/python_worker.py @@ -28,7 +28,14 @@ ### """ -TODO +The Appose worker for running Python scripts. + +Like all Appose workers, this program conforms to the Appose worker process +contract, meaning it accepts requests on stdin and produces responses on +stdout, both formatted according to Appose's assumptions. + +For details, see the Appose README: +https://github.com/apposed/appose/blob/-/README.md#workers """ import ast @@ -39,7 +46,7 @@ # NB: Avoid relative imports so that this script can be run standalone. from appose.service import RequestType, ResponseType -from appose.types import Args, decode, encode +from appose.types import Args, _set_worker, decode, encode class Task: @@ -80,7 +87,6 @@ def _start(self, script: str, inputs: Optional[Args]) -> None: def execute_script(): # Populate script bindings. binding = {"task": self} - # TODO: Magically convert shared memory image inputs. if inputs is not None: binding.update(inputs) @@ -156,6 +162,8 @@ def _respond(self, response_type: ResponseType, args: Optional[Args]) -> None: def main() -> None: + _set_worker(True) + tasks = {} while True: @@ -181,8 +189,6 @@ def main() -> None: case RequestType.CANCEL: task = tasks.get(uuid) if task is None: - # TODO: proper logging - # Maybe should stdout the error back to Appose calling process. print(f"No such task: {uuid}", file=sys.stderr) continue task.cancel_requested = True diff --git a/src/appose/types.py b/src/appose/types.py index 803f642..363ea7e 100644 --- a/src/appose/types.py +++ b/src/appose/types.py @@ -28,14 +28,170 @@ ### import json -from typing import Any, Dict +import re +from math import ceil, prod +from multiprocessing import resource_tracker, shared_memory +from typing import Any, Dict, Sequence, Union Args = Dict[str, Any] +class SharedMemory(shared_memory.SharedMemory): + """ + An enhanced version of Python's multiprocessing.shared_memory.SharedMemory + class which can be used with a `with` statement. When the program flow + exits the `with` block, this class's `dispose()` method will be invoked, + which might call `close()` or `unlink()` depending on the value of its + `unlink_on_dispose` flag. + """ + + def __init__(self, name: str = None, create: bool = False, size: int = 0): + super().__init__(name=name, create=create, size=size) + self._unlink_on_dispose = create + if _is_worker: + # HACK: Remove this shared memory block from the resource_tracker, + # which wants to clean up shared memory blocks after all known + # references are done using them. + # + # There is one resource_tracker per Python process, and they will + # each try to delete shared memory blocks known to them when they + # are shutting down, even when other processes still need them. + # + # As such, the rule Appose follows is: let the service process + # always handle cleanup of shared memory blocks, regardless of + # which process initially allocated it. + resource_tracker.unregister(self._name, "shared_memory") + + def unlink_on_dispose(self, value: bool) -> None: + """ + Set whether the `unlink()` method should be invoked to destroy + the shared memory block when the `dispose()` method is called. + + Note: dispose() is the method called when exiting a `with` block. + + By default, shared memory objects constructed with `create=True` + will behave this way, whereas shared memory objects constructed + with `create=False` will not. But this method allows to override + the behavior. + """ + self._unlink_on_dispose = value + + def dispose(self) -> None: + if self._unlink_on_dispose: + self.unlink() + else: + self.close() + + def __enter__(self) -> "SharedMemory": + return self + + def __exit__(self, exc_type, exc_value, exc_tb) -> None: + self.dispose() + + def encode(data: Args) -> str: - return json.dumps(data) + return json.dumps(data, cls=_ApposeJSONEncoder, separators=(",", ":")) def decode(the_json: str) -> Args: - return json.loads(the_json) + return json.loads(the_json, object_hook=_appose_object_hook) + + +class NDArray: + """ + Data structure for a multi-dimensional array. + The array contains elements of a data type, arranged in + a particular shape, and flattened into SharedMemory. + """ + + def __init__(self, dtype: str, shape: Sequence[int], shm: SharedMemory = None): + """ + Create an NDArray. + :param dtype: The type of the data elements; e.g. int8, uint8, float32, float64. + :param shape: The dimensional extents; e.g. a stack of 7 image planes + with resolution 512x512 would have shape [7, 512, 512]. + :param shm: The SharedMemory containing the array data, or None to create it. + """ + self.dtype = dtype + self.shape = shape + self.shm = ( + SharedMemory( + create=True, size=ceil(prod(shape) * _bytes_per_element(dtype)) + ) + if shm is None + else shm + ) + + def __str__(self): + return ( + f"NDArray(" + f"dtype='{self.dtype}', " + f"shape={self.shape}, " + f"shm='{self.shm.name}' ({self.shm.size}))" + ) + + def ndarray(self): + """ + Create a NumPy ndarray object for working with the array data. + No array data is copied; the NumPy array wraps the same SharedMemory. + Requires the numpy package to be installed. + """ + try: + import numpy + + return numpy.ndarray( + prod(self.shape), dtype=self.dtype, buffer=self.shm.buf + ).reshape(self.shape) + except ModuleNotFoundError: + raise ImportError("NumPy is not available.") + + def __enter__(self) -> "NDArray": + return self + + def __exit__(self, exc_type, exc_value, exc_tb) -> None: + self.shm.dispose() + + +class _ApposeJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, SharedMemory): + return { + "appose_type": "shm", + "name": obj.name, + "size": obj.size, + } + if isinstance(obj, NDArray): + return { + "appose_type": "ndarray", + "dtype": obj.dtype, + "shape": obj.shape, + "shm": obj.shm, + } + return super().default(obj) + + +def _appose_object_hook(obj: Dict): + atype = obj.get("appose_type") + if atype == "shm": + # Attach to existing shared memory block. + return SharedMemory(name=(obj["name"]), size=(obj["size"])) + elif atype == "ndarray": + return NDArray(obj["dtype"], obj["shape"], obj["shm"]) + else: + return obj + + +def _bytes_per_element(dtype: str) -> Union[int, float]: + try: + bits = int(re.sub("[^0-9]", "", dtype)) + except ValueError: + raise ValueError(f"Invalid dtype: {dtype}") + return bits / 8 + + +_is_worker = False + + +def _set_worker(value: bool) -> None: + global _is_worker + _is_worker = value diff --git a/tests/test_shm.py b/tests/test_shm.py new file mode 100644 index 0000000..68dcf8b --- /dev/null +++ b/tests/test_shm.py @@ -0,0 +1,60 @@ +### +# #%L +# Appose: multi-language interprocess cooperation with shared memory. +# %% +# Copyright (C) 2023 Appose developers. +# %% +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# #L% +### + +import appose +from appose.service import TaskStatus + +ndarray_inspect = """ +task.outputs["size"] = data.shm.size +task.outputs["dtype"] = data.dtype +task.outputs["shape"] = data.shape +task.outputs["sum"] = sum(v for v in data.shm.buf) +""" + + +def test_ndarray(): + env = appose.system() + with env.python() as service: + with appose.SharedMemory(create=True, size=2 * 2 * 20 * 25) as shm: + # Construct the data. + shm.buf[0] = 123 + shm.buf[456] = 78 + shm.buf[1999] = 210 + data = appose.NDArray("uint16", [2, 20, 25], shm) + + # Run the task. + task = service.task(ndarray_inspect, {"data": data}) + task.wait_for() + + # Validate the execution result. + assert TaskStatus.COMPLETE == task.status + assert 2 * 20 * 25 * 2 == task.outputs["size"] + assert "uint16" == task.outputs["dtype"] + assert [2, 20, 25] == task.outputs["shape"] + assert 123 + 78 + 210 == task.outputs["sum"] diff --git a/tests/test_types.py b/tests/test_types.py new file mode 100644 index 0000000..dc38d23 --- /dev/null +++ b/tests/test_types.py @@ -0,0 +1,102 @@ +import unittest + +import appose + + +class TypesTest(unittest.TestCase): + JSON = ( + "{" + '"posByte":123,"negByte":-98,' + '"posDouble":9.876543210123456,"negDouble":-1.234567890987654e+302,' + '"posFloat":9.876543,"negFloat":-1.2345678,' + '"posInt":1234567890,"negInt":-987654321,' + '"posLong":12345678987654321,"negLong":-98765432123456789,' + '"posShort":32109,"negShort":-23456,' + '"trueBoolean":true,"falseBoolean":false,' + '"nullChar":"\\u0000",' + '"aString":"-=[]\\\\;\',./_+{}|:\\"<>?' + "AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz" + '~!@#$%^&*()",' + '"numbers":[1,1,2,3,5,8],' + '"words":["quick","brown","fox"],' + # fmt: off + '"ndArray":{' + '"appose_type":"ndarray",' # noqa: E131 + '"dtype":"float32",' # noqa: E131 + '"shape":[2,20,25],' # noqa: E131 + '"shm":{' # noqa: E131 + '"appose_type":"shm",' # noqa: E131 + '"name":"SHM_NAME",' # noqa: E131 + '"size":4000' # noqa: E131 + "}" # noqa: E131 + "}" + # fmt: on + "}" + ) + + STRING = ( + "-=[]\\;',./_+{}|:\"<>?" + "AaBbCcDdEeFfGgHhIiJjKkLlMmNnOoPpQqRrSsTtUuVvWwXxYyZz" + "~!@#$%^&*()" + ) + + NUMBERS = [1, 1, 2, 3, 5, 8] + + WORDS = ["quick", "brown", "fox"] + + def test_encode(self): + data = { + "posByte": 123, + "negByte": -98, + "posDouble": 9.876543210123456, + "negDouble": -1.234567890987654e302, + "posFloat": 9.876543, + "negFloat": -1.2345678, + "posInt": 1234567890, + "negInt": -987654321, + "posLong": 12345678987654321, + "negLong": -98765432123456789, + "posShort": 32109, + "negShort": -23456, + "trueBoolean": True, + "falseBoolean": False, + "nullChar": "\0", + "aString": self.STRING, + "numbers": self.NUMBERS, + "words": self.WORDS, + } + with appose.NDArray("float32", [2, 20, 25]) as ndarray: + shm_name = ndarray.shm.name + data["ndArray"] = ndarray + json_str = appose.types.encode(data) + self.assertIsNotNone(json_str) + expected = self.JSON.replace("SHM_NAME", shm_name) + self.assertEqual(expected, json_str) + + def test_decode(self): + with appose.SharedMemory(create=True, size=4000) as shm: + shm_name = shm.name + data = appose.types.decode(self.JSON.replace("SHM_NAME", shm_name)) + self.assertIsNotNone(data) + self.assertEqual(19, len(data)) + self.assertEqual(123, data["posByte"]) + self.assertEqual(-98, data["negByte"]) + self.assertEqual(9.876543210123456, data["posDouble"]) + self.assertEqual(-1.234567890987654e302, data["negDouble"]) + self.assertEqual(9.876543, data["posFloat"]) + self.assertEqual(-1.2345678, data["negFloat"]) + self.assertEqual(1234567890, data["posInt"]) + self.assertEqual(-987654321, data["negInt"]) + self.assertEqual(12345678987654321, data["posLong"]) + self.assertEqual(-98765432123456789, data["negLong"]) + self.assertEqual(32109, data["posShort"]) + self.assertEqual(-23456, data["negShort"]) + self.assertTrue(data["trueBoolean"]) + self.assertFalse(data["falseBoolean"]) + self.assertEqual("\0", data["nullChar"]) + self.assertEqual(self.STRING, data["aString"]) + self.assertEqual(self.NUMBERS, data["numbers"]) + self.assertEqual(self.WORDS, data["words"]) + ndArray = data["ndArray"] + self.assertEqual("float32", ndArray.dtype) + self.assertEqual([2, 20, 25], ndArray.shape)