Skip to content

Commit

Permalink
Merge pull request #221 from simonsobs/agent-docs-best-practices
Browse files Browse the repository at this point in the history
Write Agent documentation best practices
  • Loading branch information
BrianJKoopman authored Sep 10, 2021
2 parents e9eb428 + 7117c25 commit 2d1f1c6
Show file tree
Hide file tree
Showing 14 changed files with 597 additions and 249 deletions.
53 changes: 28 additions & 25 deletions agents/aggregator/aggregator_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,16 @@ def __init__(self, agent, args):
# SUBSCRIBES TO ALL FEEDS!!!!
# If this ends up being too much data, we can add a tag '.record'
# at the end of the address of recorded feeds, and filter by that.
self.agent.subscribe_on_start(self.enqueue_incoming_data,
self.agent.subscribe_on_start(self._enqueue_incoming_data,
'observatory..feeds.',
options={'match': 'wildcard'})

record_on_start = (args.initial_state == 'record')
self.agent.register_process('record',
self.start_aggregate, self.stop_aggregate,
self.record, self._stop_record,
startup=record_on_start)

def enqueue_incoming_data(self, _data):
def _enqueue_incoming_data(self, _data):
"""
Data handler for all feeds. This checks to see if the feeds should
be recorded, and if they are it puts them into the incoming_data queue
Expand All @@ -76,27 +76,30 @@ def enqueue_incoming_data(self, _data):
self.incoming_data.put((data, feed))
self.log.debug("Enqueued {d} from Feed {f}", d=data, f=feed)

def start_aggregate(self, session: ocs_agent.OpSession, params=None):
"""
Process for starting data aggregation. This process will create an
Aggregator instance, which will collect and write provider data to disk
as long as this process is running.
The most recent file and active providers will be returned in
session.data::
{"current_file": "/data/16020/1602089117.g3",
"providers": {
"observatory.fake-data1.feeds.false_temperatures": {
"last_refresh": 1602089118.8225083,
"sessid": "1602088928.8294137",
"stale": false,
"last_block_received": "temps"},
"observatory.LSSIM.feeds.temperatures": {
"last_refresh": 1602089118.8223345,
"sessid": "1602088932.335811",
"stale": false,
"last_block_received": "temps"}}}
def record(self, session: ocs_agent.OpSession, params):
"""record()
**Process** - This process will create an Aggregator instance, which
will collect and write provider data to disk as long as this process is
running.
Notes:
The most recent file and active providers will be returned in the
session data::
>>> response.session['data']
{"current_file": "/data/16020/1602089117.g3",
"providers": {
"observatory.fake-data1.feeds.false_temperatures": {
"last_refresh": 1602089118.8225083,
"sessid": "1602088928.8294137",
"stale": false,
"last_block_received": "temps"},
"observatory.LSSIM.feeds.temperatures": {
"last_refresh": 1602089118.8223345,
"sessid": "1602088932.335811",
"stale": false,
"last_block_received": "temps"}}}
"""
session.set_status('starting')
Expand Down Expand Up @@ -124,7 +127,7 @@ def start_aggregate(self, session: ocs_agent.OpSession, params=None):

return True, "Aggregation has ended"

def stop_aggregate(self, session, params=None):
def _stop_record(self, session, params):
session.set_status('stopping')
self.aggregate = False
return True, "Stopping aggregation"
Expand Down
66 changes: 37 additions & 29 deletions agents/fake_data/fake_data_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,26 @@ def set_job_done(self):
# Process functions.

@ocs_agent.param('_') # Reject all params.
def start_acq(self, session, params):
"""**Process:** Acquire data and write to the feed.
def acq(self, session, params):
"""acq()
This Process has no useful parameters.
**Process** - Acquire data and write to the feed.
The most recent fake values are stored in the session.data object in
the format::
Notes:
The most recent fake values are stored in the session data object in
the format::
{"fields":
{"channel_00": 0.10250430068515494,
"channel_01": 0.08550903376216404,
"channel_02": 0.10481891991693446,
"channel_03": 0.10793263271024509},
"timestamp":1600448753.9288929}
>>> response.session['data']
{"fields":
{"channel_00": 0.10250430068515494,
"channel_01": 0.08550903376216404,
"channel_02": 0.10481891991693446,
"channel_03": 0.10793263271024509},
"timestamp":1600448753.9288929}
The channels kept in fields are the 'faked' data, in a similar
structure to the Lakeshore agents. 'timestamp' is the lastest time these values
were updated.
The channels kept in fields are the 'faked' data, in a similar
structure to the Lakeshore agents. 'timestamp' is the last time
these values were updated.
"""
ok, msg = self.try_set_job('acq')
Expand Down Expand Up @@ -145,7 +147,7 @@ def start_acq(self, session, params):
self.set_job_done()
return True, 'Acquisition exited cleanly.'

def stop_acq(self, session, params=None):
def _stop_acq(self, session, params):
ok = False
with self.lock:
if self.job =='acq':
Expand All @@ -157,11 +159,13 @@ def stop_acq(self, session, params=None):
# Tasks

@ocs_agent.param('heartbeat', default=True, type=bool)
def set_heartbeat_state(self, session, params):
"""Task to set the state of the agent heartbeat.
def set_heartbeat(self, session, params):
"""set_heartbeat(heartbeat=True)
**Task** - Set the state of the agent heartbeat.
Args:
heartbeat (bool): True for on (the default), False for off
heartbeat (bool, optional): True for on (the default), False for off
"""
heartbeat_state = params['heartbeat']
Expand All @@ -175,23 +179,27 @@ def set_heartbeat_state(self, session, params):
@ocs_agent.param('succeed', default=True, type=bool)
@inlineCallbacks
def delay_task(self, session, params):
"""Task that will take the requested number of seconds to complete.
"""delay_task(delay=5, succeed=True)
**Task** - Sleep (delay) for the requested number of seconds.
This can run simultaneously with the acq Process. This Task
should run in the reactor thread.
The session data will be updated with the requested delay as
well as the time elapsed so far, for example::
{'requested_delay': 5.,
'delay_so_far': 1.2}
Args:
delay (float): Time to wait before returning, in seconds.
delay (float, optional): Time to wait before returning, in seconds.
Defaults to 5.
succeed (bool): Whether to return success or not.
succeed (bool, optional): Whether to return success or not.
Defaults to True.
Notes:
The session data will be updated with the requested delay as
well as the time elapsed so far, for example::
>>> response.session['data']
{'requested_delay': 5.,
'delay_so_far': 1.2}
"""
delay = params['delay']
succeed = params['succeed'] is True
Expand Down Expand Up @@ -242,9 +250,9 @@ def add_agent_args(parser_in=None):
num_channels=args.num_channels,
sample_rate=args.sample_rate,
frame_length=args.frame_length)
agent.register_process('acq', fdata.start_acq, fdata.stop_acq,
agent.register_process('acq', fdata.acq, fdata._stop_acq,
blocking=True, startup=startup)
agent.register_task('set_heartbeat', fdata.set_heartbeat_state)
agent.register_task('set_heartbeat', fdata.set_heartbeat)
agent.register_task('delay_task', fdata.delay_task, blocking=False)

runner.run(agent, auto_reconnect=True)
47 changes: 28 additions & 19 deletions agents/host_master/host_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ def _terminate_instance(self, key):
return True, 'Kill requested.'

def _update_target_states(self, session, params):
"""Update the child Agent management parameters of the master process.
"""_update_target_states(params)
Update the child Agent management parameters of the master process.
This function is used both for first-time init of the master
Process, but also for subsequent parameter updates while
master Process is running.
Expand Down Expand Up @@ -191,20 +193,23 @@ def _update_target_states(self, session, params):
addressable[key]['target_state'] = state

@inlineCallbacks
def master_process(self, session, params=None):
"""The "master" Process maintains a list of child Agents for which it
is responsible. In response to requests from a client, the
Proces will launch or terminate child Agents.
def master(self, session, params):
"""master(**kwargs)
**Process** - The "master" Process maintains a list of child Agents for
which it is responsible. In response to requests from a client, the
Process will launch or terminate child Agents.
If an Agent process exits unexpectedly, it will be relaunched
within a few seconds.
When the master_process receives a Process stop request, it
will terminate all child agents before moving to the 'done'
state.
When the master Process receives a stop request, it will terminate all
child agents before moving to the 'done' state.
The ``params`` dictionary is passed directly to
_update_target_states(); see that docstream.
Parameters:
**kwargs: Passed directly to
``_update_target_states(params=kwargs)``; see
:func:`HostMaster._update_target_states`.
"""
self.running = True
Expand Down Expand Up @@ -334,20 +339,24 @@ def master_process(self, session, params=None):
yield dsleep(max(sleep_time, .001))
return True, 'Exited.'

def master_process_stop(self, session, params=None):
def _stop_master(self, session, params):
if session.status == 'done':
return
session.set_status('stopping')
self.running = False
return True, 'Stop initiated.'

def update_task(self, session, params=None):
"""Update the master process' child Agent parameters.
def update(self, session, params):
"""update(**kwargs)
**Task** - Update the master process' child Agent parameters.
This Task will fail if the master Process is not running.
The ``params`` dictionary is passed directly to
_update_target_states(); see that docstream.
Parameters:
**kwargs: Passed directly to
``_update_target_states(params=kwargs)``; see
:func:`HostMaster._update_target_states`.
"""
if not self.running:
Expand All @@ -357,7 +366,7 @@ def update_task(self, session, params=None):
return True, 'Update requested.'

@inlineCallbacks
def die(self, session, params=None):
def die(self, session, params):
session.set_status('running')
if not self.running:
session.add_message('Master process is not running.')
Expand Down Expand Up @@ -434,11 +443,11 @@ def errReceived(self, data):

startup_params = {'requests': [('all', args.initial_state)]}
agent.register_process('master',
host_master.master_process,
host_master.master_process_stop,
host_master.master,
host_master._stop_master,
blocking=False,
startup=startup_params)
agent.register_task('update', host_master.update_task, blocking=False)
agent.register_task('update', host_master.update, blocking=False)
agent.register_task('die', host_master.die, blocking=False)
runner.run(agent, auto_reconnect=True)

18 changes: 10 additions & 8 deletions agents/influxdb_publisher/influxdb_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ def __init__(self, agent, args):
self.incoming_data = queue.Queue()
self.loop_time = 1

self.agent.subscribe_on_start(self.enqueue_incoming_data,
self.agent.subscribe_on_start(self._enqueue_incoming_data,
'observatory..feeds.',
options={'match': 'wildcard'})

record_on_start = (args.initial_state == 'record')
self.agent.register_process('record',
self.start_aggregate, self.stop_aggregate,
self.record, self._stop_record,
startup=record_on_start)

def enqueue_incoming_data(self, _data):
def _enqueue_incoming_data(self, _data):
"""Data handler for all feeds. This checks to see if the feeds should
be recorded, and if they are it puts them into the incoming_data queue
to be processed by the Publisher during the next run iteration.
Expand All @@ -71,10 +71,12 @@ def enqueue_incoming_data(self, _data):

self.incoming_data.put((data, feed))

def start_aggregate(self, session: ocs_agent.OpSession, params=None):
"""Process for starting data aggregation. This process will create an
Publisher instance, which will collect and write provider data to disk
as long as this process is running.
def record(self, session: ocs_agent.OpSession, params):
"""record()
**Process** - This process will create an Publisher instance, which
will collect and write provider data to disk as long as this process is
running.
"""
session.set_status('starting')
Expand All @@ -99,7 +101,7 @@ def start_aggregate(self, session: ocs_agent.OpSession, params=None):

return True, "Aggregation has ended"

def stop_aggregate(self, session, params=None):
def _stop_record(self, session, params):
session.set_status('stopping')
self.aggregate = False
return True, "Stopping aggregation"
Expand Down
Loading

0 comments on commit 2d1f1c6

Please sign in to comment.