Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 111 additions & 83 deletions src/fastcs/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,79 +2,68 @@
from collections import defaultdict
from collections.abc import Callable
from types import MethodType
from typing import Any

from softioc.asyncio_dispatcher import AsyncioDispatcher

from .attributes import AttrR, AttrW, Sender, Updater
from .controller import Controller
from .exceptions import FastCSException
from .mapping import Mapping, SingleMapping


def _get_initial_tasks(mapping: Mapping) -> list[Callable]:
initial_tasks: list[Callable] = []
initial_tasks.append(mapping.controller.connect)
return initial_tasks


def _create_periodic_scan_task(period, methods: list[Callable]) -> Callable:
async def scan_task() -> None:
while True:
await asyncio.gather(*[method() for method in methods])
await asyncio.sleep(period)

return scan_task


def _get_periodic_scan_tasks(scan_dict: dict[float, list[Callable]]) -> list[Callable]:
periodic_scan_tasks: list[Callable] = []
for period, methods in scan_dict.items():
periodic_scan_tasks.append(_create_periodic_scan_task(period, methods))

return periodic_scan_tasks


def _add_scan_method_tasks(
scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping
):
for method in single_mapping.scan_methods.values():
scan_dict[method.period].append(
MethodType(method.fn, single_mapping.controller)
class Backend:
_initial_tasks: list[Callable] = []
_context: dict[str, Any] = {}

def __init__(
self, controller: Controller, loop: asyncio.AbstractEventLoop | None = None
):
self._dispatcher = AsyncioDispatcher(loop)
self._loop = self._dispatcher.loop
self._controller = controller

self._initial_tasks.append(controller.connect)

asyncio.run_coroutine_threadsafe(
self._controller.initialise(), self._loop
).result()

self._mapping = Mapping(self._controller)
self._link_process_tasks()

self._context.update(
{
"dispatcher": self._dispatcher,
"controller": self._controller,
"mapping": self._mapping,
}
)

def _link_process_tasks(self):
for single_mapping in self._mapping.get_controller_mappings():
_link_single_controller_put_tasks(single_mapping)
_link_attribute_sender_class(single_mapping)

def _create_updater_callback(attribute, controller):
async def callback():
try:
await attribute.updater.update(controller, attribute)
except Exception as e:
print(
f"Update loop in {attribute.updater} stopped:\n"
f"{e.__class__.__name__}: {e}"
)
raise

return callback

def run(self):
self._run_initial_tasks()
self._start_scan_tasks()

def _add_attribute_updater_tasks(
scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping
):
for attribute in single_mapping.attributes.values():
match attribute:
case AttrR(updater=Updater(update_period=update_period)) as attribute:
callback = _create_updater_callback(
attribute, single_mapping.controller
)
scan_dict[update_period].append(callback)
self._run()

def _run_initial_tasks(self):
for task in self._initial_tasks:
future = asyncio.run_coroutine_threadsafe(task(), self._loop)
future.result()

def _get_scan_tasks(mapping: Mapping) -> list[Callable]:
scan_dict: dict[float, list[Callable]] = defaultdict(list)
def _start_scan_tasks(self):
scan_tasks = _get_scan_tasks(self._mapping)

for single_mapping in mapping.get_controller_mappings():
_add_scan_method_tasks(scan_dict, single_mapping)
_add_attribute_updater_tasks(scan_dict, single_mapping)
for task in scan_tasks:
asyncio.run_coroutine_threadsafe(task(), self._loop)

scan_tasks = _get_periodic_scan_tasks(scan_dict)
return scan_tasks
def _run(self):
raise NotImplementedError("Specific Backend must implement _run")


def _link_single_controller_put_tasks(single_mapping: SingleMapping) -> None:
Expand All @@ -94,13 +83,6 @@ def _link_single_controller_put_tasks(single_mapping: SingleMapping) -> None:
)


def _create_sender_callback(attribute, controller):
async def callback(value):
await attribute.sender.put(controller, attribute, value)

return callback


def _link_attribute_sender_class(single_mapping: SingleMapping) -> None:
for attr_name, attribute in single_mapping.attributes.items():
match attribute:
Expand All @@ -113,25 +95,71 @@ def _link_attribute_sender_class(single_mapping: SingleMapping) -> None:
attribute.set_process_callback(callback)


class Backend:
def __init__(self, mapping: Mapping, loop: asyncio.AbstractEventLoop):
self._mapping = mapping
self._loop = loop
def _create_sender_callback(attribute, controller):
async def callback(value):
await attribute.sender.put(controller, attribute, value)

def link_process_tasks(self):
for single_mapping in self._mapping.get_controller_mappings():
_link_single_controller_put_tasks(single_mapping)
_link_attribute_sender_class(single_mapping)
return callback

def run_initial_tasks(self):
initial_tasks = _get_initial_tasks(self._mapping)

for task in initial_tasks:
future = asyncio.run_coroutine_threadsafe(task(), self._loop)
future.result()
def _get_scan_tasks(mapping: Mapping) -> list[Callable]:
scan_dict: dict[float, list[Callable]] = defaultdict(list)

def start_scan_tasks(self):
scan_tasks = _get_scan_tasks(self._mapping)
for single_mapping in mapping.get_controller_mappings():
_add_scan_method_tasks(scan_dict, single_mapping)
_add_attribute_updater_tasks(scan_dict, single_mapping)

for task in scan_tasks:
asyncio.run_coroutine_threadsafe(task(), self._loop)
scan_tasks = _get_periodic_scan_tasks(scan_dict)
return scan_tasks


def _add_scan_method_tasks(
scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping
):
for method in single_mapping.scan_methods.values():
scan_dict[method.period].append(
MethodType(method.fn, single_mapping.controller)
)


def _add_attribute_updater_tasks(
scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping
):
for attribute in single_mapping.attributes.values():
match attribute:
case AttrR(updater=Updater(update_period=update_period)) as attribute:
callback = _create_updater_callback(
attribute, single_mapping.controller
)
scan_dict[update_period].append(callback)


def _create_updater_callback(attribute, controller):
async def callback():
try:
await attribute.updater.update(controller, attribute)
except Exception as e:
print(
f"Update loop in {attribute.updater} stopped:\n"
f"{e.__class__.__name__}: {e}"
)
raise

return callback


def _get_periodic_scan_tasks(scan_dict: dict[float, list[Callable]]) -> list[Callable]:
periodic_scan_tasks: list[Callable] = []
for period, methods in scan_dict.items():
periodic_scan_tasks.append(_create_periodic_scan_task(period, methods))

return periodic_scan_tasks


def _create_periodic_scan_task(period, methods: list[Callable]) -> Callable:
async def scan_task() -> None:
while True:
await asyncio.gather(*[method() for method in methods])
await asyncio.sleep(period)

return scan_task
31 changes: 7 additions & 24 deletions src/fastcs/backends/asyncio_backend.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,13 @@
from softioc import asyncio_dispatcher, softioc
from softioc import softioc

from fastcs.backend import Backend
from fastcs.mapping import Mapping
from fastcs.controller import Controller


class AsyncioBackend:
def __init__(self, mapping: Mapping):
self._mapping = mapping

def run_interactive_session(self):
# Create an asyncio dispatcher; the event loop is now running
dispatcher = asyncio_dispatcher.AsyncioDispatcher()

backend = Backend(self._mapping, dispatcher.loop)

backend.link_process_tasks()
backend.run_initial_tasks()
backend.start_scan_tasks()
class AsyncioBackend(Backend):
def __init__(self, controller: Controller): # noqa: F821
super().__init__(controller)

def _run(self):
# Run the interactive shell
global_variables = globals()
global_variables.update(
{
"dispatcher": dispatcher,
"mapping": self._mapping,
"controller": self._mapping.controller,
}
)
softioc.interactive_ioc(globals())
softioc.interactive_ioc(self._context)
23 changes: 12 additions & 11 deletions src/fastcs/backends/epics/backend.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
from fastcs.mapping import Mapping
from fastcs.backend import Backend
from fastcs.controller import Controller

from .docs import EpicsDocs, EpicsDocsOptions
from .gui import EpicsGUI, EpicsGUIOptions
from .ioc import EpicsIOC
from .ioc import EpicsIOC, EpicsIOCOptions


class EpicsBackend:
def __init__(self, mapping: Mapping, pv_prefix: str = "MY-DEVICE-PREFIX"):
self._mapping = mapping
class EpicsBackend(Backend):
def __init__(self, controller: Controller, pv_prefix: str = "MY-DEVICE-PREFIX"):
super().__init__(controller)

self._pv_prefix = pv_prefix
self._ioc = EpicsIOC(pv_prefix, self._mapping)

def create_docs(self, options: EpicsDocsOptions | None = None) -> None:
docs = EpicsDocs(self._mapping)
docs.create_docs(options)
EpicsDocs(self._mapping).create_docs(options)

def create_gui(self, options: EpicsGUIOptions | None = None) -> None:
gui = EpicsGUI(self._mapping, self._pv_prefix)
gui.create_gui(options)
EpicsGUI(self._mapping, self._pv_prefix).create_gui(options)

def get_ioc(self) -> EpicsIOC:
return EpicsIOC(self._mapping, self._pv_prefix)
def _run(self, options: EpicsIOCOptions | None = None):
self._ioc.run(self._dispatcher, self._context, options)
Loading