From e1e43b8d851ccaff9fb520c1b2b3163e2d86ef2c Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Thu, 26 Aug 2021 15:41:01 -0400 Subject: [PATCH 01/19] Write task/process documentation guidelines --- docs/developer/agents.rst | 101 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/docs/developer/agents.rst b/docs/developer/agents.rst index c3866dc9..017f028c 100644 --- a/docs/developer/agents.rst +++ b/docs/developer/agents.rst @@ -623,3 +623,104 @@ When you are done debugging, you can remove the block, or switch the level to the default 'info'. .. _txaio: https://txaio.readthedocs.io/en/latest/programming-guide.html#logging + +Documentation +------------- + +Documentation is important for users writing OCSClients that can interact with +your new Agent. You should aim to be a thorough as possible when writing +documentation for your Agent. Here is a complete example of a well documented +Task (or Process):: + + def demo(self, session, params=None): + """demo(arg1=None, arg2=7) + **Task** (or **Process**) - An example task docstring for illustration purposes. + + Parameters: + arg1 (bool): Useful argument 1. + arg2 (int, optional): Useful argument 2, defaults to 7. For details see + :func:`socs.agent.demo_agent.DemoClass.detailing_method` + + Examples: + Example for calling in a client:: + + client.demo(arg1=False, arg2=5) + + Notes: + An example of the session data:: + + >>> session.data + {"fields": + {"Channel_05": {"T": 293.644, "R": 33.752, "timestamp": 1601924482.722671}, + "Channel_06": {"T": 0, "R": 1022.44, "timestamp": 1601924499.5258765}, + "Channel_08": {"T": 0, "R": 1026.98, "timestamp": 1601924494.8172355}, + "Channel_01": {"T": 293.41, "R": 108.093, "timestamp": 1601924450.9315426}, + "Channel_02": {"T": 293.701, "R": 30.7398, "timestamp": 1601924466.6130798} + } + } + """ + pass + +Keep reading for more details on what's going on in this example. + +Overriding the Method Signature +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +``session`` and ``params`` are both required parameters when writing an OCS +Task or Process, but both are hidden from users writing OCSClients. When +documenting a Task or Process, the method signature should be overridden to +remove both ``session`` and ``params``, and to include any parameters your Task +or Process might take. This is done in the first line of the docstring, by +writing the method name, followed by the parameters in parentheses. In the +above example that looks like:: + + def demo(self, session, params=None): + """demo(arg1=None, arg2=7)""" + +This will render the method description as ``delay_task(arg1=None, +arg2=7)`` within Sphinx, rather than ``delay_task(session, params=None)``. The +default values should be put in this documentation. If a parameter is required, +set the param to ``None`` in the method signature. + +Keyword Arguments +^^^^^^^^^^^^^^^^^ +Internal to OCS the keyword arguments provided to an OCSClient are passed as a +`dict` to ``params``. For the benefit of the end user, these keyword arguments +should be documented in the Agent as if passed as such. So the docstring should +look like:: + + Parameters: + arg1 (bool): Useful argument 1. + arg2 (int, optional): Useful argument 2, defaults to 7. For details see + :func:`socs.agent.lakeshore.LakeshoreClass.the_method` + +Examples +^^^^^^^^ +Examples should be given using the "Examples" header when it would improve the +clarity of how to interact with a given Task or Process:: + + Examples: + Example for calling in a client:: + + client.demo(arg1=False, arg2=5) + +session.data +^^^^^^^^^^^^ +The ``session.data`` object structure is left up to the Agent author. As such, +it needs to be documented so that OCSClient authors know what to expect. If +your Task or Process makes use of ``session.data``, provide an example of the +structure under the "Notes" header:: + + Notes: + An example of the session data:: + + >>> session.data + {"fields": + {"Channel_05": {"T": 293.644, "R": 33.752, "timestamp": 1601924482.722671}, + "Channel_06": {"T": 0, "R": 1022.44, "timestamp": 1601924499.5258765}, + "Channel_08": {"T": 0, "R": 1026.98, "timestamp": 1601924494.8172355}, + "Channel_01": {"T": 293.41, "R": 108.093, "timestamp": 1601924450.9315426}, + "Channel_02": {"T": 293.701, "R": 30.7398, "timestamp": 1601924466.6130798} + } + } + +For more details on the ``session.data`` object see :ref:`session_data`. From 3654173585bab2681432dd1184e6cb0cd4af212a Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Thu, 26 Aug 2021 16:59:53 -0400 Subject: [PATCH 02/19] Restructure Aggregator Agent docs page I also rewrote some small segments and added links to resolve some points from #138. fixes #138 --- docs/agents/aggregator.rst | 129 +++++++++++++++++++++++-------------- 1 file changed, 79 insertions(+), 50 deletions(-) diff --git a/docs/agents/aggregator.rst b/docs/agents/aggregator.rst index ca7951db..a597b525 100644 --- a/docs/agents/aggregator.rst +++ b/docs/agents/aggregator.rst @@ -6,20 +6,60 @@ Aggregator Agent ================ -First Time Setup ----------------- +The Aggregator Agent, also referred to as the Housekeeping Aggregator (or +HKAggreagtor), is the OCS Agent responsible for recording all data published to +the OCS Network's :ref:`feeds`. The Aggregator collects this data and writes it +to disk in the `.g3` file format. -.. note:: +.. warning:: Be sure to follow the instructions for :ref:`create_ocs_user` during installation to ensure proper permissions for the Aggregator Agent to write data to disk. -SO3G File Format and Usage --------------------------- -To store data we use a modified version of the SPT3G file format -tailored specifically to SO, called SO3g. -Like SPT3G, an SO3G consists of a sequence of *Frame* objects each containing -its own data. Each frame is a free-form mapping from strings to data +.. argparse:: + :module: agents.aggregator.aggregator_agent + :func: make_parser + :prog: aggregator_agent.py + +Dependencies +------------ +The Aggregator Agent depends on both the `spt3g_software`_ and `so3g`_ +packages. + +Description +----------- +The job of the HK aggregator is to take data published by "Providers" and write +it to disk. +The aggregator considers each OCS Feed with ``record=True`` to be a separate +provider, and so any data written by a single OCS Feed will be grouped together +into G3Frames. +See :ref:`the OCS Feed page ` for info on how +to register a feed so that it will be recorded by the aggregator. + +Unregistered providers will automatically be added when they send data, +and stale providers will be removed if no data is received in a specified +time period. +To do this, the aggregator monitors all feeds in the ``observatory`` namespace to find +feeds that should be recorded. If the aggregator receives data from a feed +registered with ``record=True``, it will automatically add that feed as a +Provider, and will start putting incoming data into frames every ``frame_length`` +seconds, where ``frame_length`` is set by the Feed on registration. +Providers will be automatically marked as stale and unregistered if it goes +``fresh_time`` seconds without receiving any data from the feed, where +``fresh_time`` is again set by the feed on registration. + +The Aggregator Agent has a single main process ``record`` in which the +aggregator will continuously loop and write any queued up data to a G3Frame and +to disk. +The ``record`` task's session data object contains information such as the +path of the current G3 file, and the status of active and stale providers. + +File Format and Usage +`````````````````````` +Data is stored using the `spt3g_software`_ and `so3g`_ packages. `so3g`_ +provides the schema that operates on standard `G3 Frames`_. Each file consists +of a sequence of *Frame* objects each containing its own data. Each frame is a +free-form mapping from strings to data of a type derived from G3FrameObject, which behave similarly to a python dictionary. Notably, SPT3G files cannot directly store python lists, tuples, or numpy arrays, but must be wrapped in appropriate G3FrameObject container classes. @@ -40,9 +80,10 @@ Examples of useful G3FrameObjects: - A map of vectors containing co-sampled data, packaged with a vector of timestamps. -If you have SPT3G and so3g installed, you can view a g3 file by calling -``spt3g-dump filename`` from the command line, which will display the -contents of the file as a dict. For instance:: +The `so3g`_ package provides functions for loading data from disk. See +the `so3g Documentation `_ for +details. If you simply need a quick look at the contents of a file you can use +the spt3g utility ``spt3g-dump``. For instance:: $ spt3g-dump 1589310638.g3 Frame (Housekeeping) [ @@ -70,8 +111,12 @@ contents of the file as a dict. For instance:: "timestamp" (spt3g.core.G3Double) => 1.58931e+09 ] +.. _spt3g_software: https://github.com/CMB-S4/spt3g_software +.. _so3g: https://github.com/simonsobs/so3g +.. _G3 Frames: https://cmb-s4.github.io/spt3g_software/frames.html + HK File Structure ------------------ +````````````````` The HK file is made up of three frame types: Session, Status, and Data, labeled with an ``hkagg_type`` value of 0, 1, and 2 respectively. Session frames occur once at the start of every file and contain information @@ -89,37 +134,14 @@ each timesample map corresponds to a group of co-sampled data, grouped by their Each G3TimesampleMap contains a G3Vector for each ``field_name`` specified in the data and a vector of timestamps. -Aggregator Agent ----------------- -The job of the HK aggregator is to take data published by "Providers" and write -it to disk. -The aggregator considers each OCS Feed with ``record=True`` to be a separate -provider, and so any data written by a single OCS Feed will be grouped together -into G3Frames. -See :ref:`the OCS Feed page ` for info on how -to register a feed so that it will be recorded by the aggregator. - -Unregistered providers will automatically be added when they send data, -and stale providers will be removed if no data is received in a specified -time period. -To do this, the aggregator monitors all feeds in the ``observatory`` namespace to find -feeds that should be recorded. If the aggregator receives data from a feed -registered with ``record=True``, it will automatically add that feed as a -Provider, and will start putting incoming data into frames every ``frame_length`` -seconds, where ``frame_length`` is set by the Feed on registration. -Providers will be automatically marked as stale and unregistered if it goes -``fresh_time`` seconds without receiving any data from the feed, where -``fresh_time`` is again set by the feed on registration. - -The Aggregator Agent has a single main process ``record`` in which the -aggregator will continuously loop and write any queued up data to a G3Frame and -to disk. -The ``record`` task's session data object contains information such as the -path of the current G3 file, and the status of active and stale providers. +Configuration File Examples +--------------------------- +Below are configuration examples for the ocs config file and for running the +Agent in a docker container. -Site Config -``````````` +ocs-config +`````````` The aggregator agent takes three site-config arguments. ``--initial-state`` can be either ``record`` or ``idle``, and determines whether or not the aggregator starts recording @@ -160,16 +182,23 @@ Here is an example configuration:: - /path/to/host/data:/data -API ---- +Agent API +--------- +.. autoclass:: agents.aggregator.aggregator_agent.AggregatorAgent + :members: start_aggregate + +Supporting APIs +--------------- .. _agg_provider_api: -Provider -`````````` .. autoclass:: ocs.agent.aggregator.Provider :members: + :noindex: -Aggregator -``````````` -.. autoclass:: agents.aggregator.aggregator_agent.AggregatorAgent - :members: start_aggregate +.. autoclass:: ocs.agent.aggregator.G3FileRotator + :members: + :noindex: + +.. autoclass:: ocs.agent.aggregator.Aggregator + :members: + :noindex: From 8651a11c2f9263b22eb2b8ba9d36f01d034cade3 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Thu, 26 Aug 2021 17:07:28 -0400 Subject: [PATCH 03/19] Resolve sphinx build warning aggregator.py:docstring of ocs.agent.aggregator.g3_cast:5: WARNING: Definition list ends without a blank line; unexpected unindent. --- ocs/agent/aggregator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ocs/agent/aggregator.py b/ocs/agent/aggregator.py index bf21fb54..3b0c3159 100644 --- a/ocs/agent/aggregator.py +++ b/ocs/agent/aggregator.py @@ -31,6 +31,7 @@ def g3_cast(data, time=False): int -> G3Int str -> G3String float -> G3Double + and lists of type X will go to G3VectorX. If ``time`` is set to True, will convert to G3Time or G3VectorTime with the assumption that ``data`` consists of unix timestamps. From 2041c5c9b104b225231881d0c45173a2d6b1e5b0 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Thu, 26 Aug 2021 17:12:03 -0400 Subject: [PATCH 04/19] Update Aggregator Agent's task/process docstrings This follows the new guidelines for best practices in documenting tasks/processes. This renames start_aggregate to record to match the registered task name in ocs. Mark the stop process (also renamed) and other methods as private where appropriate. --- agents/aggregator/aggregator_agent.py | 52 ++++++++++++++------------- docs/agents/aggregator.rst | 2 +- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/agents/aggregator/aggregator_agent.py b/agents/aggregator/aggregator_agent.py index 5a998ee4..9ae70bdf 100644 --- a/agents/aggregator/aggregator_agent.py +++ b/agents/aggregator/aggregator_agent.py @@ -53,16 +53,16 @@ def __init__(self, agent, args): # SUBSCRIBES TO ALL FEEDS!!!! # If this ends up being too much data, we can add a tag '.record' # at the end of the address of recorded feeds, and filter by that. - self.agent.subscribe_on_start(self.enqueue_incoming_data, + self.agent.subscribe_on_start(self._enqueue_incoming_data, 'observatory..feeds.', options={'match': 'wildcard'}) record_on_start = (args.initial_state == 'record') self.agent.register_process('record', - self.start_aggregate, self.stop_aggregate, + self.record, self._stop_record, startup=record_on_start) - def enqueue_incoming_data(self, _data): + def _enqueue_incoming_data(self, _data): """ Data handler for all feeds. This checks to see if the feeds should be recorded, and if they are it puts them into the incoming_data queue @@ -76,27 +76,29 @@ def enqueue_incoming_data(self, _data): self.incoming_data.put((data, feed)) self.log.debug("Enqueued {d} from Feed {f}", d=data, f=feed) - def start_aggregate(self, session: ocs_agent.OpSession, params=None): - """ - Process for starting data aggregation. This process will create an - Aggregator instance, which will collect and write provider data to disk - as long as this process is running. - - The most recent file and active providers will be returned in - session.data:: - - {"current_file": "/data/16020/1602089117.g3", - "providers": { - "observatory.fake-data1.feeds.false_temperatures": { - "last_refresh": 1602089118.8225083, - "sessid": "1602088928.8294137", - "stale": false, - "last_block_received": "temps"}, - "observatory.LSSIM.feeds.temperatures": { - "last_refresh": 1602089118.8223345, - "sessid": "1602088932.335811", - "stale": false, - "last_block_received": "temps"}}} + def record(self, session: ocs_agent.OpSession, params=None): + """record() + + **Process** - This process will create an Aggregator instance, which + will collect and write provider data to disk as long as this process is + running. + + Notes: + The most recent file and active providers will be returned in + session.data:: + + {"current_file": "/data/16020/1602089117.g3", + "providers": { + "observatory.fake-data1.feeds.false_temperatures": { + "last_refresh": 1602089118.8225083, + "sessid": "1602088928.8294137", + "stale": false, + "last_block_received": "temps"}, + "observatory.LSSIM.feeds.temperatures": { + "last_refresh": 1602089118.8223345, + "sessid": "1602088932.335811", + "stale": false, + "last_block_received": "temps"}}} """ session.set_status('starting') @@ -124,7 +126,7 @@ def start_aggregate(self, session: ocs_agent.OpSession, params=None): return True, "Aggregation has ended" - def stop_aggregate(self, session, params=None): + def _stop_record(self, session, params=None): session.set_status('stopping') self.aggregate = False return True, "Stopping aggregation" diff --git a/docs/agents/aggregator.rst b/docs/agents/aggregator.rst index a597b525..91e6fa22 100644 --- a/docs/agents/aggregator.rst +++ b/docs/agents/aggregator.rst @@ -185,7 +185,7 @@ Here is an example configuration:: Agent API --------- .. autoclass:: agents.aggregator.aggregator_agent.AggregatorAgent - :members: start_aggregate + :members: Supporting APIs --------------- From bc72a4fd03aae24c53f5922c57000678e5672a01 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Thu, 26 Aug 2021 18:00:38 -0400 Subject: [PATCH 05/19] Write guidelines for Agent Reference pages --- docs/developer/agents.rst | 41 ++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/docs/developer/agents.rst b/docs/developer/agents.rst index 017f028c..9179b706 100644 --- a/docs/developer/agents.rst +++ b/docs/developer/agents.rst @@ -628,9 +628,16 @@ Documentation ------------- Documentation is important for users writing OCSClients that can interact with -your new Agent. You should aim to be a thorough as possible when writing -documentation for your Agent. Here is a complete example of a well documented -Task (or Process):: +your new Agent. When writing a new Agent you must document the Tasks and +Processes with appropriate docstrings. Additionally a page must be created +within the docs to describe the Agent and provide other key information such as +configuration file examples. You should aim to be a thorough as possible when +writing documentation for your Agent. + +Task and Process Documentation +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Each Task and Process within an Agent must be accompanied by a docstring. Here +is a complete example of a well documented Task (or Process):: def demo(self, session, params=None): """demo(arg1=None, arg2=7) @@ -664,7 +671,7 @@ Task (or Process):: Keep reading for more details on what's going on in this example. Overriding the Method Signature -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +``````````````````````````````` ``session`` and ``params`` are both required parameters when writing an OCS Task or Process, but both are hidden from users writing OCSClients. When documenting a Task or Process, the method signature should be overridden to @@ -682,7 +689,7 @@ default values should be put in this documentation. If a parameter is required, set the param to ``None`` in the method signature. Keyword Arguments -^^^^^^^^^^^^^^^^^ +````````````````` Internal to OCS the keyword arguments provided to an OCSClient are passed as a `dict` to ``params``. For the benefit of the end user, these keyword arguments should be documented in the Agent as if passed as such. So the docstring should @@ -694,7 +701,7 @@ look like:: :func:`socs.agent.lakeshore.LakeshoreClass.the_method` Examples -^^^^^^^^ +```````` Examples should be given using the "Examples" header when it would improve the clarity of how to interact with a given Task or Process:: @@ -704,7 +711,7 @@ clarity of how to interact with a given Task or Process:: client.demo(arg1=False, arg2=5) session.data -^^^^^^^^^^^^ +```````````` The ``session.data`` object structure is left up to the Agent author. As such, it needs to be documented so that OCSClient authors know what to expect. If your Task or Process makes use of ``session.data``, provide an example of the @@ -724,3 +731,23 @@ structure under the "Notes" header:: } For more details on the ``session.data`` object see :ref:`session_data`. + +Agent Reference Pages +^^^^^^^^^^^^^^^^^^^^^ +Now that you have documented your Agent's Tasks and Processes appropriately we +need to make the page that will display that documentation. Agent reference +pages are kept in ``ocs/docs/agents/``. Each Agent has a separate `.rst` file. +Each Agent reference page must contain: + +* Brief description of the Agent +* Example ocs-site-config configuration block +* Example docker-compose configuration block (if Agent is dockerized) +* Agent API reference + +Reference pages can also include: + +* Detailed description of Agent or related material +* Example client scripts +* Supporting APIs + +For examples, see the Reference Pages for other existing Agents. From 60d6526f3cc2a99856606c4effde320f0e3077bd Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Thu, 2 Sep 2021 15:48:00 -0400 Subject: [PATCH 06/19] Create agent docs page template --- docs/developer/agents.rst | 8 ++- example/docs/agent_template.rst | 93 +++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 1 deletion(-) create mode 100644 example/docs/agent_template.rst diff --git a/docs/developer/agents.rst b/docs/developer/agents.rst index 9179b706..41f8b44e 100644 --- a/docs/developer/agents.rst +++ b/docs/developer/agents.rst @@ -750,4 +750,10 @@ Reference pages can also include: * Example client scripts * Supporting APIs -For examples, see the Reference Pages for other existing Agents. +Here is a template for an Agent documentation page. Text starting with a '#' is +there to guide you in writing the page and should be replaced or removed. +Unneeded sections should be removed. + +.. include:: ../../example/docs/agent_template.rst + :code: rst + diff --git a/example/docs/agent_template.rst b/example/docs/agent_template.rst new file mode 100644 index 00000000..4b1b3359 --- /dev/null +++ b/example/docs/agent_template.rst @@ -0,0 +1,93 @@ +.. highlight:: rst + +.. _template: + +============== +Template Agent +============== + +# A brief description of the Agent. + +.. argparse:: + :module: agents.template.template_agent + :func: make_parser + :prog: template_agent.py + +Dependencies +------------ + +# Any external dependencies for agent. Omit if there are none, or they are +# included in the main requirements.txt file. + +Configuration File Examples +--------------------------- + +Below are configuration examples for the ocs config file and for running the +Agent in a docker container. + +ocs-config +`````````` + +An example site-config-file block:: + + {'agent-class': 'TemplateAgent', + 'instance-id': 'template', + 'arguments': [['--argument-1', 'value1'], + ['--argument-2', 42], + ['--argument-3']]}, + +Docker +``````` + +An example docker-compose configuration:: + + ocs-template: + image: simonsobs/ocs-template-agent:latest + hostname: ocs-docker + environment: + - LOGLEVEL=info + volumes: + - ${OCS_CONFIG_DIR}:/config + +Description +----------- + +# Detailed description of the Agent. Include any details the users or developers +# might find valuable. + +Subsection +`````````` + +# Use subsections where appropriate. + +Agent API +--------- + +# Autoclass the Agent, this is for users to reference when writing clients. + +.. autoclass:: agents.template.template_agent.TemplateAgent + :members: + +Example Clients +--------------- + +# If an example client makes use of the Agent more clear, include here in a code block:: + + from ocs import matched_client + client = matched_client.MatchedClient('template') + client.task() + +Supporting APIs +--------------- + +# Autodoc any code supporting the Agent. This is for developers to reference +# when working on the Agent. :noindex: should be used here if code is also +# indexed in the main API page. + +.. autoclass:: ocs.agent.template.Template1 + :members: + :noindex: + +.. autoclass:: ocs.agent.template.Template2 + :members: + :noindex: From 7717cba3f9df2d3078ef286d840b7c815a70120e Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Thu, 2 Sep 2021 15:48:15 -0400 Subject: [PATCH 07/19] Move config file examples higher on aggregator agent page --- docs/agents/aggregator.rst | 96 +++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/docs/agents/aggregator.rst b/docs/agents/aggregator.rst index 91e6fa22..260cdb7e 100644 --- a/docs/agents/aggregator.rst +++ b/docs/agents/aggregator.rst @@ -26,6 +26,54 @@ Dependencies The Aggregator Agent depends on both the `spt3g_software`_ and `so3g`_ packages. +Configuration File Examples +--------------------------- + +Below are configuration examples for the ocs config file and for running the +Agent in a docker container. + +ocs-config +`````````` +The aggregator agent takes three site-config arguments. +``--initial-state`` can be either ``record`` or ``idle``, +and determines whether or not the aggregator starts recording +as soon as it is initialized. +``--time-per-file`` specifies how long each file should be in seconds, +and ``--data-dir`` specifies the default data directory. +Both of these can also be manually specified in ``params`` when +the ``record`` process is started. +An example site-config entry is:: + + {'agent-class': 'AggregatorAgent', + 'instance-id': 'aggregator', + 'arguments': [['--initial-state', 'record'], + ['--time-per-file', '3600'], + ['--data-dir', '/data/hk'] + ]}, + +.. note:: + ``/data/hk`` is used to avoid conflict with other collections of data. In + general, it is recommended to use ``/data/timestreams`` to store detector + timestreams, and ``/data/pysmurf`` to store archived pysmurf files. + + +Docker +``````` +The docker image for the aggregator agent is simonsobs/ocs-aggregator-agent +Here is an example configuration:: + + ocs-aggregator: + image: simonsobs/ocs-aggregator-agent:latest + container_name: ocs-aggregator + hostname: ocs-docker + user: "9000" + environment: + - LOGLEVEL=info + volumes: + - ${OCS_CONFIG_DIR}:/config + - /path/to/host/data:/data + + Description ----------- The job of the HK aggregator is to take data published by "Providers" and write @@ -134,54 +182,6 @@ each timesample map corresponds to a group of co-sampled data, grouped by their Each G3TimesampleMap contains a G3Vector for each ``field_name`` specified in the data and a vector of timestamps. -Configuration File Examples ---------------------------- - -Below are configuration examples for the ocs config file and for running the -Agent in a docker container. - -ocs-config -`````````` -The aggregator agent takes three site-config arguments. -``--initial-state`` can be either ``record`` or ``idle``, -and determines whether or not the aggregator starts recording -as soon as it is initialized. -``--time-per-file`` specifies how long each file should be in seconds, -and ``--data-dir`` specifies the default data directory. -Both of these can also be manually specified in ``params`` when -the ``record`` process is started. -An example site-config entry is:: - - {'agent-class': 'AggregatorAgent', - 'instance-id': 'aggregator', - 'arguments': [['--initial-state', 'record'], - ['--time-per-file', '3600'], - ['--data-dir', '/data/hk'] - ]}, - -.. note:: - ``/data/hk`` is used to avoid conflict with other collections of data. In - general, it is recommended to use ``/data/timestreams`` to store detector - timestreams, and ``/data/pysmurf`` to store archived pysmurf files. - - -Docker -``````` -The docker image for the aggregator agent is simonsobs/ocs-aggregator-agent -Here is an example configuration:: - - ocs-aggregator: - image: simonsobs/ocs-aggregator-agent:latest - container_name: ocs-aggregator - hostname: ocs-docker - user: "9000" - environment: - - LOGLEVEL=info - volumes: - - ${OCS_CONFIG_DIR}:/config - - /path/to/host/data:/data - - Agent API --------- .. autoclass:: agents.aggregator.aggregator_agent.AggregatorAgent From 1e041b5ba7ed6095d775346d318a6f7f3025180d Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Thu, 2 Sep 2021 17:12:26 -0400 Subject: [PATCH 08/19] Standardize the InfluxDB Agent docs --- .../influxdb_publisher/influxdb_publisher.py | 18 +++++---- docs/agents/influxdb_publisher.rst | 38 ++++++++++++++----- 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/agents/influxdb_publisher/influxdb_publisher.py b/agents/influxdb_publisher/influxdb_publisher.py index 7f3debe6..18f972c3 100644 --- a/agents/influxdb_publisher/influxdb_publisher.py +++ b/agents/influxdb_publisher/influxdb_publisher.py @@ -46,16 +46,16 @@ def __init__(self, agent, args): self.incoming_data = queue.Queue() self.loop_time = 1 - self.agent.subscribe_on_start(self.enqueue_incoming_data, + self.agent.subscribe_on_start(self._enqueue_incoming_data, 'observatory..feeds.', options={'match': 'wildcard'}) record_on_start = (args.initial_state == 'record') self.agent.register_process('record', - self.start_aggregate, self.stop_aggregate, + self.record, self._stop_record, startup=record_on_start) - def enqueue_incoming_data(self, _data): + def _enqueue_incoming_data(self, _data): """Data handler for all feeds. This checks to see if the feeds should be recorded, and if they are it puts them into the incoming_data queue to be processed by the Publisher during the next run iteration. @@ -71,10 +71,12 @@ def enqueue_incoming_data(self, _data): self.incoming_data.put((data, feed)) - def start_aggregate(self, session: ocs_agent.OpSession, params=None): - """Process for starting data aggregation. This process will create an - Publisher instance, which will collect and write provider data to disk - as long as this process is running. + def record(self, session: ocs_agent.OpSession, params=None): + """record() + + **Process** - This process will create an Publisher instance, which + will collect and write provider data to disk as long as this process is + running. """ session.set_status('starting') @@ -99,7 +101,7 @@ def start_aggregate(self, session: ocs_agent.OpSession, params=None): return True, "Aggregation has ended" - def stop_aggregate(self, session, params=None): + def _stop_record(self, session, params=None): session.set_status('stopping') self.aggregate = False return True, "Stopping aggregation" diff --git a/docs/agents/influxdb_publisher.rst b/docs/agents/influxdb_publisher.rst index 57ee049d..1c64459e 100644 --- a/docs/agents/influxdb_publisher.rst +++ b/docs/agents/influxdb_publisher.rst @@ -10,8 +10,20 @@ The InfluxDB Publisher Agent acts like the OCS Aggregator, but instead of writing to file it will publish all recorded OCS data feeds to an InfluxDB instance running somewhere on the network. -OCS Configuration ------------------ +.. argparse:: + :module: agents.influxdb_publisher.influxdb_publisher + :func: make_parser + :prog: influxdb_publisher.py + +Configuration File Examples +--------------------------- + +Below are configuration examples for the ocs config file and for running the +Agent in a docker container. Also included is an example for setting up +Grafana to display data from InfluxDB. + +OCS Site Config +````````````````` Add an InfluxDBAgent to your OCS configuration file:: {'agent-class': 'InfluxDBAgent', @@ -23,8 +35,8 @@ Add an InfluxDBAgent to your OCS configuration file:: ['--gzip', True], ['--database', 'ocs_feeds']]}, -docker-compose Configuration ----------------------------- +Docker Compose +`````````````` Add the InfluxDB Publisher Agent container to your docker-compose file:: ocs-influxdb-publisher: @@ -70,8 +82,9 @@ file reference`_. .. _`Compose file reference`: https://docs.docker.com/compose/compose-file/ -Grafana Configuration ---------------------- +Grafana +``````` + Once your InfluxDB container and publisher are configured and running you will need to create an InfluxDB data source in Grafana. To do so, we add an InfluxDB data source with the URL ``http://influxdb:8086``, and the Database @@ -99,10 +112,15 @@ For more information about using InfluxDB in Grafana, see the `Grafana Documenta .. _`Grafana Documentation`: https://grafana.com/docs/features/datasources/influxdb/ -API ---- +Agent API +--------- + +.. autoclass:: agents.influxdb_publisher.influxdb_publisher.InfluxDBAgent + :members: + + +Supporting APIs +--------------- -Publisher -````````` .. autoclass:: ocs.agent.influxdb_publisher.Publisher :members: From 2de1b9fc6335d553f4aece9e0daa324fcd24f5cf Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Tue, 7 Sep 2021 13:08:55 -0400 Subject: [PATCH 09/19] Standardize Registry Agent docs --- agents/registry/registry.py | 50 +++++++++++++------------ docs/agents/registry.rst | 73 +++++++++++++++++++++++++------------ docs/developer/agents.rst | 1 + 3 files changed, 77 insertions(+), 47 deletions(-) diff --git a/agents/registry/registry.py b/agents/registry/registry.py index c4618e3c..d865f4ff 100644 --- a/agents/registry/registry.py +++ b/agents/registry/registry.py @@ -107,28 +107,32 @@ def _register_heartbeat(self, _data): @inlineCallbacks def main(self, session: ocs_agent.OpSession, params=None): - """ - Main run process for the Registry agent. This will loop and keep track of - which agents have expired. It will keep track of current active agents - in the session.data variable so it can be seen by clients. + """main() + + **Process** - Main run process for the Registry agent. This will loop + and keep track of which agents have expired. It will keep track of + current active agents in the session.data variable so it can be seen by + clients. + Notes: The session.data object for this process will be a dictionary containing - the encoded RegisteredAgent object for each agent observed during the - lifetime of the registry. For instance, this might look like - - >>> session.data - {'observatory.aggregator': - {'expired': False, - 'last_updated': 1583179794.5175, - 'time_expired': None}, - 'observatory.faker1': - {'expired': False, - 'last_updated': 1583179795.072248, - 'time_expired': None}, - 'observatory.faker2': - {'expired': True, - 'last_updated': 1583179777.0211036, - 'time_expired': 1583179795.3862052}} + the encoded RegisteredAgent objects for each agent observed during the + lifetime of the Registry. For instance, this might look like:: + + >>> session.data + {'observatory.aggregator': + {'expired': False, + 'last_updated': 1583179794.5175, + 'time_expired': None}, + 'observatory.faker1': + {'expired': False, + 'last_updated': 1583179795.072248, + 'time_expired': None}, + 'observatory.faker2': + {'expired': True, + 'last_updated': 1583179777.0211036, + 'time_expired': 1583179795.3862052}} + """ session.set_status('starting') @@ -166,8 +170,8 @@ def main(self, session: ocs_agent.OpSession, params=None): return True, "Stopped registry main process" - def stop(self, session, params=None): - """Stop function for the 'run' process.""" + def _stop_main(self, session, params=None): + """Stop function for the 'main' process.""" session.set_status('stopping') self._run = False @@ -188,7 +192,7 @@ def _register_agent(self, session, agent_data): agent, runner = ocs_agent.init_site_agent(args) registry = Registry(agent) - agent.register_process('main', registry.main, registry.stop, blocking=False, startup=True) + agent.register_process('main', registry.main, registry._stop_main, blocking=False, startup=True) agent.register_task('register_agent', registry._register_agent, blocking=False) runner.run(agent, auto_reconnect=True) diff --git a/docs/agents/registry.rst b/docs/agents/registry.rst index 6534e133..7441c78e 100644 --- a/docs/agents/registry.rst +++ b/docs/agents/registry.rst @@ -2,8 +2,44 @@ .. _registry: +============== Registry Agent -======================= +============== + +The Registry Agent tracks all currently running Agents on the OCS network, +providing the ability to monitor the status of each Agent's Tasks and Processes +through the :ref:`operation_monitor`. + +Configuration File Examples +---------------------------- + +Below are configuration examples for the ocs config file and for running the +Agent in a docker container. + +ocs-config +`````````` + +An example site-config-file block:: + + { 'agent-class': 'RegistryAgent', + 'instance-id': 'registry', + 'arguments': []}, + +Docker +``````` + +An example docker-compose configuration:: + + ocs-registry: + image: simonsobs/ocs-registry-agent:latest + container_name: ocs-registry + hostname: ocs-docker + user: "9000" + volumes: + - ${OCS_CONFIG_DIR}:/config + +Description +----------- The registry agent is used to keep track of currently running active agents. It listens to the heartbeat feeds of all agents on the crossbar server, @@ -25,6 +61,7 @@ since the registry started running:: status, msg, session = registry_client.main.status() print(session['data']) + which will print a dictionary that might look like:: {'observatory.aggregator': @@ -43,8 +80,10 @@ which will print a dictionary that might look like:: 'time_expired': 1583179795.3862052, 'op_codes': {'acq': 3, 'set_heartbeat': 1, 'delay_task': 1}}} +.. _operation_monitor: + Operation Monitor -------------------- +````````````````` The registry is also used to track the status of each agent's tasks and processes. `Operation codes` for each operation are regularly passed through an @@ -62,28 +101,14 @@ all operations on a network as pictured below: -Configuration --------------------- -To add the registry to your ocs setup, you can add this file to your site-config -yaml file:: - - { 'agent-class': 'RegistryAgent', - 'instance-id': 'registry', - 'arguments': []}, - -Here is an example of a docker service that you can put in your docker-compose -file to run the registry:: - - ocs-registry: - image: simonsobs/ocs-registry-agent:latest - container_name: ocs-registry - hostname: ocs-docker - user: "9000" - volumes: - - ${OCS_CONFIG_DIR}:/config - -API ---- +Agent API +--------- .. autoclass:: agents.registry.registry.Registry :members: + +Supporting APIs +--------------- +.. autoclass:: agents.registry.registry.RegisteredAgent + :members: + diff --git a/docs/developer/agents.rst b/docs/developer/agents.rst index 41f8b44e..642ec8d3 100644 --- a/docs/developer/agents.rst +++ b/docs/developer/agents.rst @@ -641,6 +641,7 @@ is a complete example of a well documented Task (or Process):: def demo(self, session, params=None): """demo(arg1=None, arg2=7) + **Task** (or **Process**) - An example task docstring for illustration purposes. Parameters: From 61974eca64c5e101bf3a581342b0ca30d91ecd65 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Tue, 7 Sep 2021 13:25:05 -0400 Subject: [PATCH 10/19] Standardize FakeDataAgent docs --- agents/fake_data/fake_data_agent.py | 59 ++++++++++++++++------------- docs/agents/fake_data.rst | 12 +++--- 2 files changed, 38 insertions(+), 33 deletions(-) diff --git a/agents/fake_data/fake_data_agent.py b/agents/fake_data/fake_data_agent.py index a0ffcb2e..f860bfc1 100644 --- a/agents/fake_data/fake_data_agent.py +++ b/agents/fake_data/fake_data_agent.py @@ -51,24 +51,25 @@ def set_job_done(self): # Process functions. @ocs_agent.param('_') # Reject all params. - def start_acq(self, session, params): - """**Process:** Acquire data and write to the feed. + def acq(self, session, params): + """acq() - This Process has no useful parameters. + **Process** - Acquire data and write to the feed. - The most recent fake values are stored in the session.data object in - the format:: + Notes: + The most recent fake values are stored in the session.data object in + the format:: - {"fields": - {"channel_00": 0.10250430068515494, - "channel_01": 0.08550903376216404, - "channel_02": 0.10481891991693446, - "channel_03": 0.10793263271024509}, - "timestamp":1600448753.9288929} + {"fields": + {"channel_00": 0.10250430068515494, + "channel_01": 0.08550903376216404, + "channel_02": 0.10481891991693446, + "channel_03": 0.10793263271024509}, + "timestamp":1600448753.9288929} - The channels kept in fields are the 'faked' data, in a similar - structure to the Lakeshore agents. 'timestamp' is the lastest time these values - were updated. + The channels kept in fields are the 'faked' data, in a similar + structure to the Lakeshore agents. 'timestamp' is the lastest time these values + were updated. """ ok, msg = self.try_set_job('acq') @@ -145,7 +146,7 @@ def start_acq(self, session, params): self.set_job_done() return True, 'Acquisition exited cleanly.' - def stop_acq(self, session, params=None): + def _stop_acq(self, session, params=None): ok = False with self.lock: if self.job =='acq': @@ -157,8 +158,10 @@ def stop_acq(self, session, params=None): # Tasks @ocs_agent.param('heartbeat', default=True, type=bool) - def set_heartbeat_state(self, session, params): - """Task to set the state of the agent heartbeat. + def set_heartbeat(self, session, params): + """set_heartbeat(heartbeat=True) + + **Task** - Set the state of the agent heartbeat. Args: heartbeat (bool): True for on (the default), False for off @@ -175,23 +178,27 @@ def set_heartbeat_state(self, session, params): @ocs_agent.param('succeed', default=True, type=bool) @inlineCallbacks def delay_task(self, session, params): - """Task that will take the requested number of seconds to complete. + """delay_task(delay=5, succeed=True) + + **Task** - Sleep (delay) for the requested number of seconds. This can run simultaneously with the acq Process. This Task should run in the reactor thread. - The session data will be updated with the requested delay as - well as the time elapsed so far, for example:: - - {'requested_delay': 5., - 'delay_so_far': 1.2} - Args: delay (float): Time to wait before returning, in seconds. Defaults to 5. succeed (bool): Whether to return success or not. Defaults to True. + Notes: + The session data will be updated with the requested delay as + well as the time elapsed so far, for example:: + + >>> session.data + {'requested_delay': 5., + 'delay_so_far': 1.2} + """ delay = params['delay'] succeed = params['succeed'] is True @@ -242,9 +249,9 @@ def add_agent_args(parser_in=None): num_channels=args.num_channels, sample_rate=args.sample_rate, frame_length=args.frame_length) - agent.register_process('acq', fdata.start_acq, fdata.stop_acq, + agent.register_process('acq', fdata.acq, fdata._stop_acq, blocking=True, startup=startup) - agent.register_task('set_heartbeat', fdata.set_heartbeat_state) + agent.register_task('set_heartbeat', fdata.set_heartbeat) agent.register_task('delay_task', fdata.delay_task, blocking=False) runner.run(agent, auto_reconnect=True) diff --git a/docs/agents/fake_data.rst b/docs/agents/fake_data.rst index f8c4b235..a92b2d58 100644 --- a/docs/agents/fake_data.rst +++ b/docs/agents/fake_data.rst @@ -6,9 +6,6 @@ The Fake Data Agent is provided with OCS to help demonstrate and debug issues with data aggregation and display. It will generate random data and pass it to an OCS feed. -Command-line / site config args -------------------------------- - .. argparse:: :module: agents.fake_data.fake_data_agent :func: add_agent_args @@ -16,7 +13,8 @@ Command-line / site config args Configuration File Examples --------------------------- -Below are configuration examples for the ocs config file and for running the Agent in a docker container. +Below are configuration examples for the ocs config file and for running the +Agent in a docker container. ocs-config `````````` @@ -36,7 +34,7 @@ The Fake Data Agent can also be run in a Docker container. An example docker-compose service configuration is shown here:: fake-data1: - image: simonsobs/ocs-fake-data-agent + image: simonsobs/ocs-fake-data-agent:latest hostname: ocs-docker environment: - LOGLEVEL=info @@ -45,8 +43,8 @@ docker-compose service configuration is shown here:: command: - "--instance-id=fake-data1" -Agent Operations ----------------- +Agent API +--------- .. autoclass:: agents.fake_data.fake_data_agent.FakeDataAgent :members: From f526cdda78ef3d2ebe455fb0f8e0d07b2aee81be Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Tue, 7 Sep 2021 14:12:53 -0400 Subject: [PATCH 11/19] Standardize HostMaster Agent docs --- agents/host_master/host_master.py | 39 +++++++++++++++++-------------- docs/agents/host_master.rst | 17 ++++++++++---- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/agents/host_master/host_master.py b/agents/host_master/host_master.py index ea4b036c..ccc19f14 100644 --- a/agents/host_master/host_master.py +++ b/agents/host_master/host_master.py @@ -191,20 +191,22 @@ def _update_target_states(self, session, params): addressable[key]['target_state'] = state @inlineCallbacks - def master_process(self, session, params=None): - """The "master" Process maintains a list of child Agents for which it - is responsible. In response to requests from a client, the - Proces will launch or terminate child Agents. + def master(self, session, params=None): + """master(params=None) + + **Process** - The "master" Process maintains a list of child Agents for + which it is responsible. In response to requests from a client, the + Process will launch or terminate child Agents. If an Agent process exits unexpectedly, it will be relaunched within a few seconds. - When the master_process receives a Process stop request, it - will terminate all child agents before moving to the 'done' - state. + When the master Process receives a stop request, it will terminate all + child agents before moving to the 'done' state. - The ``params`` dictionary is passed directly to - _update_target_states(); see that docstream. + Parameters: + params (dict): Passed directly to ``_update_target_states()``; see + :func:`HostMaster._update_target_states`. """ self.running = True @@ -334,20 +336,23 @@ def master_process(self, session, params=None): yield dsleep(max(sleep_time, .001)) return True, 'Exited.' - def master_process_stop(self, session, params=None): + def _stop_master(self, session, params=None): if session.status == 'done': return session.set_status('stopping') self.running = False return True, 'Stop initiated.' - def update_task(self, session, params=None): - """Update the master process' child Agent parameters. + def update(self, session, params=None): + """update(params=None) + + **Task** - Update the master process' child Agent parameters. This Task will fail if the master Process is not running. - The ``params`` dictionary is passed directly to - _update_target_states(); see that docstream. + Parameters: + params (dict): Passed directly to ``_update_target_states()``; see + :func:`HostMaster._update_target_states`. """ if not self.running: @@ -434,11 +439,11 @@ def errReceived(self, data): startup_params = {'requests': [('all', args.initial_state)]} agent.register_process('master', - host_master.master_process, - host_master.master_process_stop, + host_master.master, + host_master._stop_master, blocking=False, startup=startup_params) - agent.register_task('update', host_master.update_task, blocking=False) + agent.register_task('update', host_master.update, blocking=False) agent.register_task('die', host_master.die, blocking=False) runner.run(agent, auto_reconnect=True) diff --git a/docs/agents/host_master.rst b/docs/agents/host_master.rst index a650152d..add84f71 100644 --- a/docs/agents/host_master.rst +++ b/docs/agents/host_master.rst @@ -21,14 +21,17 @@ to: Direct user interaction with an HMA can be achieved through the ``ocsbow`` command line script. -Agent Configuration -------------------- +Configuration File Examples +--------------------------- The Host Master Agent is an optional component. In order to function properly, it requires that site_config be in use. It should be listed in the SCF like other agents. There should only be a single HMA per host definition block. +OCS Site Config +``````````````` + Here's an abbreviated SCF showing the correct configuration: .. code-block:: yaml @@ -54,8 +57,12 @@ The ``agent-class`` should be ``HostMaster``. The ``instance-id`` in this example is based on a (recommended) convention that HostMaster live at ``master-{host}``. -API ---- +Agent API +--------- .. autoclass:: agents.host_master.host_master.HostMaster - :members: master_process, master_process_stop, update_task, die + :members: + +Supporting APIs +--------------- +.. automethod:: agents.host_master.host_master.HostMaster._update_target_states From 9f5252890fe9fb3a06a806645d2aae073e3b38fd Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Tue, 7 Sep 2021 14:16:16 -0400 Subject: [PATCH 12/19] Make headings and docker images consistent Headings under "Configure File Examples" were inconsistent across Agents, and not all Agents had :latest listed as a Docker tag. --- docs/agents/aggregator.rst | 10 ++++++---- docs/agents/fake_data.rst | 11 +++++++---- docs/agents/influxdb_publisher.rst | 6 ++++-- docs/agents/registry.rst | 8 ++++---- example/docs/agent_template.rst | 8 ++++---- 5 files changed, 25 insertions(+), 18 deletions(-) diff --git a/docs/agents/aggregator.rst b/docs/agents/aggregator.rst index 260cdb7e..5592e83d 100644 --- a/docs/agents/aggregator.rst +++ b/docs/agents/aggregator.rst @@ -32,8 +32,9 @@ Configuration File Examples Below are configuration examples for the ocs config file and for running the Agent in a docker container. -ocs-config -`````````` +OCS Site Config +``````````````` + The aggregator agent takes three site-config arguments. ``--initial-state`` can be either ``record`` or ``idle``, and determines whether or not the aggregator starts recording @@ -57,8 +58,9 @@ An example site-config entry is:: timestreams, and ``/data/pysmurf`` to store archived pysmurf files. -Docker -``````` +Docker Compose +`````````````` + The docker image for the aggregator agent is simonsobs/ocs-aggregator-agent Here is an example configuration:: diff --git a/docs/agents/fake_data.rst b/docs/agents/fake_data.rst index a92b2d58..a8b29b38 100644 --- a/docs/agents/fake_data.rst +++ b/docs/agents/fake_data.rst @@ -13,11 +13,13 @@ an OCS feed. Configuration File Examples --------------------------- + Below are configuration examples for the ocs config file and for running the Agent in a docker container. -ocs-config -`````````` +OCS Site Config +``````````````` + To configure the Fake Data Agent we need to add a FakeDataAgent block to our ocs configuration file. Here is an example configuration block using all of the available arguments:: @@ -28,8 +30,9 @@ available arguments:: ['--num-channels', '16'], ['--sample-rate', '4']]}, -Docker -`````` +Docker Compose +`````````````` + The Fake Data Agent can also be run in a Docker container. An example docker-compose service configuration is shown here:: diff --git a/docs/agents/influxdb_publisher.rst b/docs/agents/influxdb_publisher.rst index 1c64459e..1fc217c3 100644 --- a/docs/agents/influxdb_publisher.rst +++ b/docs/agents/influxdb_publisher.rst @@ -23,7 +23,8 @@ Agent in a docker container. Also included is an example for setting up Grafana to display data from InfluxDB. OCS Site Config -````````````````` +``````````````` + Add an InfluxDBAgent to your OCS configuration file:: {'agent-class': 'InfluxDBAgent', @@ -37,10 +38,11 @@ Add an InfluxDBAgent to your OCS configuration file:: Docker Compose `````````````` + Add the InfluxDB Publisher Agent container to your docker-compose file:: ocs-influxdb-publisher: - image: simonsobs/ocs-influxdb-publisher-agent + image: simonsobs/ocs-influxdb-publisher-agent:latest hostname: ocs-docker volumes: - ${OCS_CONFIG_DIR}:/config:ro diff --git a/docs/agents/registry.rst b/docs/agents/registry.rst index 7441c78e..4c668619 100644 --- a/docs/agents/registry.rst +++ b/docs/agents/registry.rst @@ -16,8 +16,8 @@ Configuration File Examples Below are configuration examples for the ocs config file and for running the Agent in a docker container. -ocs-config -`````````` +OCS Site Config +``````````````` An example site-config-file block:: @@ -25,8 +25,8 @@ An example site-config-file block:: 'instance-id': 'registry', 'arguments': []}, -Docker -``````` +Docker Compose +`````````````` An example docker-compose configuration:: diff --git a/example/docs/agent_template.rst b/example/docs/agent_template.rst index 4b1b3359..8d9b4987 100644 --- a/example/docs/agent_template.rst +++ b/example/docs/agent_template.rst @@ -25,8 +25,8 @@ Configuration File Examples Below are configuration examples for the ocs config file and for running the Agent in a docker container. -ocs-config -`````````` +OCS Site Config +```````````````` An example site-config-file block:: @@ -36,8 +36,8 @@ An example site-config-file block:: ['--argument-2', 42], ['--argument-3']]}, -Docker -``````` +Docker Compose +`````````````` An example docker-compose configuration:: From 05b0fdb3f37a574cd4bef9a3cb5caa83370f1c19 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Tue, 7 Sep 2021 14:19:36 -0400 Subject: [PATCH 13/19] Mark optional params as such --- agents/fake_data/fake_data_agent.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/agents/fake_data/fake_data_agent.py b/agents/fake_data/fake_data_agent.py index f860bfc1..2655f987 100644 --- a/agents/fake_data/fake_data_agent.py +++ b/agents/fake_data/fake_data_agent.py @@ -164,7 +164,7 @@ def set_heartbeat(self, session, params): **Task** - Set the state of the agent heartbeat. Args: - heartbeat (bool): True for on (the default), False for off + heartbeat (bool, optional): True for on (the default), False for off """ heartbeat_state = params['heartbeat'] @@ -186,9 +186,9 @@ def delay_task(self, session, params): should run in the reactor thread. Args: - delay (float): Time to wait before returning, in seconds. + delay (float, optional): Time to wait before returning, in seconds. Defaults to 5. - succeed (bool): Whether to return success or not. + succeed (bool, optional): Whether to return success or not. Defaults to True. Notes: From 8379e5bf7e126225fbe2dad5c398a502a40cc3c2 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Tue, 7 Sep 2021 14:22:00 -0400 Subject: [PATCH 14/19] Fix only remaining Sphinx build warning --- docs/user/logging.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/user/logging.rst b/docs/user/logging.rst index 70a63a19..f90f58b9 100644 --- a/docs/user/logging.rst +++ b/docs/user/logging.rst @@ -87,6 +87,7 @@ the logging driver in the Docker daemon configuration. For more details see the Once the loki logging driver is installed and configured access to logs should still be accessible via ``docker logs``, however it is no longer available via ``docker-compose logs``. You will instead see a warning:: + WARNING: no logs are available with the 'loki' log driver .. _Loki installation documentation: https://grafana.com/docs/loki/latest/installation/ From 2062087978bb84e6c84b813e44a1ab5a4ec5d63b Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Tue, 7 Sep 2021 14:28:40 -0400 Subject: [PATCH 15/19] Make some small wording adjustments in agent doc guide --- docs/developer/agents.rst | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/developer/agents.rst b/docs/developer/agents.rst index 642ec8d3..6fbce83c 100644 --- a/docs/developer/agents.rst +++ b/docs/developer/agents.rst @@ -674,7 +674,7 @@ Keep reading for more details on what's going on in this example. Overriding the Method Signature ``````````````````````````````` ``session`` and ``params`` are both required parameters when writing an OCS -Task or Process, but both are hidden from users writing OCSClients. When +Task or Process, but both should be hidden from users writing OCSClients. When documenting a Task or Process, the method signature should be overridden to remove both ``session`` and ``params``, and to include any parameters your Task or Process might take. This is done in the first line of the docstring, by @@ -703,7 +703,7 @@ look like:: Examples ```````` -Examples should be given using the "Examples" header when it would improve the +Examples should be given using the "Examples" heading when it would improve the clarity of how to interact with a given Task or Process:: Examples: @@ -716,7 +716,7 @@ session.data The ``session.data`` object structure is left up to the Agent author. As such, it needs to be documented so that OCSClient authors know what to expect. If your Task or Process makes use of ``session.data``, provide an example of the -structure under the "Notes" header:: +structure under the "Notes" heading:: Notes: An example of the session data:: @@ -737,8 +737,9 @@ Agent Reference Pages ^^^^^^^^^^^^^^^^^^^^^ Now that you have documented your Agent's Tasks and Processes appropriately we need to make the page that will display that documentation. Agent reference -pages are kept in ``ocs/docs/agents/``. Each Agent has a separate `.rst` file. -Each Agent reference page must contain: +pages are kept in `ocs/docs/agents/ +`_. Each Agent has a +separate `.rst` file. Each Agent reference page must contain: * Brief description of the Agent * Example ocs-site-config configuration block From c2c4493b78478109c7e06ad7184a5f71a68b7c02 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Fri, 10 Sep 2021 14:22:25 -0400 Subject: [PATCH 16/19] Fix small typos --- agents/fake_data/fake_data_agent.py | 4 ++-- docs/agents/aggregator.rst | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/agents/fake_data/fake_data_agent.py b/agents/fake_data/fake_data_agent.py index 2655f987..e4cf8b52 100644 --- a/agents/fake_data/fake_data_agent.py +++ b/agents/fake_data/fake_data_agent.py @@ -68,8 +68,8 @@ def acq(self, session, params): "timestamp":1600448753.9288929} The channels kept in fields are the 'faked' data, in a similar - structure to the Lakeshore agents. 'timestamp' is the lastest time these values - were updated. + structure to the Lakeshore agents. 'timestamp' is the last time + these values were updated. """ ok, msg = self.try_set_job('acq') diff --git a/docs/agents/aggregator.rst b/docs/agents/aggregator.rst index 5592e83d..1ade8e8c 100644 --- a/docs/agents/aggregator.rst +++ b/docs/agents/aggregator.rst @@ -7,7 +7,7 @@ Aggregator Agent ================ The Aggregator Agent, also referred to as the Housekeeping Aggregator (or -HKAggreagtor), is the OCS Agent responsible for recording all data published to +HKAggregator), is the OCS Agent responsible for recording all data published to the OCS Network's :ref:`feeds`. The Aggregator collects this data and writes it to disk in the `.g3` file format. From 0bd07fbf07472d33e9ef38c672b9232d01c7c07f Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Fri, 10 Sep 2021 14:22:47 -0400 Subject: [PATCH 17/19] Fix host master agent's method signatures and remove params=None --- agents/host_master/host_master.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/agents/host_master/host_master.py b/agents/host_master/host_master.py index ccc19f14..92ad6154 100644 --- a/agents/host_master/host_master.py +++ b/agents/host_master/host_master.py @@ -97,7 +97,9 @@ def _terminate_instance(self, key): return True, 'Kill requested.' def _update_target_states(self, session, params): - """Update the child Agent management parameters of the master process. + """_update_target_states(params) + + Update the child Agent management parameters of the master process. This function is used both for first-time init of the master Process, but also for subsequent parameter updates while master Process is running. @@ -191,8 +193,8 @@ def _update_target_states(self, session, params): addressable[key]['target_state'] = state @inlineCallbacks - def master(self, session, params=None): - """master(params=None) + def master(self, session, params): + """master(**kwargs) **Process** - The "master" Process maintains a list of child Agents for which it is responsible. In response to requests from a client, the @@ -205,8 +207,9 @@ def master(self, session, params=None): child agents before moving to the 'done' state. Parameters: - params (dict): Passed directly to ``_update_target_states()``; see - :func:`HostMaster._update_target_states`. + **kwargs: Passed directly to + ``_update_target_states(params=kwargs)``; see + :func:`HostMaster._update_target_states`. """ self.running = True @@ -336,23 +339,24 @@ def master(self, session, params=None): yield dsleep(max(sleep_time, .001)) return True, 'Exited.' - def _stop_master(self, session, params=None): + def _stop_master(self, session, params): if session.status == 'done': return session.set_status('stopping') self.running = False return True, 'Stop initiated.' - def update(self, session, params=None): - """update(params=None) + def update(self, session, params): + """update(**kwargs) **Task** - Update the master process' child Agent parameters. This Task will fail if the master Process is not running. Parameters: - params (dict): Passed directly to ``_update_target_states()``; see - :func:`HostMaster._update_target_states`. + **kwargs: Passed directly to + ``_update_target_states(params=kwargs)``; see + :func:`HostMaster._update_target_states`. """ if not self.running: @@ -362,7 +366,7 @@ def update(self, session, params=None): return True, 'Update requested.' @inlineCallbacks - def die(self, session, params=None): + def die(self, session, params): session.set_status('running') if not self.running: session.add_message('Master process is not running.') From 55636cf4ddfc3de6e69fad9985e311f96e42a1d0 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Fri, 10 Sep 2021 14:39:41 -0400 Subject: [PATCH 18/19] Clarify session data access on OCSClient side --- agents/aggregator/aggregator_agent.py | 5 +++-- agents/fake_data/fake_data_agent.py | 5 +++-- agents/registry/registry.py | 4 ++-- docs/developer/agents.rst | 19 +++++++++++++------ 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/agents/aggregator/aggregator_agent.py b/agents/aggregator/aggregator_agent.py index 9ae70bdf..c74f95fd 100644 --- a/agents/aggregator/aggregator_agent.py +++ b/agents/aggregator/aggregator_agent.py @@ -84,9 +84,10 @@ def record(self, session: ocs_agent.OpSession, params=None): running. Notes: - The most recent file and active providers will be returned in - session.data:: + The most recent file and active providers will be returned in the + session data:: + >>> response.session['data'] {"current_file": "/data/16020/1602089117.g3", "providers": { "observatory.fake-data1.feeds.false_temperatures": { diff --git a/agents/fake_data/fake_data_agent.py b/agents/fake_data/fake_data_agent.py index e4cf8b52..db5b9911 100644 --- a/agents/fake_data/fake_data_agent.py +++ b/agents/fake_data/fake_data_agent.py @@ -57,9 +57,10 @@ def acq(self, session, params): **Process** - Acquire data and write to the feed. Notes: - The most recent fake values are stored in the session.data object in + The most recent fake values are stored in the session data object in the format:: + >>> response.session['data'] {"fields": {"channel_00": 0.10250430068515494, "channel_01": 0.08550903376216404, @@ -195,7 +196,7 @@ def delay_task(self, session, params): The session data will be updated with the requested delay as well as the time elapsed so far, for example:: - >>> session.data + >>> response.session['data'] {'requested_delay': 5., 'delay_so_far': 1.2} diff --git a/agents/registry/registry.py b/agents/registry/registry.py index d865f4ff..272b38e7 100644 --- a/agents/registry/registry.py +++ b/agents/registry/registry.py @@ -115,11 +115,11 @@ def main(self, session: ocs_agent.OpSession, params=None): clients. Notes: - The session.data object for this process will be a dictionary containing + The session data object for this process will be a dictionary containing the encoded RegisteredAgent objects for each agent observed during the lifetime of the Registry. For instance, this might look like:: - >>> session.data + >>> response.session['data'] {'observatory.aggregator': {'expired': False, 'last_updated': 1583179794.5175, diff --git a/docs/developer/agents.rst b/docs/developer/agents.rst index 6fbce83c..5c676f30 100644 --- a/docs/developer/agents.rst +++ b/docs/developer/agents.rst @@ -331,6 +331,7 @@ containerized Agents together with the command Depending on your host's permissions, this command may need to be run with ``sudo``. +.. _param: Operation Parameters -------------------- @@ -639,7 +640,9 @@ Task and Process Documentation Each Task and Process within an Agent must be accompanied by a docstring. Here is a complete example of a well documented Task (or Process):: - def demo(self, session, params=None): + @ocs_agent.param('arg1', type=bool) + @ocs_agent.param('arg2', default=7, type=int) + def demo(self, session, params): """demo(arg1=None, arg2=7) **Task** (or **Process**) - An example task docstring for illustration purposes. @@ -657,7 +660,7 @@ is a complete example of a well documented Task (or Process):: Notes: An example of the session data:: - >>> session.data + >>> response.session['data'] {"fields": {"Channel_05": {"T": 293.644, "R": 33.752, "timestamp": 1601924482.722671}, "Channel_06": {"T": 0, "R": 1022.44, "timestamp": 1601924499.5258765}, @@ -687,7 +690,8 @@ above example that looks like:: This will render the method description as ``delay_task(arg1=None, arg2=7)`` within Sphinx, rather than ``delay_task(session, params=None)``. The default values should be put in this documentation. If a parameter is required, -set the param to ``None`` in the method signature. +set the param to ``None`` in the method signature. For more info on the +``@ocs_agent.param`` decorator see :ref:`param`. Keyword Arguments ````````````````` @@ -711,17 +715,20 @@ clarity of how to interact with a given Task or Process:: client.demo(arg1=False, arg2=5) -session.data +Session Data ```````````` The ``session.data`` object structure is left up to the Agent author. As such, it needs to be documented so that OCSClient authors know what to expect. If your Task or Process makes use of ``session.data``, provide an example of the -structure under the "Notes" heading:: +structure under the "Notes" heading. On the OCSClient end, this +``session.data`` object is returned in the response under +``response.session['data']``. This is how it should be presented in the example +docstrings:: Notes: An example of the session data:: - >>> session.data + >>> response.session['data'] {"fields": {"Channel_05": {"T": 293.644, "R": 33.752, "timestamp": 1601924482.722671}, "Channel_06": {"T": 0, "R": 1022.44, "timestamp": 1601924499.5258765}, From 7117c259f6ff0616db10a1940390c9bb04b70308 Mon Sep 17 00:00:00 2001 From: Brian Koopman Date: Fri, 10 Sep 2021 14:41:53 -0400 Subject: [PATCH 19/19] Remove None default for params in all tasks/processes --- agents/aggregator/aggregator_agent.py | 4 ++-- agents/fake_data/fake_data_agent.py | 2 +- agents/influxdb_publisher/influxdb_publisher.py | 4 ++-- agents/registry/registry.py | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/agents/aggregator/aggregator_agent.py b/agents/aggregator/aggregator_agent.py index c74f95fd..07002b76 100644 --- a/agents/aggregator/aggregator_agent.py +++ b/agents/aggregator/aggregator_agent.py @@ -76,7 +76,7 @@ def _enqueue_incoming_data(self, _data): self.incoming_data.put((data, feed)) self.log.debug("Enqueued {d} from Feed {f}", d=data, f=feed) - def record(self, session: ocs_agent.OpSession, params=None): + def record(self, session: ocs_agent.OpSession, params): """record() **Process** - This process will create an Aggregator instance, which @@ -127,7 +127,7 @@ def record(self, session: ocs_agent.OpSession, params=None): return True, "Aggregation has ended" - def _stop_record(self, session, params=None): + def _stop_record(self, session, params): session.set_status('stopping') self.aggregate = False return True, "Stopping aggregation" diff --git a/agents/fake_data/fake_data_agent.py b/agents/fake_data/fake_data_agent.py index db5b9911..86796463 100644 --- a/agents/fake_data/fake_data_agent.py +++ b/agents/fake_data/fake_data_agent.py @@ -147,7 +147,7 @@ def acq(self, session, params): self.set_job_done() return True, 'Acquisition exited cleanly.' - def _stop_acq(self, session, params=None): + def _stop_acq(self, session, params): ok = False with self.lock: if self.job =='acq': diff --git a/agents/influxdb_publisher/influxdb_publisher.py b/agents/influxdb_publisher/influxdb_publisher.py index 18f972c3..c1f438e5 100644 --- a/agents/influxdb_publisher/influxdb_publisher.py +++ b/agents/influxdb_publisher/influxdb_publisher.py @@ -71,7 +71,7 @@ def _enqueue_incoming_data(self, _data): self.incoming_data.put((data, feed)) - def record(self, session: ocs_agent.OpSession, params=None): + def record(self, session: ocs_agent.OpSession, params): """record() **Process** - This process will create an Publisher instance, which @@ -101,7 +101,7 @@ def record(self, session: ocs_agent.OpSession, params=None): return True, "Aggregation has ended" - def _stop_record(self, session, params=None): + def _stop_record(self, session, params): session.set_status('stopping') self.aggregate = False return True, "Stopping aggregation" diff --git a/agents/registry/registry.py b/agents/registry/registry.py index 272b38e7..e0a5e96c 100644 --- a/agents/registry/registry.py +++ b/agents/registry/registry.py @@ -106,7 +106,7 @@ def _register_heartbeat(self, _data): self.registered_agents[feed['agent_address']].refresh(op_codes=op_codes) @inlineCallbacks - def main(self, session: ocs_agent.OpSession, params=None): + def main(self, session: ocs_agent.OpSession, params): """main() **Process** - Main run process for the Registry agent. This will loop @@ -170,7 +170,7 @@ def main(self, session: ocs_agent.OpSession, params=None): return True, "Stopped registry main process" - def _stop_main(self, session, params=None): + def _stop_main(self, session, params): """Stop function for the 'main' process.""" session.set_status('stopping') self._run = False