Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/decentralize' into decentralize
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcusZuber committed Sep 18, 2024
2 parents 05ff079 + 7e7c608 commit b868f6d
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 106 deletions.
20 changes: 20 additions & 0 deletions bin/concert
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import argparse
import atexit
import contextlib
import logging
import os
Expand Down Expand Up @@ -56,6 +57,12 @@ def get_prompt_config(path):
return MyPrompt


def delete_lock_file(lockfile):
if os.path.exists(lockfile):
os.remove(lockfile)
LOG.debug("Removed lock file %s", lockfile)


class InitCommand(SubCommand):

"""Create a new session."""
Expand Down Expand Up @@ -382,6 +389,19 @@ class StartCommand(SubCommand):

ip_config = traitlets.config.Config()
if path and path.endswith('.py'):
if not cs.is_multiinstance(session):
lockfile = os.path.join(os.path.dirname(path), '.' + os.path.basename(path))
if os.path.exists(lockfile):
print(f"An instance of `{session}' seems to be already running.")
print(
f"If you are sure there are no running instances, delete `{lockfile}' "
"and start again."
)
return
with open(lockfile, 'w'):
pass
atexit.register(delete_lock_file, lockfile)

docstring = cs.get_docstring(path)
if docstring:
print(docstring)
Expand Down
30 changes: 22 additions & 8 deletions concert/devices/cameras/uca.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import numpy as np
import struct
import weakref
from concert.coroutines.base import background, run_in_executor
from concert.quantities import q
from concert.base import check, Parameter, Quantity
Expand Down Expand Up @@ -265,16 +266,10 @@ async def __ainit__(self, params=None):
await Camera.__ainit__(self, 'net', params=params)
self._ucad_host = self.uca.props.host
self._ucad_port = self.uca.props.port
weakref.finalize(self, _ucad_unregister_all, self._ucad_host, self._ucad_port)

async def _communicate(self, request):
reader, writer = await asyncio.open_connection(self._ucad_host, self._ucad_port)
try:
writer.write(request)
await writer.drain()
_construct_ucad_error(await reader.read())
finally:
writer.close()
await writer.wait_closed()
await _ucad_communicate(request, self._ucad_host, self._ucad_port)

async def _send_grab_push_command(self, num_images=1, end=True):
"""Grab images in ucad and push them over network to another receiver than us."""
Expand All @@ -296,6 +291,25 @@ async def register_endpoint(self, endpoint: CommData) -> None:
async def unregister_endpoint(self, endpoint: CommData) -> None:
await self._communicate(struct.pack("I128s", 13, bytes(endpoint.server_endpoint, 'ascii')))

async def unregister_all(self) -> None:
await self._communicate(struct.pack("I", 14))


async def _ucad_communicate(request, host, port):
reader, writer = await asyncio.open_connection(host, port)
try:
writer.write(request)
await writer.drain()
_construct_ucad_error(await reader.read())
finally:
writer.close()
await writer.wait_closed()


def _ucad_unregister_all(host, port):
asyncio.run(_ucad_communicate(struct.pack("I", 14), host, port))
LOG.debug("Unregistered all endpoints on %s:%d", host, port)


def _construct_ucad_error(message):
# struct UcaNetDefaultReply of uca-net-protocol.h
Expand Down
28 changes: 21 additions & 7 deletions concert/experiments/addons/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ def _make_consumers(self, acquisitions):
for acq in acquisitions:
consumers[acq] = AcquisitionConsumer(
self.start_timer,
corofunc_args=(acq.name,)
corofunc_args=(acq.name,),
addon=self if self.start_timer.remote else None
)

return consumers
Expand Down Expand Up @@ -150,8 +151,9 @@ async def prepare_and_write(*args):

return prepare_and_write

addon = self if self.write_sequence.remote else None
for acq in acquisitions:
consumers[acq] = AcquisitionConsumer(prepare_wrapper(acq))
consumers[acq] = AcquisitionConsumer(prepare_wrapper(acq), addon=addon)
consumers[acq].corofunc.remote = self.write_sequence.remote

return consumers
Expand Down Expand Up @@ -182,8 +184,9 @@ async def __ainit__(self, consumer, experiment, acquisitions=None):
def _make_consumers(self, acquisitions):
consumers = {}

addon = self if self.consume.remote else None
for acq in acquisitions:
consumers[acq] = AcquisitionConsumer(self.consume)
consumers[acq] = AcquisitionConsumer(self.consume, addon=addon)

return consumers

Expand All @@ -208,8 +211,9 @@ async def __ainit__(self, viewer, experiment, acquisitions=None):
def _make_consumers(self, acquisitions):
consumers = {}

addon = self if self.consume.remote else None
for acq in acquisitions:
consumers[acq] = AcquisitionConsumer(self.consume)
consumers[acq] = AcquisitionConsumer(self.consume, addon=addon)

return consumers

Expand Down Expand Up @@ -243,11 +247,13 @@ def _make_consumers(self, acquisitions):
shapes = (None,) * len(acquisitions) if self._shapes is None else self._shapes
consumers = {}

addon = self if self.accumulate.remote else None
for i, acq in enumerate(acquisitions):
consumers[acq] = AcquisitionConsumer(
self.accumulate,
corofunc_args=(acq.name,),
corofunc_kwargs={'shape': shapes[i], 'dtype': self._dtype},
addon=addon
)

return consumers
Expand Down Expand Up @@ -316,16 +322,20 @@ async def __ainit__(self, experiment, acquisitions=None, do_normalization=True,
def _make_consumers(self, acquisitions):
consumers = {}

addon = self if self.reconstruct.remote else None
if self._do_normalization:
consumers[get_acq_by_name(acquisitions, 'darks')] = AcquisitionConsumer(
self.update_darks
self.update_darks,
addon=addon
)
consumers[get_acq_by_name(acquisitions, 'flats')] = AcquisitionConsumer(
self.update_flats
self.update_flats,
addon=addon
)

consumers[get_acq_by_name(acquisitions, 'radios')] = AcquisitionConsumer(
self.reconstruct
self.reconstruct,
addon=addon
)

return consumers
Expand Down Expand Up @@ -485,14 +495,18 @@ async def __ainit__(self, experiment, output_directory="contrasts"):

def _make_consumers(self, acquisitions):
consumers = {}
addon = self if self.process_stepping.remote else None
consumers[get_acq_by_name(acquisitions, 'darks')] = AcquisitionConsumer(
self.process_darks,
addon=addon
)
consumers[get_acq_by_name(acquisitions, 'reference_stepping')] = AcquisitionConsumer(
self.process_stepping,
addon=addon
)
consumers[get_acq_by_name(acquisitions, 'object_stepping')] = AcquisitionConsumer(
self.process_stepping,
addon=addon
)

return consumers
Expand Down
55 changes: 36 additions & 19 deletions concert/experiments/addons/tango.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

from concert.experiments.addons import base
from concert.experiments.base import remote
from concert.helpers import CommData
from concert.quantities import q
from typing import Awaitable


LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -40,23 +42,28 @@ async def wrapper(self, *args, **kwargs):

return wrapper

async def __ainit__(self, device):
async def __ainit__(self, device, endpoint: CommData) -> Awaitable:
self._device = device
self.endpoint = endpoint
await self._device.write_attribute('endpoint', self.endpoint.client_endpoint)

async def connect_endpoint(self):
await self._device.connect_endpoint()

async def disconnect_endpoint(self):
await self._device.disconnect_endpoint()

async def cancel(self):
await self._device.cancel()

async def _setup(self):
await self._device.reset_connection()

async def _teardown(self):
await self._device.teardown()
await self._device.disconnect_endpoint()


class Benchmarker(TangoMixin, base.Benchmarker):

async def __ainit__(self, experiment, device, acquisitions=None):
await TangoMixin.__ainit__(self, device)
async def __ainit__(self, experiment, device, endpoint, acquisitions=None):
await TangoMixin.__ainit__(self, device, endpoint)
await base.Benchmarker.__ainit__(self, experiment=experiment, acquisitions=acquisitions)

@TangoMixin.cancel_remote
Expand All @@ -74,8 +81,8 @@ async def _teardown(self):

class ImageWriter(TangoMixin, base.ImageWriter):

async def __ainit__(self, experiment, acquisitions=None):
await TangoMixin.__ainit__(self, experiment.walker.device)
async def __ainit__(self, experiment, endpoint, acquisitions=None):
await TangoMixin.__ainit__(self, experiment.walker.device, endpoint)
await base.ImageWriter.__ainit__(self, experiment=experiment, acquisitions=acquisitions)

@TangoMixin.cancel_remote
Expand All @@ -87,10 +94,16 @@ async def write_sequence(self, name):
class LiveView(base.LiveView):

async def __ainit__(self, viewer, endpoint, experiment, acquisitions=None):
self.endpoint = endpoint
await base.LiveView.__ainit__(self, viewer, experiment=experiment, acquisitions=acquisitions)
self._endpoint = endpoint
self._orig_limits = await viewer.get_limits()

async def connect_endpoint(self):
self._viewer.subscribe(self.endpoint.client_endpoint)

async def disconnect_endpoint(self):
self._viewer.unsubscribe()

@remote
async def consume(self):
try:
Expand All @@ -99,14 +112,10 @@ async def consume(self):
# Force viewer to update the limits by unsubscribing and re-subscribing after
# setting limits to stream
await self._viewer.set_limits('stream')
self._viewer.subscribe(self._endpoint)
self._viewer.subscribe(self.endpoint.client_endpoint)
finally:
self._orig_limits = await self._viewer.get_limits()

async def _teardown(self):
await super()._teardown()
self._viewer.unsubscribe()


class _TangoProxyArgs:
def __init__(self, device):
Expand All @@ -120,10 +129,18 @@ async def get_reco_arg(self, arg):


class OnlineReconstruction(TangoMixin, base.OnlineReconstruction):
async def __ainit__(self, device, experiment, acquisitions=None, do_normalization=True,
average_normalization=True, slice_directory='online-slices',
viewer=None):
await TangoMixin.__ainit__(self, device)
async def __ainit__(
self,
device,
experiment,
endpoint,
acquisitions=None,
do_normalization=True,
average_normalization=True,
slice_directory='online-slices',
viewer=None
):
await TangoMixin.__ainit__(self, device, endpoint)

# Lock the device to prevent other processes from using it
try:
Expand Down
Loading

0 comments on commit b868f6d

Please sign in to comment.