From e02af52f47b19120cd294575435625715bc9d723 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Mon, 8 Jan 2024 12:26:55 -0500 Subject: [PATCH 01/12] ocs_agent: set_status now blocks for completion if run in thread This means that set_status('running') won't return until the status has actually been updated to 'running'. --- ocs/ocs_agent.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index efeefd82..64cc3057 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -1249,13 +1249,19 @@ def set_status(self, status, timestamp=None, log_status=True): The only valid transitions are forward in the sequence [starting, running, stopping, done]; i.e. it is forbidden for the status of an OpSession to move from stopping to running. + + If this function is called from a worker thread, it will be + scheduled to run in the reactor, and will block until that is + complete. + """ if timestamp is None: timestamp = time.time() if not in_reactor_context(): - return reactor.callFromThread(self.set_status, status, - timestamp=timestamp, - log_status=log_status) + return threads.blockingCallFromThread(reactor, + self.set_status, status, + timestamp=timestamp, + log_status=log_status) # Sanity check the status value. from_index = SESSION_STATUS_CODES.index(self.status) # current status valid? to_index = SESSION_STATUS_CODES.index(status) # new status valid? From 55813729ed16a621680615864bd4ea51166f1ec7 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Mon, 8 Jan 2024 12:43:00 -0500 Subject: [PATCH 02/12] ocs_agent: don't push half-baked session to .feed --- ocs/ocs_agent.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index 64cc3057..f3e37d8e 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -683,8 +683,9 @@ def subscribe_on_start(self, handler, topic, options=None, force_subscribe=None) def _handle_task_return_val(self, *args, **kw): try: (ok, message), session = args + # Add message to queue but don't publish -- set_status will publish. + session.add_message(message, publish_now=False) session.success = ok - session.add_message(message) session.set_status('done') except BaseException: print('Failed to decode _handle_task_return_val args:', @@ -698,7 +699,7 @@ def _handle_task_error(self, *args, **kw): message = 'ERROR: {}'.format(ex.getErrorMessage()) else: message = 'CRASH: %s' % str(ex) - session.add_message(message) + session.add_message(message, publish_now=False) session.success = False session.set_status('done') except BaseException: @@ -1277,7 +1278,7 @@ def set_status(self, status, timestamp=None, log_status=True): self.app.log.error('setting session status to "{s}" failed. ' + 'transport lost or disconnected', s=status) - def add_message(self, message, timestamp=None): + def add_message(self, message, timestamp=None, publish_now=True): """Add a log message to the OpSession messages buffer. Args: @@ -1285,6 +1286,9 @@ def add_message(self, message, timestamp=None): timestamp (float): timestamp to tag the message. The default, which is None, will cause the timestamp to be computed here and should be used in most cases. + publish_now (bool): if True, publish_status is called to + alert any listeners of the new log message. + Otherwise, it isn't. """ if timestamp is None: @@ -1293,7 +1297,8 @@ def add_message(self, message, timestamp=None): return reactor.callFromThread(self.add_message, message, timestamp=timestamp) self.messages.append((timestamp, message)) - self.app.publish_status('Message', self) + if publish_now: + self.app.publish_status('Message', self) # Make the app log this message, too. The op_name and # session_id are an important provenance prefix. self.app.log.info('%s:%i %s' % (self.op_name, self.session_id, message)) From e7f2ff093b9230e4438c6fac694002d8107c818b Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Mon, 8 Jan 2024 12:44:09 -0500 Subject: [PATCH 03/12] ocs_agent: actually, no one uses .feed so discontinue it --- ocs/ocs_agent.py | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index f3e37d8e..dda0f494 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -432,13 +432,6 @@ def _management_handler(self, q, **kwargs): if q == 'get_agent_class': return self.class_name - def publish_status(self, message, session): - try: - self.publish(self.agent_address + '.feed', session.encoded()) - except TransportLost: - self.log.error('Unable to publish status. TransportLost. ' - + 'crossbar server likely unreachable.') - def register_task(self, name, func, aborter=None, blocking=True, aborter_blocking=None, startup=False): """Register a Task for this agent. @@ -683,8 +676,7 @@ def subscribe_on_start(self, handler, topic, options=None, force_subscribe=None) def _handle_task_return_val(self, *args, **kw): try: (ok, message), session = args - # Add message to queue but don't publish -- set_status will publish. - session.add_message(message, publish_now=False) + session.add_message(message) session.success = ok session.set_status('done') except BaseException: @@ -699,7 +691,7 @@ def _handle_task_error(self, *args, **kw): message = 'ERROR: {}'.format(ex.getErrorMessage()) else: message = 'CRASH: %s' % str(ex) - session.add_message(message, publish_now=False) + session.add_message(message) session.success = False session.set_status('done') except BaseException: @@ -1278,7 +1270,7 @@ def set_status(self, status, timestamp=None, log_status=True): self.app.log.error('setting session status to "{s}" failed. ' + 'transport lost or disconnected', s=status) - def add_message(self, message, timestamp=None, publish_now=True): + def add_message(self, message, timestamp=None): """Add a log message to the OpSession messages buffer. Args: @@ -1286,9 +1278,6 @@ def add_message(self, message, timestamp=None, publish_now=True): timestamp (float): timestamp to tag the message. The default, which is None, will cause the timestamp to be computed here and should be used in most cases. - publish_now (bool): if True, publish_status is called to - alert any listeners of the new log message. - Otherwise, it isn't. """ if timestamp is None: @@ -1297,8 +1286,6 @@ def add_message(self, message, timestamp=None, publish_now=True): return reactor.callFromThread(self.add_message, message, timestamp=timestamp) self.messages.append((timestamp, message)) - if publish_now: - self.app.publish_status('Message', self) # Make the app log this message, too. The op_name and # session_id are an important provenance prefix. self.app.log.info('%s:%i %s' % (self.op_name, self.session_id, message)) From f75b2255c7b4d09a03ca6a562eb2b112638583e3 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Mon, 8 Jan 2024 14:48:52 -0500 Subject: [PATCH 04/12] ocs_agent: ops enter agent code with status='running' This removes the need to set_status('running') "manually" at the start of every op. --- ocs/ocs_agent.py | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index dda0f494..b0abbfb1 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -759,16 +759,11 @@ def start(self, op_name, params=None): self.next_session_id += 1 self.sessions[op_name] = session - # Launch differently depending on whether op intends to - # block or not. - if op.blocking: - # Launch, soon, in a blockable worker thread. - session.d = threads.deferToThread(op.launcher, session, params) - else: - # Launch, soon, in the main reactor thread. - session.d = task.deferLater(reactor, 0, op.launcher, session, params) + # Schedule op to run (in worker thread or reactor) + session.d = op.launch_deferred(session, params) session.d.addCallback(self._handle_task_return_val, session) session.d.addErrback(self._handle_task_error, session) + return (ocs.OK, msg, session.encoded()) else: @@ -976,7 +971,27 @@ def status(self, op_name, params=None): return (ocs.ERROR, 'No task or process called "%s"' % op_name, {}) -class AgentTask: +class AgentOp: + def launch_deferred(self, session, params): + """Launch the operation using the launcher function, either in + a worker thread (self.blocking) or in the reactor (not + self.blocking). Return a Deferred. Prior to executing the + operation code, set session state to "running". + + """ + def _running_wrapper(session, params): + session.set_status('running') + return self.launcher(session, params) + + if self.blocking: + # Launch, soon, in a blockable worker thread. + return threads.deferToThread(_running_wrapper, session, params) + else: + # Launch, soon, in the main reactor thread. + return task.deferLater(reactor, 0, _running_wrapper, session, params) + + +class AgentTask(AgentOp): def __init__(self, launcher, blocking=None, aborter=None, aborter_blocking=None): self.launcher = launcher @@ -997,7 +1012,7 @@ def encoded(self): } -class AgentProcess: +class AgentProcess(AgentOp): def __init__(self, launcher, stopper, blocking=None, stopper_blocking=None): self.launcher = launcher self.stopper = stopper @@ -1260,6 +1275,8 @@ def set_status(self, status, timestamp=None, log_status=True): to_index = SESSION_STATUS_CODES.index(status) # new status valid? assert (to_index >= from_index) # Only forward moves in status are permitted. + log_status = log_status and (to_index > from_index) + self.status = status if status == 'done': self.end_time = timestamp From ad3b2ee65c4b985499a734a13ea903ebcda0724e Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Mon, 8 Jan 2024 14:51:10 -0500 Subject: [PATCH 05/12] ocs_agent: remove unused log_status option from set_status --- ocs/ocs_agent.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index b0abbfb1..8a554ae2 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -1067,7 +1067,7 @@ class OpSession: """ - def __init__(self, session_id, op_name, status='starting', log_status=True, + def __init__(self, session_id, op_name, status='starting', app=None, purge_policy=None): # Note that some data members are used internally, while others are # communicated over WAMP to Agent control clients. @@ -1083,7 +1083,7 @@ def __init__(self, session_id, op_name, status='starting', log_status=True, self.status = None # This has to be the last call since it depends on init... - self.set_status(status, log_status=log_status, timestamp=self.start_time) + self.set_status(status, timestamp=self.start_time) # Set up the log message purge. self.purge_policy = { @@ -1228,14 +1228,12 @@ def op_code(self): else: return OpCode.FAILED - def set_status(self, status, timestamp=None, log_status=True): + def set_status(self, status, timestamp=None): """Update the OpSession status and possibly post a message about it. Args: status (string): New value for status (see below). timestamp (float): timestamp for the operation. - log_status (bool): Determines whether change is logged in - message buffer. The possible values for status are: @@ -1268,24 +1266,24 @@ def set_status(self, status, timestamp=None, log_status=True): if not in_reactor_context(): return threads.blockingCallFromThread(reactor, self.set_status, status, - timestamp=timestamp, - log_status=log_status) + timestamp=timestamp) # Sanity check the status value. from_index = SESSION_STATUS_CODES.index(self.status) # current status valid? to_index = SESSION_STATUS_CODES.index(status) # new status valid? assert (to_index >= from_index) # Only forward moves in status are permitted. - log_status = log_status and (to_index > from_index) + if to_index == from_index: + return self.status = status if status == 'done': self.end_time = timestamp - if log_status: - try: - self.add_message('Status is now "%s".' % status, timestamp=timestamp) - except (TransportLost, Disconnected): - self.app.log.error('setting session status to "{s}" failed. ' - + 'transport lost or disconnected', s=status) + + try: + self.add_message('Status is now "%s".' % status, timestamp=timestamp) + except (TransportLost, Disconnected): + self.app.log.error('setting session status to "{s}" failed. ' + + 'transport lost or disconnected', s=status) def add_message(self, message, timestamp=None): """Add a log message to the OpSession messages buffer. From e051ef37d95b3662acd32eb3292f78739d6fd0d7 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Mon, 8 Jan 2024 14:55:47 -0500 Subject: [PATCH 06/12] agents: remove unnecessary session.set_status() calls Mostly "running" but also some "starting" --- ocs/agents/aggregator/agent.py | 2 -- ocs/agents/barebones/agent.py | 5 ----- ocs/agents/fake_data/agent.py | 3 --- ocs/agents/host_manager/agent.py | 3 --- ocs/agents/influxdb_publisher/agent.py | 2 -- ocs/agents/registry/agent.py | 3 --- 6 files changed, 18 deletions(-) diff --git a/ocs/agents/aggregator/agent.py b/ocs/agents/aggregator/agent.py index 3f16cb45..b1e2301a 100644 --- a/ocs/agents/aggregator/agent.py +++ b/ocs/agents/aggregator/agent.py @@ -110,7 +110,6 @@ def record(self, session: ocs_agent.OpSession, params): "last_block_received": "temps"}}} """ - session.set_status('starting') self.aggregate = True try: @@ -126,7 +125,6 @@ def record(self, session: ocs_agent.OpSession, params): reactor.callFromThread(reactor.stop) return False, "Aggregation not started" - session.set_status('running') while self.aggregate: time.sleep(self.loop_time) aggregator.run() diff --git a/ocs/agents/barebones/agent.py b/ocs/agents/barebones/agent.py index d1e5d5f6..706b0f74 100644 --- a/ocs/agents/barebones/agent.py +++ b/ocs/agents/barebones/agent.py @@ -66,8 +66,6 @@ def count(self, session, params): + f"is held by {self.lock.job}") return False - session.set_status('running') - # Initialize last release time for lock last_release = time.time() @@ -135,9 +133,6 @@ def print(self, session, params): + f"is held by {self.lock.job}") return False - # Set operations status to 'running' - session.set_status('running') - # Log the text provided to the Agent logs self.log.info(f"{params['text']}") diff --git a/ocs/agents/fake_data/agent.py b/ocs/agents/fake_data/agent.py index b2b746e7..cd47d796 100644 --- a/ocs/agents/fake_data/agent.py +++ b/ocs/agents/fake_data/agent.py @@ -80,7 +80,6 @@ def acq(self, session, params): ok, msg = self.try_set_job('acq') if not ok: return ok, msg - session.set_status('running') T = [.100 for c in self.channel_names] block = ocs_feed.Block('temps', self.channel_names) @@ -170,7 +169,6 @@ def count_seconds(self, session, params): # This process runs entirely in the reactor, as does its stop function. session.data = {'counter': 0, 'last_update': time.time()} - session.set_status('running') while session.status == 'running': yield dsleep(1) session.data['last_update'] = time.time() @@ -233,7 +231,6 @@ def delay_task(self, session, params): session.data = {'requested_delay': delay, 'delay_so_far': 0} - session.set_status('running') t0 = time.time() while session.status == 'running': session.data['delay_so_far'] = time.time() - t0 diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index fc7bd0f2..24bd5f01 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -491,7 +491,6 @@ def manager(self, session, params): } self.running = True - session.set_status('running') if params['reload_config']: self.database = {} @@ -613,7 +612,6 @@ def update(self, session, params): """ if not self.running: return False, 'Manager process is not running; params not updated.' - session.set_status('running') if params['reload_config']: yield self._reload_config(session) self._process_target_states(session, params['requests']) @@ -621,7 +619,6 @@ def update(self, session, params): @inlineCallbacks def die(self, session, params): - session.set_status('running') if not self.running: session.add_message('Manager process is not running.') else: diff --git a/ocs/agents/influxdb_publisher/agent.py b/ocs/agents/influxdb_publisher/agent.py index e35ef00a..6fd26572 100644 --- a/ocs/agents/influxdb_publisher/agent.py +++ b/ocs/agents/influxdb_publisher/agent.py @@ -87,7 +87,6 @@ def record(self, session: ocs_agent.OpSession, params): This is meant only for testing. Default is False. """ - session.set_status('starting') self.aggregate = True self.log.debug("Instatiating Publisher class") @@ -100,7 +99,6 @@ def record(self, session: ocs_agent.OpSession, params): operate_callback=lambda: self.aggregate, ) - session.set_status('running') while self.aggregate: time.sleep(self.loop_time) self.log.debug(f"Approx. queue size: {self.incoming_data.qsize()}") diff --git a/ocs/agents/registry/agent.py b/ocs/agents/registry/agent.py index f53d644d..4ead257f 100644 --- a/ocs/agents/registry/agent.py +++ b/ocs/agents/registry/agent.py @@ -194,11 +194,8 @@ def main(self, session: ocs_agent.OpSession, params): } """ - - session.set_status('starting') self._run = True - session.set_status('running') last_publish = time.time() while self._run: yield dsleep(1) From 9659faf70a27b1686f76876455ba7b48367662d2 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Mon, 8 Jan 2024 14:59:16 -0500 Subject: [PATCH 07/12] Update docs to remove examples of set_status('running') --- docs/developer/session-data.rst | 1 - docs/developer/writing_an_agent/logging.rst | 5 ----- docs/developer/writing_an_agent/process.rst | 7 ------- docs/developer/writing_an_agent/publish.rst | 5 ----- docs/developer/writing_an_agent/task.rst | 6 ------ docs/developer/writing_an_agent/timeoutlock.rst | 10 ---------- 6 files changed, 34 deletions(-) diff --git a/docs/developer/session-data.rst b/docs/developer/session-data.rst index 5aa89d7e..83034efc 100644 --- a/docs/developer/session-data.rst +++ b/docs/developer/session-data.rst @@ -56,7 +56,6 @@ Fake Data Agent as an example, specifically at its primary Process, """ ok, msg = self.try_set_job('acq') if not ok: return ok, msg - session.set_status('running') if params is None: params = {} diff --git a/docs/developer/writing_an_agent/logging.rst b/docs/developer/writing_an_agent/logging.rst index 4ec34fa7..9a35637e 100644 --- a/docs/developer/writing_an_agent/logging.rst +++ b/docs/developer/writing_an_agent/logging.rst @@ -96,8 +96,6 @@ Our Agent in full now looks like this: "timestamp":1600448753.9288929} """ - session.set_status('running') - # Initialize the counter self._count=True counter = 0 @@ -139,9 +137,6 @@ Our Agent in full now looks like this: 'last_updated': 1660249321.8729222} """ - # Set operations status to 'running' - session.set_status('running') - # Log the text provided to the Agent logs self.log.info(f"{params['text']}") diff --git a/docs/developer/writing_an_agent/process.rst b/docs/developer/writing_an_agent/process.rst index 15bfbdbe..a4c7096c 100644 --- a/docs/developer/writing_an_agent/process.rst +++ b/docs/developer/writing_an_agent/process.rst @@ -27,8 +27,6 @@ two more class methods: "timestamp": 1600448753.9288929} """ - session.set_status('running') - # Initialize the counter self._count=True counter = 0 @@ -123,8 +121,6 @@ Our Agent in full now looks like this: "timestamp": 1600448753.9288929} """ - session.set_status('running') - # Initialize the counter self._count=True counter = 0 @@ -166,9 +162,6 @@ Our Agent in full now looks like this: 'last_updated': 1660249321.8729222} """ - # Set operations status to 'running' - session.set_status('running') - # Print the text provided to the Agent logs print(f"{params['text']}") diff --git a/docs/developer/writing_an_agent/publish.rst b/docs/developer/writing_an_agent/publish.rst index 2486b816..51262569 100644 --- a/docs/developer/writing_an_agent/publish.rst +++ b/docs/developer/writing_an_agent/publish.rst @@ -128,8 +128,6 @@ Our Agent in full now looks like this: f"is held by {self.lock.job}") return False - session.set_status('running') - # Initialize last release time for lock last_release = time.time() @@ -197,9 +195,6 @@ Our Agent in full now looks like this: f"is held by {self.lock.job}") return False - # Set operations status to 'running' - session.set_status('running') - # Log the text provided to the Agent logs self.log.info(f"{params['text']}") diff --git a/docs/developer/writing_an_agent/task.rst b/docs/developer/writing_an_agent/task.rst index d5cb48b7..359d9e14 100644 --- a/docs/developer/writing_an_agent/task.rst +++ b/docs/developer/writing_an_agent/task.rst @@ -26,9 +26,6 @@ here: 'last_updated': 1660249321.8729222} """ - # Set operations status to 'running' - session.set_status('running') - # Print the text provided print(f"{params['text']}") @@ -181,9 +178,6 @@ Our full Agent so far should look like this: 'last_updated': 1660249321.8729222} """ - # Set operations status to 'running' - session.set_status('running') - # Print the text provided print(f"{params['text']}") diff --git a/docs/developer/writing_an_agent/timeoutlock.rst b/docs/developer/writing_an_agent/timeoutlock.rst index e303bb9b..be686ac0 100644 --- a/docs/developer/writing_an_agent/timeoutlock.rst +++ b/docs/developer/writing_an_agent/timeoutlock.rst @@ -19,8 +19,6 @@ Process would look like this: f"{self.lock.job} is already running") return False, "Could not acquire lock" - session.set_status('running') - # Initialize the counter self._count=True counter = 0 @@ -45,9 +43,6 @@ Process would look like this: f"{self.lock.job} is already running") return False, "Could not acquire lock" - # Set operations status to 'running' - session.set_status('running') - # Log the text provided to the Agent logs self.log.info(f"{params['text']}") @@ -148,8 +143,6 @@ Our Agent in full now looks like this: f"is held by {self.lock.job}") return False - session.set_status('running') - # Initialize last release time for lock last_release = time.time() @@ -208,9 +201,6 @@ Our Agent in full now looks like this: f"is held by {self.lock.job}") return False - # Set operations status to 'running' - session.set_status('running') - # Log the text provided to the Agent logs self.log.info(f"{params['text']}") From c2864426c3e0a93e0524f1006ad349f5c7dcefdd Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Mon, 8 Jan 2024 15:09:51 -0500 Subject: [PATCH 08/12] For tests, OpSession should init in state 'running' --- tests/agents/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/agents/util.py b/tests/agents/util.py index 284a6e2e..9e931855 100644 --- a/tests/agents/util.py +++ b/tests/agents/util.py @@ -33,7 +33,7 @@ def agent(): def create_session(op_name): """Create an OpSession with a mocked app for testing.""" mock_app = mock.MagicMock() - session = OpSession(1, op_name, app=mock_app) + session = OpSession(1, op_name, app=mock_app, status='running') return session From 4f6cd76a50dd07344211d2af8923c4e2e0bf190d Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Mon, 8 Jan 2024 15:33:54 -0500 Subject: [PATCH 09/12] Extend OpCode to report "DEGRADED" state under certain conditions --- ocs/base.py | 19 ++++++++++++++++--- ocs/ocs_agent.py | 7 ++++++- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/ocs/base.py b/ocs/base.py index 277c0086..547c9fbf 100644 --- a/ocs/base.py +++ b/ocs/base.py @@ -38,9 +38,15 @@ class ResponseCode(Enum): class OpCode(Enum): """Enumeration of OpSession "op_code" values. - The op_code corresponds to the session.status, except that if the - session.status == "done" then the op_code will be assigned a value - of either SUCCEEDED or FAILED based on session.success. + The op_code corresponds to the session.status, with the following + extensions: + + - If the session.status == "done" then the op_code will be + assigned a value of either SUCCEEDED or FAILED based on + session.success. + - If the session.status == "running", and the session.data + contains a key "degraded" that evaluate to True, then the + op_code will be DEGRADED rather than RUNNING. """ @@ -76,3 +82,10 @@ class OpCode(Enum): #: EXPIRED may used to mark session information as invalid in cases #: where the state cannot be determined. EXPIRED = 7 + + #: DEGRADED indicates that an operation meets the requirements for + #: state RUNNING, but is self-reporting as being in a problematic + #: state where it is unable to perform its primary functions (for + #: example, if a Process to operate some hardware is trying to + #: re-establish connection to that hardware). + DEGRADED = 8 diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index 8a554ae2..13ed6b2b 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -1220,9 +1220,14 @@ def op_code(self): """ if self.status is None: return OpCode.NONE - elif self.status in ['starting', 'running', 'stopping']: + elif self.status in ['starting', 'stopping']: return {'starting': OpCode.STARTING, 'running': OpCode.RUNNING, 'stopping': OpCode.STOPPING}[self.status] + elif self.status == 'running': + if self.data.get('degraded', False): + return OpCode.DEGRADED + else: + return OpCode.RUNNING elif self.success: return OpCode.SUCCEEDED else: From abeb233b2cc8e1aff85489eccbb402862c3c9e4e Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Mon, 8 Jan 2024 15:34:19 -0500 Subject: [PATCH 10/12] FakeDataAgent: acq method can mark self as degraded --- ocs/agents/fake_data/agent.py | 18 +++++++++++++++++- tests/agents/test_fakedata.py | 3 ++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/ocs/agents/fake_data/agent.py b/ocs/agents/fake_data/agent.py index cd47d796..fdde6927 100644 --- a/ocs/agents/fake_data/agent.py +++ b/ocs/agents/fake_data/agent.py @@ -51,14 +51,18 @@ def set_job_done(self): # Process functions. @ocs_agent.param('test_mode', default=False, type=bool) + @ocs_agent.param('degradation_period', default=None, type=float) def acq(self, session, params): - """acq(test_mode=False) + """acq(test_mode=False, degradation_period=None) **Process** - Acquire data and write to the feed. Parameters: test_mode (bool, optional): Run the acq Process loop only once. This is meant only for testing. Default is False. + degradation_period (float, optional): If set, then + alternately mark self as degraded / not degraded with + this period (in seconds). Notes: The most recent fake values are stored in the session data object in @@ -88,6 +92,12 @@ def acq(self, session, params): reporting_interval = 1. next_report = next_timestamp + reporting_interval + is_degraded = False + next_deg_flip = None + if params['degradation_period'] is not None: + next_deg_flip = 0 + session.data['degraded'] = is_degraded + self.log.info("Starting acquisition") while True: @@ -100,6 +110,12 @@ def acq(self, session, params): return 10 now = time.time() + + if next_deg_flip is not None and now > next_deg_flip: + is_degraded = not is_degraded + next_deg_flip = now + params['degradation_period'] + session.data['degraded'] = is_degraded + delay_time = next_report - now if delay_time > 0: time.sleep(min(delay_time, 1.)) diff --git a/tests/agents/test_fakedata.py b/tests/agents/test_fakedata.py index f7d7b597..28f1fd1f 100644 --- a/tests/agents/test_fakedata.py +++ b/tests/agents/test_fakedata.py @@ -19,7 +19,8 @@ def test_fake_data_set_heartbeat(agent): def test_fake_data_acq(agent): session = create_session('acq') - params = {'test_mode': True} + params = {'test_mode': True, + 'degradation_period': None} res = agent.acq(session, params=params) assert res[0] is True From 73e2b2a81102f21dac9c3019cddd1a1ecc5b0151 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Wed, 10 Jan 2024 15:02:47 -0500 Subject: [PATCH 11/12] Use session.degraded instead of session.data['degraded'] --- ocs/agents/fake_data/agent.py | 5 +---- ocs/base.py | 5 ++--- ocs/ocs_agent.py | 19 +++++++++++++------ 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/ocs/agents/fake_data/agent.py b/ocs/agents/fake_data/agent.py index fdde6927..b3929df0 100644 --- a/ocs/agents/fake_data/agent.py +++ b/ocs/agents/fake_data/agent.py @@ -92,11 +92,9 @@ def acq(self, session, params): reporting_interval = 1. next_report = next_timestamp + reporting_interval - is_degraded = False next_deg_flip = None if params['degradation_period'] is not None: next_deg_flip = 0 - session.data['degraded'] = is_degraded self.log.info("Starting acquisition") @@ -112,9 +110,8 @@ def acq(self, session, params): now = time.time() if next_deg_flip is not None and now > next_deg_flip: - is_degraded = not is_degraded + session.degraded = not session.degraded next_deg_flip = now + params['degradation_period'] - session.data['degraded'] = is_degraded delay_time = next_report - now if delay_time > 0: diff --git a/ocs/base.py b/ocs/base.py index 547c9fbf..2418dcaa 100644 --- a/ocs/base.py +++ b/ocs/base.py @@ -44,9 +44,8 @@ class OpCode(Enum): - If the session.status == "done" then the op_code will be assigned a value of either SUCCEEDED or FAILED based on session.success. - - If the session.status == "running", and the session.data - contains a key "degraded" that evaluate to True, then the - op_code will be DEGRADED rather than RUNNING. + - If the session.status == "running", and session.degraded is + True, then the op_code will be DEGRADED rather than RUNNING. """ diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index 13ed6b2b..5f17a002 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -1074,6 +1074,7 @@ def __init__(self, session_id, op_name, status='starting', self.messages = [] # entries are time-ordered (timestamp, text). self.data = {} # Operation-specific data structures. + self.degraded = False self.session_id = session_id self.op_name = op_name self.start_time = time.time() @@ -1125,11 +1126,15 @@ def encoded(self): op_name : str The OCS Operation name. op_code : int - The OpCode, which combines information from status and - success; see :class:`ocs.base.OpCode`. + The OpCode, which combines information from status, success, + and degraded; see :class:`ocs.base.OpCode`. status : str The Operation run status (e.g. 'starting', 'done', ...). See :data:`ocs.ocs_agent.SESSION_STATUS_CODES`. + degraded: bool + A boolean flag (defaults to False) that an operation may set + to indicate that it is not achieving its primary function + (e.g. if it cannot establish connection to hardware). success : bool or None If the Operation Session has completed (`status == 'done'`), this indicates that the Operation was deemed successful. @@ -1206,6 +1211,7 @@ def json_safe(data, check_ok=False): 'op_name': self.op_name, 'op_code': self.op_code.value, 'status': self.status, + 'degraded': self.degraded, 'success': self.success, 'start_time': self.start_time, 'end_time': self.end_time, @@ -1220,14 +1226,15 @@ def op_code(self): """ if self.status is None: return OpCode.NONE - elif self.status in ['starting', 'stopping']: - return {'starting': OpCode.STARTING, 'running': OpCode.RUNNING, - 'stopping': OpCode.STOPPING}[self.status] + elif self.status == 'starting': + return OpCode.STARTING elif self.status == 'running': - if self.data.get('degraded', False): + if self.degraded: return OpCode.DEGRADED else: return OpCode.RUNNING + elif self.status == 'stopping': + return OpCode.STOPPING elif self.success: return OpCode.SUCCEEDED else: From fb11e5a13b131ce9b22994f396396f8631b25431 Mon Sep 17 00:00:00 2001 From: Matthew Hasselfield Date: Mon, 22 Jan 2024 14:03:06 -0500 Subject: [PATCH 12/12] docs: show client checking for degraded process --- docs/developer/clients.rst | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/docs/developer/clients.rst b/docs/developer/clients.rst index ce30ae6e..02c654d3 100644 --- a/docs/developer/clients.rst +++ b/docs/developer/clients.rst @@ -216,6 +216,32 @@ of a Task:: else: print('Task did not complete successfully') +Check Health of a Process +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Usually it's good when a Process is still running, but some Processes +will also mark themselves as "degraded" if there is some non-fatal +problem that means the acquisition or control is not occurring:: + + from ocs.ocs_client import OCSClient + + client = OCSClient('agent-instance-id') + + client.some_process.start() + time.sleep(2) + status = client.some_process() + + if response.session['success'] is None: + if response.session['degraded']: + print('Process is running, but in a degraded state.') + else: + print('Process is running (and does not report degraded state).') + elif response.session['success']: + print('Process exited without error') + else: + print('Process exited with error') + + Check Latest Data in an Operation ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^