Skip to content

Commit

Permalink
Make walker a Parameterizable
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcusZuber committed Jun 21, 2024
1 parent d0b22db commit 4149771
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 25 deletions.
2 changes: 1 addition & 1 deletion concert/directors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async def _run(self):
handler = None
try:
if self._experiment.walker:
handler = logging.FileHandler(os.path.join(await self._experiment.walker.current,
handler = logging.FileHandler(os.path.join(await self._experiment.walker.get_current(),
'director.log'))
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s '
'- %(message)s')
Expand Down
2 changes: 1 addition & 1 deletion concert/experiments/addons/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ async def _process_data(self, stepping, dark):

async def _write_single_image(self, name, image):
async with self._experiment.walker:
file_name = os.path.join(await self._experiment.walker.current, name)
file_name = os.path.join(await self._experiment.walker.get_current(), name)

im_writer = self._experiment.walker.writer(file_name, bytes_per_file=0)
im_writer.write(image)
Expand Down
2 changes: 1 addition & 1 deletion concert/experiments/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ async def _run(self):
if self.walker:
if separate_scans:
await self.walker.descend((await self.get_name_fmt()).format(iteration))
if os.path.exists(await self.walker.current):
if os.path.exists(await self.walker.get_current()):
handler = await self.walker.get_log_handler()
self.log.addHandler(handler)
exp_metadata: str = await self._prepare_metadata_str()
Expand Down
59 changes: 42 additions & 17 deletions concert/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import re
from typing import Optional, AsyncIterable, Awaitable, Type, Iterable, Set
import tifffile
from concert.base import AsyncObject
from concert.base import AsyncObject, Parameterizable, Parameter
from concert.coroutines.base import background
from concert.writers import TiffWriter
from concert.typing import RemoteDirectoryWalkerTangoDevice
Expand Down Expand Up @@ -158,7 +158,7 @@ async def write_images(producer: AsyncIterable[ArrayLike],
prefix.format(start_index + file_index - 1)))


class Walker(AsyncObject):
class Walker(Parameterizable):
"""
A Walker moves through an abstract hierarchy and allows to write data
at a specific location.
Expand All @@ -168,7 +168,12 @@ class Walker(AsyncObject):
_current: str
_log_name: str
_lock: asyncio.Lock
dsetname: str
_dsetname: str

root = Parameter()
current = Parameter()
log_name = Parameter()
dsetname = Parameter()

async def __ainit__(self,
root: str,
Expand All @@ -187,7 +192,8 @@ async def __ainit__(self,
self._root = root
self._log_name = log_name
self._lock = asyncio.Lock()
self.dsetname = dsetname
self._dsetname = dsetname
await super().__ainit__()
await self.home()

async def __aenter__(self) -> Walker:
Expand All @@ -205,7 +211,7 @@ async def _ascend(self) -> None:
"""Ascend from current depth."""
raise NotImplementedError

def _create_writer(self,
async def _create_writer(self,
producer: AsyncIterable[ArrayLike],
dsetname: Optional[str] = None) -> Awaitable:
"""
Expand All @@ -217,14 +223,22 @@ async def _get_current(self) -> str:
"""Fetches the current from internal contex"""
raise NotImplementedError

async def _get_root(self) -> str:
"""Fetches the root from internal context"""
return self._root

async def _get_log_name(self) -> str:
"""Fetches the log name from internal context"""
return self._log_name

async def _get_dsetname(self) -> str:
"""Fetches the dataset name from internal context"""
return self._dsetname

async def home(self) -> None:
"""Return to root"""
self._current = self._root

@property
async def current(self) -> str:
"""Return current position."""
return await self._get_current()

async def exists(self, *paths) -> bool:
"""Return True if path from current position specified by a list of
Expand Down Expand Up @@ -260,7 +274,7 @@ async def create_writer(self,
if name:
await self.descend(name)
try:
return await self._create_writer(producer, dsetname=dsetname)
return await (await self._create_writer(producer, dsetname=dsetname))
finally:
if name:
await self.ascend()
Expand All @@ -274,7 +288,7 @@ async def write(self,
*producer*. The execution starts immediately in the background and
await will block until the images are written.
"""
return await self._create_writer(producer, dsetname=dsetname)
return await (await self._create_writer(producer, dsetname=dsetname))

async def get_log_handler(self) -> AsyncLoggingHandlerCloser:
"""Provides a log handler featuring an asynchronous flush and closure
Expand Down Expand Up @@ -321,7 +335,7 @@ async def _ascend(self) -> None:
if self._current != self._root:
self._current = os.path.dirname(self._current)

def _create_writer(self,
async def _create_writer(self,
producer: AsyncIterable[ArrayLike],
dsetname: Optional[str] = None) -> Awaitable:
dsetname = dsetname or self.dsetname
Expand Down Expand Up @@ -349,6 +363,9 @@ class DirectoryWalker(Walker):
_start_index: int
writer: Type[TiffWriter]

bytes_per_file = Parameter()
start_index = Parameter()

async def __ainit__(self,
root: Optional[str] = None,
dsetname: str = "frame_{:>06}.tif",
Expand Down Expand Up @@ -388,10 +405,18 @@ async def _get_current(self) -> str:
"""Provides current from local context"""
return self._current

def _create_writer(self,
async def _get_bytes_per_file(self) -> int:
"""Provides bytes per file from local context"""
return self._bytes_per_file

async def _get_start_index(self) -> int:
"""Provides start index from local context"""
return self._start_index

async def _create_writer(self,
producer: AsyncIterable[ArrayLike],
dsetname: Optional[str] = None) -> Awaitable:
dsetname = dsetname or self.dsetname
dsetname = dsetname or await self.get_dsetname()
if self._dset_exists(dsetname):
dset_prefix = split_dsetformat(dsetname)
dset_path = os.path.join(self._current, dset_prefix)
Expand All @@ -417,10 +442,10 @@ def _dset_exists(self, dsetname: str) -> bool:

async def exists(self, *paths: str) -> bool:
"""Check if *paths* exist."""
return os.path.exists(os.path.join(await self.current, *paths))
return os.path.exists(os.path.join(await self.get_current(), *paths))

async def get_log_handler(self) -> AsyncLoggingHandlerCloser:
return LoggingHandler(os.path.join(await self.current, self._log_name))
return LoggingHandler(os.path.join(await self.get_current(), self._log_name))

async def log_to_json(self, payload: str) -> None:
"""
Expand All @@ -434,7 +459,7 @@ async def log_to_json(self, payload: str) -> None:
:type payload: str
"""
with open(
file=os.path.join(self._current, "experiment.json"),
file=os.path.join(await self.get_current(), "experiment.json"),
mode="w",
encoding="utf-8") as lgf:
lgf.write(payload)
Expand Down
4 changes: 2 additions & 2 deletions concert/tests/integration/test_experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ async def test_image_writing(self):

# Check if the writing coroutine has been attached
for i in range(self.num_produce):
foo = op.join(data_dir, 'foo', walker.dsetname.format(i))
bar = op.join(data_dir, 'bar', walker.dsetname.format(i))
foo = op.join(data_dir, 'foo', (await walker.get_dsetname()).format(i))
bar = op.join(data_dir, 'bar', (await walker.get_dsetname()).format(i))

self.assertTrue(await walker.exists(foo))
self.assertTrue(await walker.exists(bar))
Expand Down
2 changes: 1 addition & 1 deletion concert/tests/integration/test_image_with_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def test_metadata(self):
await self.walker.descend("metadata")
await self.exp.run()
radio_images = tifffile.TiffReader(
os.path.join(await self.walker.current, "radios", "frame_000000.tif"))
os.path.join(await self.walker.get_current(), "radios", "frame_000000.tif"))
for i in range(await self.exp.get_num_projections()):
self.assertEqual(ast.literal_eval(radio_images.pages[i].description)['index_int'],
int(i))
Expand Down
4 changes: 2 additions & 2 deletions concert/tests/unit/test_walker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ async def check(self, subdir=''):
async def test_create_writer_no_subdir(self):
await self.walker.create_writer(async_generate(self.data), dsetname='foo')
await self.check()
self.assertEqual(await self.walker.current, '')
self.assertEqual(await self.walker.get_current(), '')

async def test_create_writer_with_subdir(self):
await self.walker.create_writer(async_generate(self.data), name='inside', dsetname='foo')
await self.check(subdir='inside')
self.assertEqual(await self.walker.current, '')
self.assertEqual(await self.walker.get_current(), '')

async def test_coroutine(self):
print(f"Self Data: {self.data}")
Expand Down

0 comments on commit 4149771

Please sign in to comment.