Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to session.status #371

Merged
merged 12 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/developer/clients.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,32 @@ of a Task::
else:
print('Task did not complete successfully')

Check Health of a Process
^^^^^^^^^^^^^^^^^^^^^^^^^

Usually it's good when a Process is still running, but some Processes
will also mark themselves as "degraded" if there is some non-fatal
problem that means the acquisition or control is not occurring::

from ocs.ocs_client import OCSClient

client = OCSClient('agent-instance-id')

client.some_process.start()
time.sleep(2)
status = client.some_process()

if response.session['success'] is None:
if response.session['degraded']:
print('Process is running, but in a degraded state.')
else:
print('Process is running (and does not report degraded state).')
elif response.session['success']:
print('Process exited without error')
else:
print('Process exited with error')


Check Latest Data in an Operation
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
1 change: 0 additions & 1 deletion docs/developer/session-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ Fake Data Agent as an example, specifically at its primary Process,
"""
ok, msg = self.try_set_job('acq')
if not ok: return ok, msg
session.set_status('running')

if params is None:
params = {}
Expand Down
5 changes: 0 additions & 5 deletions docs/developer/writing_an_agent/logging.rst
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ Our Agent in full now looks like this:
"timestamp":1600448753.9288929}

"""
session.set_status('running')

# Initialize the counter
self._count=True
counter = 0
Expand Down Expand Up @@ -139,9 +137,6 @@ Our Agent in full now looks like this:
'last_updated': 1660249321.8729222}

"""
# Set operations status to 'running'
session.set_status('running')

# Log the text provided to the Agent logs
self.log.info(f"{params['text']}")

Expand Down
7 changes: 0 additions & 7 deletions docs/developer/writing_an_agent/process.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ two more class methods:
"timestamp": 1600448753.9288929}

"""
session.set_status('running')

# Initialize the counter
self._count=True
counter = 0
Expand Down Expand Up @@ -123,8 +121,6 @@ Our Agent in full now looks like this:
"timestamp": 1600448753.9288929}

"""
session.set_status('running')

# Initialize the counter
self._count=True
counter = 0
Expand Down Expand Up @@ -166,9 +162,6 @@ Our Agent in full now looks like this:
'last_updated': 1660249321.8729222}

"""
# Set operations status to 'running'
session.set_status('running')

# Print the text provided to the Agent logs
print(f"{params['text']}")

Expand Down
5 changes: 0 additions & 5 deletions docs/developer/writing_an_agent/publish.rst
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ Our Agent in full now looks like this:
f"is held by {self.lock.job}")
return False

session.set_status('running')

# Initialize last release time for lock
last_release = time.time()

Expand Down Expand Up @@ -197,9 +195,6 @@ Our Agent in full now looks like this:
f"is held by {self.lock.job}")
return False

# Set operations status to 'running'
session.set_status('running')

# Log the text provided to the Agent logs
self.log.info(f"{params['text']}")

Expand Down
6 changes: 0 additions & 6 deletions docs/developer/writing_an_agent/task.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ here:
'last_updated': 1660249321.8729222}

"""
# Set operations status to 'running'
session.set_status('running')

# Print the text provided
print(f"{params['text']}")

Expand Down Expand Up @@ -181,9 +178,6 @@ Our full Agent so far should look like this:
'last_updated': 1660249321.8729222}

"""
# Set operations status to 'running'
session.set_status('running')

# Print the text provided
print(f"{params['text']}")

Expand Down
10 changes: 0 additions & 10 deletions docs/developer/writing_an_agent/timeoutlock.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ Process would look like this:
f"{self.lock.job} is already running")
return False, "Could not acquire lock"

session.set_status('running')

# Initialize the counter
self._count=True
counter = 0
Expand All @@ -45,9 +43,6 @@ Process would look like this:
f"{self.lock.job} is already running")
return False, "Could not acquire lock"

# Set operations status to 'running'
session.set_status('running')

# Log the text provided to the Agent logs
self.log.info(f"{params['text']}")

Expand Down Expand Up @@ -148,8 +143,6 @@ Our Agent in full now looks like this:
f"is held by {self.lock.job}")
return False

session.set_status('running')

# Initialize last release time for lock
last_release = time.time()

Expand Down Expand Up @@ -208,9 +201,6 @@ Our Agent in full now looks like this:
f"is held by {self.lock.job}")
return False

# Set operations status to 'running'
session.set_status('running')

# Log the text provided to the Agent logs
self.log.info(f"{params['text']}")

Expand Down
2 changes: 0 additions & 2 deletions ocs/agents/aggregator/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ def record(self, session: ocs_agent.OpSession, params):
"last_block_received": "temps"}}}

"""
session.set_status('starting')
self.aggregate = True

try:
Expand All @@ -126,7 +125,6 @@ def record(self, session: ocs_agent.OpSession, params):
reactor.callFromThread(reactor.stop)
return False, "Aggregation not started"

session.set_status('running')
while self.aggregate:
time.sleep(self.loop_time)
aggregator.run()
Expand Down
5 changes: 0 additions & 5 deletions ocs/agents/barebones/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ def count(self, session, params):
+ f"is held by {self.lock.job}")
return False

session.set_status('running')

# Initialize last release time for lock
last_release = time.time()

Expand Down Expand Up @@ -135,9 +133,6 @@ def print(self, session, params):
+ f"is held by {self.lock.job}")
return False

# Set operations status to 'running'
session.set_status('running')

# Log the text provided to the Agent logs
self.log.info(f"{params['text']}")

Expand Down
18 changes: 14 additions & 4 deletions ocs/agents/fake_data/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ def set_job_done(self):
# Process functions.

@ocs_agent.param('test_mode', default=False, type=bool)
@ocs_agent.param('degradation_period', default=None, type=float)
def acq(self, session, params):
"""acq(test_mode=False)
"""acq(test_mode=False, degradation_period=None)

**Process** - Acquire data and write to the feed.

Parameters:
test_mode (bool, optional): Run the acq Process loop only once.
This is meant only for testing. Default is False.
degradation_period (float, optional): If set, then
alternately mark self as degraded / not degraded with
this period (in seconds).

Notes:
The most recent fake values are stored in the session data object in
Expand All @@ -80,7 +84,6 @@ def acq(self, session, params):
ok, msg = self.try_set_job('acq')
if not ok:
return ok, msg
session.set_status('running')

T = [.100 for c in self.channel_names]
block = ocs_feed.Block('temps', self.channel_names)
Expand All @@ -89,6 +92,10 @@ def acq(self, session, params):
reporting_interval = 1.
next_report = next_timestamp + reporting_interval

next_deg_flip = None
if params['degradation_period'] is not None:
next_deg_flip = 0

self.log.info("Starting acquisition")

while True:
Expand All @@ -101,6 +108,11 @@ def acq(self, session, params):
return 10

now = time.time()

if next_deg_flip is not None and now > next_deg_flip:
session.degraded = not session.degraded
next_deg_flip = now + params['degradation_period']

delay_time = next_report - now
if delay_time > 0:
time.sleep(min(delay_time, 1.))
Expand Down Expand Up @@ -170,7 +182,6 @@ def count_seconds(self, session, params):
# This process runs entirely in the reactor, as does its stop function.
session.data = {'counter': 0,
'last_update': time.time()}
session.set_status('running')
while session.status == 'running':
yield dsleep(1)
session.data['last_update'] = time.time()
Expand Down Expand Up @@ -233,7 +244,6 @@ def delay_task(self, session, params):

session.data = {'requested_delay': delay,
'delay_so_far': 0}
session.set_status('running')
t0 = time.time()
while session.status == 'running':
session.data['delay_so_far'] = time.time() - t0
Expand Down
3 changes: 0 additions & 3 deletions ocs/agents/host_manager/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,6 @@ def manager(self, session, params):
}

self.running = True
session.set_status('running')

if params['reload_config']:
self.database = {}
Expand Down Expand Up @@ -613,15 +612,13 @@ def update(self, session, params):
"""
if not self.running:
return False, 'Manager process is not running; params not updated.'
session.set_status('running')
if params['reload_config']:
yield self._reload_config(session)
self._process_target_states(session, params['requests'])
return True, 'Update requested.'

@inlineCallbacks
def die(self, session, params):
session.set_status('running')
if not self.running:
session.add_message('Manager process is not running.')
else:
Expand Down
2 changes: 0 additions & 2 deletions ocs/agents/influxdb_publisher/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ def record(self, session: ocs_agent.OpSession, params):
This is meant only for testing. Default is False.

"""
session.set_status('starting')
self.aggregate = True

self.log.debug("Instatiating Publisher class")
Expand All @@ -100,7 +99,6 @@ def record(self, session: ocs_agent.OpSession, params):
operate_callback=lambda: self.aggregate,
)

session.set_status('running')
while self.aggregate:
time.sleep(self.loop_time)
self.log.debug(f"Approx. queue size: {self.incoming_data.qsize()}")
Expand Down
3 changes: 0 additions & 3 deletions ocs/agents/registry/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,8 @@ def main(self, session: ocs_agent.OpSession, params):
}

"""

session.set_status('starting')
self._run = True

session.set_status('running')
last_publish = time.time()
while self._run:
yield dsleep(1)
Expand Down
18 changes: 15 additions & 3 deletions ocs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ class ResponseCode(Enum):
class OpCode(Enum):
"""Enumeration of OpSession "op_code" values.

The op_code corresponds to the session.status, except that if the
session.status == "done" then the op_code will be assigned a value
of either SUCCEEDED or FAILED based on session.success.
The op_code corresponds to the session.status, with the following
extensions:

- If the session.status == "done" then the op_code will be
assigned a value of either SUCCEEDED or FAILED based on
session.success.
- If the session.status == "running", and session.degraded is
True, then the op_code will be DEGRADED rather than RUNNING.

"""

Expand Down Expand Up @@ -76,3 +81,10 @@ class OpCode(Enum):
#: EXPIRED may used to mark session information as invalid in cases
#: where the state cannot be determined.
EXPIRED = 7

#: DEGRADED indicates that an operation meets the requirements for
#: state RUNNING, but is self-reporting as being in a problematic
#: state where it is unable to perform its primary functions (for
#: example, if a Process to operate some hardware is trying to
#: re-establish connection to that hardware).
DEGRADED = 8
Loading