Skip to content

Commit

Permalink
Create ThreadManager
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanmelt committed Feb 16, 2025
1 parent e3de9bb commit dcbbb51
Show file tree
Hide file tree
Showing 19 changed files with 271 additions and 58 deletions.
6 changes: 5 additions & 1 deletion openc3/lib/openc3/microservices/cleanup_microservice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,8 @@ def shutdown
end
end

OpenC3::CleanupMicroservice.run if __FILE__ == $0
if __FILE__ == $0
OpenC3::CleanupMicroservice.run
ThreadManager.instance.shutdown
ThreadManager.instance.join
end
7 changes: 6 additions & 1 deletion openc3/lib/openc3/microservices/decom_microservice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def start
@logger.error "#{@microservice_name}: Limits Response thread died: #{e.formatted}"
raise e
end
ThreadManager.instance.register(@thread, stop_object: self)
end

def stop
Expand Down Expand Up @@ -250,4 +251,8 @@ def limits_change_callback(packet, item, old_limits_state, value, log_change)
end
end

OpenC3::DecomMicroservice.run if __FILE__ == $0
if __FILE__ == $0
OpenC3::DecomMicroservice.run
ThreadManager.instance.shutdown
ThreadManager.instance.join
end
8 changes: 7 additions & 1 deletion openc3/lib/openc3/microservices/interface_microservice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def start
@logger.error "#{@interface.name}: Command handler thread died: #{e.formatted}"
raise e
end
ThreadManager.instance.register(@thread, stop_object: self)
end

def stop()
Expand Down Expand Up @@ -346,6 +347,7 @@ def start
@logger.error "#{@router.name}: Telemetry handler thread died: #{e.formatted}"
raise e
end
ThreadManager.instance.register(@thread, stop_object: self)
end

def stop
Expand Down Expand Up @@ -813,4 +815,8 @@ def graceful_kill
end
end

OpenC3::InterfaceMicroservice.run if __FILE__ == $0
if __FILE__ == $0
OpenC3::InterfaceMicroservice.run
ThreadManager.instance.shutdown
ThreadManager.instance.join
end
6 changes: 5 additions & 1 deletion openc3/lib/openc3/microservices/log_microservice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,8 @@ def shutdown
end
end

OpenC3::LogMicroservice.run if __FILE__ == $0
if __FILE__ == $0
OpenC3::LogMicroservice.run
ThreadManager.instance.shutdown
ThreadManager.instance.join
end
35 changes: 21 additions & 14 deletions openc3/lib/openc3/microservices/microservice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
OpenC3.require_file 'openc3/utilities/secrets'
OpenC3.require_file 'openc3/utilities/sleeper'
OpenC3.require_file 'openc3/utilities/open_telemetry'
OpenC3.require_file 'openc3/utilities/thread_manager'
OpenC3.require_file 'openc3/models/microservice_model'
OpenC3.require_file 'openc3/models/microservice_status_model'
OpenC3.require_file 'tmpdir'
Expand All @@ -49,22 +50,27 @@ class Microservice
def self.run(name = nil)
name = ENV['OPENC3_MICROSERVICE_NAME'] unless name
microservice = self.new(name)
begin
MicroserviceStatusModel.set(microservice.as_json(:allow_nan => true), scope: microservice.scope)
microservice.state = 'RUNNING'
microservice.run
microservice.state = 'FINISHED'
rescue Exception => e
if SystemExit === e or SignalException === e
microservice.state = 'KILLED'
else
microservice.error = e
microservice.state = 'DIED_ERROR'
Logger.fatal("Microservice #{name} dying from exception\n#{e.formatted}")
thread = Thread.new do
begin
MicroserviceStatusModel.set(microservice.as_json(:allow_nan => true), scope: microservice.scope)
microservice.state = 'RUNNING'
microservice.run
microservice.state = 'FINISHED'
rescue Exception => e
if SystemExit === e or SignalException === e
microservice.state = 'KILLED'
else
microservice.error = e
microservice.state = 'DIED_ERROR'
Logger.fatal("Microservice #{name} dying from exception\n#{e.formatted}")
end
ensure
MicroserviceStatusModel.set(microservice.as_json(:allow_nan => true), scope: microservice.scope)
end
ensure
MicroserviceStatusModel.set(microservice.as_json(:allow_nan => true), scope: microservice.scope)
end
ThreadManager.instance.register(thread, shutdown_object: microservice)
ThreadManager.instance.monitor
ThreadManager.instance.shutdown
end

def as_json(*a)
Expand Down Expand Up @@ -192,6 +198,7 @@ def initialize(name, is_plugin: false)
@logger.error "#{@name} status thread died: #{e.formatted}"
raise e
end
ThreadManager.instance.register(@microservice_status_thread)
end
end

Expand Down
25 changes: 10 additions & 15 deletions openc3/lib/openc3/microservices/multi_microservice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

require 'openc3/microservices/microservice'
require 'openc3/topics/topic'
require 'openc3/utilities/thread_manager'

module OpenC3
class MultiMicroservice < Microservice
def run
@threads = []
ARGV.each do |microservice_name|
microservice_model = MicroserviceModel.get_model(name: microservice_name, scope: @scope)
@threads << Thread.new do
thread = Thread.new do
cmd_line = microservice_model.cmd.join(' ')
split_cmd_line = cmd_line.split(' ')
filename = nil
Expand All @@ -42,21 +43,15 @@ def run
klass = filename.filename_to_class_name.to_class
klass.run(microservice_model.name)
end
ThreadManager.instance.register(thread)
end
@threads.each do |thread|
thread.join
end
end

def shutdown
super()
if @threads
@threads.each do |thread|
thread.join
end
end
ThreadManager.instance.monitor
ThreadManager.instance.shutdown
end
end
end

OpenC3::MultiMicroservice.run if __FILE__ == $0
if __FILE__ == $0
OpenC3::MultiMicroservice.run
ThreadManager.instance.shutdown
ThreadManager.instance.join
end
6 changes: 5 additions & 1 deletion openc3/lib/openc3/microservices/periodic_microservice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ def shutdown
end
end

OpenC3::PeriodicMicroservice.run if __FILE__ == $0
if __FILE__ == $0
OpenC3::PeriodicMicroservice.run
ThreadManager.instance.shutdown
ThreadManager.instance.join
end
6 changes: 5 additions & 1 deletion openc3/lib/openc3/microservices/reducer_microservice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -633,4 +633,8 @@ def extract_entry_samples(packet)
end
end

OpenC3::ReducerMicroservice.run if __FILE__ == $0
if __FILE__ == $0
OpenC3::ReducerMicroservice.run
ThreadManager.instance.shutdown
ThreadManager.instance.join
end
6 changes: 5 additions & 1 deletion openc3/lib/openc3/microservices/router_microservice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,8 @@ def handle_packet(packet)
end
end

OpenC3::RouterMicroservice.run if __FILE__ == $0
if __FILE__ == $0
OpenC3::RouterMicroservice.run
ThreadManager.instance.shutdown
ThreadManager.instance.join
end
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,8 @@ def get_areas_and_poll_time
end
end

OpenC3::ScopeCleanupMicroservice.run if __FILE__ == $0
if __FILE__ == $0
OpenC3::ScopeCleanupMicroservice.run
ThreadManager.instance.shutdown
ThreadManager.instance.join
end
6 changes: 5 additions & 1 deletion openc3/lib/openc3/microservices/text_log_microservice.rb
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,8 @@ def shutdown
end
end

OpenC3::TextLogMicroservice.run if __FILE__ == $0
if __FILE__ == $0
OpenC3::TextLogMicroservice.run
ThreadManager.instance.shutdown
ThreadManager.instance.join
end
83 changes: 83 additions & 0 deletions openc3/lib/openc3/utilities/thread_manager.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
# encoding: ascii-8bit

# Copyright 2025 OpenC3, Inc.
# All Rights Reserved.
#
# This program is free software; you can modify and/or redistribute it
# under the terms of the GNU Affero General Public License
# as published by the Free Software Foundation; version 3 with
# attribution addendums as found in the LICENSE.txt
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# This file may also be used under the terms of a commercial license
# if purchased from OpenC3, Inc.

module OpenC3
class ThreadManager
MONITOR_SLEEP_SECONDS = 0.25

# Variable that holds the singleton instance
@@instance = nil

# Mutex used to ensure that only one instance of is created
@@instance_mutex = Mutex.new

# Get the singleton instance of ThreadManager
def self.instance
return @@instance if @@instance

@@instance_mutex.synchronize do
return @@instance if @@instance
@@instance ||= self.new
return @@instance
end
end

def initialize
@threads = []
@shutdown_started = false
end

def register(thread, stop_object: nil, shutdown_object: nil)
@threads << [thread, stop_object, shutdown_object]
end

def monitor
while true
@threads.each do |thread, _, _|
if !thread.alive?
return
end
end
sleep(MONITOR_SLEEP_SECONDS)
end
end

def shutdown
@@instance_mutex.synchronize do
return if @shutdown_started
@shutdown_started = true
end
@threads.each do |thread, stop_object, shutdown_object|
if thread.alive?
if stop_object
stop_object.stop
end
if shutdown_object
shutdown_object.shutdown
end
end
end
end

def join
@threads.each do |thread, _, _|
thread.join
end
end
end
end
4 changes: 4 additions & 0 deletions openc3/python/openc3/microservices/decom_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from openc3.topics.telemetry_decom_topic import TelemetryDecomTopic
from openc3.config.config_parser import ConfigParser
from openc3.utilities.time import to_nsec_from_epoch, from_nsec_from_epoch
from openc3.utilities.thread_manager import ThreadManager
from openc3.microservices.interface_decom_common import (
handle_build_cmd,
handle_inject_tlm,
Expand All @@ -48,6 +49,7 @@ def __init__(self, microservice_name, queue, logger, metric, scope):
def start(self):
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
ThreadManager.instance().register(self.thread, stop_object=self)
return self.thread

def stop(self):
Expand Down Expand Up @@ -243,3 +245,5 @@ def limits_change_callback(self, packet, item, old_limits_state, value, log_chan

if os.path.basename(__file__) == os.path.basename(sys.argv[0]):
DecomMicroservice.class_run()
ThreadManager.instance().shutdown()
ThreadManager.instance().join()
5 changes: 5 additions & 0 deletions openc3/python/openc3/microservices/interface_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from openc3.utilities.time import from_nsec_from_epoch
from openc3.utilities.json import JsonDecoder
from openc3.utilities.store_queued import StoreQueued, EphemeralStoreQueued
from openc3.utilities.thread_manager import ThreadManager
from openc3.top_level import kill_thread

try:
Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(self, interface, tlm, logger=None, metric=None, scope=None):
def start(self):
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
ThreadManager.instance().register(self.thread, stop_object=self)
return self.thread

def stop(self):
Expand Down Expand Up @@ -355,6 +357,7 @@ def __init__(self, router, tlm, logger=None, metric=None, scope=None):
def start(self):
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
ThreadManager.instance().register(self.thread, stop_object=self)
return self.thread

def stop(self):
Expand Down Expand Up @@ -824,3 +827,5 @@ def graceful_kill(self):

if os.path.basename(__file__) == os.path.basename(sys.argv[0]):
InterfaceMicroservice.class_run()
ThreadManager.instance().shutdown()
ThreadManager.instance().join()
Loading

0 comments on commit dcbbb51

Please sign in to comment.