Skip to content

Commit

Permalink
Add timeout parameter to stream.steps(). Necessary for controlling ti…
Browse files Browse the repository at this point in the history
…meout in in situ processing. Also very useful for not being stuck forever in processing files that were not closed properly. Default is still infinity, which may not be the ideal setup for python scripts in practice. Usage:

    for _ in f.steps(timeout=1.0):
        ...
  • Loading branch information
pnorbert authored and vicentebolea committed Feb 28, 2024
1 parent 82b9fff commit 4e50476
Showing 1 changed file with 32 additions and 12 deletions.
44 changes: 32 additions & 12 deletions python/adios2/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def string_to_mode(mode: str) -> [bindings.Mode, bool]:
class Stream:
"""High level implementation of the Stream class from the core API"""

# Default timeout for stream.begin_step()
DEFAULT_TIMEOUT_SEC = -1.0

@singledispatchmethod
def __init__(self, path, mode, comm=None):
# pylint: disable=R0912 # Too many branches
Expand All @@ -71,6 +74,7 @@ def __init__(self, path, mode, comm=None):
self.index = -1
self.max_steps = maxsize
self._step_status = bindings.StepStatus.EndOfStream
self._step_timeout_sec = self.DEFAULT_TIMEOUT_SEC

# e.g. Stream(io: adios2.IO, path, mode)
@__init__.register(IO)
Expand All @@ -82,6 +86,7 @@ def _(self, io: IO, path, mode, comm=None):
self.index = -1
self.max_steps = maxsize
self._step_status = bindings.StepStatus.EndOfStream
self._step_timeout_sec = self.DEFAULT_TIMEOUT_SEC

@property
def mode(self):
Expand Down Expand Up @@ -123,10 +128,17 @@ def __next__(self):
raise StopIteration

self.index += 1
self._step_status = self.begin_step()
self._step_status = self.begin_step(timeout=self._step_timeout_sec)
if self._step_status == bindings.StepStatus.EndOfStream:
raise StopIteration

if self._step_status == bindings.StepStatus.NotReady:
print(
"ERROR: Stream returned no new step within the time limit of"
f" {self._step_timeout_sec} seconds. Ending the loop"
)
raise StopIteration

if self._step_status == bindings.StepStatus.OtherError:
print("ERROR: Stream returned an error. Ending the loop")
raise StopIteration
Expand Down Expand Up @@ -241,7 +253,9 @@ def inquire_attribute(self, name, variable_name="", separator="/"):
@singledispatchmethod
def write(self, variable: Variable, content):
"""
writes a variable
Writes a variable.
Note that the content will be available for consumption only at
the end of the for loop or in the end_step() call.
Parameters
variable
Expand All @@ -257,7 +271,7 @@ def write(self, variable: Variable, content):
@write.register(str)
def _(self, name, content, shape=[], start=[], count=[], operations=None):
"""
writes a variable
Writes a variable
Parameters
name
Expand Down Expand Up @@ -485,25 +499,29 @@ def read_attribute_string(self, name, variable_name="", separator="/"):

return attribute.data_string()

def begin_step(self):
def begin_step(self, *, timeout=DEFAULT_TIMEOUT_SEC):
"""
Write mode: advances to the next step. Convenient when declaring
variable attributes as advancing to the next step is not attached
to any variable.
Write mode: declare the starting of an output step. Pass data in
stream.write() and stream.write_attribute(). All data will be published
in end_step().
Read mode: in streaming mode releases the current step (no effect
in file based engines)
"""
if self._read_mode:
mode = bindings.StepMode.Read
else:
mode = bindings.StepMode.Append

if not self._engine.between_step_pairs():
return self._engine.begin_step()
return self._engine.begin_step(mode=mode, timeoutSeconds=timeout)
return bindings.StepStatus.OtherError

def end_step(self):
"""
Write mode: advances to the next step. Convenient when declaring
variable attributes as advancing to the next step is not attached
to any variable.
Write mode: declaring the end of an output step. All data passed in
stream.write() and all attributes passed in stream.write_attribute()
will be published for consumers.
Read mode: in streaming mode releases the current step (no effect
in file based engines)
Expand Down Expand Up @@ -549,7 +567,7 @@ def loop_index(self):
"""
return self.index

def steps(self, num_steps=0):
def steps(self, num_steps=0, *, timeout=DEFAULT_TIMEOUT_SEC):
"""
Returns an interator that can be use to itererate throught the steps.
In each iteration begin_step() and end_step() will be internally called.
Expand Down Expand Up @@ -580,6 +598,8 @@ def steps(self, num_steps=0):
else:
self.max_steps = maxsize # engine steps will limit the loop

self._step_timeout_sec = timeout

# in write mode we can run yet another loop
self.index = -1

Expand Down

0 comments on commit 4e50476

Please sign in to comment.