Skip to content

Commit

Permalink
Run Limits Responses in Thread to Prevent Blocking Decom
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanmelt committed Feb 15, 2025
1 parent 92b60ed commit 3d52546
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 23 deletions.
78 changes: 67 additions & 11 deletions openc3/lib/openc3/microservices/decom_microservice.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# encoding: ascii-8bit

# Copyright 2022 Ball Aerospace & Technologies Corp.
# Copyright 2025 Ball Aerospace & Technologies Corp.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand All @@ -21,12 +21,67 @@
# if purchased from OpenC3, Inc.

require 'time'
require 'thread'
require 'openc3/microservices/microservice'
require 'openc3/microservices/interface_decom_common'
require 'openc3/topics/telemetry_decom_topic'
require 'openc3/topics/limits_event_topic'

module OpenC3
class LimitsResponseThread
def initialize(microservice_name:, queue:, logger:, metric:, scope:)
@microservice_name = microservice_name
@queue = queue
@logger = logger
@metric = metric
@scope = scope
@count = 0
@error_count = 0
@metric.set(name: 'limits_response_total', value: @count, type: 'counter')
@metric.set(name: 'limits_response_error_total', value: @error_count, type: 'counter')
end

def start
@thread = Thread.new do
run()
rescue Exception => e
@logger.error "#{@microservice_name}: Limits Response thread died: #{e.formatted}"
raise e
end
end

def stop
if @thread
OpenC3.kill_thread(self, @thread)
@thread = nil
end
end

def graceful_kill
@queue << [nil, nil, nil]
end

def run
while true
packet, item, old_limits_state = @queue.pop()
break if packet.nil?

begin
item.limits.response.call(packet, item, old_limits_state)
rescue Exception => e
@error_count += 1
@metric.set(name: 'limits_response_error_total', value: @error_count, type: 'counter')
@logger.error "#{packet.target_name} #{packet.packet_name} #{item.name} Limits Response Exception!"
@logger.error "Called with old_state = #{old_limits_state}, new_state = #{item.limits.state}"
@logger.error e.filtered
end

@count += 1
@metric.set(name: 'limits_response_total', value: @count, type: 'counter')
end
end
end

class DecomMicroservice < Microservice
include InterfaceDecomCommon
LIMITS_STATE_INDEX = { RED_LOW: 0, YELLOW_LOW: 1, YELLOW_HIGH: 2, RED_HIGH: 3, GREEN_LOW: 4, GREEN_HIGH: 5 }
Expand All @@ -44,9 +99,14 @@ def initialize(*args)
@error_count = 0
@metric.set(name: 'decom_total', value: @count, type: 'counter')
@metric.set(name: 'decom_error_total', value: @error_count, type: 'counter')
@limits_response_queue = Queue.new
@limits_response_thread = nil
end

def run
@limits_response_thread = LimitsResponseThread.new(microservice_name: @name, queue: @limits_response_queue, logger: @logger, metric: @metric, scope: @scope)
@limits_response_thread.start()

setup_microservice_topic()
while true
break if @cancel_thread
Expand Down Expand Up @@ -81,6 +141,9 @@ def run
@logger.error("Decom error: #{e.formatted}")
end
end

@limits_response_thread.stop()
@limits_response_thread = nil
end

def decom_packet(_topic, msg_id, msg_hash, _redis)
Expand Down Expand Up @@ -179,16 +242,9 @@ def limits_change_callback(packet, item, old_limits_state, value, log_change)
LimitsEventTopic.write(event, scope: @scope)

if item.limits.response
begin
# TODO: The limits response is user code and should be run as a separate thread / process
# If this code blocks it will delay TelemetryDecomTopic.write_packet
item.limits.response.call(packet, item, old_limits_state)
rescue Exception => e
@error = e
@logger.error "#{packet.target_name} #{packet.packet_name} #{item.name} Limits Response Exception!"
@logger.error "Called with old_state = #{old_limits_state}, new_state = #{item.limits.state}"
@logger.error e.filtered
end
copied_packet = packet.deep_copy
copied_item = packet.items[item.name]
@limits_response_queue << [copied_packet, copied_item, old_limits_state]
end
end
end
Expand Down
21 changes: 19 additions & 2 deletions openc3/lib/openc3/packets/structure.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ class Structure

# @return [Hash] Items that make up the structure.
# Hash key is the item's name in uppercase
attr_reader :items
attr_accessor :items

# @return [Array] Items sorted by bit_offset.
attr_reader :sorted_items
attr_accessor :sorted_items

# @return [Integer] Defined length in bytes (not bits) of the structure
attr_reader :defined_length
Expand Down Expand Up @@ -511,11 +511,28 @@ def clone
# additional work that isn't necessary here
structure.instance_variable_set("@buffer".freeze, @buffer.clone) if @buffer
# Need to update reference packet in the Accessor
structure.accessor = @accessor.clone
structure.accessor.packet = structure
return structure
end
alias dup clone

# Clone that also deep copies items
# @return [Structure] A deep copy of the structure
def deep_copy
cloned = clone()
cloned_items = []
cloned.sorted_items.each do |item|
cloned_items << item.clone()
end
cloned.sorted_items = cloned_items
cloned.items = {}
cloned_items.each do |item|
cloned.items[item.name] = item
end
return cloned
end

# Enable the ability to read and write item values as if they were methods
# to the class
def enable_method_missing
Expand Down
1 change: 1 addition & 0 deletions openc3/lib/openc3/packets/structure_item.rb
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ def <=>(other)
def clone
item = super()
item.name = self.name.clone if self.name
item.key = self.key.clone if self.key
item
end
alias dup clone
Expand Down
71 changes: 61 additions & 10 deletions openc3/python/openc3/microservices/decom_microservice.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2024 OpenC3, Inc.
# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
Expand All @@ -18,6 +18,8 @@
import sys
import time
import json
import threading
import queue
from openc3.microservices.microservice import Microservice
from openc3.system.system import System
from openc3.topics.topic import Topic
Expand All @@ -29,7 +31,54 @@
handle_build_cmd,
handle_inject_tlm,
)
from openc3.top_level import kill_thread

class LimitsResponseThread:
def __init__(self, microservice_name, queue, logger, metric, scope):
self.microservice_name = microservice_name
self.queue = queue
self.logger = logger
self.metric = metric
self.scope = scope
self.count = 0
self.error_count = 0
self.metric.set(name="limits_response_total", value=self.count, type="counter")
self.metric.set(name="limits_response_error_total", value=self.error_count, type="counter")

def start(self):
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
return self.thread

def stop(self):
if self.thread:
kill_thread(self, self.thread)
self.thread = None

def graceful_kill(self):
self.queue.put([None, None, None])

def run(self):
try:
while True:
packet, item, old_limits_state = self.queue.get()
if packet == None:

Check failure on line 65 in openc3/python/openc3/microservices/decom_microservice.py

View workflow job for this annotation

GitHub Actions / unit-test (3.10)

Ruff (E711)

openc3/microservices/decom_microservice.py:65:30: E711 Comparison to `None` should be `cond is None`

Check failure on line 65 in openc3/python/openc3/microservices/decom_microservice.py

View workflow job for this annotation

GitHub Actions / unit-test (3.11)

Ruff (E711)

openc3/microservices/decom_microservice.py:65:30: E711 Comparison to `None` should be `cond is None`
break

try:
item.limits.response.call(packet, item, old_limits_state)
except Exception as error:
self.error_count += 1
self.metric.set(name='limits_response_error_total', value=self.error_count, type='counter')
self.logger.error(f"{packet.target_name} {packet.packet_name} {item.name} Limits Response Exception!")
self.logger.error(f"Called with old_state = {old_limits_state}, new_state = {item.limits.state}")
self.logger.error(repr(error))

self.count += 1
self.metric.set(name='limits_response_total', value=self.count, type='counter')
except Exception as error:
self.logger.error(f"{self.microservice_name}: Limits Response thread died: {repr(error)}")
raise error

class DecomMicroservice(Microservice):
LIMITS_STATE_INDEX = { "RED_LOW": 0, "YELLOW_LOW": 1, "YELLOW_HIGH": 2, "RED_HIGH": 3, "GREEN_LOW": 4, "GREEN_HIGH": 5 }
Expand All @@ -46,8 +95,13 @@ def __init__(self, *args):
self.error_count = 0
self.metric.set(name="decom_total", value=self.count, type="counter")
self.metric.set(name="decom_error_total", value=self.error_count, type="counter")
self.limits_response_queue = queue.Queue()
self.limits_response_thread = None

def run(self):
self.limits_response_thread = LimitsResponseThread(microservice_name=self.name, queue=self.limits_response_queue, logger=self.logger, metric=self.metric, scope=self.scope)
self.limits_response_thread.start()

self.setup_microservice_topic()
while True:
if self.cancel_thread:
Expand Down Expand Up @@ -78,6 +132,9 @@ def run(self):
self.error = error
self.logger.error(f"Decom error {repr(error)}")

self.limits_response_thread.stop()
self.limits_response_thread = None

def decom_packet(self, topic, msg_id, msg_hash, _redis):
# OpenC3.in_span("decom_packet") do
msgid_seconds_from_epoch = int(msg_id.split("-")[0]) / 1000.0
Expand Down Expand Up @@ -179,15 +236,9 @@ def limits_change_callback(self, packet, item, old_limits_state, value, log_chan
LimitsEventTopic.write(event, scope=self.scope)

if item.limits.response is not None:
try:
# TODO: The limits response is user code and should be run as a separate thread / process
# If this code blocks it will delay TelemetryDecomTopic.write_packet
item.limits.response.call(packet, item, old_limits_state)
except Exception as error:
self.error = error
self.logger.error(f"{packet.target_name} {packet.packet_name} {item.name} Limits Response Exception!")
self.logger.error(f"Called with old_state = {old_limits_state}, new_state = {item.limits.state}")
self.logger.error(repr(error))
copied_packet = packet.deep_copy()
copied_item = packet.items[item.name]
self.limits_response_queue.put([copied_packet, copied_item, old_limits_state])


if os.path.basename(__file__) == os.path.basename(sys.argv[0]):
Expand Down
15 changes: 15 additions & 0 deletions openc3/python/openc3/packets/structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,9 +481,24 @@ def buffer(self, buffer):
def clone(self):
struct = copy.copy(self)
struct._buffer = self.buffer # Makes a copy
struct.accessor = copy.copy(self.accessor)
struct.accessor.packet = struct
return struct

# Clone that also deep copies items
# @return [Structure] A deep copy of the structure
def deep_copy(self):
cloned = self.clone()
cloned_items = []
for item in cloned.sorted_items:
cloned_items.append(item.clone())

cloned.sorted_items = cloned_items
cloned.items = {}
for item in cloned_items:
cloned.items[item.name] = item
return cloned

CLASS_MUTEX = threading.Lock()

def setup_mutex(self):
Expand Down

0 comments on commit 3d52546

Please sign in to comment.