diff --git a/docs/developer/clients.rst b/docs/developer/clients.rst index ce30ae6e..02c654d3 100644 --- a/docs/developer/clients.rst +++ b/docs/developer/clients.rst @@ -216,6 +216,32 @@ of a Task:: else: print('Task did not complete successfully') +Check Health of a Process +^^^^^^^^^^^^^^^^^^^^^^^^^ + +Usually it's good when a Process is still running, but some Processes +will also mark themselves as "degraded" if there is some non-fatal +problem that means the acquisition or control is not occurring:: + + from ocs.ocs_client import OCSClient + + client = OCSClient('agent-instance-id') + + client.some_process.start() + time.sleep(2) + status = client.some_process() + + if response.session['success'] is None: + if response.session['degraded']: + print('Process is running, but in a degraded state.') + else: + print('Process is running (and does not report degraded state).') + elif response.session['success']: + print('Process exited without error') + else: + print('Process exited with error') + + Check Latest Data in an Operation ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/developer/session-data.rst b/docs/developer/session-data.rst index 5aa89d7e..83034efc 100644 --- a/docs/developer/session-data.rst +++ b/docs/developer/session-data.rst @@ -56,7 +56,6 @@ Fake Data Agent as an example, specifically at its primary Process, """ ok, msg = self.try_set_job('acq') if not ok: return ok, msg - session.set_status('running') if params is None: params = {} diff --git a/docs/developer/writing_an_agent/logging.rst b/docs/developer/writing_an_agent/logging.rst index 4ec34fa7..9a35637e 100644 --- a/docs/developer/writing_an_agent/logging.rst +++ b/docs/developer/writing_an_agent/logging.rst @@ -96,8 +96,6 @@ Our Agent in full now looks like this: "timestamp":1600448753.9288929} """ - session.set_status('running') - # Initialize the counter self._count=True counter = 0 @@ -139,9 +137,6 @@ Our Agent in full now looks like this: 'last_updated': 1660249321.8729222} """ - # Set operations status to 'running' - session.set_status('running') - # Log the text provided to the Agent logs self.log.info(f"{params['text']}") diff --git a/docs/developer/writing_an_agent/process.rst b/docs/developer/writing_an_agent/process.rst index 15bfbdbe..a4c7096c 100644 --- a/docs/developer/writing_an_agent/process.rst +++ b/docs/developer/writing_an_agent/process.rst @@ -27,8 +27,6 @@ two more class methods: "timestamp": 1600448753.9288929} """ - session.set_status('running') - # Initialize the counter self._count=True counter = 0 @@ -123,8 +121,6 @@ Our Agent in full now looks like this: "timestamp": 1600448753.9288929} """ - session.set_status('running') - # Initialize the counter self._count=True counter = 0 @@ -166,9 +162,6 @@ Our Agent in full now looks like this: 'last_updated': 1660249321.8729222} """ - # Set operations status to 'running' - session.set_status('running') - # Print the text provided to the Agent logs print(f"{params['text']}") diff --git a/docs/developer/writing_an_agent/publish.rst b/docs/developer/writing_an_agent/publish.rst index 2486b816..51262569 100644 --- a/docs/developer/writing_an_agent/publish.rst +++ b/docs/developer/writing_an_agent/publish.rst @@ -128,8 +128,6 @@ Our Agent in full now looks like this: f"is held by {self.lock.job}") return False - session.set_status('running') - # Initialize last release time for lock last_release = time.time() @@ -197,9 +195,6 @@ Our Agent in full now looks like this: f"is held by {self.lock.job}") return False - # Set operations status to 'running' - session.set_status('running') - # Log the text provided to the Agent logs self.log.info(f"{params['text']}") diff --git a/docs/developer/writing_an_agent/task.rst b/docs/developer/writing_an_agent/task.rst index d5cb48b7..359d9e14 100644 --- a/docs/developer/writing_an_agent/task.rst +++ b/docs/developer/writing_an_agent/task.rst @@ -26,9 +26,6 @@ here: 'last_updated': 1660249321.8729222} """ - # Set operations status to 'running' - session.set_status('running') - # Print the text provided print(f"{params['text']}") @@ -181,9 +178,6 @@ Our full Agent so far should look like this: 'last_updated': 1660249321.8729222} """ - # Set operations status to 'running' - session.set_status('running') - # Print the text provided print(f"{params['text']}") diff --git a/docs/developer/writing_an_agent/timeoutlock.rst b/docs/developer/writing_an_agent/timeoutlock.rst index e303bb9b..be686ac0 100644 --- a/docs/developer/writing_an_agent/timeoutlock.rst +++ b/docs/developer/writing_an_agent/timeoutlock.rst @@ -19,8 +19,6 @@ Process would look like this: f"{self.lock.job} is already running") return False, "Could not acquire lock" - session.set_status('running') - # Initialize the counter self._count=True counter = 0 @@ -45,9 +43,6 @@ Process would look like this: f"{self.lock.job} is already running") return False, "Could not acquire lock" - # Set operations status to 'running' - session.set_status('running') - # Log the text provided to the Agent logs self.log.info(f"{params['text']}") @@ -148,8 +143,6 @@ Our Agent in full now looks like this: f"is held by {self.lock.job}") return False - session.set_status('running') - # Initialize last release time for lock last_release = time.time() @@ -208,9 +201,6 @@ Our Agent in full now looks like this: f"is held by {self.lock.job}") return False - # Set operations status to 'running' - session.set_status('running') - # Log the text provided to the Agent logs self.log.info(f"{params['text']}") diff --git a/ocs/agents/aggregator/agent.py b/ocs/agents/aggregator/agent.py index 3f16cb45..b1e2301a 100644 --- a/ocs/agents/aggregator/agent.py +++ b/ocs/agents/aggregator/agent.py @@ -110,7 +110,6 @@ def record(self, session: ocs_agent.OpSession, params): "last_block_received": "temps"}}} """ - session.set_status('starting') self.aggregate = True try: @@ -126,7 +125,6 @@ def record(self, session: ocs_agent.OpSession, params): reactor.callFromThread(reactor.stop) return False, "Aggregation not started" - session.set_status('running') while self.aggregate: time.sleep(self.loop_time) aggregator.run() diff --git a/ocs/agents/barebones/agent.py b/ocs/agents/barebones/agent.py index d1e5d5f6..706b0f74 100644 --- a/ocs/agents/barebones/agent.py +++ b/ocs/agents/barebones/agent.py @@ -66,8 +66,6 @@ def count(self, session, params): + f"is held by {self.lock.job}") return False - session.set_status('running') - # Initialize last release time for lock last_release = time.time() @@ -135,9 +133,6 @@ def print(self, session, params): + f"is held by {self.lock.job}") return False - # Set operations status to 'running' - session.set_status('running') - # Log the text provided to the Agent logs self.log.info(f"{params['text']}") diff --git a/ocs/agents/fake_data/agent.py b/ocs/agents/fake_data/agent.py index b2b746e7..b3929df0 100644 --- a/ocs/agents/fake_data/agent.py +++ b/ocs/agents/fake_data/agent.py @@ -51,14 +51,18 @@ def set_job_done(self): # Process functions. @ocs_agent.param('test_mode', default=False, type=bool) + @ocs_agent.param('degradation_period', default=None, type=float) def acq(self, session, params): - """acq(test_mode=False) + """acq(test_mode=False, degradation_period=None) **Process** - Acquire data and write to the feed. Parameters: test_mode (bool, optional): Run the acq Process loop only once. This is meant only for testing. Default is False. + degradation_period (float, optional): If set, then + alternately mark self as degraded / not degraded with + this period (in seconds). Notes: The most recent fake values are stored in the session data object in @@ -80,7 +84,6 @@ def acq(self, session, params): ok, msg = self.try_set_job('acq') if not ok: return ok, msg - session.set_status('running') T = [.100 for c in self.channel_names] block = ocs_feed.Block('temps', self.channel_names) @@ -89,6 +92,10 @@ def acq(self, session, params): reporting_interval = 1. next_report = next_timestamp + reporting_interval + next_deg_flip = None + if params['degradation_period'] is not None: + next_deg_flip = 0 + self.log.info("Starting acquisition") while True: @@ -101,6 +108,11 @@ def acq(self, session, params): return 10 now = time.time() + + if next_deg_flip is not None and now > next_deg_flip: + session.degraded = not session.degraded + next_deg_flip = now + params['degradation_period'] + delay_time = next_report - now if delay_time > 0: time.sleep(min(delay_time, 1.)) @@ -170,7 +182,6 @@ def count_seconds(self, session, params): # This process runs entirely in the reactor, as does its stop function. session.data = {'counter': 0, 'last_update': time.time()} - session.set_status('running') while session.status == 'running': yield dsleep(1) session.data['last_update'] = time.time() @@ -233,7 +244,6 @@ def delay_task(self, session, params): session.data = {'requested_delay': delay, 'delay_so_far': 0} - session.set_status('running') t0 = time.time() while session.status == 'running': session.data['delay_so_far'] = time.time() - t0 diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index fc7bd0f2..24bd5f01 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -491,7 +491,6 @@ def manager(self, session, params): } self.running = True - session.set_status('running') if params['reload_config']: self.database = {} @@ -613,7 +612,6 @@ def update(self, session, params): """ if not self.running: return False, 'Manager process is not running; params not updated.' - session.set_status('running') if params['reload_config']: yield self._reload_config(session) self._process_target_states(session, params['requests']) @@ -621,7 +619,6 @@ def update(self, session, params): @inlineCallbacks def die(self, session, params): - session.set_status('running') if not self.running: session.add_message('Manager process is not running.') else: diff --git a/ocs/agents/influxdb_publisher/agent.py b/ocs/agents/influxdb_publisher/agent.py index e35ef00a..6fd26572 100644 --- a/ocs/agents/influxdb_publisher/agent.py +++ b/ocs/agents/influxdb_publisher/agent.py @@ -87,7 +87,6 @@ def record(self, session: ocs_agent.OpSession, params): This is meant only for testing. Default is False. """ - session.set_status('starting') self.aggregate = True self.log.debug("Instatiating Publisher class") @@ -100,7 +99,6 @@ def record(self, session: ocs_agent.OpSession, params): operate_callback=lambda: self.aggregate, ) - session.set_status('running') while self.aggregate: time.sleep(self.loop_time) self.log.debug(f"Approx. queue size: {self.incoming_data.qsize()}") diff --git a/ocs/agents/registry/agent.py b/ocs/agents/registry/agent.py index f53d644d..4ead257f 100644 --- a/ocs/agents/registry/agent.py +++ b/ocs/agents/registry/agent.py @@ -194,11 +194,8 @@ def main(self, session: ocs_agent.OpSession, params): } """ - - session.set_status('starting') self._run = True - session.set_status('running') last_publish = time.time() while self._run: yield dsleep(1) diff --git a/ocs/base.py b/ocs/base.py index 277c0086..2418dcaa 100644 --- a/ocs/base.py +++ b/ocs/base.py @@ -38,9 +38,14 @@ class ResponseCode(Enum): class OpCode(Enum): """Enumeration of OpSession "op_code" values. - The op_code corresponds to the session.status, except that if the - session.status == "done" then the op_code will be assigned a value - of either SUCCEEDED or FAILED based on session.success. + The op_code corresponds to the session.status, with the following + extensions: + + - If the session.status == "done" then the op_code will be + assigned a value of either SUCCEEDED or FAILED based on + session.success. + - If the session.status == "running", and session.degraded is + True, then the op_code will be DEGRADED rather than RUNNING. """ @@ -76,3 +81,10 @@ class OpCode(Enum): #: EXPIRED may used to mark session information as invalid in cases #: where the state cannot be determined. EXPIRED = 7 + + #: DEGRADED indicates that an operation meets the requirements for + #: state RUNNING, but is self-reporting as being in a problematic + #: state where it is unable to perform its primary functions (for + #: example, if a Process to operate some hardware is trying to + #: re-establish connection to that hardware). + DEGRADED = 8 diff --git a/ocs/ocs_agent.py b/ocs/ocs_agent.py index efeefd82..5f17a002 100644 --- a/ocs/ocs_agent.py +++ b/ocs/ocs_agent.py @@ -432,13 +432,6 @@ def _management_handler(self, q, **kwargs): if q == 'get_agent_class': return self.class_name - def publish_status(self, message, session): - try: - self.publish(self.agent_address + '.feed', session.encoded()) - except TransportLost: - self.log.error('Unable to publish status. TransportLost. ' - + 'crossbar server likely unreachable.') - def register_task(self, name, func, aborter=None, blocking=True, aborter_blocking=None, startup=False): """Register a Task for this agent. @@ -683,8 +676,8 @@ def subscribe_on_start(self, handler, topic, options=None, force_subscribe=None) def _handle_task_return_val(self, *args, **kw): try: (ok, message), session = args - session.success = ok session.add_message(message) + session.success = ok session.set_status('done') except BaseException: print('Failed to decode _handle_task_return_val args:', @@ -766,16 +759,11 @@ def start(self, op_name, params=None): self.next_session_id += 1 self.sessions[op_name] = session - # Launch differently depending on whether op intends to - # block or not. - if op.blocking: - # Launch, soon, in a blockable worker thread. - session.d = threads.deferToThread(op.launcher, session, params) - else: - # Launch, soon, in the main reactor thread. - session.d = task.deferLater(reactor, 0, op.launcher, session, params) + # Schedule op to run (in worker thread or reactor) + session.d = op.launch_deferred(session, params) session.d.addCallback(self._handle_task_return_val, session) session.d.addErrback(self._handle_task_error, session) + return (ocs.OK, msg, session.encoded()) else: @@ -983,7 +971,27 @@ def status(self, op_name, params=None): return (ocs.ERROR, 'No task or process called "%s"' % op_name, {}) -class AgentTask: +class AgentOp: + def launch_deferred(self, session, params): + """Launch the operation using the launcher function, either in + a worker thread (self.blocking) or in the reactor (not + self.blocking). Return a Deferred. Prior to executing the + operation code, set session state to "running". + + """ + def _running_wrapper(session, params): + session.set_status('running') + return self.launcher(session, params) + + if self.blocking: + # Launch, soon, in a blockable worker thread. + return threads.deferToThread(_running_wrapper, session, params) + else: + # Launch, soon, in the main reactor thread. + return task.deferLater(reactor, 0, _running_wrapper, session, params) + + +class AgentTask(AgentOp): def __init__(self, launcher, blocking=None, aborter=None, aborter_blocking=None): self.launcher = launcher @@ -1004,7 +1012,7 @@ def encoded(self): } -class AgentProcess: +class AgentProcess(AgentOp): def __init__(self, launcher, stopper, blocking=None, stopper_blocking=None): self.launcher = launcher self.stopper = stopper @@ -1059,13 +1067,14 @@ class OpSession: """ - def __init__(self, session_id, op_name, status='starting', log_status=True, + def __init__(self, session_id, op_name, status='starting', app=None, purge_policy=None): # Note that some data members are used internally, while others are # communicated over WAMP to Agent control clients. self.messages = [] # entries are time-ordered (timestamp, text). self.data = {} # Operation-specific data structures. + self.degraded = False self.session_id = session_id self.op_name = op_name self.start_time = time.time() @@ -1075,7 +1084,7 @@ def __init__(self, session_id, op_name, status='starting', log_status=True, self.status = None # This has to be the last call since it depends on init... - self.set_status(status, log_status=log_status, timestamp=self.start_time) + self.set_status(status, timestamp=self.start_time) # Set up the log message purge. self.purge_policy = { @@ -1117,11 +1126,15 @@ def encoded(self): op_name : str The OCS Operation name. op_code : int - The OpCode, which combines information from status and - success; see :class:`ocs.base.OpCode`. + The OpCode, which combines information from status, success, + and degraded; see :class:`ocs.base.OpCode`. status : str The Operation run status (e.g. 'starting', 'done', ...). See :data:`ocs.ocs_agent.SESSION_STATUS_CODES`. + degraded: bool + A boolean flag (defaults to False) that an operation may set + to indicate that it is not achieving its primary function + (e.g. if it cannot establish connection to hardware). success : bool or None If the Operation Session has completed (`status == 'done'`), this indicates that the Operation was deemed successful. @@ -1198,6 +1211,7 @@ def json_safe(data, check_ok=False): 'op_name': self.op_name, 'op_code': self.op_code.value, 'status': self.status, + 'degraded': self.degraded, 'success': self.success, 'start_time': self.start_time, 'end_time': self.end_time, @@ -1212,22 +1226,26 @@ def op_code(self): """ if self.status is None: return OpCode.NONE - elif self.status in ['starting', 'running', 'stopping']: - return {'starting': OpCode.STARTING, 'running': OpCode.RUNNING, - 'stopping': OpCode.STOPPING}[self.status] + elif self.status == 'starting': + return OpCode.STARTING + elif self.status == 'running': + if self.degraded: + return OpCode.DEGRADED + else: + return OpCode.RUNNING + elif self.status == 'stopping': + return OpCode.STOPPING elif self.success: return OpCode.SUCCEEDED else: return OpCode.FAILED - def set_status(self, status, timestamp=None, log_status=True): + def set_status(self, status, timestamp=None): """Update the OpSession status and possibly post a message about it. Args: status (string): New value for status (see below). timestamp (float): timestamp for the operation. - log_status (bool): Determines whether change is logged in - message buffer. The possible values for status are: @@ -1249,27 +1267,35 @@ def set_status(self, status, timestamp=None, log_status=True): The only valid transitions are forward in the sequence [starting, running, stopping, done]; i.e. it is forbidden for the status of an OpSession to move from stopping to running. + + If this function is called from a worker thread, it will be + scheduled to run in the reactor, and will block until that is + complete. + """ if timestamp is None: timestamp = time.time() if not in_reactor_context(): - return reactor.callFromThread(self.set_status, status, - timestamp=timestamp, - log_status=log_status) + return threads.blockingCallFromThread(reactor, + self.set_status, status, + timestamp=timestamp) # Sanity check the status value. from_index = SESSION_STATUS_CODES.index(self.status) # current status valid? to_index = SESSION_STATUS_CODES.index(status) # new status valid? assert (to_index >= from_index) # Only forward moves in status are permitted. + if to_index == from_index: + return + self.status = status if status == 'done': self.end_time = timestamp - if log_status: - try: - self.add_message('Status is now "%s".' % status, timestamp=timestamp) - except (TransportLost, Disconnected): - self.app.log.error('setting session status to "{s}" failed. ' - + 'transport lost or disconnected', s=status) + + try: + self.add_message('Status is now "%s".' % status, timestamp=timestamp) + except (TransportLost, Disconnected): + self.app.log.error('setting session status to "{s}" failed. ' + + 'transport lost or disconnected', s=status) def add_message(self, message, timestamp=None): """Add a log message to the OpSession messages buffer. @@ -1287,7 +1313,6 @@ def add_message(self, message, timestamp=None): return reactor.callFromThread(self.add_message, message, timestamp=timestamp) self.messages.append((timestamp, message)) - self.app.publish_status('Message', self) # Make the app log this message, too. The op_name and # session_id are an important provenance prefix. self.app.log.info('%s:%i %s' % (self.op_name, self.session_id, message)) diff --git a/tests/agents/test_fakedata.py b/tests/agents/test_fakedata.py index f7d7b597..28f1fd1f 100644 --- a/tests/agents/test_fakedata.py +++ b/tests/agents/test_fakedata.py @@ -19,7 +19,8 @@ def test_fake_data_set_heartbeat(agent): def test_fake_data_acq(agent): session = create_session('acq') - params = {'test_mode': True} + params = {'test_mode': True, + 'degradation_period': None} res = agent.acq(session, params=params) assert res[0] is True diff --git a/tests/agents/util.py b/tests/agents/util.py index 284a6e2e..9e931855 100644 --- a/tests/agents/util.py +++ b/tests/agents/util.py @@ -33,7 +33,7 @@ def agent(): def create_session(op_name): """Create an OpSession with a mocked app for testing.""" mock_app = mock.MagicMock() - session = OpSession(1, op_name, app=mock_app) + session = OpSession(1, op_name, app=mock_app, status='running') return session