Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run Limits Responses in Thread to Prevent Blocking Decom #1901

Merged
merged 5 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 67 additions & 11 deletions openc3/lib/openc3/microservices/decom_microservice.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# encoding: ascii-8bit

# Copyright 2022 Ball Aerospace & Technologies Corp.
# Copyright 2025 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand All @@ -21,12 +21,67 @@
# if purchased from OpenC3, Inc.

require 'time'
require 'thread'
require 'openc3/microservices/microservice'
require 'openc3/microservices/interface_decom_common'
require 'openc3/topics/telemetry_decom_topic'
require 'openc3/topics/limits_event_topic'

module OpenC3
class LimitsResponseThread
def initialize(microservice_name:, queue:, logger:, metric:, scope:)
@microservice_name = microservice_name
@queue = queue
@logger = logger
@metric = metric
@scope = scope
@count = 0
@error_count = 0
@metric.set(name: 'limits_response_total', value: @count, type: 'counter')
@metric.set(name: 'limits_response_error_total', value: @error_count, type: 'counter')
end

def start
@thread = Thread.new do
run()
rescue Exception => e
@logger.error "#{@microservice_name}: Limits Response thread died: #{e.formatted}"
raise e
end
end

def stop
if @thread
OpenC3.kill_thread(self, @thread)
@thread = nil
end
end

def graceful_kill
@queue << [nil, nil, nil]
end

def run
while true
packet, item, old_limits_state = @queue.pop()
break if packet.nil?

begin
item.limits.response.call(packet, item, old_limits_state)
rescue Exception => e
@error_count += 1
@metric.set(name: 'limits_response_error_total', value: @error_count, type: 'counter')
@logger.error "#{packet.target_name} #{packet.packet_name} #{item.name} Limits Response Exception!"
@logger.error "Called with old_state = #{old_limits_state}, new_state = #{item.limits.state}"
@logger.error e.filtered
end

@count += 1
@metric.set(name: 'limits_response_total', value: @count, type: 'counter')
end
end
end

class DecomMicroservice < Microservice
include InterfaceDecomCommon
LIMITS_STATE_INDEX = { RED_LOW: 0, YELLOW_LOW: 1, YELLOW_HIGH: 2, RED_HIGH: 3, GREEN_LOW: 4, GREEN_HIGH: 5 }
Expand All @@ -44,9 +99,14 @@ def initialize(*args)
@error_count = 0
@metric.set(name: 'decom_total', value: @count, type: 'counter')
@metric.set(name: 'decom_error_total', value: @error_count, type: 'counter')
@limits_response_queue = Queue.new
@limits_response_thread = nil
end

def run
@limits_response_thread = LimitsResponseThread.new(microservice_name: @name, queue: @limits_response_queue, logger: @logger, metric: @metric, scope: @scope)
@limits_response_thread.start()

setup_microservice_topic()
while true
break if @cancel_thread
Expand Down Expand Up @@ -81,6 +141,9 @@ def run
@logger.error("Decom error: #{e.formatted}")
end
end

@limits_response_thread.stop()
@limits_response_thread = nil
end

def decom_packet(_topic, msg_id, msg_hash, _redis)
Expand Down Expand Up @@ -179,16 +242,9 @@ def limits_change_callback(packet, item, old_limits_state, value, log_change)
LimitsEventTopic.write(event, scope: @scope)

if item.limits.response
begin
# TODO: The limits response is user code and should be run as a separate thread / process
# If this code blocks it will delay TelemetryDecomTopic.write_packet
item.limits.response.call(packet, item, old_limits_state)
rescue Exception => e
@error = e
@logger.error "#{packet.target_name} #{packet.packet_name} #{item.name} Limits Response Exception!"
@logger.error "Called with old_state = #{old_limits_state}, new_state = #{item.limits.state}"
@logger.error e.filtered
end
copied_packet = packet.deep_copy
copied_item = packet.items[item.name]
@limits_response_queue << [copied_packet, copied_item, old_limits_state]
end
end
end
Expand Down
21 changes: 19 additions & 2 deletions openc3/lib/openc3/packets/structure.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ class Structure

# @return [Hash] Items that make up the structure.
# Hash key is the item's name in uppercase
attr_reader :items
attr_accessor :items

# @return [Array] Items sorted by bit_offset.
attr_reader :sorted_items
attr_accessor :sorted_items

# @return [Integer] Defined length in bytes (not bits) of the structure
attr_reader :defined_length
Expand Down Expand Up @@ -511,11 +511,28 @@ def clone
# additional work that isn't necessary here
structure.instance_variable_set("@buffer".freeze, @buffer.clone) if @buffer
# Need to update reference packet in the Accessor
structure.accessor = @accessor.clone
structure.accessor.packet = structure
return structure
end
alias dup clone

# Clone that also deep copies items
# @return [Structure] A deep copy of the structure
def deep_copy
cloned = clone()
cloned_items = []
cloned.sorted_items.each do |item|
cloned_items << item.clone()
end
cloned.sorted_items = cloned_items
cloned.items = {}
cloned_items.each do |item|
cloned.items[item.name] = item
end
return cloned
end

# Enable the ability to read and write item values as if they were methods
# to the class
def enable_method_missing
Expand Down
1 change: 1 addition & 0 deletions openc3/lib/openc3/packets/structure_item.rb
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ def <=>(other)
def clone
item = super()
item.name = self.name.clone if self.name
item.key = self.key.clone if self.key
item
end
alias dup clone
Expand Down
71 changes: 61 additions & 10 deletions openc3/python/openc3/microservices/decom_microservice.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 OpenC3, Inc.
# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand All @@ -18,6 +18,8 @@
import sys
import time
import json
import threading
import queue
from openc3.microservices.microservice import Microservice
from openc3.system.system import System
from openc3.topics.topic import Topic
Expand All @@ -29,7 +31,54 @@
handle_build_cmd,
handle_inject_tlm,
)
from openc3.top_level import kill_thread

class LimitsResponseThread:
def __init__(self, microservice_name, queue, logger, metric, scope):
self.microservice_name = microservice_name
self.queue = queue
self.logger = logger
self.metric = metric
self.scope = scope
self.count = 0
self.error_count = 0
self.metric.set(name="limits_response_total", value=self.count, type="counter")
self.metric.set(name="limits_response_error_total", value=self.error_count, type="counter")

def start(self):
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
return self.thread

def stop(self):
if self.thread:
kill_thread(self, self.thread)
self.thread = None

def graceful_kill(self):
self.queue.put([None, None, None])

def run(self):
try:
while True:
packet, item, old_limits_state = self.queue.get()
if packet is None:
break

try:
item.limits.response.call(packet, item, old_limits_state)
except Exception as error:
self.error_count += 1
self.metric.set(name='limits_response_error_total', value=self.error_count, type='counter')
self.logger.error(f"{packet.target_name} {packet.packet_name} {item.name} Limits Response Exception!")
self.logger.error(f"Called with old_state = {old_limits_state}, new_state = {item.limits.state}")
self.logger.error(repr(error))

self.count += 1
self.metric.set(name='limits_response_total', value=self.count, type='counter')
except Exception as error:
self.logger.error(f"{self.microservice_name}: Limits Response thread died: {repr(error)}")
raise error

class DecomMicroservice(Microservice):
LIMITS_STATE_INDEX = { "RED_LOW": 0, "YELLOW_LOW": 1, "YELLOW_HIGH": 2, "RED_HIGH": 3, "GREEN_LOW": 4, "GREEN_HIGH": 5 }
Expand All @@ -46,8 +95,13 @@ def __init__(self, *args):
self.error_count = 0
self.metric.set(name="decom_total", value=self.count, type="counter")
self.metric.set(name="decom_error_total", value=self.error_count, type="counter")
self.limits_response_queue = queue.Queue()
self.limits_response_thread = None

def run(self):
self.limits_response_thread = LimitsResponseThread(microservice_name=self.name, queue=self.limits_response_queue, logger=self.logger, metric=self.metric, scope=self.scope)
self.limits_response_thread.start()

self.setup_microservice_topic()
while True:
if self.cancel_thread:
Expand Down Expand Up @@ -78,6 +132,9 @@ def run(self):
self.error = error
self.logger.error(f"Decom error {repr(error)}")

self.limits_response_thread.stop()
self.limits_response_thread = None

def decom_packet(self, topic, msg_id, msg_hash, _redis):
# OpenC3.in_span("decom_packet") do
msgid_seconds_from_epoch = int(msg_id.split("-")[0]) / 1000.0
Expand Down Expand Up @@ -179,15 +236,9 @@ def limits_change_callback(self, packet, item, old_limits_state, value, log_chan
LimitsEventTopic.write(event, scope=self.scope)

if item.limits.response is not None:
try:
# TODO: The limits response is user code and should be run as a separate thread / process
# If this code blocks it will delay TelemetryDecomTopic.write_packet
item.limits.response.call(packet, item, old_limits_state)
except Exception as error:
self.error = error
self.logger.error(f"{packet.target_name} {packet.packet_name} {item.name} Limits Response Exception!")
self.logger.error(f"Called with old_state = {old_limits_state}, new_state = {item.limits.state}")
self.logger.error(repr(error))
copied_packet = packet.deep_copy()
copied_item = packet.items[item.name]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python already has deepcopy: https://docs.python.org/3/library/copy.html#copy.deepcopy. Should this just be packet.deepcopy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Its a "mostly" deep copy. At least in Ruby can't do a real deep copy because that would marshal Mutex which isn't allowed.

self.limits_response_queue.put([copied_packet, copied_item, old_limits_state])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a reference right? Should it be a copy()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because of the packet.deep_copy it is a reference to the correct new item.



if os.path.basename(__file__) == os.path.basename(sys.argv[0]):
Expand Down
15 changes: 15 additions & 0 deletions openc3/python/openc3/packets/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,9 +481,24 @@ def buffer(self, buffer):
def clone(self):
struct = copy.copy(self)
struct._buffer = self.buffer # Makes a copy
struct.accessor = copy.copy(self.accessor)
struct.accessor.packet = struct
return struct

# Clone that also deep copies items
# @return [Structure] A deep copy of the structure
def deep_copy(self):
cloned = self.clone()
cloned_items = []
for item in cloned.sorted_items:
cloned_items.append(item.clone())

cloned.sorted_items = cloned_items
cloned.items = {}
for item in cloned_items:
cloned.items[item.name] = item
return cloned

CLASS_MUTEX = threading.Lock()

def setup_mutex(self):
Expand Down
Loading