Skip to content

Commit

Permalink
fix session.data + threading in PID agent
Browse files Browse the repository at this point in the history
  • Loading branch information
jlashner committed Jan 16, 2024
1 parent 790b228 commit 6f05671
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions socs/agents/hwp_pid/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import txaio
from ocs import ocs_agent, site_config
from ocs.ocs_twisted import TimeoutLock
from twisted.internet import defer
from twisted.internet import defer, threads, reactor

txaio.use_twisted()

Expand All @@ -15,6 +15,18 @@
import socs.agents.hwp_pid.drivers.pid_controller as pd


def parse_action_result(res):
"""
Parses the result of an action to ensure it is a dictionary so it can be
stored in session.data
"""
if res is None:
return {}
elif isinstance(res, dict):
return res
else:
return {'result': res}

class Actions:
@dataclass
class BaseAction:
Expand Down Expand Up @@ -116,10 +128,14 @@ def _process_actions(self, pid):
try:
self.log.info(f"Running action {action}")
res = action.process(pid)
action.deferred.callback(res)
threads.blockingCallFromThread(
reactor, action.deferred.callback, res
)
except Exception as e:
self.log.error(f"Error processing action: {action}")
action.deferred.errback(e)
threads.blockingCallFromThread(
reactor, action.deferred.errback, e
)

def _clear_queue(self):
while not self.action_queue.empty():
Expand Down Expand Up @@ -176,7 +192,8 @@ def tune_stop(self, session, params):
"""
action = Actions.TuneStop(**params)
self.action_queue.put(action)
session.data = yield action.deferred
res = yield action.deferred
session.data = parse_action_result(res)
return True, f"Completed: {str(action)}"

@defer.inlineCallbacks
Expand All @@ -189,7 +206,8 @@ def tune_freq(self, session, params):
"""
action = Actions.TuneFreq(**params)
self.action_queue.put(action)
session.data = yield action.deferred
res = yield action.deferred
session.data = parse_action_result(res)
return True, f"Completed: {str(action)}"

@defer.inlineCallbacks
Expand All @@ -206,7 +224,8 @@ def declare_freq(self, session, params):
"""
action = Actions.DeclareFreq(**params)
self.action_queue.put(action)
session.data = yield action.deferred
res = yield action.deferred
session.data = parse_action_result(res)
return True, f"Completed: {str(action)}"

@defer.inlineCallbacks
Expand All @@ -227,7 +246,8 @@ def set_pid(self, session, params):
"""
action = Actions.DeclareFreq(**params)
self.action_queue.put(action)
session.data = yield action.deferred
res = yield action.deferred
session.data = parse_action_result(res)
return True, f"Completed: {str(action)}"

@defer.inlineCallbacks
Expand All @@ -243,7 +263,8 @@ def set_direction(self, session, params):
"""
action = Actions.SetDirection(**params)
self.action_queue.put(action)
session.data = yield action.deferred
res = yield action.deferred
session.data = parse_action_result(res)
return True, f"Completed: {str(action)}"

@defer.inlineCallbacks
Expand All @@ -266,7 +287,8 @@ def set_scale(self, session, params):
"""
action = Actions.SetScale(**params)
self.action_queue.put(action)
session.data = yield action.deferred
res = yield action.deferred
session.data = parse_action_result(res)
return True, f"Completed: {str(action)}"


Expand Down

0 comments on commit 6f05671

Please sign in to comment.