diff --git a/CHANGELOG.md b/CHANGELOG.md index 7ed6a5a7..d818adfb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,116 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Generated by [`auto-changelog`](https://github.com/CookPete/auto-changelog). +## [2.3.6-rc1](https://github.com/NASA-AMMOS/AIT-Core/compare/2.3.5...2.3.6-rc1) - 2022-04-07 + +### Merged + +- Issue #168 - Add ability for plugins to publish to arbitrary topic [`#435`](https://github.com/NASA-AMMOS/AIT-Core/pull/435) +- Issue #427 - Update AIT OpenMCT comp. provider to serve packets as fold… [`#433`](https://github.com/NASA-AMMOS/AIT-Core/pull/433) +- Issue #358 - OpenMCT Plugin support for multiple clients and filtered telemetry [`#426`](https://github.com/NASA-AMMOS/AIT-Core/pull/426) +- Issue #401 - Minor fixes to issues found in master [`#422`](https://github.com/NASA-AMMOS/AIT-Core/pull/422) +- Issue #371 - Fix calls to logger in util pkl handling [`#423`](https://github.com/NASA-AMMOS/AIT-Core/pull/423) +- Issue #420 - Add Packet Accumulator [`#418`](https://github.com/NASA-AMMOS/AIT-Core/pull/418) +- Issue #419 - Add Packet Padder Plugin [`#417`](https://github.com/NASA-AMMOS/AIT-Core/pull/417) +- Issue #328 - Update getDefaultDict to return CmdDict extensions when applicable [`#404`](https://github.com/NASA-AMMOS/AIT-Core/pull/404) +- Issue #304 - Improve time handling in database insert method [`#421`](https://github.com/NASA-AMMOS/AIT-Core/pull/421) +- Issue #371 - Testing nested includes during test for 'dirty' (pickle) [`#416`](https://github.com/NASA-AMMOS/AIT-Core/pull/416) +- Issue #394 - Add DSN paths to ait-create-dirs [`#396`](https://github.com/NASA-AMMOS/AIT-Core/pull/396) +- issue #401 - Fix ArrayType encode bug [`#402`](https://github.com/NASA-AMMOS/AIT-Core/pull/402) +- Issue #399 - Fix error when IPV6 is disabled at kernel level [`#400`](https://github.com/NASA-AMMOS/AIT-Core/pull/400) +- Issue #383 - update poetry builds [`#385`](https://github.com/NASA-AMMOS/AIT-Core/pull/385) +- Issue #395 - Add try except when attempting to pack opcode into struct. [`#397`](https://github.com/NASA-AMMOS/AIT-Core/pull/397) +- Issue #352 - Ait-tlm-csv fields argument fix [`#387`](https://github.com/NASA-AMMOS/AIT-Core/pull/387) +- Issue #351 - Update ait-tlm-csv cmd packet option description [`#388`](https://github.com/NASA-AMMOS/AIT-Core/pull/388) +- Issue #382 - Change CSV-generating fields file to open 'r' mode [`#386`](https://github.com/NASA-AMMOS/AIT-Core/pull/386) +- Issue #364 - Update Core package handling [`#381`](https://github.com/NASA-AMMOS/AIT-Core/pull/381) +- Issue #364 - Dev Tool Updates [`#379`](https://github.com/NASA-AMMOS/AIT-Core/pull/379) +- Issue #362 - Add "core.seq" to extensions documentation [`#378`](https://github.com/NASA-AMMOS/AIT-Core/pull/378) +- Issue #360 - Fix time handling in table encode / decode [`#361`](https://github.com/NASA-AMMOS/AIT-Core/pull/361) +- Issue #354 - Rework table support [`#355`](https://github.com/NASA-AMMOS/AIT-Core/pull/355) +- Issue #345 - Document 64-bit integer limitation in GUI [`#346`](https://github.com/NASA-AMMOS/AIT-Core/pull/346) +- Issue #341 - Update database API docs page [`#343`](https://github.com/NASA-AMMOS/AIT-Core/pull/343) +- Issue #302 - Add historical query support to the OpenMCT plugin [`#349`](https://github.com/NASA-AMMOS/AIT-Core/pull/349) +- Issue #339 - Document Packet / Field naming constraints [`#340`](https://github.com/NASA-AMMOS/AIT-Core/pull/340) + +### Fixed + +- Update Core package handling [`#364`](https://github.com/NASA-AMMOS/AIT-Core/issues/364) +- Issue #360 - Fix time handling in table encode / decode [`#360`](https://github.com/NASA-AMMOS/AIT-Core/issues/360) +- Issue #345 - Document 64-bit integer limitation in GUI [`#345`](https://github.com/NASA-AMMOS/AIT-Core/issues/345) +- Issue #341 - Update database API docs page [`#341`](https://github.com/NASA-AMMOS/AIT-Core/issues/341) + +### Commits + +- Run black across codebase [`2834d47`](https://github.com/NASA-AMMOS/AIT-Core/commit/2834d4753bc247fe716665e7de69b0e6294ebf21) +- Initial pre-commit additions and cleanup. mypy compatibility fixes [`4573211`](https://github.com/NASA-AMMOS/AIT-Core/commit/4573211dc7949041b5ea60ba3a7769ab941468be) +- Issue #302 - Support historical queries in OpenMCT plugin [`39ecd2a`](https://github.com/NASA-AMMOS/AIT-Core/commit/39ecd2ad3ad1f4a72f4404e2460a9b43698d2ed3) +- Port tests to pytest [`6b5a779`](https://github.com/NASA-AMMOS/AIT-Core/commit/6b5a7798015bfc6d2f441ea6fd6518cce5fd42eb) +- Issue 427: Update AIT OpenMCT comp. provider to serve packets as folders of fields [`bd5cfd6`](https://github.com/NASA-AMMOS/AIT-Core/commit/bd5cfd6eb1c863bc25e90356c8676009f89679f0) +- Fix linting errors in dmc module [`61fbf57`](https://github.com/NASA-AMMOS/AIT-Core/commit/61fbf573c6f06e695f2c46c3f7ba16c248f9f3a0) +- bin script linting error cleanup [`b458412`](https://github.com/NASA-AMMOS/AIT-Core/commit/b458412363e6697f4ee965abddda6c74c4257678) +- Issue #354 - Table documentation update [`2867e2a`](https://github.com/NASA-AMMOS/AIT-Core/commit/2867e2a1be68a45278f0f8e5f0232fd07ff22731) +- Fix linting errors in db module [`89d38b5`](https://github.com/NASA-AMMOS/AIT-Core/commit/89d38b547af07e9160f59b96864d80e4d1ffa5c4) +- Linting related cleanup [`d557cef`](https://github.com/NASA-AMMOS/AIT-Core/commit/d557cefb79d95de77e624cf00f730906a6985c6e) +- Fix linting errors in tlm module. [`c5ae415`](https://github.com/NASA-AMMOS/AIT-Core/commit/c5ae415398ea2de2f58586d3d1d2c1dc93c61878) +- Fix linting errors in API module [`20aeb70`](https://github.com/NASA-AMMOS/AIT-Core/commit/20aeb70d576c8b71fdb80528061bc1b29ae8c4ac) +- Additional Core server linting fixes [`42b6539`](https://github.com/NASA-AMMOS/AIT-Core/commit/42b6539f2482252bb01cdef9886e2db70a4cd91e) +- issue-#420 Add Packet Accumulator [`9255006`](https://github.com/NASA-AMMOS/AIT-Core/commit/9255006bba7ce1762acf9a694877c08a6b717621) +- Fix linting errors in util module [`526b5f4`](https://github.com/NASA-AMMOS/AIT-Core/commit/526b5f4f9b71ef1bd62ce5b7ba60cebf2b9db985) +- Additional cfg.py module linting cleanup [`2820f1d`](https://github.com/NASA-AMMOS/AIT-Core/commit/2820f1ddcb8f99c7fab3cb10b19f194f2d2be9ed) +- Make sure sphinx-build is poetry compatible update README and installation doc [`bfdb9a6`](https://github.com/NASA-AMMOS/AIT-Core/commit/bfdb9a6a1cee2b8739d5fa59fdcda33b7b6d3af5) +- minor nose->pytest cleanup [`7a7e3bf`](https://github.com/NASA-AMMOS/AIT-Core/commit/7a7e3bf2cbb655965769bdf8364ced71695762e2) +- Additional mypy related updates and cleanup for full pipeline check [`ce725b5`](https://github.com/NASA-AMMOS/AIT-Core/commit/ce725b5d9d828a740f23d9092d8a314e2604d52f) +- Update ait-tlm-csv command and add documentation [`7e02fa8`](https://github.com/NASA-AMMOS/AIT-Core/commit/7e02fa875f8e113f0a7875c7a683d63c4a832aa7) +- Fix linting errors in seq module [`8f0b17f`](https://github.com/NASA-AMMOS/AIT-Core/commit/8f0b17f6c976ec28bf892ea877250e823a8f1988) +- Fix linting errors in val module [`896efd5`](https://github.com/NASA-AMMOS/AIT-Core/commit/896efd5e7eb40042afc6066a33c182396cc5e2f8) +- Fix linting errors in gds module [`8bb3ffa`](https://github.com/NASA-AMMOS/AIT-Core/commit/8bb3ffa33335e64113253f0c35f973cfd36d6ea9) +- Fix linting errors in dtype module. [`22f2efb`](https://github.com/NASA-AMMOS/AIT-Core/commit/22f2efb2d4ab4f9455a8099a6c85dfaa9a22918e) +- issue-#419 Add Packet Padder Plugin [`86bf9ce`](https://github.com/NASA-AMMOS/AIT-Core/commit/86bf9cebf77ee990f11d5ebef1d1e31799958467) +- Revert testdata changes [`511fa7e`](https://github.com/NASA-AMMOS/AIT-Core/commit/511fa7efc4db80799fad4409b746da8e93bef61f) +- Fix linting errors in cmd module [`093d26b`](https://github.com/NASA-AMMOS/AIT-Core/commit/093d26bb0a73c7e4a97205e4fad322753ed9f471) +- Update optional package install handling and add new depdencies [`6919de8`](https://github.com/NASA-AMMOS/AIT-Core/commit/6919de8aa3c40696e1d22229b9719c561c2f45ae) +- Fix unit-test related issues [`d6b088d`](https://github.com/NASA-AMMOS/AIT-Core/commit/d6b088d49da6227ff77df953f2dbd94793af2e72) +- Add getDefaultDict with extension unit tests [`4cd31ca`](https://github.com/NASA-AMMOS/AIT-Core/commit/4cd31cab11dc31b7d23942aec23b5a4944fddfac) +- Add tox support and rearrange test structuring [`aec5df4`](https://github.com/NASA-AMMOS/AIT-Core/commit/aec5df424b7b9e5234b1ddc2887ee850b87311ab) +- Fix linting errors in evr module [`a2b0013`](https://github.com/NASA-AMMOS/AIT-Core/commit/a2b0013a803437e82b4fe382026d6d69c8ef864f) +- Drop custom git hook install in prep for pre-commit changes [`2414dff`](https://github.com/NASA-AMMOS/AIT-Core/commit/2414dff8053c8edafea510f39c7f483c07d94feb) +- Fix linting erros in json module [`23d3076`](https://github.com/NASA-AMMOS/AIT-Core/commit/23d3076cf5e5d234a4f6cd377f755374d8e53890) +- Fix linting errors in limits module [`c5158d4`](https://github.com/NASA-AMMOS/AIT-Core/commit/c5158d41d0945af97ed2b457b0ef7c183ce4b418) +- Fix linting errors in geom module [`e805c64`](https://github.com/NASA-AMMOS/AIT-Core/commit/e805c64e7fa558dd13fa0400a95f2eebbe05895b) +- Fix linting errors in log module [`9baf9ea`](https://github.com/NASA-AMMOS/AIT-Core/commit/9baf9ea3cae4ba13495d8a1ea5d40435b83f347d) +- Clean up server.broker linting errors [`7e5c646`](https://github.com/NASA-AMMOS/AIT-Core/commit/7e5c646a0619b5f3f17e9377404b00c1ef1c0301) +- Cleanup linting errors in server.stream [`fb0a137`](https://github.com/NASA-AMMOS/AIT-Core/commit/fb0a1374ce74ac22ec971e887b08ed5d081936d1) +- Issue #434 Add ability for plugins to publish to arbitrary topic [`36f60a6`](https://github.com/NASA-AMMOS/AIT-Core/commit/36f60a6ced0c30b00eefe50090ebe323130ac6cb) +- Cleanup server.__init__ linting errors [`3cbd17d`](https://github.com/NASA-AMMOS/AIT-Core/commit/3cbd17d8bdf49a7ccc921bb32b8f8def3eb7a8f7) +- Fix linting errors in coord module [`3abd4d9`](https://github.com/NASA-AMMOS/AIT-Core/commit/3abd4d93ab9b1eb74cfd0b0eee3c96eef130aeb2) +- More black cleanup [`7939298`](https://github.com/NASA-AMMOS/AIT-Core/commit/79392987ea3e9638e707d9dea67e9ced45f597cd) +- core.__init__ linting error cleanup [`92a1f65`](https://github.com/NASA-AMMOS/AIT-Core/commit/92a1f65800ce8657ca183d5545895216e07fa59b) +- Issue #394 Add DSN paths to ait-create-dirs [`b0073a6`](https://github.com/NASA-AMMOS/AIT-Core/commit/b0073a64b5ab98cf039d07a68d5ab4d0bbd768fe) +- Issue #395 Add try except when attempting to pack opcode into struct. [`1e34a6d`](https://github.com/NASA-AMMOS/AIT-Core/commit/1e34a6d49012b8de7edf4f78db1e36e6423ea096) +- Cleanup server.server linting errors [`9bf5c51`](https://github.com/NASA-AMMOS/AIT-Core/commit/9bf5c5123030ea57c8c7ee55b2bb7a90055cfa39) +- Drop nose from setup.cfg [`f43a156`](https://github.com/NASA-AMMOS/AIT-Core/commit/f43a156a57029ff9414ddcb9ca47654a0d157688) +- Cleanup server.plugin linting errors [`688ead2`](https://github.com/NASA-AMMOS/AIT-Core/commit/688ead279a189151471dc1d1a8ffa91484765ef8) +- Fix linting errors in bsc module [`9277ed2`](https://github.com/NASA-AMMOS/AIT-Core/commit/9277ed20fadb11b6af148bf2e4fb7274bb59ae13) +- Issue 371: Fix calls to logger [`648d84d`](https://github.com/NASA-AMMOS/AIT-Core/commit/648d84dcb60c2107582aee4e0e088e5c16c8240e) +- Add support for array type in ArgDefn encode method [`fc9e5a3`](https://github.com/NASA-AMMOS/AIT-Core/commit/fc9e5a33a5cda0835fac7eb0a62f3f08a3041099) +- Clean up server.utils linting errors [`7c0e38b`](https://github.com/NASA-AMMOS/AIT-Core/commit/7c0e38b5b978502f2c467feaa5480f91e19b7cc0) +- server.plugins.__init__ linting error cleanup [`8f30508`](https://github.com/NASA-AMMOS/AIT-Core/commit/8f30508629b31daf61e2a2a04925086449c37d6e) +- Add flake8 exclusions for module level import errors [`390f223`](https://github.com/NASA-AMMOS/AIT-Core/commit/390f223636896ef5ecba83dfd3e3a272af5d0c6f) +- added comments to mame files causing new pickle cache [`3f2f0a4`](https://github.com/NASA-AMMOS/AIT-Core/commit/3f2f0a4472866a9705cf21b320eccc8e77a23f16) +- change print() statements to ait.log.debug() [`9265e49`](https://github.com/NASA-AMMOS/AIT-Core/commit/9265e49fbfef717f55bd6c16a6dfce5c09522601) +- Method cmd.getDefaultDict shall return CmdDict extensions if applicable [`9f3c91b`](https://github.com/NASA-AMMOS/AIT-Core/commit/9f3c91b99e98fc53ad3457cffd6f723181272561) +- Cleanup server.plugins.data_archive linting errors [`a67a88c`](https://github.com/NASA-AMMOS/AIT-Core/commit/a67a88cc5898fbc77eb1175ca3229e6d77b64bc4) +- server.handlers.__init__ linting error cleanup [`75a9826`](https://github.com/NASA-AMMOS/AIT-Core/commit/75a9826605dd472ef5c327bc4635d5be2ffdc2f2) +- Drop pylint and pep-related config [`cf506e5`](https://github.com/NASA-AMMOS/AIT-Core/commit/cf506e5ea58079f2677916b7621fc8a5a15acc07) +- Use values variable database execution [`04ec870`](https://github.com/NASA-AMMOS/AIT-Core/commit/04ec8705c929b2494985949c8b781842b8f63836) +- Remove invalid log statements in util.py [`5eb57d5`](https://github.com/NASA-AMMOS/AIT-Core/commit/5eb57d5d1d7069b16b67e303540ce889df47aca0) +- Update ait-tlm-csv cmd option description [`0677dd9`](https://github.com/NASA-AMMOS/AIT-Core/commit/0677dd9303f73853bad959cc97d22b9b4dfb7c8d) +- Fix linting errors in table module [`63e6502`](https://github.com/NASA-AMMOS/AIT-Core/commit/63e6502eb0d58385528d9196852a9e81b6d4287c) +- Fix linting errors in pcap module [`e0151e3`](https://github.com/NASA-AMMOS/AIT-Core/commit/e0151e3dc0aa67302f2e67c7a33b58e9bbee5089) +- Add tag files to .gitignore [`4768a7e`](https://github.com/NASA-AMMOS/AIT-Core/commit/4768a7eea7ece04b7eb82d5bba20b2f32cb1bf32) +- final nose removal cleanup [`9a648fd`](https://github.com/NASA-AMMOS/AIT-Core/commit/9a648fd6e0c42c9146ad0d74a28b64dd96c10ca7) + ## [2.3.5](https://github.com/NASA-AMMOS/AIT-Core/compare/2.3.4...2.3.5) ### Merged diff --git a/ait/core/api.py b/ait/core/api.py index 26915764..d0ae7d1d 100644 --- a/ait/core/api.py +++ b/ait/core/api.py @@ -91,7 +91,8 @@ def __str__(self): @property def msg(self): - s = 'FalseWaitError: "False" boolean passed as argument to wait. Ensure wait condition args are surounded by lambda or " "' + s = 'FalseWaitError: "False" boolean passed as argument to wait. Ensure wait ' \ + 'condition args are surrounded by lambda or " "' if self._msg: s += ": " + self._msg @@ -115,7 +116,7 @@ def __init__(self, udp_dest=None, cmddict=None, verbose=False, cmdtopic=None): self._cmddict = cmddict self._verbose = verbose - # Setup the destination of our commands and arguments based on destination + # Set up the destination of our commands and arguments based on destination # information. if udp_dest: # Convert partial info to full tuple diff --git a/ait/core/cfg.py b/ait/core/cfg.py index ec95babd..d5175c8a 100644 --- a/ait/core/cfg.py +++ b/ait/core/cfg.py @@ -333,12 +333,13 @@ def _datapaths(self): for k in data: paths[k] = data[k]["path"] - data = self._config.get("dsn",{}).get("cfdp",{}).get("datasink",{}) + data = self._config.get("dsn", {}).get("cfdp", {}).get("datasink", {}) for k in data: paths[k] = data[k]["path"] - data = self._config.get("dsn",{}).get("cfdp",{}) - if data: paths["mib"] = data["mib"]["path"] + data = self._config.get("dsn", {}).get("cfdp", {}) + if data: + paths["mib"] = data["mib"]["path"] except KeyError as e: raise AitConfigMissing(str(e)) diff --git a/ait/core/cmd.py b/ait/core/cmd.py index 584a175d..d3ca09b0 100644 --- a/ait/core/cmd.py +++ b/ait/core/cmd.py @@ -126,7 +126,6 @@ def encode(self, value): value = self.enum[value] return self.type.encode(*value) if type(value) in [tuple, list] else self.type.encode(value) - def slice(self, offset=0): """Returns a Python slice object (e.g. for array indexing) indicating the start and stop byte position of this Command argument. The diff --git a/ait/core/db.py b/ait/core/db.py index e9d6de3d..0fcce1d7 100644 --- a/ait/core/db.py +++ b/ait/core/db.py @@ -395,7 +395,7 @@ def query(self, query, **kwargs): db_res = self._query(query) return AITDBResult(query=query, results=db_res) except self._backend.exceptions.InfluxDBClientError as e: - log.error(f"query_time_range failed with exception: {e}") + log.error(f"db.InfluxDBBackend.query failed with exception: {e}") return AITDBResult(query=query, errors=[str(e)]) def query_packets(self, packets=None, start_time=None, end_time=None, **kwargs): @@ -467,7 +467,7 @@ def query_packets(self, packets=None, start_time=None, end_time=None, **kwargs): try: db_res = self._query(query_string, **kwargs) except self._backend.exceptions.InfluxDBClientError as e: - log.error(f"query_time_range failed with exception: {e}") + log.error(f"db.InfluxDBBackend.query failed with exception: {e}") return AITDBResult(query=query_string, errors=[str(e)]) def influx_results_gen(db_res, **kwargs): @@ -748,7 +748,7 @@ def query(self, query, **kwargs): results = self._query(query, **kwargs) return AITDBResult(query=query, results=results) except self._backend.OperationalError as e: - log.error(f"query_time_range failed with exception: {e}") + log.error(f"db.SQLiteBackend.query failed with exception: {e}") return AITDBResult(query=query, errors=[str(e)]) def query_packets(self, packets=None, start_time=None, end_time=None, **kwargs): @@ -829,7 +829,7 @@ def query_packets(self, packets=None, start_time=None, end_time=None, **kwargs): try: results.append((pkt, self._query(query_string))) except self._backend.OperationalError as e: - log.error(f"query_time_range failed with exception: {e}") + log.error(f"db.SQLiteBackend.query failed with exception: {e}") errs.append(str(e)) def sqlite_results_gen(results, **kwargs): diff --git a/ait/core/pcap.py b/ait/core/pcap.py index 5f03e320..29d3ca67 100644 --- a/ait/core/pcap.py +++ b/ait/core/pcap.py @@ -14,7 +14,7 @@ """ This module, pcap.py, is a library to read/write PCAP-formatted files with -simple open, read, write, close functions +simple open, read, write, close functions. (PCAP - packet capture) """ import builtins @@ -53,7 +53,6 @@ class PCapGlobalHeader: https://wiki.wireshark.org/Development/LibpcapFileFormat """ - def __init__(self, stream=None): """Creates a new PCapGlobalHeader with default values. If a stream is given, the global header data is read from it. @@ -63,13 +62,13 @@ def __init__(self, stream=None): self._swap = "@" if stream is None: - self.magic_number = 0xA1B2C3D4 + self.magic_number = 0xA1B2C3D4 # detects file format and byte ordering self.version_major = 2 self.version_minor = 4 - self.thiszone = 0 - self.sigfigs = 0 - self.snaplen = 65535 - self.network = 147 + self.thiszone = 0 # GMT to local correction (0 == GMT) + self.sigfigs = 0 # accuracy of timestamps + self.snaplen = 65535 # max length of captured packets, in octets + self.network = 147 # data link type (147-162 are reserved for private use) self._data = self.pack() else: self.read(stream) @@ -80,17 +79,10 @@ def __len__(self): def __str__(self): """Returns this PCapGlobalHeader as a binary string.""" - - return struct.pack( - self._format, - self.magic_number, - self.version_major, - self.version_minor, - self.thiszone, - self.sigfigs, - self.snaplen, - self.network, - ) + return f'PCapGlobalHeader Class: \r\n format={self._format}, magic number={self.magic_number},'\ + f'major version={self.version_major}, minor version={self.version_minor}, \r\n' \ + f' time zone={self.thiszone}, timestamp accuracy={self.sigfigs}, max packet size={self.snaplen}, '\ + f'network={self.network}' def pack(self): return struct.pack( @@ -146,6 +138,7 @@ def __init__(self, stream=None, swap=None, orig_len=0, maxlen=65535): """Creates a new PCapPacketHeader with default values. If a stream is given, the packet header data is read from it. """ + if swap is None: swap = "@" @@ -167,9 +160,10 @@ def __len__(self): def __str__(self): """Returns this PCapPacketHeader as a binary string.""" - return struct.pack( - self._format, self.ts_sec, self.ts_usec, self.incl_len, self.orig_len - ) + + return f'PCapPacketHeader Class: \r\n format={self._format}, timestamp seconds={self.ts_sec},' \ + f'timestamp microseconds={self.ts_usec}.\r\n number of octets in file={self.incl_len}, ' \ + f'actual length of packet={self.orig_len}' def pack(self): """Returns this PCapPacketHeader as a binary string.""" @@ -500,8 +494,7 @@ def query(starttime, endtime, output=None, *filenames): for header, packet in stream: if packet is not None: if ( - header.timestamp >= starttime - and header.timestamp <= endtime + starttime <= header.timestamp <= endtime ): outfile.write(packet, header=header) diff --git a/ait/core/server/client.py b/ait/core/server/client.py index 338eedec..6c673caf 100644 --- a/ait/core/server/client.py +++ b/ait/core/server/client.py @@ -37,11 +37,17 @@ def __init__( # calls gevent.Greenlet or gs.DatagramServer __init__ super(ZMQClient, self).__init__(**kwargs) - def publish(self, msg): + def publish(self, msg, topic=None): """ - Publishes input message with client name as topic. + Publishes input message with client name as the topic if the + topic parameter is not provided. + + Publishes input message with topic as the topic if the + topic parameter is provided. Topic can be an arbitrary string. """ - msg = utils.encode_message(self.name, msg) + if not topic: + topic = self.name + msg = utils.encode_message(topic, msg) if msg is None: log.error(f"{self} unable to encode msg {msg} for send.") return diff --git a/ait/core/server/plugins/apid_routing.py b/ait/core/server/plugins/apid_routing.py new file mode 100644 index 00000000..fcbfb799 --- /dev/null +++ b/ait/core/server/plugins/apid_routing.py @@ -0,0 +1,206 @@ +''' +Implements a plugin which routes CCSDS packets by APID +''' +import os +import yaml +from ait.core.server.plugins import Plugin +from ait.core import tlm, log + + +class APIDRouter(Plugin): + ''' + Routes CCSDS packets by APID according to a routing table defined by a yaml file. + Arguments to the range operator are inclusive. + (i.e. range[40,50] adds APIDs 40-50 inclusive to the topic, not 40-49) + The exclude operator must come after the range operator. + + example in config.yaml: + + - plugin: + name: ait.core.server.plugins.apid_routing.APIDRouter + inputs: + - AOS_to_CCSDS + default_topic: default_ccsds_tlm_topic + routing_table: + path: packet_routing_table.yaml + + example routing table .yaml file: + + output_topics: + - telem_topic_1: + - 1 + - 2 + - telem_stream_1: + - 3 + - 4 + - range: + - 40 + - 50 + - exclude: + - 43 + - telem_topic_2: + - 5 + - range: + - 12 + - 19 + - exclude: + - 14 + - 18 + - GUI_input_stream: + - range: + - 1 + - 100 + - exclude: + - 87 + - DataArchive: + - range: + - 1 + - 138 + ''' + def __init__(self, inputs=None, outputs=None, zmq_args=None, routing_table=None, default_topic=None): + + super().__init__(inputs, outputs, zmq_args) + + self.default_topic = default_topic + + if 'path' in routing_table: + self.routing_table_object = self.load_table_yaml(routing_table['path'], tlm.getDefaultDict()) + else: + self.routing_table_object = None + log.error("no path specified for routing table") + if self.routing_table_object is None: + log.error("Unable to load routing table .yaml file") + + def process(self, input_data): + ''' + publishes incoming CCSDS packets to the routes specified in the routing table + + :param input_data: CCSDS packet as bytes + :type input_data: bytes, bytearray + ''' + packet_apid = self.get_packet_apid(input_data) + topics = self.routing_table_object[packet_apid] + for route in topics: + self.publish(input_data, route) + + def get_packet_apid(self, packet): + ''' + Returns the APID (as integer) for a given packet (bytearray) + Assumes that the APID is the last 11 bits of the first two bytes + + :param packet: CCSDS packet as bytes + :type packet: bytes, bytearray + :returns: packet APID + :rtype: int + ''' + packet_apid_bits = bytearray(b1 & b2 for b1, b2 in zip(packet[0:2], bytearray(b'\x07\xff'))) + apid = int.from_bytes(packet_apid_bits, byteorder='big') + return apid + + def add_topic_to_table(self, routing_table, apid, topic_name): + ''' + Returns an updated table with the topic_name added to the entry for the specified APID + + :param routing_table: routing table to be updated + :param apid: entry in routing table + :param topic_name: topic name to add to entry in routing table + :type routing_table: dict + :type apid: int + :type topic_name: string + :returns: updated routing table + :rtype: dict + ''' + temp_entry = routing_table[apid] + temp_entry.append(topic_name) + routing_table[apid] = temp_entry + return routing_table + + def add_range_to_table(self, routing_table, range_array, topic_name): + ''' + Adds a range of APIDs to the routing table. + The range_array argument is an array of form [beginning, end]. + This function is inclusive of all values. + I.e. if range_array is [5, 9], APIDs 5-9 inclusive will be added (not 5-8). + + :param routing_table: routing table to be updated + :param range_array: list containing beginning and end values for entries to update + :param topic_name: topic name to add to entries in routing table + :type routing_table: dict + :type range_array: list + :type topic_name: string + :returns: updated routing table + :rtype: dict + ''' + beginning = range_array[0] + end = range_array[1] + for apid in range(beginning, end + 1): + routing_table = self.add_topic_to_table(routing_table, apid, topic_name) + return routing_table + + def remove_from_table(self, routing_table, apid_array, topic_name): + ''' + Removes a topic name from all the APIDs in the apid_array argument. + + :param routing_table: routing table to be updated + :param apid_array: list containing entries to update + :param topic_name: topic name to remove from entries in routing table + :type routing_table: dict + :type apid_array: list + :type topic_name: string + :returns: updated routing table + :rtype: dict + ''' + for apid in apid_array: + temp_entry = routing_table[apid] + if topic_name in temp_entry: + temp_entry.remove(topic_name) + routing_table[apid] = temp_entry + return routing_table + + def load_table_yaml(self, routing_table_path, tlm_dict): + ''' + Reads a .yaml file and returns a dictionary of format {apid1: [streams], apid2: [streams]} + + :param routing_table_path: path to yaml file containing routing table + :param tlm_dict: AIT telemetry dictionary + :type routing_table_path: string + :returns: routing table + :rtype: dict + ''' + routing_table = {} + error = None + + for packet_name in tlm_dict: + packet_apid = tlm_dict[packet_name].apid # assuming apid is defined in dictionary + routing_table[packet_apid] = [self.default_topic] + + if routing_table_path is None: + error = "No path specified for routing_table_path parameter" + log.error(error) + return None + + if os.path.isfile(routing_table_path): + with open(routing_table_path, "rb") as stream: + yaml_file_as_dict = yaml.load(stream, Loader=yaml.Loader) + else: + error = f"File path {routing_table_path} does not exist" + log.error(error) + return None + + for telem_stream_entry in yaml_file_as_dict["output_topics"]: + # telem_stream_entry is a dict with one entry + for telem_stream_name in telem_stream_entry: + for value in telem_stream_entry[telem_stream_name]: + if isinstance(value, int): # assume integer value is apid + apid = value + routing_table = self.add_topic_to_table(routing_table, apid, telem_stream_name) + elif isinstance(value, dict): + for operator in value: + if operator == "range": + routing_table = self.add_range_to_table(routing_table, value["range"], telem_stream_name) + if operator == "exclude": + routing_table = self.remove_from_table(routing_table, value["exclude"], telem_stream_name) + else: + log.error("Error while parsing table.yaml: encountered a value which is neither an integer nor a dictionary") + + return routing_table diff --git a/ait/core/server/plugins/openmct.py b/ait/core/server/plugins/openmct.py index 6ec402cb..7fbbebd7 100644 --- a/ait/core/server/plugins/openmct.py +++ b/ait/core/server/plugins/openmct.py @@ -27,7 +27,6 @@ import random import struct import sys -import time import webbrowser import gevent @@ -35,6 +34,7 @@ gevent.monkey.patch_all() import geventwebsocket +from gevent import sleep as gsleep, Timeout, Greenlet import bottle import importlib @@ -44,6 +44,294 @@ from ait.core.server.plugin import Plugin +class ManagedWebSocket(): + """ + A data structure to maintain state for OpenMCT websockets + """ + idCounter = 0 # to assign unique ids + + PACKET_ID_WILDCARD = "*" + + def __init__(self, web_socket, client_ip=None): + self.web_socket = web_socket + self.client_ip = client_ip + self._subscribed_dict = dict() # Dict from packetId to list of fieldIds + self.is_closed = False + self.is_error = False + self.id = ManagedWebSocket._generate_id() + + @staticmethod + def _generate_id(): + tmp_id = f"{ManagedWebSocket.idCounter}/{id(gevent.getcurrent())}" + ManagedWebSocket.idCounter += 1 + return tmp_id + + def subscribe_field(self, openmct_field_id): + """ + Adds a subscription to an OpenMCT field + :param openmct_field_id: OpenMCT Field id + """ + pkt_id, fld_id = DictUtils.parse_mct_pkt_id(openmct_field_id) + if pkt_id and fld_id: + # If packet id is not in dict, add it with empty set as value + if pkt_id not in self._subscribed_dict.keys(): + self._subscribed_dict[pkt_id] = set() + field_set = self._subscribed_dict.get(pkt_id, None) + if field_set is not None: # Unnecessary, but paranoid + field_set.add(fld_id) + + def unsubscribe_field(self, openmct_field_id): + """ + Removes a subscription to an OpenMCT field + :param openmct_field_id: OpenMCT Field id + """ + pkt_id, fld_id = DictUtils.parse_mct_pkt_id(openmct_field_id) + if pkt_id and fld_id: + field_set = self._subscribed_dict.get(pkt_id, None) + if field_set: + field_set.remove(fld_id) + # If there are no more fields, then remove packet id + if len(field_set) == 0: + self._subscribed_dict.pop(pkt_id) + + @property + def is_alive(self): + """ + Returns True if web-socket is active, False otherwise + :return: Managed web-socket state + """ + self._check_state() + return not self.is_closed + + def _check_state(self): + """ + Checks internal flags as well as state of underlying websocket + to see if this instance can be considered closed + """ + if not self.is_closed: + if self.is_error: + self.is_closed = True + elif self.web_socket and self.web_socket.closed: + self.is_closed = True + + def set_error(self): + """ + Sets error flag + """ + self.is_error = True + + def accepts_packet(self, pkt_id): + """ + Returns True if pkt_id is considered subscribed to by this websocket + If pkt_id is PACKET_ID_WILDCARD, it will be automatically accepted + :param pkt_id: AIT Packet name + :return: True if packet id is accepted, False otherwise + """ + if pkt_id == ManagedWebSocket.PACKET_ID_WILDCARD: + return True + field_set = self._subscribed_dict.get(pkt_id, None) + if field_set: # Should be true if set is non-empty + return True + return False + + def create_subscribed_packet(self, omc_packet): + """ + Returns a modified OpenMCT packet that contains only fields + for which the web-socket is subscribed + :param omc_packet: Full OpenMCT packet with all fields + :return: New modified packet if any match in fields, else None + """ + packet_id = omc_packet['packet'] + if not self.accepts_packet(packet_id): + return None + + # Grab the original field data dict + orig_fld_dict = omc_packet['data'] + if not orig_fld_dict: + return None + + sub_pkt = None + + # Get set of fields of the packet to which session is subscribed + field_set = self._subscribed_dict.get(packet_id, None) + + # Filter the original field value dict to only fields session is subscribed to + if field_set: + filt_fld_dict = {k: v for k, v in orig_fld_dict.items() if k in field_set} + # If filtered dict is non-empty, then build new packet for return + if filt_fld_dict: + sub_pkt = {'packet': packet_id, 'data': filt_fld_dict} + + return sub_pkt + + +class DictUtils(object): + """ + Encapsulates dictionary utilities, primarily for translating between + AIT and OpenMCT dictionaries and packets + """ + + @staticmethod + def create_mct_pkt_id(ait_pkt_id, ait_field_id): + return ait_pkt_id + "." + ait_field_id + + @staticmethod + def parse_mct_pkt_id(mct_pkt_id): + if "." in mct_pkt_id: + return mct_pkt_id.split(".") + else: + return None, None + + @staticmethod + def create_uid_pkt_map(ait_dict): + """Creates a dictionary from packet def UID to package definition""" + uid_map = dict() + for _k, v in ait_dict.items(): + uid_map[v.uid] = v + return uid_map + + @staticmethod + def format_tlmpkt_for_openmct(ait_pkt): + """Formats an AIT telemetry packet instance as an + OpenMCT telemetry packet structure""" + + mct_dict = dict() + ait_pkt_def = ait_pkt._defn + ait_pkt_id = ait_pkt_def.name + + mct_pkt_value_dict = dict() + + mct_dict["packet"] = ait_pkt_id + mct_dict["data"] = mct_pkt_value_dict + + ait_pkt_fieldmap = ait_pkt_def.fieldmap + for ait_field_id in ait_pkt_fieldmap: + tlm_pt_value = getattr(ait_pkt, ait_field_id) + mct_pkt_value_dict[ait_field_id] = tlm_pt_value + + return mct_dict + + @staticmethod + def format_tlmdict_for_openmct(ait_tlm_dict): + """Formats the AIT telemetry dictionary as an + OpenMCT telemetry dictionary""" + + mct_dict = dict() + mct_dict["name"] = "AIT Telemetry" + mct_dict["key"] = "ait_telemetry_dictionary" + mct_dict["measurements"] = [] + + for ait_pkt_id in ait_tlm_dict: + ait_pkt_def = ait_tlm_dict[ait_pkt_id] + ait_pkt_fieldmap = ait_pkt_def.fieldmap + for ait_field_id in ait_pkt_fieldmap: + ait_field_def = ait_pkt_fieldmap[ait_field_id] + + mct_field_dict = dict() + # mct_field_dict['key'] = ait_pkt_id + "." + ait_field_id + mct_field_dict["key"] = DictUtils.create_mct_pkt_id(ait_pkt_id, ait_field_id) + + mct_field_dict["name"] = ait_field_def.name + mct_field_dict["name"] = ait_pkt_id + ":" + ait_field_def.name + + mct_field_value_list = [] + + mct_field_val_range = DictUtils.create_mct_fieldmap(ait_field_def) + + mct_field_val_domain = { + "key": "utc", + "source": "timestamp", + "name": "Timestamp", + "format": "utc", + "hints": {"domain": 1}, + } + + mct_field_value_list.append(mct_field_val_range) + mct_field_value_list.append(mct_field_val_domain) + + mct_field_dict["values"] = mct_field_value_list + + mct_dict["measurements"].append(mct_field_dict) + + return mct_dict + + @staticmethod + def create_mct_fieldmap(ait_pkt_fld_def): + """Constructs an OpenMCT field declaration struct from an AIT packet definition""" + mct_field_map = {"key": "value", "name": "Value", "hints": {"range": 1}} + + # Handle units + if hasattr(ait_pkt_fld_def, "units"): + if ait_pkt_fld_def.units is not None: + mct_field_map["units"] = ait_pkt_fld_def.units + + # Type and min/max + # Borrowed code from AIT dtype to infer info form type-NAME + if hasattr(ait_pkt_fld_def, "type"): + if ait_pkt_fld_def.type is not None: + ttype = ait_pkt_fld_def.type + + typename = ttype.name + + tfloat = False + tmin = None + tmax = None + tsigned = False + tstring = False + tnbits = 0 + + if typename.startswith("LSB_") or typename.startswith("MSB_"): + tsigned = typename[4] != "U" + tfloat = typename[4] == "F" or typename[4] == "D" + tnbits = int(typename[-2:]) + elif typename.startswith("S"): + tnbits = int(typename[1:]) * 8 + tstring = True + else: + tsigned = typename[0] != "U" + tnbits = int(typename[-1:]) + + if tfloat: + tmax = +sys.float_info.max + tmin = -sys.float_info.max + elif tsigned: + tmax = 2 ** (tnbits - 1) + tmin = -1 * (tmax - 1) + elif not tstring: + tmax = 2 ** tnbits - 1 + tmin = 0 + + if tmin is not None: + mct_field_map["min"] = tmin + if tmax is not None: + mct_field_map["max"] = tmax + + if tfloat: + mct_field_map["format"] = "float" + elif tstring: + mct_field_map["format"] = "string" + else: + mct_field_map["format"] = "integer" + + # array types not supported + + # Handle enumerations + if hasattr(ait_pkt_fld_def, "enum"): + if ait_pkt_fld_def.enum is not None: + del mct_field_map["min"] + del mct_field_map["max"] + mct_field_map["format"] = "enum" + mct_enum_array = [] + enum_dict = ait_pkt_fld_def.enum + for e_number in enum_dict: + e_name = enum_dict.get(e_number) + enum_entry = {"string": e_name, "value": e_number} + mct_enum_array.append(enum_entry) + mct_field_map["enumerations"] = mct_enum_array + + return mct_field_map + + class AITOpenMctPlugin(Plugin): """This is the implementation of the AIT plugin for interaction with OpenMCT framework. Telemetry dispatched from AIT server/broker @@ -55,6 +343,14 @@ class AITOpenMctPlugin(Plugin): DEFAULT_DEBUG_MAX_LEN = 512 DEFAULT_DATABASE_ENABLED = False + DEFAULT_WS_RECV_TIMEOUT_SECS = 0.1 + DEFAULT_TELEM_QUEUE_TIMEOUT_SECS = 10 + + DEFAULT_TELEM_CHECK_SLEEP_SECS = 2 + DEFAULT_WEBSOCKET_CHECK_SLEEP_SECS = 2 + + DEFAULT_WS_EMPTY_MESSAGE = json.dumps(dict()) # Empty Json string + def __init__( self, inputs, @@ -101,18 +397,23 @@ def __init__( # Queues for AIT events events self._tlmQueue = api.GeventDeque(maxlen=100) - self._logQueue = api.GeventDeque(maxlen=100) # Load AIT tlm dict and create OpenMCT format of it self._aitTlmDict = tlm.getDefaultDict() - self._mctTlmDict = self.format_tlmdict_for_openmct(self._aitTlmDict) + self._mctTlmDict = DictUtils.format_tlmdict_for_openmct(self._aitTlmDict) # Create lookup from packet-uid to packet def - self._uidToPktDefMap = self.create_uid_pkt_map(self._aitTlmDict) + self._uidToPktDefMap = DictUtils.create_uid_pkt_map(self._aitTlmDict) # Attempt to initialize database, None if no DB self._database = self.load_database(**kwargs) + # Maintains a set of active websocket structs + self._socket_set = set() + + # Spawn greenlets to poll telemetry + self.tlm_poll_greenlet = Greenlet.spawn(self.poll_telemetry_periodically) + gevent.spawn(self.init) def _check_config(self): @@ -123,13 +424,7 @@ def _check_config(self): if isinstance(self.debug_enabled, bool): self._debugEnabled = self.debug_enabled elif isinstance(self.debug_enabled, str): - self._debugEnabled = self.debug_enabled in [ - "true", - "1", - "TRUE", - "enabled", - "ENABLED", - ] + self._debugEnabled = self.debug_enabled.upper() == "TRUE" self.dbg_message("Debug flag = " + str(self._debugEnabled)) # Check if port is assigned @@ -145,13 +440,7 @@ def _check_config(self): if isinstance(self.database_enabled, bool): self._databaseEnabled = self.database_enabled elif isinstance(self.database_enabled, str): - self._databaseEnabled = self.database_enabled in [ - "true", - "1", - "TRUE", - "enabled", - "ENABLED", - ] + self._databaseEnabled = self.database_enabled.upper() == "TRUE" self.dbg_message("Database flag = " + str(self._databaseEnabled)) def load_database(self, **kwargs): @@ -173,7 +462,8 @@ def load_database(self, **kwargs): db_cfg = ait.config.get("database", kwargs.get("database", None)) if not db_cfg: log.error( - "[OpenMCT] Plugin configured to use database but no database configuration was found" + "[OpenMCT] Plugin configured to use database but " + "no database configuration was found" ) log.warn("Disabling historical queries.") else: @@ -182,7 +472,7 @@ def load_database(self, **kwargs): dbconn = getattr(importlib.import_module(db_mod), db_cls)() dbconn.connect(**kwargs) except Exception as ex: - log.error("Error connecting to database: {}".format(ex)) + self.error = log.error(f"Error connecting to database: {ex}") log.warn("Disabling historical queries.") else: msg = ( @@ -196,45 +486,40 @@ def load_database(self, **kwargs): return dbconn def process(self, input_data, topic=None): - """Process received input message + """ + Process received input message. + + This plugin should be configured to only receive telemetry. Received messaged is expected to be a tuple of the form produced by AITPacketHandler. - - Handle telem messages based on topic - Look for topic in list of telem stream names first - If those lists don't exist or topic is not in them, try matching text - in topic name to "telem_stream" - """ processed = False - if hasattr(self, "telem_stream_names"): - if topic in self.telem_stream_names: - self._process_telem_msg(input_data) - processed = True - - if not processed: - if "telem_stream" in topic: - self._process_telem_msg(input_data) + try: + pkl_load = pickle.loads(input_data) + pkt_id, pkt_data = int(pkl_load[0]), pkl_load[1] + packet_def = self._get_tlm_packet_def(pkt_id) + if packet_def: + packet_def = self._uidToPktDefMap[pkt_id] + tlm_packet = tlm.Packet(packet_def, data=bytearray(pkt_data)) + self._process_telem_msg(tlm_packet) processed = True + else: + log.error("OpenMCT Plugin received telemetry message with unknown " + f"packet id {pkt_id}. Skipping input...") + except Exception as e: + log.error(f"OpenMCT Plugin: {e}") + log.error("OpenMCT Plugin received input_data that it is unable to " + "process. Skipping input ...") - if not processed: - raise ValueError( - "Topic of received message not recognized as telem stream." - ) - - def _process_telem_msg(self, input_data): - - # Use pickle to recover message - msg = pickle.loads(input_data) - - uid = int(msg[0]) - packet = msg[1] + return processed - # Package as a tuple, then add to queue - tlm_entry = (uid, packet) - self._tlmQueue.append(tlm_entry) + def _process_telem_msg(self, tlm_packet): + """ + Places tlm_packet in telem queue + """ + self._tlmQueue.append(tlm_packet) # We report our special debug messages on the 'Info' log level # so we dont have to turn on DEBUG logging globally @@ -258,8 +543,10 @@ def get_browser_name(browser): def _get_tlm_packet_def(self, uid): """Return packet definition based on packet unique id""" - pkt_defn = self._uidToPktDefMap[uid] - return pkt_defn + if uid in self._uidToPktDefMap.keys(): + return self._uidToPktDefMap[uid] + else: + return None def init(self): """Initialize the web-server state""" @@ -302,161 +589,9 @@ def start_browser(self, url, name=None): log.info("Starting browser: %s" % self.getBrowserName(browser)) browser.open_new(url) - def create_mct_pkt_id(self, ait_pkt_id, ait_field_id): - return ait_pkt_id + "." + ait_field_id - - def parse_mct_pkt_id(self, mct_pkt_id): - return mct_pkt_id.split(".") - def wait(self): gevent.wait() - @staticmethod - def create_uid_pkt_map(ait_dict): - """Creates a dictionary from packet def UID to package definition""" - uid_map = dict() - for _k, v in ait_dict.items(): - uid_map[v.uid] = v - return uid_map - - def format_tlmpkt_for_openmct(self, ait_pkt): - """Formats an AIT telemetry packet instance as an - OpenMCT telemetry packet structure""" - - mct_dict = dict() - ait_pkt_def = ait_pkt._defn - ait_pkt_id = ait_pkt_def.name - - mct_pkt_value_dict = dict() - - mct_dict["packet"] = ait_pkt_id - mct_dict["data"] = mct_pkt_value_dict - - ait_pkt_fieldmap = ait_pkt_def.fieldmap - for ait_field_id in ait_pkt_fieldmap: - tlm_pt_value = getattr(ait_pkt, ait_field_id) - mct_pkt_value_dict[ait_field_id] = tlm_pt_value - - return mct_dict - - def format_tlmdict_for_openmct(self, ait_tlm_dict): - """Formats the AIT telemetry dictionary as an - OpenMCT telemetry dictionary""" - - mct_dict = dict() - mct_dict["name"] = "AIT Telemetry" - mct_dict["key"] = "ait_telemetry_dictionary" - mct_dict["measurements"] = [] - - for ait_pkt_id in ait_tlm_dict: - ait_pkt_def = ait_tlm_dict[ait_pkt_id] - ait_pkt_fieldmap = ait_pkt_def.fieldmap - for ait_field_id in ait_pkt_fieldmap: - ait_field_def = ait_pkt_fieldmap[ait_field_id] - - mct_field_dict = dict() - # mct_field_dict['key'] = ait_pkt_id + "." + ait_field_id - mct_field_dict["key"] = self.create_mct_pkt_id(ait_pkt_id, ait_field_id) - - mct_field_dict["name"] = ait_field_def.name - mct_field_dict["name"] = ait_pkt_id + ":" + ait_field_def.name - - mct_field_value_list = [] - - mct_field_val_range = self.create_mct_fieldmap(ait_field_def) - - mct_field_val_domain = { - "key": "utc", - "source": "timestamp", - "name": "Timestamp", - "format": "utc", - "hints": {"domain": 1}, - } - - mct_field_value_list.append(mct_field_val_range) - mct_field_value_list.append(mct_field_val_domain) - - mct_field_dict["values"] = mct_field_value_list - - mct_dict["measurements"].append(mct_field_dict) - - return mct_dict - - def create_mct_fieldmap(self, ait_pkt_fld_def): - """Constructs an OpenMCT field declaration struct from an AIT packet definition""" - mct_field_map = {"key": "value", "name": "Value", "hints": {"range": 1}} - - # Handle units - if hasattr(ait_pkt_fld_def, "units"): - if ait_pkt_fld_def.units is not None: - mct_field_map["units"] = ait_pkt_fld_def.units - - # Type and min/max - # Borrowed code from AIT dtype to infer info form type-NAME - if hasattr(ait_pkt_fld_def, "type"): - if ait_pkt_fld_def.type is not None: - ttype = ait_pkt_fld_def.type - - typename = ttype.name - - tfloat = False - tmin = None - tmax = None - tsigned = False - tstring = False - tnbits = 0 - - if typename.startswith("LSB_") or typename.startswith("MSB_"): - tsigned = typename[4] != "U" - tfloat = typename[4] == "F" or typename[4] == "D" - tnbits = int(typename[-2:]) - elif typename.startswith("S"): - tnbits = int(typename[1:]) * 8 - tstring = True - else: - tsigned = typename[0] != "U" - tnbits = int(typename[-1:]) - - if tfloat: - tmax = +sys.float_info.max - tmin = -sys.float_info.max - elif tsigned: - tmax = 2 ** (tnbits - 1) - tmin = -1 * (tmax - 1) - elif not tstring: - tmax = 2 ** tnbits - 1 - tmin = 0 - - if tmin is not None: - mct_field_map["min"] = tmin - if tmax is not None: - mct_field_map["max"] = tmax - - if tfloat: - mct_field_map["format"] = "float" - elif tstring: - mct_field_map["format"] = "string" - else: - mct_field_map["format"] = "integer" - - # TODO - handle array types? - - # Handle enumerations - if hasattr(ait_pkt_fld_def, "enum"): - if ait_pkt_fld_def.enum is not None: - del mct_field_map["min"] - del mct_field_map["max"] - mct_field_map["format"] = "enum" - mct_enum_array = [] - enum_dict = ait_pkt_fld_def.enum - for e_number in enum_dict: - e_name = enum_dict.get(e_number) - enum_entry = {"string": e_name, "value": e_number} - mct_enum_array.append(enum_entry) - mct_field_map["enumerations"] = mct_enum_array - - return mct_field_map - # --------------------------------------------------------------------- # Section of methods to which bottle requests will be routed @@ -477,7 +612,7 @@ def get_tlm_dict_raw_json(self): """Returns the AIT-formatted dictionary""" return json.dumps(self._aitTlmDict.toJSON()) - def get_realtime_tlm(self): + def get_realtime_tlm_original_dumb(self): """Handles realtime packet dispatch via websocket layers""" websocket = bottle.request.environ.get("wsgi.websocket") @@ -498,23 +633,32 @@ def get_realtime_tlm(self): try: while not websocket.closed: + + message = None + with Timeout(3, False): + message = websocket.receive() + if message: + self.dbg_message("Received websocket message: "+message) + else: + self.dbg_message("Received NO websocket message") + try: self.dbg_message("Polling Telemtry queue...") - uid, data = self._tlmQueue.popleft(timeout=30) + uid, data = self._tlmQueue.popleft(timeout=3) pkt_defn = self._get_tlm_packet_def(uid) if not pkt_defn: continue ait_pkt = ait.core.tlm.Packet(pkt_defn, data=data) - openmct_pkt = self.format_tlmpkt_for_openmct(ait_pkt) + packet_id, openmct_pkt = DictUtils.format_tlmpkt_for_openmct(ait_pkt) openmct_pkt_jsonstr = json.dumps( openmct_pkt, default=self.datetime_jsonifier ) self.dbg_message( - "Sending realtime telemtry websocket msg: " + "Sending realtime telemetry websocket msg: " + openmct_pkt_jsonstr ) @@ -541,6 +685,49 @@ def get_realtime_tlm(self): + str(wser) ) + def get_realtime_tlm(self): + """Handles realtime packet dispatch via websocket layers""" + websocket = bottle.request.environ.get("wsgi.websocket") + + if not websocket: + bottle.abort(400, "Expected WebSocket request.") + return + + req_env = bottle.request.environ + client_ip = ( + req_env.get("HTTP_X_FORWARDED_FOR") + or req_env.get("REMOTE_ADDR") + or "(unknown)" + ) + + if websocket and not websocket.closed: + mws = ManagedWebSocket(websocket, client_ip) + self.manage_web_socket(mws) + + def manage_web_socket(self, mws): + """ + Adds mws instance to managed set (for receiving telemetry), + and then continuously checks web socket for new messages, which + may affect its state. + When web-socket is considered closed, it is removed from the + managed set and this method returns + :param mws: Managed web-socket instance + """ + self.dbg_message(f"Adding record for new web-socket ID:{mws.id} with IP: {mws.client_ip}") + self._socket_set.add(mws) + + while mws.is_alive: + self.dbg_message(f"Polling web-socket record ID {mws.id} ") + msg_processed = self.poll_websocket(mws) + if not msg_processed: + # If no message received, then sleep a lil + gsleep(AITOpenMctPlugin.DEFAULT_WEBSOCKET_CHECK_SLEEP_SECS) + + # Web-socket is considered closed, so remove from set and return + rem_msg_state = 'err' if mws.is_error else 'closed' + self.dbg_message(f"Removing {rem_msg_state} web-socket record ID {mws.id}") + self._socket_set.remove(mws) + def get_historical_tlm(self, mct_pkt_id): """ Handling of historical queries. Time range is retrieved from bottle request query. @@ -553,11 +740,8 @@ def get_historical_tlm(self, mct_pkt_id): # Set the content type of response for OpenMct to know its JSON bottle.response.content_type = "application/json" - self.dbg_message( - "Received request for historical tlm: Ids={} Start={} End={}".format( - mct_pkt_id, str(start_time_ms), str(end_time_ms) - ) - ) + self.dbg_message("Received request for historical tlm: " + f"Ids={mct_pkt_id} Start={start_time_ms} End={end_time_ms}") # The tutorial indicated that this could be a comma-separated list of ids... # If its a single, then this will create a list with one entry @@ -570,11 +754,8 @@ def get_historical_tlm(self, mct_pkt_id): # Dump results to JSON string json_result = json.dumps(results) - self.dbg_message( - "Result for historical tlm ( {} - {} ): {}".format( - str(start_time_ms), str(end_time_ms), json_result - ) - ) + self.dbg_message(f"Result for historical tlm ( {start_time_ms} " + f"- {end_time_ms} ): {json_result}") return json_result @@ -598,7 +779,7 @@ def get_historical_tlm_for_range(self, mct_pkt_ids, start_epoch_ms, end_epoch_ms # Collect fields that share the same AIT packet (for more efficient queries) ait_pkt_fields_dict = {} # Dict of pkt_id to list of field ids for mct_pkt_id_entry in mct_pkt_ids: - ait_pkt_id, ait_field_name = self.parse_mct_pkt_id(mct_pkt_id_entry) + ait_pkt_id, ait_field_name = DictUtils.parse_mct_pkt_id(mct_pkt_id_entry) # Add new list if this is the first time we see AIT pkt id if ait_pkt_id not in ait_pkt_fields_dict: @@ -670,10 +851,9 @@ def get_historical_tlm_for_packet_fields( end_timestamp_secs, tz=datetime.timezone.utc ) - query_args_str = "Packets = {}; Start = {}; End = {}".format( - packet_ids, start_date, end_date - ) - self.dbg_message("Query args : {}".format(query_args_str)) + query_args_str = f"Packets = {packet_ids}; Start = {start_date};" \ + f" End = {end_date}" + self.dbg_message(f"Query args : {query_args_str}") # default response is empty res_pkts = list() @@ -700,11 +880,8 @@ def get_historical_tlm_for_packet_fields( res_pkts = list(ait_db_result.get_packets()) # Debug result size - self.dbg_message( - "Number of results for query {} : {}".format( - query_args_str, str(len(res_pkts)) - ) - ) + self.dbg_message(f"Number of results for query " + f"{query_args_str} : {len(res_pkts)}") except Exception as e: log.error("[OpenMCT] Database query failed. Error: " + str(e)) @@ -719,7 +896,7 @@ def get_historical_tlm_for_packet_fields( # Add a record for each requested field for this timestamp for cur_field_name in field_names: record = {"timestamp": unix_timestamp_msec} - record["id"] = self.create_mct_pkt_id(ait_pkt_id, cur_field_name) + record["id"] = DictUtils.create_mct_pkt_id(ait_pkt_id, cur_field_name) record["value"] = getattr(cur_pkt, cur_field_name) result_list.append(record) @@ -746,7 +923,6 @@ def mimic_tlm(self, ait_tlm_pkt_name, ait_tlm_pkt_fill=None): ait_pkt_defn = tlm.getDefaultDict().values()[0] # Create the expected message format - pkt_def_uid = ait_pkt_defn.uid pkt_size_bytes = ait_pkt_defn.nbytes # if self._debugMimicRepeat: @@ -775,24 +951,21 @@ def mimic_tlm(self, ait_tlm_pkt_name, ait_tlm_pkt_fill=None): random_num, random_num, random_num, random_num, random_num ) - msg_serial = pickle.dumps((pkt_def_uid, dummy_data), 2) - msg_str_fmt = "{}".format( - msg_serial - ) # Lesson learned: AIT ZMQClient formats to string before emitting message - self._process_telem_msg(msg_str_fmt) + tlm_pkt = tlm.Packet(ait_pkt_defn, data=bytearray(dummy_data)) + self._process_telem_msg(tlm_pkt) info_msg = ( "AIT OpenMct Plugin submitted mimicked telemetry for " + ait_pkt_defn.name + " (" + str(datetime.datetime.now()) - + ")" + + ") to telem queue" ) self.dbg_message(info_msg) # sleep if mimic on if self._debugMimicRepeat: - time.sleep(5) + gsleep(5) # either it was immediate or we woke up, check break condition if not self._debugMimicRepeat: @@ -801,6 +974,192 @@ def mimic_tlm(self, ait_tlm_pkt_name, ait_tlm_pkt_fill=None): # Return last status message as result to client return info_msg + # --------------------------------------------------------------------- + + # Greelet-invoked functions + + def poll_telemetry_periodically(self): + while True: + real_tlm_emitted = self.poll_telemetry() + if not real_tlm_emitted: + gsleep(AITOpenMctPlugin.DEFAULT_TELEM_CHECK_SLEEP_SECS) + + def poll_telemetry(self): + """ + Polls the telemetry queue for next available telem entry. + If found, it is broadcast to all of the managed web-sockets, + where they decide if they are interested in the telemetry. + If nothing on queue, then empty probe messag is sent. + :return: True if real telemetry emitted, False otherwise. + """ + try: + self.dbg_message("Polling Telemetry queue...") + ait_pkt = self._tlmQueue.popleft(timeout=self.DEFAULT_TELEM_QUEUE_TIMEOUT_SECS) + openmct_pkt = DictUtils.format_tlmpkt_for_openmct(ait_pkt) + self.dbg_message(f"Broadcasting {openmct_pkt} to managed web-sockets...") + self.broadcast_packet(openmct_pkt) + return True + + except IndexError: + # If no telemetry has been received by the server + # after timeout seconds, "probe" the client + # websocket connection to make sure it's still + # active and if so, keep it alive. This is + # accomplished by sending an empty JSON object. + self.dbg_message("Telemetry queue is empty.") + self.broadcast_message(self.DEFAULT_WS_EMPTY_MESSAGE) + return False + + def broadcast_packet(self, openmct_pkt): + """ + Attempt to broadcast OpenMCT packet to web-socket clients, + the managed web-socket themselves determine if the Packet will + be emitted. + :param openmct_pkt: Instance of OpenMCT packet to be emitted + :return: True if packet was emitted by at least one web-socket, + False otherwise. + """ + pkt_emitted_by_any = False + openmct_pkt_id = openmct_pkt["packet"] + + for mws in self._socket_set: + pkt_emitted_by_cur = self.send_socket_pkt_mesg(mws, + openmct_pkt_id, openmct_pkt) + pkt_emitted_by_any = pkt_emitted_by_cur or pkt_emitted_by_any + return pkt_emitted_by_any + + def broadcast_message(self, message): + """ + Broadcast OpenMCT packet to web-socket clients + :param openmct_pkt: Instance of OpenMCT packet to be emitted + :return: + """ + for mws in self._socket_set: + self.managed_web_socket_send(mws, message) + + def send_socket_pkt_mesg(self, mws, pkt_id, mct_pkt): + """ + Attempts to send socket message if managed web-socket is alive + and accepts the message by inspecting the pkt_id value + :param mws: Managed web-socket + :param pkt_id: Packet ID associated with message + :param mct_pkt: OpenMCT telem packet + :return: True if message sent to web-socket, False otherwise + """ + if mws.is_alive and mws.accepts_packet(pkt_id): + # Collect only fields the subscription cares about + subscribed_pkt = mws.create_subscribed_packet(mct_pkt) + # If that new packet still has fields, stringify and send + if subscribed_pkt: + pkt_mesg = json.dumps(subscribed_pkt, + default=self.datetime_jsonifier) + self.dbg_message("Sending realtime telemetry web-socket msg " + f"to websocket {mws.id}: {pkt_mesg}") + self.managed_web_socket_send(mws, pkt_mesg) + return True + + return False + + # --------------------------------------------------------------------- + + @staticmethod + def managed_web_socket_recv(mws): + ''' + Attempts to read message from the websocket with timeout. + :param mws: Managed web-socket instance + :return: Message retrieved from underlying-websocket, or None + ''' + message = None + try: + with Timeout(AITOpenMctPlugin.DEFAULT_WS_RECV_TIMEOUT_SECS, False): + message = mws.web_socket.receive() + except geventwebsocket.WebSocketError as wserr: + log.warn(f"Error while reading from web-socket {mws.id}; Error: {wserr}") + mws.set_error() + return message + + @staticmethod + def managed_web_socket_send(mws, message): + ''' + Sends message to underlying web-socket + :param mws: Managed web-socket instance + :param message: Message to be sent + ''' + if mws.is_alive: + try: + mws.web_socket.send(message) + except geventwebsocket.WebSocketError as wserr: + log.warn(f"Error while writing to web-socket {mws.id}; Message:'{message}'; Error: {wserr}") + mws.set_error() + + # --------------------------------------------------------------------- + + def poll_websocket_periodically_while_alive(self, mws): + while mws.is_alive: + gsleep(self.DEFAULT_WEBSOCKET_CHECK_SLEEP_SECS) + self.poll_websocket(mws) + + def poll_websockets(self): + """ + Polls set of maintained web-sockets to test for: + - web-socket is considered closed, in which case its removed from internal set; + - web-socket has message available that affects its state. + """ + removal_set = set() + + if len(self._socket_set) == 0: + self.dbg_message("No websockets to poll") + else: + for mws in self._socket_set: + if mws.is_alive: + self.poll_websocket(mws) + else: + removal_set.add(mws) + + # Remove the closed/error entries from our set + if len(removal_set) > 0: + for rip_mws in removal_set: + rem_msg = f"Removing closed web-socket record ID {rip_mws.id}" + if mws.is_error: + rem_msg = f"Removing err web-socket record ID {rip_mws.id}" + self.dbg_message(rem_msg) + self._socket_set.remove(rip_mws) + + def poll_websocket(self, mws): + """ + Polls instance of web-socket for message + :return True if message was processed, False otherwise + """ + # attempt to read message from websocket and process + if mws.is_alive: + message = self.managed_web_socket_recv(mws) + if message: + self.process_websocket_mesg(mws, message) + return True + else: + return False + + def process_websocket_mesg(self, mws, message): + """ + Processes message received from a web-socket. + Handles the following directives: close, subscribe, unsubscribe + :param mws: Managed web-socket instance associated with message + :param message: Web-socket message + """ + msg_parts = message.split(" ", 1) + directive = msg_parts[0] + if directive == 'close': + self.dbg_message(f"Received 'close' message. Marking web-socket ID {mws.id} as closed") + mws.is_closed = True + elif directive == 'subscribe' and len(msg_parts) > 1: + self.dbg_message(f"Subscribing websocket {mws.id} to: {msg_parts[1]}") + mws.subscribe_field(msg_parts[1]) + elif directive == 'unsubscribe': + self.dbg_message(f"Unsubscribing websocket {mws.id} from: {msg_parts[1]}") + mws.unsubscribe_field(msg_parts[1]) + else: + self.dbg_message(f"Unrecognized web-socket message: {message}") + # --------------------------------------------------------------------- # Routing rules diff --git a/ait/core/table.py b/ait/core/table.py index 857456b6..5d3a6b83 100644 --- a/ait/core/table.py +++ b/ait/core/table.py @@ -16,9 +16,10 @@ import io import os import pickle +import yaml import ait -import yaml + from ait.core import dmc from ait.core import dtype from ait.core import log @@ -452,37 +453,34 @@ def __init__(self, filename=None): filename = ait.config.get("table.filename") self.filename = filename - self.pcklname = os.path.splitext(filename)[0] + ".pkl" + self.cachename = os.path.splitext(filename)[0] + ".pkl" self.fswtabdict = None + @property def dirty(self): - return not os.path.exists(self.pcklname) or os.path.getmtime( - self.filename - ) > os.path.getmtime(self.pcklname) + """True if the pickle cache needs to be regenerated, False to use current pickle binary""" + return util.check_yaml_timestamps(self.filename, self.cachename) def load(self): if self.fswtabdict is None: - if self.dirty(): + if self.dirty: self.fswtabdict = FSWTabDict(self.filename) - self.update() + util.update_cache(self.filename, self.cachename, self.fswtabdict) + log.info(f'Loaded new pickle file: {self.cachename}') else: - with open(self.pcklname, "rb") as stream: + with open(self.cachename, "rb") as stream: self.fswtabdict = pickle.load(stream) + log.info(f'Current pickle file loaded: {self.cachename.split("/")[-1]}') return self.fswtabdict - def update(self): - msg = "Saving updates from more recent '%s' to '%s'" - log.info(msg, self.filename, self.pcklname) - with open(self.pcklname, "wb") as output: - pickle.dump(self.fswtabdict, output, -1) - _DefaultFSWTabDictCache = FSWTabDictCache() def getDefaultFSWTabDict(): # noqa: N802 fswtabdict = None + filename = None try: filename = _DefaultFSWTabDictCache.filename fswtabdict = _DefaultFSWTabDictCache.load() diff --git a/ait/core/util.py b/ait/core/util.py index 6464b4bf..173f21bd 100755 --- a/ait/core/util.py +++ b/ait/core/util.py @@ -26,6 +26,8 @@ import sys import time import zlib +import tempfile +import warnings import pickle @@ -35,7 +37,8 @@ class ObjectCache(object): def __init__(self, filename, loader): - """Creates a new ObjectCache + """ + Creates a new ObjectCache Caches the Python object returned by loader(filename), using Python's pickle object serialization mechanism. An ObjectCache @@ -59,81 +62,31 @@ def cachename(self): @property def dirty(self): - """True if the pickle cache needs to be updated, False to use pickle binary""" - return self.check_yaml_timestamps(self.filename, self.cachename) + """True if the pickle cache needs to be regenerated, False to use current pickle binary""" + return check_yaml_timestamps(self.filename, self.cachename) @property def filename(self): """The filename to cache via loader(filename)""" return self._filename - def cache(self): - """Caches the result of loader(filename) to cachename.""" - msg = 'Saving updates from more recent "%s" to "%s"' - log.info(msg, self.filename, self.cachename) - with open(self.cachename, "wb") as output: - pickle.dump(self._dict, output, -1) - - def check_yaml_timestamps(self, yaml_file_name, cache_name): - """ - Checks YAML configuration file timestamp and any 'included' YAML configuration file - timestamps against the pickle cache file. - The term 'dirty' means that a yaml file has a more recent timestamp than the pickle - cache file. If a file is found to be dirty the response will indicate that a new - pickle cache file must be generated. As soon as one file is found to be 'dirty' - the flag indicating 'dirty' will be returned. If a file_name is found to be 'clean' - the pickle binary will be loaded. - - param yaml_file_name: str - Name of the yaml configuration file to be tested - param cache_name: str - Filename with path to the cached pickle file for this config file. - - return: boolean - True/False indicating 'dirty' (update pickle cache) - - """ - - # If no pickle cache exists return True to make a new one. - if not os.path.exists(cache_name): - log.debug(f'No pickle cache exists, make a new one') - return True - # Has the yaml config file has been modified since the creation of the pickle cache - if os.path.getmtime(yaml_file_name) > os.path.getmtime(cache_name): - log.debug(f'{yaml_file_name} modified - make a new pickle cash') - return True - # Get the directory of the yaml config file to be parsed - dir_name = os.path.dirname(yaml_file_name) - # Open the yaml config file to look for '!includes' to be tested on the next iteration - with open(yaml_file_name, "r") as file: - try: - for line in file: - if not line.strip().startswith("#") and "!include" in line: - check = self.check_yaml_timestamps( - os.path.join(dir_name, line.strip().split(" ")[2]), cache_name) - if check: - return True - except RecursionError as e: # TODO Python 3.7 does not catch this error. - print(f'ERROR: {e}: Infinite loop: check that yaml config files are not looping ' - f'back and forth to one another thought the "!include" statements.') - log.debug('Load pickle binary.') - return False - def load(self): - """Loads the Python object + """ + Loads the Python object - Loads the Python object, either via loader(filename) or the + Loads the Python object, either via loader (filename) or the pickled cache file, whichever was modified most recently. """ if self._dict is None: if self.dirty: self._dict = self._loader(self.filename) - self.cache() + update_cache(self.filename, self.cachename, self._dict) + log.info(f'Loaded new pickle file: {self.cachename}') else: with open(self.cachename, "rb") as stream: self._dict = pickle.load(stream) - + log.info(f'Current pickle file loaded: {self.cachename.split("/")[-1]}') return self._dict @@ -145,8 +98,77 @@ def load(self): timer = time.time +def check_yaml_timestamps(yaml_file_name, cache_name): + """ + Checks YAML configuration file timestamp and any 'included' YAML configuration file's + timestamp against the pickle cache file timestamp. + The term 'dirty' means that a yaml config file has a more recent timestamp than the + pickle cache file. If a pickle cache file is found to be 'dirty' (return true) the + pickle cache file is not up-to-date, and a new pickle cache file must be generated. + If the cache file in not 'dirty' (return false) the existing pickle binary will + be loaded. + + param: yaml_file_name: str + Name of the yaml configuration file to be tested + param: cache_name: str + Filename with path to the cached pickle file for this config file. + + return: boolean + True: + Indicates 'dirty' pickle cache: i.e. the file is not current, generate new binary + False + Load current cache file + + """ + # If no pickle cache exists return True to make a new one. + if not os.path.exists(cache_name): + log.debug('No pickle cache exists, make a new one') + return True + # Has the yaml config file has been modified since the creation of the pickle cache + if os.path.getmtime(yaml_file_name) > os.path.getmtime(cache_name): + log.info(f'{yaml_file_name} modified - make a new binary pickle cache file.') + return True + # Get the directory of the yaml config file to be parsed + dir_name = os.path.dirname(yaml_file_name) + # Open the yaml config file to look for '!includes' to be tested on the next iteration + with open(yaml_file_name, "r") as file: + try: + for line in file: + if not line.strip().startswith("#") and "!include" in line: + check = check_yaml_timestamps( + os.path.join(dir_name, line.strip().split(" ")[2]), cache_name) + if check: + return True + except RecursionError as e: + print(f'ERROR: {e}: Infinite loop: check that yaml config files are not looping ' + f'back and forth to one another thought the "!include" statements.') + return False + + +def update_cache(yaml_file_name, cache_file_name, object_to_serialize): + """ + Caches the result of loader (yaml_file_name) to pickle binary (cache_file_name), if + the yaml config file has been modified since the last pickle cache was created, i.e. + (the binary pickle cache is declared to be 'dirty' in 'check_yaml_timestamps()'). + + param: yaml_file_name: str + Name of the yaml configuration file to be serialized ('pickled') + param: cache_file_name: str + File name with path to the new serialized cached pickle file for this config file.: + param: object_to_serialize: object + Object to serialize ('pickle') e.g. instance of 'ait.core.cmd.CmdDict' + + """ + + msg = f'Saving updates from more recent {yaml_file_name} to {cache_file_name}.' + log.info(msg) + with open(cache_file_name, "wb") as output: + pickle.dump(object_to_serialize, output, -1) + + def __init_extensions__(modname, modsyms): # noqa - """Initializes a module (given its name and :func:`globals()` symbol + """ + Initializes a module (given its name and :func:`globals()` symbol table) for AIT extensions. For every Python class defined in the given module, a @@ -168,7 +190,8 @@ def __init_extensions__(modname, modsyms): # noqa """ def createFunc(cls, extname): # noqa - """Creates and returns a new ``createXXX()`` function to instantiate + """ + Creates and returns a new ``createXXX()`` function to instantiate either the given class by class object (*cls*) or extension class name (*extname*). @@ -214,7 +237,8 @@ def create(*args, **kwargs): def __load_functions__(symtbl): # noqa - """Loads all Python functions from the module specified in the + """ + Loads all Python functions from the module specified in the ``functions`` configuration parameter (in config.yaml) into the given symbol table (Python dictionary). """ @@ -234,7 +258,8 @@ def __load_functions__(symtbl): # noqa def crc32File(filename, skip=0): # noqa - """Computes the CRC-32 of the contents of filename, optionally + """ + Computes the CRC-32 of the contents of filename, optionally skipping a certain number of bytes at the beginning of the file. """ with open(filename, "rb") as stream: @@ -250,7 +275,8 @@ def endianSwapU16(bytes): # noqa def setDictDefaults(d, defaults): # noqa - """Sets all defaults for the given dictionary to those contained in a + """ + Sets all defaults for the given dictionary to those contained in a second defaults dictionary. This convenience method calls: d.setdefault(key, value) @@ -264,7 +290,8 @@ def setDictDefaults(d, defaults): # noqa def getDefaultDict(modname, config_key, loader, reload=False, filename=None): # noqa - """Returns default AIT dictonary for modname + """ + Returns default AIT dictonary for modname This helper function encapulates the core logic necessary to (re)load, cache (via util.ObjectCache), and return the default @@ -311,7 +338,8 @@ def toBCD(n): # noqa def toFloat(str, default=None): # noqa - """toFloat(str[, default]) -> float | default + """ + toFloat(str[, default]) -> float | default Converts the given string to a floating-point value. If the string could not be converted, default (None) is returned. @@ -342,7 +370,8 @@ def toFloat(str, default=None): # noqa def toNumber(str, default=None): # noqa - """toNumber(str[, default]) -> integer | float | default + """ + toNumber(str[, default]) -> integer | float | default Converts the given string to a numeric value. The string may be a hexadecimal, integer, or floating number. If string could not be @@ -472,6 +501,55 @@ def listAllFiles(directory, suffix=None, abspath=False): # noqa return files +class TestFile: + """TestFile + + TestFile is a Python Context Manager for quickly creating test + data files that delete when a test completes, either successfully + or unsuccessfully. + + Example: + + with TestFile(data) as filename: + # filename (likely something like '/var/tmp/tmp.1.uNqbVJ') now + # contains data. + assert load(filename) + + Whether the above assert passes or throws AssertionError, filename + will be deleted. + """ + + def __init__(self, data, options): + """ + Creates a new TestFile and writes data to a temporary file. + + Parameters: + data: text/binary + the data to be written to the temporary file + + options: str + file operations for reading, writing, concatenating the temporary file + """ + self._filename = None + + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + self._tfile = tempfile.NamedTemporaryFile(mode=options) + self._filename = self._tfile.name + + with open(self._filename, "w") as output: + output.write(data) + + def __enter__(self): + """Enter the runtime context and return filename.""" + return self._filename + + def __exit__(self, exc_type, exc_value, traceback): + """Exit the runtime context and delete filename.""" + self._tfile.close() + self._filename = None + + class YAMLValidationError(Exception): def __init__(self, arg): # Set some exception infomation diff --git a/doc/source/ait.core.server.plugins.apid_routing.rst b/doc/source/ait.core.server.plugins.apid_routing.rst new file mode 100644 index 00000000..22bdadca --- /dev/null +++ b/doc/source/ait.core.server.plugins.apid_routing.rst @@ -0,0 +1,7 @@ +ait.core.server.plugins.apid_routing module +====================================== + +.. automodule:: ait.core.server.plugins.apid_routing + :members: + :undoc-members: + :show-inheritance: \ No newline at end of file diff --git a/doc/source/ait.core.server.plugins.rst b/doc/source/ait.core.server.plugins.rst index a0d6b7ca..9f1d8b45 100644 --- a/doc/source/ait.core.server.plugins.rst +++ b/doc/source/ait.core.server.plugins.rst @@ -9,6 +9,7 @@ Submodules ait.core.server.plugins.data_archive ait.core.server.plugins.limit_monitor ait.core.server.plugins.openmct + ait.core.server.plugins.apid_routing Module contents --------------- diff --git a/doc/source/conf.py b/doc/source/conf.py index 1acd86b6..07cee373 100644 --- a/doc/source/conf.py +++ b/doc/source/conf.py @@ -65,9 +65,9 @@ # built documents. # # The short X.Y version. -version = u'2.3.5' +version = u'2.3.6-rc1' # The full version, including alpha/beta/rc tags. -release = u'2.3.5' +release = u'2.3.6-rc1' # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/doc/source/plugin_openmct.rst b/doc/source/plugin_openmct.rst index b0b2009f..9dc3c786 100644 --- a/doc/source/plugin_openmct.rst +++ b/doc/source/plugin_openmct.rst @@ -1,15 +1,15 @@ AIT OpenMCT Plugin ======================== -The 'openmct' directory (AIT-Core/openmct/) contains files needed by your -OpenMCT installation that will expose AIT realtime and historical telemetry +The *openmct* directory (AIT-Core/openmct/) contains files needed to expose AIT realtime and historical telemetry endpoints to the OpenMCT framework. -There is a two step process: +This is a two step process: * Activate the OpenMCT plugin within the AIT server configuration. This step creates the data source from which OpenMCT will pull data. -* Integrate AIT extensions in OpenMCT. This step installs Javascript extensions into the OpenMCT framework that will access the AIT OpenMCT plugin service. +* Deploy web-server with AIT-OpenMCT integration. This step sets up a Node web-server for the OpenMCT framework that will access the AIT OpenMCT plugin service. + .. _Ait_openmct_plugin: @@ -20,8 +20,8 @@ Activating the OpenMCT Plugin Update your AIT configuration file :ref:`config.yaml ` to add the AITOpenMctPlugin in the 'server:plugins:' section. .. _Ait_openmct_port: -The plugin's 'service_port' value defaults to 8082, but can be overridden in the configuration. If something other than the default is used, you will also need to include this in -the OpenMCT frameworks's setup configuration. + +The plugin's 'service_port' value defaults to 8082, but can be overridden in the configuration. If something other than the default is used, you will also need to include this in the OpenMCT frameworks's setup configuration. Currently, the server is assumed to run on 'localhost'. @@ -74,48 +74,71 @@ Integrating with OpenMCT Framework ---------------------------------- **Note:** -At this time, the AIT-Integration is capatible with OpenMCT v0.14.0. Setup step 1 will address this. +Earlier versions of the AIT-OpenMCT integration required explicit +installation of OpenMCT, and adding AIT extensions to that deployment. +This has since been simplified where OpenMCT is now treated as a dependency. **Note:** -The AIT extension requires 'http.js', a library that was included in the OpenMCT Tutorial (Apache License, Version 2.0). -The source location of this file is: https://github.com/nasa/openmct-tutorial/tree/completed/lib/http.js +The AIT extension requires 'http.js', a library that was +included in the OpenMCT Tutorial (Apache License, Version 2.0). +The source location of this file is: +https://github.com/nasa/openmct-tutorial/tree/completed/lib/http.js +It is currently included with our example OpenMCT server. -Setup -^^^^^ +Server Setup +^^^^^^^^^^^^^^ -1. Install OpenMCT (https://nasa.github.io/openmct/getting-started/) +While treating OpenMCT as a dependency, a Node web-server capable of running +the OpenMCT service is still needed. AIT provides a basic example +server which fulfills this need (based on the OpenMCT tutorial). +See `AIT-Core/openmct/example-server +`_. -To ensure you get the capatible version of the software, after performing the git-clone step, you will need to checkout the v0.14.0 version. +The example server includes: -.. code-block:: none +* *package.json* - with all dependencies, including OpenMCT; and service launcher. + +* *server.js* - entry point for the web-server that will host OpenMCT service. + +* *index.html* - sets up OpenMCT and AIT extensions. + +* *lib/http.js* - modified library required by the integration. - git clone https://github.com/nasa/openmct.git #Download OpenMCT - git checkout v0.14.0 #Checkout required version - npm install #Install dependencies +* *ait_integration.js* - symlink to AIT-OpenMct service integration. -We will assume that OpenMCT is installed in a directory referenced -by the environment variable ${OPENMCT_DIR} +**Setup steps:** +Are steps assume you will be setting up the web-server in a directory identified by $OPENMCT_DIR -2. Copy the downloaded 'http.js' library file to your OpenMCT installation: +1. Copy *example-server* to a directory referenced by *$OPENMCT_DIR* .. code-block:: none - mkdir ${OPENMCT_DIR}/lib - cp http.js ${OPENMCT_DIR}/lib/ + cp -RL ./example-server $OPENMCT_DIR #Recursive copy, resolve symlinks to real files + + +2) Install service dependencies (including OpenMCT) via NPM and package.json: + +.. code-block:: none + + cd $OPENMCT_DIR + npm install + +**Running steps:** -3. Copy the 'ait_integration.js' file to your OpenMCT installation: +The web-server can be launched via Node-NPM: .. code-block:: none - cp ait_integration.js ${OPENMCT_DIR} + npm start +Notes on the OpenMCT Extensions +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -4. Edit the existing OpenMCT 'index.html' file to include references to the 'http.js' and 'ait_integration.js' (prior -to the script tag that initializes OpenMCT): +The index.html includes the import of the required Javascript files: .. code-block:: none @@ -123,16 +146,17 @@ to the script tag that initializes OpenMCT): -5. Install AIT extensions to the openmct framework (prior to the openmct.start() function call). Value of 'port' should match the value used in the :ref:`previous section`. +...as well as the OpenMCT installation of the AIT integration and data endpoints: .. code-block:: none - openmct.install(AITIntegration({ - host: 'localhost', - port : 8082 })); - openmct.install(AITHistoricalTelemetryPlugin()); - openmct.install(AITRealtimeTelemetryPlugin()); + openmct.install(AITIntegration({ + host: 'localhost', + port : 8082 })); + openmct.install(AITHistoricalTelemetryPlugin()); + openmct.install(AITRealtimeTelemetryPlugin()); +**Note:** If you change the AIT-OpenMCT plugin's *service_port* in your AIT config, the same value should be used for the *port* above. diff --git a/openmct/README b/openmct/README index 0d584ed8..d6c844a3 100644 --- a/openmct/README +++ b/openmct/README @@ -1,6 +1,7 @@ -This directory contains files need by your OpenMCT installation that -will expose AIT realtime and historical telemetry endpoints to the -OpenMCT framework. +This directory contains files needed to expose AIT realtime and historical +telemetry endpoints to the OpenMCT framework. + +You can learn more about OpenMCT by visiting: https://nasa.github.io/openmct/ ------------------------ @@ -16,41 +17,51 @@ host 'localhost' and port 8082, which are two configuration options that can be passed to the OpenMct AIT extension during session setup. +- Earlier versions of the AIT-OpenMCT integration required explicit +installation of OpenMCT, and adding AIT extensions to that deployment. +This has since been simplified where OpenMCT is now treated as a dependency. + - The AIT extension requires 'http.js', a library that was included in the OpenMCT Tutorial (Apache License, Version 2.0). The source location of this file is: https://github.com/nasa/openmct-tutorial/tree/completed/lib/http.js +It is currently included with our example OpenMCT server. ------------------------ Setup: -1) Install OpenMCT (https://nasa.github.io/openmct/getting-started/) -We will assume that OpenMCT is installed in a directory referenced -by the environment variable ${OPENMCT_DIR} +While treating OpenMCT as a dependency, a Node web-server capable of running +the OpenMCT service is still needed. AIT provides a basic example +server which fulfills this need (based on OpenMCT's tutorial). +See AIT-Core/openmct/example-server/. + +The example server includes: +- package.json; with all dependencies (including OpenMCT) and service launcher. +- server.js; entry point for the web-server that will host OpenMCT service. +- index.html; sets up OpenMCT and AIT extensions. +- lib/https.js; a modified library required by the integration. +- ait_integration.js; symlink to AIT-OpenMct service integration. +Setup via the example-server: -2) Copy the downloaded 'http.js' library file to your OpenMCT installation: +1) Copy 'example-server' to a directory referenced by the environment variable ${OPENMCT_DIR} -> mkdir ${OPENMCT_DIR}/lib -> cp http.js ${OPENMCT_DIR}/lib/ +> cp -RL ./example-server ${OPENMCT_DIR} #Recursive copy, resolve symlinks to real files -3) Copy the 'ait_integration.js' file to your OpenMCT installation: +2) Install service dependencies (including OpenMCT) via NPM and package.json: -> cp ait_integration.js ${OPENMCT_DIR} +> cd ${OPENMCT_DIR} +> npm install -4) Edit the existing OpenMCT 'index.html' file to include -references to the 'http.js' and 'ait_integration.js' (prior -to the script tag that initializes OpenMCT): +The index.html includes the import of the required Javascript files: - -5) Install AIT extensions to the openmct framework (prior -to the openmct.start() function call): +...as well as the OpenMCT installation of the integration and data endpoints: openmct.install(AITIntegration({ host: 'localhost', @@ -59,10 +70,14 @@ to the openmct.start() function call): openmct.install(AITRealtimeTelemetryPlugin()); +The web-server can be launched via Node-NPM: + +> npm start + -------------------------- -Running: +Running AIT and OpenMCT: 1) Start the AIT server (configured to run AIT's OpenMct plugin) -2) Start OpenMCT server. +2) Start OpenMCT server 3) Open browser to location of the OpenMCT UI endpoint. diff --git a/openmct/ait_integration.js b/openmct/ait_integration.js index fd8bcca3..a4df8215 100644 --- a/openmct/ait_integration.js +++ b/openmct/ait_integration.js @@ -48,25 +48,43 @@ * 3) Point your browser to localhost:8080 */ -//DEFAULTS + + +//----- +//Constants + +let OBJ_NAMESPACE = "ait-ns"; +let OBJ_ROOT = "tlm-dict"; +let OBJ_NS_ROOT = OBJ_NAMESPACE+":"+OBJ_ROOT; + +//----- +//Defaults + // AIT connection settings const AIT_HOST_DEFAULT = 'localhost'; const AIT_PORT_DEFAULT = 8082; //Debug const DEBUG_ENABLED_DEFAULT = false; +//Full-Field names (field names contain packet) +const FULL_FIELD_NAMES_DEFAULT = false; +//----- // State variables for connections, debug, ws-reconnect + let ait_host = AIT_HOST_DEFAULT; let ait_post = AIT_PORT_DEFAULT; let ait_debug = DEBUG_ENABLED_DEFAULT; +//controls if field names are full (with packet) or not +let full_field_names = FULL_FIELD_NAMES_DEFAULT; + +// Web-socket reconnection settings let ws_reconnect_enabled = true; let ws_reconnect_wait_millis = 10000; // Keep a reference to our promise for tlmdict let tlmdictPromise = null; - //--------------------------------------------- // Updated OpenMCT endpoint returns a converted version of the original AIT @@ -92,6 +110,8 @@ function debugMsg(msg) { console.log("OpenMct/AIT: "+msg); } +//--------------------------------------------- + //--------------------------------------------- @@ -99,10 +119,11 @@ function debugMsg(msg) { //for AIT function AITIntegration(config) { - + //set values to default ait_host = AIT_HOST_DEFAULT; ait_post = AIT_PORT_DEFAULT; ait_debug = DEBUG_ENABLED_DEFAULT; + full_field_names = FULL_FIELD_NAMES_DEFAULT; //check for configuration overrides if (config != null) @@ -122,90 +143,160 @@ function AITIntegration(config) { ait_port = (Number.isInteger(config.port)) ? config.port : (parseInt(config.port) || AIT_PORT_DEFAULT); } + if (config.hasOwnProperty('full_field_names')) + { + full_field_names = (config.full_field_names === "true"); + } } //You will only see these if DEBUG was enabled - debugMsg("AIT Debug set to: "+ait_debug); - debugMsg("AIT Host set to: "+ait_host); - debugMsg("AIT Port set to: "+ait_port); + debugMsg("AIT Debug set to: "+ait_debug); + debugMsg("AIT Host set to: "+ait_host); + debugMsg("AIT Port set to: "+ait_port); + debugMsg("Full-Fields set to: "+full_field_names); tlmdictPromise = getDictionary(ait_host, ait_port); + //AIT Object Provider let objectProvider = { get: function (identifier) { return tlmdictPromise.then(function (dictionary) { - if (identifier.key === 'spacecraft') { - return { + + const id_key = identifier.key.toString(); + let rval = null; + + if (identifier.key === OBJ_ROOT) { + + //Identifier is Root. + //Provide information about the root + rval = { identifier: identifier, name: dictionary.name, type: 'folder', location: 'ROOT' }; - } else { - let measurement = dictionary.measurements.filter(function (m) { - return m.key === identifier.key; - })[0]; - return { + } else if (!id_key.includes(".") ) { + + //Identifier is packet + //Provide information about the packet (which contains fields) + rval = { identifier: identifier, - name: measurement.name, - type: 'telemetry', - telemetry: { - values: measurement.values - }, - location: 'taxonomy:spacecraft' + name: identifier.key, + type: 'folder', + location: OBJ_NS_ROOT }; + } else { + + //Identifier is packet-field + //Provide information about the packet-field + let measurement = dictionary.measurements.find(function (m) { + return m.key === identifier.key;}); + if (measurement != null) { + const pkt_fld_array = id_key.split("."); + const packet_name = pkt_fld_array[0]; + const field_name = pkt_fld_array[1]; + + // Include full field name (pkt.fld) or just name (fld) + const name_value = full_field_names ? measurement.name : field_name; + + rval = { + identifier: identifier, + name: name_value, + type: 'telemetry', + telemetry: { + values: measurement.values + }, + location: OBJ_NS_ROOT + }; + } else { + console.warn("AIT-objectProvider received unknown " + + "measurement key: "+id_key) + return null; + } } + + return rval; }); } }; + //Composition provides a tree structure, where each packet is a folder + //containing packet fields telemetry. + //TODO: Consider another layer for subsystem or other? let compositionProvider = { appliesTo: function (domainObject) { - return domainObject.identifier.namespace === 'taxonomy' && - domainObject.type === 'folder'; + + let id_key = domainObject.identifier.key.toString(); + //This applies to our namespace, for folder types, + //but not when key includes '.' (so packets ok, fields not ok) + return domainObject.identifier.namespace === OBJ_NAMESPACE && + domainObject.type === 'folder' && + !id_key.includes("."); }, load: function (domainObject) { - return tlmdictPromise - .then(function (dictionary) { - return dictionary.measurements.map(function (m) { + + let id_key = domainObject.identifier.key.toString(); + + if (id_key === OBJ_ROOT) { + return tlmdictPromise.then(function (dictionary) { + + //Top level, so collect all of the Packet names (no fields) + + //create array of unique packet names (by examining all field names) + let keySet = new Set(dictionary.measurements.map(item => + item.key.substr(0, + item.key.indexOf(".")))); + let keyArr = [...keySet]; + + //return array of packet-name structs + let rval = keyArr.map(function (key) { return { - namespace: 'taxonomy', + namespace: OBJ_NAMESPACE, + key: key + }; + }); + return rval; + }); + } else { + return tlmdictPromise.then(function (dictionary) { + + //Collect all fields that are part of the packet identified + //by id_key + let pkt_fields = dictionary.measurements.filter(function(m) { + return m.key.startsWith(id_key)}); + + //return array of field structs + let rval = pkt_fields.map(function (m) { + return { + namespace: OBJ_NAMESPACE, key: m.key }; }); + return rval; }); - } //, - // loadHeirarchy: function (domainObject) { - // return tlmdictPromise - // .then(function (dictionary) { - // domainObject.identifier.key - // function checkParent(age) { - // return age >= 18; - // } - // return dictionary.measurements.map(function (m) { - // return { - // namespace: 'taxonomy', - // key: m.key - // }; - // }); - // }); - // } + } + } }; return function install(openmct) { + + //Add AIT Root openmct.objects.addRoot({ - namespace: 'taxonomy', - key: 'spacecraft' + namespace: OBJ_NAMESPACE, + key: OBJ_ROOT }); - openmct.objects.addProvider('taxonomy', objectProvider); + //Add Provider for AIT Objects + openmct.objects.addProvider(OBJ_NAMESPACE, objectProvider); + //Add Provider to handle tree structure of telem fields openmct.composition.addProvider(compositionProvider); + //Add telemetry type for AIT fields openmct.types.addType('telemetry', { name: 'Telemetry Point', - description: 'Spacecraft Telemetry point', + description: 'AIT Telemetry point', cssClass: 'icon-telemetry' }); }; @@ -214,6 +305,7 @@ function AITIntegration(config) { //--------------------------------------------- //Historical telemetry +//TODO: support the other OpenMCT historical query parameters function AITHistoricalTelemetryPlugin() { return function install (openmct) { @@ -222,13 +314,13 @@ function AITHistoricalTelemetryPlugin() { return domainObject.type === 'telemetry'; }, request: function (domainObject, options) { - let histUrlRoot = 'http://' + ait_host + ':' + ait_port + '/tlm/history/' + let histUrlRoot = 'http://' + ait_host + ':' + ait_port + '/tlm/history/'; let histUrl = histUrlRoot + domainObject.identifier.key + '?start=' + options.start + '&end=' + options.end; return http.get(histUrl) .then(function (resp) { - return resp.data + return resp.data; }); } }; @@ -264,7 +356,7 @@ let connectRealtime = function() return; } - let msg_json = JSON.parse(event.data) + let msg_json = JSON.parse(event.data); //Check that JSON object contains telemetry info if (msg_json.hasOwnProperty('packet') && @@ -310,7 +402,7 @@ function AITRealtimeTelemetryPlugin() { //attach to listener map declared above let listener = realtimeListeners; - connectRealtime(); + web_socket = connectRealtime(); let provider = { supportsSubscribe: function (domainObject) { @@ -318,10 +410,12 @@ function AITRealtimeTelemetryPlugin() { }, subscribe: function (domainObject, callback) { debugMsg("Adding realtime subscriber for key "+domainObject.identifier.key); + web_socket.send('subscribe ' + domainObject.identifier.key); listener[domainObject.identifier.key] = callback; return function unsubscribe() { debugMsg("Removing realtime subscriber for key "+domainObject.identifier.key); delete listener[domainObject.identifier.key]; + web_socket.send('unsubscribe ' + domainObject.identifier.key); }; } }; diff --git a/openmct/example-server/ait_integration.js b/openmct/example-server/ait_integration.js new file mode 120000 index 00000000..a0ca631c --- /dev/null +++ b/openmct/example-server/ait_integration.js @@ -0,0 +1 @@ +../ait_integration.js \ No newline at end of file diff --git a/openmct/example-server/index.html b/openmct/example-server/index.html new file mode 100644 index 00000000..ac458be5 --- /dev/null +++ b/openmct/example-server/index.html @@ -0,0 +1,34 @@ + + + + Open MCT Tutorials + + + + + + + + + + + diff --git a/openmct/example-server/lib/http.js b/openmct/example-server/lib/http.js new file mode 100644 index 00000000..2c063d66 --- /dev/null +++ b/openmct/example-server/lib/http.js @@ -0,0 +1,66 @@ +(function (root, factory) { + if (typeof define === 'function' && define.amd) { + define(factory); + } else if (typeof exports === 'object') { + module.exports = factory; + } else { + root.http = factory(root); + } +})(this, function (root) { + + 'use strict'; + + var exports = {}; + + var generateResponse = function (req) { + var response = { + data: req.responseText, + status: req.status, + request: req + }; + if (req.getResponseHeader('Content-Type').indexOf('application/json') !== -1) { + response.data = JSON.parse(response.data); + } + return response; + }; + + var xhr = function (type, url, data) { + var promise = new Promise(function (resolve, reject) { + var XHR = XMLHttpRequest || ActiveXObject; + var request = new XHR('MSXML2.XMLHTTP.3.0'); + + request.open(type, url, true); + request.onreadystatechange = function () { + var req; + if (request.readyState === 4) { + req = generateResponse(request); + if (request.status >= 200 && request.status < 300) { + resolve(req); + } else { + reject(req); + } + } + }; + request.send(data); + }); + return promise; + }; + + exports.get = function (src) { + return xhr('GET', src); + }; + + exports.put = function (url, data) { + return xhr('PUT', url, data); + }; + + exports.post= function (url, data) { + return xhr('POST', url, data); + }; + + exports.delete = function (url) { + return xhr('DELETE', url); + }; + + return exports; +}); diff --git a/openmct/example-server/package.json b/openmct/example-server/package.json new file mode 100644 index 00000000..db801a4f --- /dev/null +++ b/openmct/example-server/package.json @@ -0,0 +1,25 @@ +{ + "name": "ait-openmct-basic-server", + "version": "0.0.1", + "description": "AIT Web-Server for Open MCT", + "main": "server.js", + "scripts": { + "start": "node server.js" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/nasa/openmct-tutorial.git" + }, + "author": "", + "license": "MIT", + "bugs": { + "url": "https://github.com/NASA-AMMOS/AIT-Core/issues" + }, + "homepage": "https://github.com/NASA-AMMOS/AIT-Core/issues", + "dependencies": { + "express": "^4.16.4", + "express-ws": "^4.0.0", + "openmct": "latest", + "ws": "^6.1.2" + } +} diff --git a/openmct/example-server/server.js b/openmct/example-server/server.js new file mode 100644 index 00000000..0827ddb2 --- /dev/null +++ b/openmct/example-server/server.js @@ -0,0 +1,24 @@ +var express = require('express'); + +function StaticServer() { + var router = express.Router(); + + router.use('/', express.static(__dirname + '/.')); + + return router +} + +var expressWs = require('express-ws'); +var app = express(); + +expressWs(app); + +var staticServer = new StaticServer(); +app.use('/', staticServer); + +var port = process.env.PORT || 8080 + +app.listen(port, function () { + console.log('Open MCT hosted at http://localhost:' + port); +}); + diff --git a/pyproject.toml b/pyproject.toml index 12beb098..1ac90c88 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = 'ait-core' -version = '2.3.5' +version = '2.3.6-rc1' description = "NASA JPL's Ground Data System toolkit for Instrument and CubeSat Missions" license = 'MIT' readme = 'README.rst' diff --git a/scripts/example_script.py b/scripts/example_script.py index a7fbf68c..10d92b1a 100644 --- a/scripts/example_script.py +++ b/scripts/example_script.py @@ -1,15 +1,20 @@ #!/usr/bin/env python -''' +""" Check out the AIT API Documentation for a more detailed look at the scripting API. https://ait-core.readthedocs.io/en/latest/api_intro.html -''' + +""" from ait.core.api import Instrument +from ait.core.table import FSWTabDictCache inst = Instrument() # Send a command inst.cmd.send('NO_OP') + +cache = FSWTabDictCache() +tab_dict = cache.load() diff --git a/tests/ait/core/__init__.py b/tests/ait/core/__init__.py index c229aa9c..d7d02924 100644 --- a/tests/ait/core/__init__.py +++ b/tests/ait/core/__init__.py @@ -1,7 +1,7 @@ # Advanced Multi-Mission Operations System (AMMOS) Instrument Toolkit (AIT) # Bespoke Link to Instruments and Small Satellites (BLISS) # -# Copyright 2014, by the California Institute of Technology. ALL RIGHTS +# Copyright 2022, by the California Institute of Technology. ALL RIGHTS # RESERVED. United States Government Sponsorship acknowledged. Any # commercial use must be negotiated with the Office of Technology Transfer # at the California Institute of Technology. @@ -11,71 +11,23 @@ # laws and regulations. User has the responsibility to obtain export licenses, # or other export authority as may be required before exporting such # information to foreign countries or providing access to foreign persons. -""" -AIT Unit and Functional Tests -The ait.test module provides functional and unit tests for ait modules. -""" +"""AIT Unit and Functional Tests""" import logging -import os -import tempfile -import warnings - -import ait.core def setUp(): - """Set up tests. - - Turn logging level to CRITICAL: due to failure test cases, there - are many verbose log messages that are useful in context. + """ + Set up tests: + Turn logging level to CRITICAL: due to failure test cases, there + are many verbose log messages that are useful in context. """ logging.getLogger("ait").setLevel(logging.CRITICAL) def tearDown(): - """Tear down tests. - - Turn logging level back to INFO. """ - logging.getLogger("ait").setLevel(logging.INFO) - - -class TestFile: - """TestFile - - TestFile is a Python Context Manager for quickly creating test - data files that delete when a test completes, either successfully - or unsuccessfully. - - Example: - - with TestFile(data) as filename: - # filename (likely something like '/var/tmp/tmp.1.uNqbVJ') now - # contains data. - assert load(filename) - - Whether the above assert passes or throws AssertionError, filename - will be deleted. + Tear down tests: + Turn logging level back to INFO. """ - - def __init__(self, data): - """Creates a new TestFile and writes data to a temporary file.""" - self._filename = None - - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - self._tfile = tempfile.NamedTemporaryFile(mode="wt") - self._filename = self._tfile.name - - with open(self._filename, "wt") as output: - output.write(data) - - def __enter__(self): - """Enter the runtime context and return filename.""" - return self._filename - - def __exit__(self, exc_type, exc_value, traceback): - """Exit the runtime context and delete filename.""" - self._tfile.close() - self._filename = None + logging.getLogger("ait").setLevel(logging.INFO) diff --git a/tests/ait/core/server/test_apid_routing.py b/tests/ait/core/server/test_apid_routing.py new file mode 100644 index 00000000..7c1af3a2 --- /dev/null +++ b/tests/ait/core/server/test_apid_routing.py @@ -0,0 +1,84 @@ +import unittest + +from unittest.mock import patch +from ait.core import log +from ait.core.server.plugins.apid_routing import APIDRouter +from ait.core.util import TestFile + + +class TestPacket: + def __init__(self, apid=0): + self.apid = apid + + +def create_test_dict(number_of_packets = 150): + test_dict = {} + for i in range(1, number_of_packets+1): + packet_name = f"Packet_{i}" + test_dict[packet_name] = TestPacket(apid = i) + return test_dict + + +class TestPacketRouting(unittest.TestCase): + def routing_table_yaml(): + """ + # Call this function to return the yaml string below + + output_topics: + - telem_topic_1: + - 1 + - 7 + - telem_topic_2: + - 2 + - range: + - 4 + - 9 + - exclude: + - 6 + """ + + pass + + with TestFile(routing_table_yaml.__doc__, "wt") as filename: + def new_init(self, routing_table=None, default_topic=None): + self.default_topic = default_topic + + if 'path' in routing_table: + self.routing_table_object = self.load_table_yaml(routing_table['path'], create_test_dict(10)) + else: + self.routing_table_object = None + log.error("no path specified for routing table") + if self.routing_table_object is None: + log.error("Unable to load routing table .yaml file") + + with patch.object(APIDRouter, '__init__', new_init): + router_plugin_instance = APIDRouter(routing_table={'path': filename}, default_topic= "test_default_topic") + + def test_routing_table(self): + test_routing_table_dict = { + 1: ['test_default_topic', 'telem_topic_1'], + 2: ['test_default_topic', 'telem_topic_2'], + 3: ['test_default_topic'], + 4: ['test_default_topic', 'telem_topic_2'], + 5: ['test_default_topic', 'telem_topic_2'], + 6: ['test_default_topic'], + 7: ['test_default_topic', 'telem_topic_1', 'telem_topic_2'], + 8: ['test_default_topic', 'telem_topic_2'], + 9: ['test_default_topic', 'telem_topic_2'], + 10: ['test_default_topic'] + } + self.assertEqual(self.router_plugin_instance.routing_table_object, test_routing_table_dict) + + def test_apid_extraction1(self): + test_bytearray = bytearray(b'\x00\x1f\x75\x94\xfa\xdc\x43\x90\x9a\x8c\xff\xe0') + self.assertEqual(self.router_plugin_instance.get_packet_apid(test_bytearray), 31) + + def test_apid_extraction2(self): + test_bytearray = bytearray(b'\x01\x03\x75\x94\xfa\xdc\x43\x90\x9a\x8c\xff\xe0') + self.assertEqual(self.router_plugin_instance.get_packet_apid(test_bytearray), 259) + + def test_get_topics(self): + test_bytearray = bytearray(b'\x00\x07\x75\x94\xfa\xdc\x43\x90\x9a\x8c\xff\xe0') + test_bytearray_apid = self.router_plugin_instance.get_packet_apid(test_bytearray) + expected_topics = ['test_default_topic', 'telem_topic_1', 'telem_topic_2'] + self.assertEqual(self.router_plugin_instance.routing_table_object[test_bytearray_apid], expected_topics) diff --git a/tests/ait/core/test_cfg.py b/tests/ait/core/test_cfg.py index 46c44248..b89e60f6 100644 --- a/tests/ait/core/test_cfg.py +++ b/tests/ait/core/test_cfg.py @@ -16,7 +16,7 @@ import time from ait.core import cfg -from tests.ait.core import TestFile +from ait.core.util import TestFile def YAML(): @@ -232,7 +232,7 @@ def test_flatten(): def test_load_yaml(): - with TestFile(data=YAML()) as filename: + with TestFile(YAML(), "w+") as filename: assert cfg.load_yaml(filename) == cfg.load_yaml(data=YAML()) @@ -287,9 +287,8 @@ def test_AitConfig(): path = "bin/ait-orbits" assert_AitConfig(config, path) - with TestFile(data=YAML()) as filename: + with TestFile(YAML(), "w+") as filename: config = cfg.AitConfig(filename) assert_AitConfig(config, path, filename) - config.reload() assert_AitConfig(config, path, filename) diff --git a/tests/ait/core/test_pcap.py b/tests/ait/core/test_pcap.py index e7ad26e5..11ad7722 100644 --- a/tests/ait/core/test_pcap.py +++ b/tests/ait/core/test_pcap.py @@ -11,29 +11,29 @@ # laws and regulations. User has the responsibility to obtain export licenses, # or other export authority as may be required before exporting such # information to foreign countries or providing access to foreign persons. -import gevent.monkey - -gevent.monkey.patch_all() import datetime import os import struct -import time import warnings import time -import tempfile from unittest import mock +from gevent import monkey + from ait.core import dmc, pcap +from ait.core.util import TestFile + +monkey.patch_all() TmpFile = None TmpFilename = None with warnings.catch_warnings(): warnings.simplefilter("ignore") - TmpFile = tempfile.NamedTemporaryFile(mode="wb") - TmpFilename = TmpFile.name + with TestFile('', "wb") as filename: + TmpFilename = filename def testPCapGlobalHeader(): diff --git a/tests/ait/core/test_table.py b/tests/ait/core/test_table.py index 0e76d767..a02f3047 100644 --- a/tests/ait/core/test_table.py +++ b/tests/ait/core/test_table.py @@ -1,12 +1,11 @@ import datetime as dt import inspect -import os.path -import tempfile import unittest import ait.core.dmc as dmc import ait.core.dtype as dtype from ait.core import table +from ait.core.util import TestFile test_table = """ - !FSWTable @@ -72,12 +71,10 @@ class TestTableEncode(unittest.TestCase): @classmethod def setUpClass(cls): - cls.table_defn_path = os.path.join(tempfile.gettempdir(), "test_table.yaml") - - with open(cls.table_defn_path, "w") as infile: - infile.write(test_table) - - cls.tabdict = table.FSWTabDict(cls.table_defn_path) + # The TestFile class is in util.py uses tempfile.NamedTemporaryFile() to create + # a temporary file that will be automatically deleted at the end of the tests + with TestFile(test_table, "rt") as file_name: + cls.tabdict = table.FSWTabDict(file_name) def test_enum_encode(self): defn = self.tabdict["test_type"] @@ -172,12 +169,12 @@ def test_encode_file_no_hdr(self): 4,5,6 """ - tmp_table_input = os.path.join(tempfile.gettempdir(), "test_table.txt") - with open(tmp_table_input, "w") as in_file: - in_file.write(table_data) + with TestFile("test_table.txt", "w+") as file_name: + with open(file_name, "w") as in_file: + assert in_file.write(table_data) - with open(tmp_table_input, "r") as in_file: - encoded = defn.encode(file_in=in_file) + with open(file_name, "r") as in_file: + encoded = defn.encode(file_in=in_file) assert len(encoded) == 13 @@ -200,7 +197,7 @@ def test_encode_file_no_hdr(self): assert encoded[11] == 5 assert encoded[12] == 6 - os.remove(tmp_table_input) + self.assertRaises(StopIteration) def test_encode_file_with_hdr(self): defn = self.tabdict["test_type"] @@ -212,14 +209,14 @@ def test_encode_file_with_hdr(self): hdr_vals = [13, 12, 11] - tmp_table_input = os.path.join(tempfile.gettempdir(), "test_table.txt") - with open(tmp_table_input, "w") as in_file: - in_file.write(table_data) + with TestFile("test_table.txt", "a+") as temp_file_name: + with open(temp_file_name, "w") as in_file: + assert in_file.write(table_data) - with open(tmp_table_input, "r") as in_file: - encoded = defn.encode(file_in=in_file, hdr_vals=hdr_vals) + with open(temp_file_name, "r") as in_file: + encoded = defn.encode(file_in=in_file, hdr_vals=hdr_vals) - assert len(encoded) == 13 + assert len(encoded) == 13 # Check header assert encoded[0] == 13 @@ -240,18 +237,12 @@ def test_encode_file_with_hdr(self): assert encoded[11] == 5 assert encoded[12] == 6 - os.remove(tmp_table_input) - class TestTableDecode(unittest.TestCase): @classmethod def setUpClass(cls): - cls.table_defn_path = os.path.join(tempfile.gettempdir(), "test_table.yaml") - - with open(cls.table_defn_path, "w") as infile: - infile.write(test_table) - - cls.tabdict = table.FSWTabDict(cls.table_defn_path) + with TestFile(test_table, "rt") as file_name: + cls.tabdict = table.FSWTabDict(file_name) def test_decode_binary(self): defn = self.tabdict["test_type"] @@ -283,12 +274,12 @@ def test_enum_decode(self): table_data = ["13,12,11", "1,2,TEST_ENUM_3", "4,5,TEST_ENUM_0"] encoded = defn.encode(text_in=table_data) - bin_file = os.path.join(tempfile.gettempdir(), "test_table_in.bin") - with open(bin_file, "wb") as out_file: - out_file.write(encoded) + with TestFile("test_table_in.bin", "wb+") as temp_bin_file: + with open(temp_bin_file, "wb") as out_file: + out_file.write(encoded) - with open(bin_file, "rb") as out_file: - decoded = defn.decode(file_in=out_file) + with open(temp_bin_file, "rb") as out_file: + decoded = defn.decode(file_in=out_file) # Check header assert decoded[0][0] == 13 @@ -311,12 +302,12 @@ def test_enum_decode_raw_values(self): table_data = ["13,12,11", "1,2,TEST_ENUM_3", "4,5,TEST_ENUM_0"] encoded = defn.encode(text_in=table_data) - bin_file = os.path.join(tempfile.gettempdir(), "test_table_in.bin") - with open(bin_file, "wb") as out_file: - out_file.write(encoded) + with TestFile("test_table_in.bin", "wb+") as temp_bin_file: + with open(temp_bin_file, "wb") as out_file: + assert out_file.write(encoded) - with open(bin_file, "rb") as out_file: - decoded = defn.decode(file_in=out_file, raw=True) + with open(temp_bin_file, "rb") as out_file: + decoded = defn.decode(file_in=out_file, raw=True) # Check header assert decoded[0][0] == 13 @@ -364,69 +355,63 @@ def test_table_duplicate_enum_load(self): 3: TEST_ENUM_3 4: TEST_ENUM_0 """ - table_defn_path = os.path.join( - tempfile.gettempdir(), "test_table_dupe_enum.yaml" - ) - with open(table_defn_path, "w") as infile: - infile.write(test_table) + with TestFile("test_table_dupe_enum.yaml", "wt") as temp_test_file: + with open(temp_test_file, "w") as infile: + assert infile.write(test_table) - with self.assertLogs("ait", level="ERROR") as cm: - tabdict = table.FSWTabDict(table_defn_path) - assert len(cm.output) == 1 + with self.assertLogs("ait", level="ERROR") as cm: + tabdict = table.FSWTabDict(temp_test_file) + assert len(cm.output) == 1 class TestTableTimeHandling(unittest.TestCase): @classmethod def setUpClass(cls): - cls.table_defn_path = os.path.join( - tempfile.gettempdir(), "test_time_table.yaml" - ) - - with open(cls.table_defn_path, "w") as infile: - infile.write( - inspect.cleandoc( + with TestFile("test_time_table.yaml", "wt") as temp_test_file: + with open(temp_test_file, "w") as infile: + assert infile.write( + inspect.cleandoc( + """ + - !FSWTable + name: test_type + delimiter: "," + header: + - !FSWColumn + name: MAGIC_NUM + desc: The first column in our header + type: U8 + bytes: 0 + - !FSWColumn + name: UPTYPE + desc: The second column in our header + type: U8 + bytes: 1 + - !FSWColumn + name: VERSION + desc: The third column in our header + type: U8 + bytes: 2 + columns: + - !FSWColumn + name: COLUMN_ONE + desc: First FSW Table Column + type: TIME64 + bytes: [0,7] + - !FSWColumn + name: COLUMN_TWO + desc: Second FSW Table Column + type: TIME40 + bytes: [8,12] + - !FSWColumn + name: COLUMN_THREE + desc: Third FSW Table Column + type: TIME32 + bytes: [13,16] """ - - !FSWTable - name: test_type - delimiter: "," - header: - - !FSWColumn - name: MAGIC_NUM - desc: The first column in our header - type: U8 - bytes: 0 - - !FSWColumn - name: UPTYPE - desc: The second column in our header - type: U8 - bytes: 1 - - !FSWColumn - name: VERSION - desc: The third column in our header - type: U8 - bytes: 2 - columns: - - !FSWColumn - name: COLUMN_ONE - desc: First FSW Table Column - type: TIME64 - bytes: [0,7] - - !FSWColumn - name: COLUMN_TWO - desc: Second FSW Table Column - type: TIME40 - bytes: [8,12] - - !FSWColumn - name: COLUMN_THREE - desc: Third FSW Table Column - type: TIME32 - bytes: [13,16] - """ + ) ) - ) - - cls.tabdict = table.FSWTabDict(cls.table_defn_path) + cls.tabdict = table.FSWTabDict(temp_test_file) def test_end_to_end_time_handling(self): defn = self.tabdict["test_type"] diff --git a/tests/ait/core/test_tlm.py b/tests/ait/core/test_tlm.py index 6a423944..e84b18a3 100644 --- a/tests/ait/core/test_tlm.py +++ b/tests/ait/core/test_tlm.py @@ -11,18 +11,19 @@ # laws and regulations. User has the responsibility to obtain export licenses, # or other export authority as may be required before exporting such # information to foreign countries or providing access to foreign persons. -import gevent.monkey +# gevent.monkey.patch_all() -gevent.monkey.patch_all() - -import os import csv +import os import struct +from gevent import monkey import pytest from ait.core import tlm +monkey.patch_all() + class TestTlmDictWriter(object): test_yaml_file = "/tmp/test.yaml"