diff --git a/.gitignore b/.gitignore index b60291285..b31443232 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ openc3/spec/examples.txt coverage/ profile/ plugins/DEFAULT/ +**/python/prof/ # local env files .env.local diff --git a/openc3/lib/openc3/microservices/decom_microservice.rb b/openc3/lib/openc3/microservices/decom_microservice.rb index 3b159db75..1e9a5c36b 100644 --- a/openc3/lib/openc3/microservices/decom_microservice.rb +++ b/openc3/lib/openc3/microservices/decom_microservice.rb @@ -14,19 +14,74 @@ # GNU Affero General Public License for more details. # Modified by OpenC3, Inc. -# All changes Copyright 2024, OpenC3, Inc. +# All changes Copyright 2025, OpenC3, Inc. # All Rights Reserved # # This file may also be used under the terms of a commercial license # if purchased from OpenC3, Inc. require 'time' +require 'thread' require 'openc3/microservices/microservice' require 'openc3/microservices/interface_decom_common' require 'openc3/topics/telemetry_decom_topic' require 'openc3/topics/limits_event_topic' module OpenC3 + class LimitsResponseThread + def initialize(microservice_name:, queue:, logger:, metric:, scope:) + @microservice_name = microservice_name + @queue = queue + @logger = logger + @metric = metric + @scope = scope + @count = 0 + @error_count = 0 + @metric.set(name: 'limits_response_total', value: @count, type: 'counter') + @metric.set(name: 'limits_response_error_total', value: @error_count, type: 'counter') + end + + def start + @thread = Thread.new do + run() + rescue Exception => e + @logger.error "#{@microservice_name}: Limits Response thread died: #{e.formatted}" + raise e + end + end + + def stop + if @thread + OpenC3.kill_thread(self, @thread) + @thread = nil + end + end + + def graceful_kill + @queue << [nil, nil, nil] + end + + def run + while true + packet, item, old_limits_state = @queue.pop() + break if packet.nil? + + begin + item.limits.response.call(packet, item, old_limits_state) + rescue Exception => e + @error_count += 1 + @metric.set(name: 'limits_response_error_total', value: @error_count, type: 'counter') + @logger.error "#{packet.target_name} #{packet.packet_name} #{item.name} Limits Response Exception!" + @logger.error "Called with old_state = #{old_limits_state}, new_state = #{item.limits.state}" + @logger.error e.filtered + end + + @count += 1 + @metric.set(name: 'limits_response_total', value: @count, type: 'counter') + end + end + end + class DecomMicroservice < Microservice include InterfaceDecomCommon LIMITS_STATE_INDEX = { RED_LOW: 0, YELLOW_LOW: 1, YELLOW_HIGH: 2, RED_HIGH: 3, GREEN_LOW: 4, GREEN_HIGH: 5 } @@ -44,9 +99,14 @@ def initialize(*args) @error_count = 0 @metric.set(name: 'decom_total', value: @count, type: 'counter') @metric.set(name: 'decom_error_total', value: @error_count, type: 'counter') + @limits_response_queue = Queue.new + @limits_response_thread = nil end def run + @limits_response_thread = LimitsResponseThread.new(microservice_name: @name, queue: @limits_response_queue, logger: @logger, metric: @metric, scope: @scope) + @limits_response_thread.start() + setup_microservice_topic() while true break if @cancel_thread @@ -81,6 +141,9 @@ def run @logger.error("Decom error: #{e.formatted}") end end + + @limits_response_thread.stop() + @limits_response_thread = nil end def decom_packet(_topic, msg_id, msg_hash, _redis) @@ -119,8 +182,9 @@ def decom_packet(_topic, msg_id, msg_hash, _redis) # but that is rescued separately in the limits_change_callback packet.check_limits(System.limits_set) - # This is what updates the CVT + # This is what actually decommutates the packet and updates the CVT TelemetryDecomTopic.write_packet(packet, scope: @scope) + diff = Process.clock_gettime(Process::CLOCK_MONOTONIC) - start # seconds as a float @metric.set(name: 'decom_duration_seconds', value: diff, type: 'gauge', unit: 'seconds') end @@ -179,16 +243,9 @@ def limits_change_callback(packet, item, old_limits_state, value, log_change) LimitsEventTopic.write(event, scope: @scope) if item.limits.response - begin - # TODO: The limits response is user code and should be run as a separate thread / process - # If this code blocks it will delay TelemetryDecomTopic.write_packet - item.limits.response.call(packet, item, old_limits_state) - rescue Exception => e - @error = e - @logger.error "#{packet.target_name} #{packet.packet_name} #{item.name} Limits Response Exception!" - @logger.error "Called with old_state = #{old_limits_state}, new_state = #{item.limits.state}" - @logger.error e.filtered - end + copied_packet = packet.deep_copy + copied_item = packet.items[item.name] + @limits_response_queue << [copied_packet, copied_item, old_limits_state] end end end diff --git a/openc3/lib/openc3/packets/structure.rb b/openc3/lib/openc3/packets/structure.rb index 983b1072d..e4f670314 100644 --- a/openc3/lib/openc3/packets/structure.rb +++ b/openc3/lib/openc3/packets/structure.rb @@ -35,10 +35,10 @@ class Structure # @return [Hash] Items that make up the structure. # Hash key is the item's name in uppercase - attr_reader :items + attr_accessor :items # @return [Array] Items sorted by bit_offset. - attr_reader :sorted_items + attr_accessor :sorted_items # @return [Integer] Defined length in bytes (not bits) of the structure attr_reader :defined_length @@ -511,11 +511,28 @@ def clone # additional work that isn't necessary here structure.instance_variable_set("@buffer".freeze, @buffer.clone) if @buffer # Need to update reference packet in the Accessor + structure.accessor = @accessor.clone structure.accessor.packet = structure return structure end alias dup clone + # Clone that also deep copies items + # @return [Structure] A deep copy of the structure + def deep_copy + cloned = clone() + cloned_items = [] + cloned.sorted_items.each do |item| + cloned_items << item.clone() + end + cloned.sorted_items = cloned_items + cloned.items = {} + cloned_items.each do |item| + cloned.items[item.name] = item + end + return cloned + end + # Enable the ability to read and write item values as if they were methods # to the class def enable_method_missing diff --git a/openc3/lib/openc3/packets/structure_item.rb b/openc3/lib/openc3/packets/structure_item.rb index 66408e666..df8e05dfc 100644 --- a/openc3/lib/openc3/packets/structure_item.rb +++ b/openc3/lib/openc3/packets/structure_item.rb @@ -303,6 +303,7 @@ def <=>(other) def clone item = super() item.name = self.name.clone if self.name + item.key = self.key.clone if self.key item end alias dup clone diff --git a/openc3/python/openc3/microservices/decom_microservice.py b/openc3/python/openc3/microservices/decom_microservice.py index 2343c242e..66b1ac0b6 100644 --- a/openc3/python/openc3/microservices/decom_microservice.py +++ b/openc3/python/openc3/microservices/decom_microservice.py @@ -1,4 +1,4 @@ -# Copyright 2024 OpenC3, Inc. +# Copyright 2025 OpenC3, Inc. # All Rights Reserved. # # This program is free software; you can modify and/or redistribute it @@ -18,6 +18,8 @@ import sys import time import json +import threading +import queue from openc3.microservices.microservice import Microservice from openc3.system.system import System from openc3.topics.topic import Topic @@ -29,10 +31,68 @@ handle_build_cmd, handle_inject_tlm, ) +from openc3.top_level import kill_thread + + +class LimitsResponseThread: + def __init__(self, microservice_name, queue, logger, metric, scope): + self.microservice_name = microservice_name + self.queue = queue + self.logger = logger + self.metric = metric + self.scope = scope + self.count = 0 + self.error_count = 0 + self.metric.set(name="limits_response_total", value=self.count, type="counter") + self.metric.set(name="limits_response_error_total", value=self.error_count, type="counter") + + def start(self): + self.thread = threading.Thread(target=self.run, daemon=True) + self.thread.start() + return self.thread + + def stop(self): + if self.thread: + kill_thread(self, self.thread) + self.thread = None + + def graceful_kill(self): + self.queue.put([None, None, None]) + + def run(self): + try: + while True: + packet, item, old_limits_state = self.queue.get() + if packet is None: + break + + try: + item.limits.response.call(packet, item, old_limits_state) + except Exception as error: + self.error_count += 1 + self.metric.set(name="limits_response_error_total", value=self.error_count, type="counter") + self.logger.error( + f"{packet.target_name} {packet.packet_name} {item.name} Limits Response Exception!" + ) + self.logger.error(f"Called with old_state = {old_limits_state}, new_state = {item.limits.state}") + self.logger.error(repr(error)) + + self.count += 1 + self.metric.set(name="limits_response_total", value=self.count, type="counter") + except Exception as error: + self.logger.error(f"{self.microservice_name}: Limits Response thread died: {repr(error)}") + raise error class DecomMicroservice(Microservice): - LIMITS_STATE_INDEX = { "RED_LOW": 0, "YELLOW_LOW": 1, "YELLOW_HIGH": 2, "RED_HIGH": 3, "GREEN_LOW": 4, "GREEN_HIGH": 5 } + LIMITS_STATE_INDEX = { + "RED_LOW": 0, + "YELLOW_LOW": 1, + "YELLOW_HIGH": 2, + "RED_HIGH": 3, + "GREEN_LOW": 4, + "GREEN_HIGH": 5, + } def __init__(self, *args): super().__init__(*args) @@ -46,8 +106,19 @@ def __init__(self, *args): self.error_count = 0 self.metric.set(name="decom_total", value=self.count, type="counter") self.metric.set(name="decom_error_total", value=self.error_count, type="counter") + self.limits_response_queue = queue.Queue() + self.limits_response_thread = None def run(self): + self.limits_response_thread = LimitsResponseThread( + microservice_name=self.name, + queue=self.limits_response_queue, + logger=self.logger, + metric=self.metric, + scope=self.scope, + ) + self.limits_response_thread.start() + self.setup_microservice_topic() while True: if self.cancel_thread: @@ -78,6 +149,9 @@ def run(self): self.error = error self.logger.error(f"Decom error {repr(error)}") + self.limits_response_thread.stop() + self.limits_response_thread = None + def decom_packet(self, topic, msg_id, msg_hash, _redis): # OpenC3.in_span("decom_packet") do msgid_seconds_from_epoch = int(msg_id.split("-")[0]) / 1000.0 @@ -139,7 +213,7 @@ def limits_change_callback(self, packet, item, old_limits_state, value, log_chan if item.limits.values: values = item.limits.values[System.limits_set()] # Check if the state is RED_LOW, YELLOW_LOW, YELLOW_HIGH, RED_HIGH, GREEN_LOW, GREEN_HIGH - if DecomMicroservice.LIMITS_STATE_INDEX.get(item.limits.state): + if DecomMicroservice.LIMITS_STATE_INDEX.get(item.limits.state, None) is not None: # Directly index into the values and return the value message += f" ({values[DecomMicroservice.LIMITS_STATE_INDEX[item.limits.state]]})" elif item.limits.state == "GREEN": @@ -153,7 +227,7 @@ def limits_change_callback(self, packet, item, old_limits_state, value, log_chan # Include the packet_time in the log json but not the log message # Can't use isoformat because it appends "+00:00" instead of "Z" - time = { 'packet_time': packet_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ") } + time = {"packet_time": packet_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ")} if log_change: match item.limits.state: case "BLUE" | "GREEN" | "GREEN_LOW" | "GREEN_HIGH": @@ -179,15 +253,9 @@ def limits_change_callback(self, packet, item, old_limits_state, value, log_chan LimitsEventTopic.write(event, scope=self.scope) if item.limits.response is not None: - try: - # TODO: The limits response is user code and should be run as a separate thread / process - # If this code blocks it will delay TelemetryDecomTopic.write_packet - item.limits.response.call(packet, item, old_limits_state) - except Exception as error: - self.error = error - self.logger.error(f"{packet.target_name} {packet.packet_name} {item.name} Limits Response Exception!") - self.logger.error(f"Called with old_state = {old_limits_state}, new_state = {item.limits.state}") - self.logger.error(repr(error)) + copied_packet = packet.deep_copy() + copied_item = packet.items[item.name] + self.limits_response_queue.put([copied_packet, copied_item, old_limits_state]) if os.path.basename(__file__) == os.path.basename(sys.argv[0]): diff --git a/openc3/python/openc3/packets/structure.py b/openc3/python/openc3/packets/structure.py index d7117aea8..7aed97f82 100644 --- a/openc3/python/openc3/packets/structure.py +++ b/openc3/python/openc3/packets/structure.py @@ -481,9 +481,24 @@ def buffer(self, buffer): def clone(self): struct = copy.copy(self) struct._buffer = self.buffer # Makes a copy + struct.accessor = copy.copy(self.accessor) struct.accessor.packet = struct return struct + # Clone that also deep copies items + # @return [Structure] A deep copy of the structure + def deep_copy(self): + cloned = self.clone() + cloned_items = [] + for item in cloned.sorted_items: + cloned_items.append(item.clone()) + + cloned.sorted_items = cloned_items + cloned.items = {} + for item in cloned_items: + cloned.items[item.name] = item + return cloned + CLASS_MUTEX = threading.Lock() def setup_mutex(self): diff --git a/openc3/python/test/microservices/test_decom_microservice.py b/openc3/python/test/microservices/test_decom_microservice.py new file mode 100644 index 000000000..647373d7f --- /dev/null +++ b/openc3/python/test/microservices/test_decom_microservice.py @@ -0,0 +1,201 @@ +# Copyright 2025 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# This file may also be used under the terms of a commercial license +# if purchased from OpenC3, Inc. + +import re +import time +import unittest +import threading +from unittest.mock import patch +from datetime import datetime, timezone +from test.test_helper import Mock, mock_redis, setup_system, capture_io +from openc3.api.tlm_api import tlm +from openc3.system.system import System +from openc3.packets.limits_response import LimitsResponse +from openc3.models.target_model import TargetModel +from openc3.models.microservice_model import MicroserviceModel +from openc3.microservices.decom_microservice import DecomMicroservice +from openc3.topics.limits_event_topic import LimitsEventTopic +from openc3.topics.topic import Topic +from openc3.topics.telemetry_topic import TelemetryTopic +from openc3.processors.processor import Processor + + +class TestDecomMicroservice(unittest.TestCase): + @patch("openc3.system.system.System.limits_set") + @patch("openc3.microservices.microservice.System") + def setUp(self, usystem, limits_set): + redis = mock_redis(self) + setup_system() + + limits_set.return_value = "DEFAULT" + + orig_xread = redis.xread + + # Override xread to ignore the block and count keywords + def xread_side_effect(*args, **kwargs): + if "block" in kwargs: + kwargs.pop("block") + result = None + try: + result = orig_xread(*args, **kwargs) + except RuntimeError: + pass + + # # Create a slight delay to simulate the blocking call + if result and len(result) == 0: + time.sleep(0.01) + return result + + redis.xread = Mock() + redis.xread.side_effect = xread_side_effect + + model = TargetModel(name="INST", scope="DEFAULT") + model.create() + model = TargetModel(name="SYSTEM", scope="DEFAULT") + model.create() + model = MicroserviceModel( + "DEFAULT__DECOM__INST_INT", + scope="DEFAULT", + topics=[ + "DEFAULT__TELEMETRY__{INST}__HEALTH_STATUS", + ], + target_names=["INST"], + ) + model.create() + self.dm = DecomMicroservice("DEFAULT__DECOM__INST_INT") + self.dm_thread = threading.Thread(target=self.dm.run) + self.dm_thread.start() + time.sleep(0.001) # Allow the thread to start + + def tearDown(self): + self.dm.shutdown() + self.dm_thread.join() + + def test_run_decommutates_a_packet_from_raw_to_engineering_values(self): + packet = System.telemetry.packet("INST", "HEALTH_STATUS") + packet.extra = {} + packet.extra["STATUS"] = "OK" + packet.received_time = datetime.now(timezone.utc) + for stdout in capture_io(): + TelemetryTopic.write_packet(packet, scope="DEFAULT") + time.sleep(0.01) + self.assertIn("INST HEALTH_STATUS TEMP1 = -100.0 is RED_LOW (-80.0)", stdout.getvalue()) + self.assertIn("INST HEALTH_STATUS TEMP2 = -100.0 is RED_LOW (-60.0)", stdout.getvalue()) + self.assertIn("INST HEALTH_STATUS TEMP3 = -100.0 is RED_LOW (-25.0)", stdout.getvalue()) + self.assertIn("INST HEALTH_STATUS TEMP4 = -100.0 is RED_LOW (-80.0)", stdout.getvalue()) + self.assertIn("INST HEALTH_STATUS GROUND1STATUS = UNAVAILABLE is YELLOW", stdout.getvalue()) + self.assertIn("INST HEALTH_STATUS GROUND2STATUS = UNAVAILABLE is YELLOW", stdout.getvalue()) + self.assertEqual(tlm("INST HEALTH_STATUS TEMP1"), -100.0) + self.assertEqual(tlm("INST HEALTH_STATUS TEMP2"), -100.0) + self.assertEqual(tlm("INST HEALTH_STATUS TEMP3"), -100.0) + self.assertEqual(tlm("INST HEALTH_STATUS TEMP4"), -100.0) + + events = LimitsEventTopic.read(0, scope="DEFAULT") + self.assertEqual(len(events), 6) + # Check the first one completely + self.assertEqual(events[0][1].get("type"), "LIMITS_CHANGE") + self.assertEqual(events[0][1].get("target_name"), "INST") + self.assertEqual(events[0][1].get("packet_name"), "HEALTH_STATUS") + self.assertEqual(events[0][1].get("item_name"), "TEMP1") + self.assertEqual(events[0][1].get("old_limits_state"), "None") + self.assertEqual(events[0][1].get("new_limits_state"), "RED_LOW") + self.assertEqual(events[0][1].get("message"), "INST HEALTH_STATUS TEMP1 = -100.0 is RED_LOW (-80.0)") + self.assertEqual(events[1][1].get("message"), "INST HEALTH_STATUS TEMP2 = -100.0 is RED_LOW (-60.0)") + self.assertEqual(events[2][1].get("message"), "INST HEALTH_STATUS TEMP3 = -100.0 is RED_LOW (-25.0)") + self.assertEqual(events[3][1].get("message"), "INST HEALTH_STATUS TEMP4 = -100.0 is RED_LOW (-80.0)") + self.assertEqual(events[4][1].get("message"), "INST HEALTH_STATUS GROUND1STATUS = UNAVAILABLE is YELLOW") + self.assertEqual(events[5][1].get("message"), "INST HEALTH_STATUS GROUND2STATUS = UNAVAILABLE is YELLOW") + + packet.disable_limits("TEMP3") + packet.write("TEMP1", 0.0) + packet.write("TEMP2", 0.0) + packet.write("TEMP3", 0.0) + for stdout in capture_io(): + TelemetryTopic.write_packet(packet, scope="DEFAULT") + time.sleep(0.01) + assert re.search(r"INST HEALTH_STATUS TEMP1 = .* is BLUE \(-20.0 to 20.0\)", stdout.getvalue()) + assert re.search(r"INST HEALTH_STATUS TEMP2 = .* is GREEN \(-55.0 to 30.0\)", stdout.getvalue()) + + # Start reading from the last event's ID + events = LimitsEventTopic.read(events[-1][0], scope="DEFAULT") + self.assertEqual(len(events), 3) + self.assertEqual(events[0][1]["type"], "LIMITS_CHANGE") + self.assertEqual(events[0][1]["target_name"], "INST") + self.assertEqual(events[0][1]["packet_name"], "HEALTH_STATUS") + self.assertEqual(events[0][1]["item_name"], "TEMP3") + self.assertEqual(events[0][1]["old_limits_state"], "RED_LOW") + self.assertEqual(events[0][1]["new_limits_state"], "None") + self.assertEqual(events[0][1]["message"], "INST HEALTH_STATUS TEMP3 is disabled") + assert re.search(r"INST HEALTH_STATUS TEMP1 = .* is BLUE \(-20.0 to 20.0\)", events[1][1]["message"]) + assert re.search(r"INST HEALTH_STATUS TEMP2 = .* is GREEN \(-55.0 to 30.0\)", events[2][1]["message"]) + + def test_handles_exceptions_in_the_thread(self): + with patch.object(self.dm, "microservice_cmd") as mock_microservice_cmd: + mock_microservice_cmd.side_effect = Exception("Bad command") + for stdout in capture_io(): + Topic.write_topic("MICROSERVICE__DEFAULT__DECOM__INST_INT", {"connect": "true"}, "*", 100) + time.sleep(0.01) + self.assertIn("Decom error Exception('Bad command')", stdout.getvalue()) + # This is an implementation detail but we want to ensure the error was logged + self.assertEqual(self.dm.metric.data["decom_error_total"]["value"], 1) + + def test_handles_exceptions_in_user_processors(self): + packet = System.telemetry.packet("INST", "HEALTH_STATUS") + processor = Processor() + processor.call = lambda packet, buffer: exec("raise RuntimeError('Bad processor')") + packet.processors["TEMP1"] = processor + packet.received_time = datetime.now(timezone.utc) + for stdout in capture_io(): + TelemetryTopic.write_packet(packet, scope="DEFAULT") + time.sleep(0.01) + self.assertIn("Bad processor", stdout.getvalue()) + # This is an implementation detail but we want to ensure the error was logged + self.assertEqual(self.dm.metric.data["decom_error_total"]["value"], 1) + # CVT is still set + self.assertEqual(tlm("INST HEALTH_STATUS TEMP1"), -100.0) + self.assertEqual(tlm("INST HEALTH_STATUS TEMP2"), -100.0) + + def test_handles_limits_responses_in_another_thread(self): + class DelayedLimitsResponse(LimitsResponse): + def call(self, packet, item, old_limits_state): + time.sleep(0.1) + + packet = System.telemetry.packet("INST", "HEALTH_STATUS") + temp1 = packet.get_item("TEMP1") + temp1.limits.response = DelayedLimitsResponse() + packet.received_time = datetime.now(timezone.utc) + TelemetryTopic.write_packet(packet, scope="DEFAULT") + time.sleep(0.01) + + # Verify that even though the limits response sleeps for 0.1s, the decom thread is not blocked + self.assertLess(self.dm.metric.data["decom_duration_seconds"]["value"], 0.01) + + def test_handles_exceptions_in_limits_responses(self): + class BadLimitsResponse(LimitsResponse): + def call(self, packet, item, old_limits_state): + raise RuntimeError("Bad response") + + packet = System.telemetry.packet("INST", "HEALTH_STATUS") + temp1 = packet.get_item("TEMP1") + temp1.limits.response = BadLimitsResponse() + packet.received_time = datetime.now(timezone.utc) + for stdout in capture_io(): + TelemetryTopic.write_packet(packet, scope="DEFAULT") + time.sleep(0.01) + self.assertIn("INST HEALTH_STATUS TEMP1 Limits Response Exception!", stdout.getvalue()) + self.assertIn("Bad response", stdout.getvalue()) + + self.assertEqual(self.dm.limits_response_thread.metric.data["limits_response_error_total"]["value"], 1) diff --git a/openc3/python/test/packets/test_structure.py b/openc3/python/test/packets/test_structure.py index 2d6b90ec0..9a10699ee 100644 --- a/openc3/python/test/packets/test_structure.py +++ b/openc3/python/test/packets/test_structure.py @@ -660,3 +660,21 @@ def test_duplicates_the_structure_with_a_new_buffer(self): self.assertEqual(s2.read("test1"), [0, 0]) # Ensure we didn't change the original self.assertEqual(s.read("test1"), [1, 2]) + + def test_deep_copy(self): + s = Structure("BIG_ENDIAN") + s.append_item("test1", 8, "UINT", 16) + s.write("test1", [1, 2]) + s.append_item("test2", 16, "UINT") + s.write("test2", 0x0304) + s.append_item("test3", 32, "UINT") + s.write("test3", 0x05060708) + + s2 = s.deep_copy() + self.assertEqual(s.items["TEST1"].overflow, 'ERROR') + self.assertEqual(s2.items["TEST1"].overflow, 'ERROR') + # Change something about the item in the original + s.items["TEST1"].overflow = 'SATURATE' + self.assertEqual(s.items["TEST1"].overflow, 'SATURATE') + # Verify the deep_copy didn't change + self.assertEqual(s2.items["TEST1"].overflow, 'ERROR') diff --git a/openc3/spec/microservices/decom_microservice_spec.rb b/openc3/spec/microservices/decom_microservice_spec.rb new file mode 100644 index 000000000..7ce24e44c --- /dev/null +++ b/openc3/spec/microservices/decom_microservice_spec.rb @@ -0,0 +1,193 @@ +# encoding: ascii-8bit + +# Copyright 2025 OpenC3, Inc. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Affero General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# This file may also be used under the terms of a commercial license +# if purchased from OpenC3, Inc. + +require 'spec_helper' +require 'openc3/microservices/decom_microservice' +require 'openc3/packets/limits_response' +require 'openc3/models/metric_model' +require 'openc3/topics/telemetry_topic' +require 'openc3/topics/limits_event_topic' + +module OpenC3 + describe DecomMicroservice do + class ApiTest + include Extract + include Api + include Authorization + end + + before(:each) do + redis = mock_redis() + setup_system() + allow(redis).to receive(:xread).and_wrap_original do |m, *args| + # Only use the first two arguments as the last argument is keyword block: + result = m.call(*args[0..1]) + # Create a slight delay to simulate the blocking call + sleep 0.001 if result and result.length == 0 + result + end + + model = TargetModel.new(folder_name: 'INST', name: 'INST', scope: "DEFAULT") + model.create + model.update_store(System.new(['INST'], File.join(SPEC_DIR, 'install', 'config', 'targets'))) + model = MicroserviceModel.new(name: "DEFAULT__DECOM__INST_INT", scope: "DEFAULT", topics: ["DEFAULT__TELEMETRY__{INST}__HEALTH_STATUS"], target_names: ["INST"]) + model.create + @dm = DecomMicroservice.new("DEFAULT__DECOM__INST_INT") + @api = ApiTest.new + @dm_thread = Thread.new { @dm.run } + sleep 0.001 # Allow the thread to start + end + + after(:each) do + @dm.shutdown() + @dm_thread.join() + end + + describe "run" do + it "decommutates a packet from raw to engineering values" do + packet = System.telemetry.packet('INST', 'HEALTH_STATUS') + packet.extra ||= {} + packet.extra['STATUS'] = 'OK' + packet.received_time = Time.now.sys + capture_io do |stdout| + TelemetryTopic.write_packet(packet, scope: 'DEFAULT') + sleep 0.01 + expect(stdout.string).to include("INST HEALTH_STATUS TEMP1 = -100.0 is RED_LOW (-80.0)") + expect(stdout.string).to include("INST HEALTH_STATUS TEMP2 = -100.0 is RED_LOW (-60.0)") + expect(stdout.string).to include("INST HEALTH_STATUS TEMP3 = -100.0 is RED_LOW (-25.0)") + expect(stdout.string).to include("INST HEALTH_STATUS TEMP4 = -100.0 is RED_LOW (-80.0)") + expect(stdout.string).to include("INST HEALTH_STATUS GROUND1STATUS = UNAVAILABLE is YELLOW") + expect(stdout.string).to include("INST HEALTH_STATUS GROUND2STATUS = UNAVAILABLE is YELLOW") + end + expect(@api.tlm("INST HEALTH_STATUS TEMP1")).to eql(-100.0) + expect(@api.tlm("INST HEALTH_STATUS TEMP2")).to eql(-100.0) + expect(@api.tlm("INST HEALTH_STATUS TEMP3")).to eql(-100.0) + expect(@api.tlm("INST HEALTH_STATUS TEMP4")).to eql(-100.0) + + events = LimitsEventTopic.read(0, scope: "DEFAULT") + expect(events.length).to eql(6) + # Check the first one completely + expect(events[0][1]['type']).to eql("LIMITS_CHANGE") + expect(events[0][1]['target_name']).to eql("INST") + expect(events[0][1]['packet_name']).to eql("HEALTH_STATUS") + expect(events[0][1]['item_name']).to eql("TEMP1") + expect(events[0][1]['old_limits_state']).to eql("") + expect(events[0][1]['new_limits_state']).to eql("RED_LOW") + expect(events[0][1]['time_nsec']).to be > 0 + expect(events[0][1]['message']).to eql("INST HEALTH_STATUS TEMP1 = -100.0 is RED_LOW (-80.0)") + expect(events[1][1]['message']).to eql("INST HEALTH_STATUS TEMP2 = -100.0 is RED_LOW (-60.0)") + expect(events[2][1]['message']).to eql("INST HEALTH_STATUS TEMP3 = -100.0 is RED_LOW (-25.0)") + expect(events[3][1]['message']).to eql("INST HEALTH_STATUS TEMP4 = -100.0 is RED_LOW (-80.0)") + expect(events[4][1]['message']).to eql("INST HEALTH_STATUS GROUND1STATUS = UNAVAILABLE is YELLOW") + expect(events[5][1]['message']).to eql("INST HEALTH_STATUS GROUND2STATUS = UNAVAILABLE is YELLOW") + + packet.disable_limits("TEMP3") + packet.write("TEMP1", 0.0) + packet.write("TEMP2", 0.0) + packet.write("TEMP3", 0.0) + capture_io do |stdout| + TelemetryTopic.write_packet(packet, scope: 'DEFAULT') + sleep 0.01 + expect(stdout.string).to match(/INST HEALTH_STATUS TEMP1 = .* is BLUE \(-20.0 to 20.0\)/) + expect(stdout.string).to match(/INST HEALTH_STATUS TEMP2 = .* is GREEN \(-55.0 to 30.0\)/) + end + + # Start reading from the last event's ID + events = LimitsEventTopic.read(events[-1][0], scope: "DEFAULT") + expect(events.length).to eql(3) + expect(events[0][1]['type']).to eql("LIMITS_CHANGE") + expect(events[0][1]['target_name']).to eql("INST") + expect(events[0][1]['packet_name']).to eql("HEALTH_STATUS") + expect(events[0][1]['item_name']).to eql("TEMP3") + expect(events[0][1]['old_limits_state']).to eql("RED_LOW") + expect(events[0][1]['new_limits_state']).to eql("") + expect(events[0][1]['time_nsec']).to be > 0 + expect(events[0][1]['message']).to eql("INST HEALTH_STATUS TEMP3 is disabled") + expect(events[1][1]['message']).to match(/INST HEALTH_STATUS TEMP1 = .* is BLUE \(-20.0 to 20.0\)/) + expect(events[2][1]['message']).to match(/INST HEALTH_STATUS TEMP2 = .* is GREEN \(-55.0 to 30.0\)/) + end + + it "handles exceptions in the thread" do + expect(@dm).to receive(:microservice_cmd).and_raise("Bad command") + capture_io do |stdout| + Topic.write_topic("MICROSERVICE__DEFAULT__DECOM__INST_INT", { 'connect' => 'true' }, '*', 100) + sleep 0.01 + expect(stdout.string).to include("Decom error: RuntimeError : Bad command") + end + # This is an implementation detail but we want to ensure the error was logged + expect(@dm.instance_variable_get("@metric").data['decom_error_total']['value']).to eql(1) + end + + it "handles exceptions in user processors" do + packet = System.telemetry.packet('INST', 'HEALTH_STATUS') + processor = double(Processor).as_null_object + expect(processor).to receive(:call).and_raise("Bad processor") + packet.processors['TEMP1'] = processor + packet.received_time = Time.now.sys + capture_io do |stdout| + TelemetryTopic.write_packet(packet, scope: 'DEFAULT') + sleep 0.01 + expect(stdout.string).to include("Bad processor") + end + # This is an implementation detail but we want to ensure the error was logged + expect(@dm.instance_variable_get("@metric").data['decom_error_total']['value']).to eql(1) + # CVT is still set + expect(@api.tlm("INST HEALTH_STATUS TEMP1")).to eql(-100.0) + expect(@api.tlm("INST HEALTH_STATUS TEMP2")).to eql(-100.0) + end + + it "handles limits responses in another thread" do + class DelayedLimitsResponse < LimitsResponse + def call(packet, item, old_limits_state) + sleep 0.1 + end + end + + packet = System.telemetry.packet('INST', 'HEALTH_STATUS') + temp1 = packet.get_item("TEMP1") + temp1.limits.response = DelayedLimitsResponse.new + packet.received_time = Time.now.sys + TelemetryTopic.write_packet(packet, scope: 'DEFAULT') + sleep 0.01 + # Verify that even though the limits response sleeps for 0.1s, the decom thread is not blocked + expect(@dm.instance_variable_get("@metric").data['decom_duration_seconds']['value']).to be < 0.01 + end + + it "handles exceptions in limits responses" do + class BadLimitsResponse < LimitsResponse + def call(packet, item, old_limits_state) + raise "Bad response" + end + end + + packet = System.telemetry.packet('INST', 'HEALTH_STATUS') + temp1 = packet.get_item("TEMP1") + temp1.limits.response = BadLimitsResponse.new + packet.received_time = Time.now.sys + capture_io do |stdout| + TelemetryTopic.write_packet(packet, scope: 'DEFAULT') + sleep 0.01 + expect(stdout.string).to include("INST HEALTH_STATUS TEMP1 Limits Response Exception!") + expect(stdout.string).to include("Bad response") + end + lrt = @dm.instance_variable_get("@limits_response_thread") + expect(lrt.instance_variable_get("@metric").data['limits_response_error_total']['value']).to eql(1) + end + end + end +end diff --git a/openc3/spec/packets/structure_spec.rb b/openc3/spec/packets/structure_spec.rb index 222aab898..f02deb19a 100644 --- a/openc3/spec/packets/structure_spec.rb +++ b/openc3/spec/packets/structure_spec.rb @@ -683,19 +683,13 @@ module OpenC3 it "recalculates the bit offsets for 0 size" do s = Structure.new(:BIG_ENDIAN) - # APPEND_ITEM BLOCK 8000 BLOCK "Raw Data" - # APPEND_ITEM IMAGE 0 BLOCK "Image Data" - s.append_item("test1", 80, :BLOCK) s.append_item("test2", 0, :BLOCK) s.define_item("test3", -16, 16, :UINT) s.buffer = "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09" + "\x0a\x0b\x0c\x0d\x0e\x0f\x0f\x0e\x0d\x0c\x0b\x0a\xAA\x55" - puts s.read("test1").formatted - puts s.read("test2").formatted - puts s.read("test3") expect(s.read("test1")).to eql "\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09" - # expect(s.read("test2")).to eql "\x0a\x0b\x0c\x0d\x0e\x0f\x0e\x0d\x0c\x0b\x0a" + expect(s.read("test2")).to eql "\x0a\x0b\x0c\x0d\x0e\x0f\x0f\x0e\x0d\x0c\x0b\x0a\xaa\x55" expect(s.read("test3")).to eql 0xAA55 end end @@ -729,6 +723,27 @@ module OpenC3 end end + describe "deep_copy" do + it "duplicates the structure and items" do + s = Structure.new(:BIG_ENDIAN) + s.append_item("test1", 8, :UINT, 16) + s.write("test1", [1, 2]) + s.append_item("test2", 16, :UINT) + s.write("test2", 0x0304) + s.append_item("test3", 32, :UINT) + s.write("test3", 0x05060708) + + s2 = s.deep_copy() + expect(s.items["TEST1"].overflow).to eql :ERROR + expect(s2.items["TEST1"].overflow).to eql :ERROR + # Change something about the item in the original + s.items["TEST1"].overflow = :SATURATE + expect(s.items["TEST1"].overflow).to eql :SATURATE + # Verify the deep_copy didn't change + expect(s2.items["TEST1"].overflow).to eql :ERROR + end + end + describe "enable_method_missing" do it "enables reading by name" do s = Structure.new(:BIG_ENDIAN)