Skip to content

Commit

Permalink
#202 Clean up Opus module code
Browse files Browse the repository at this point in the history
  • Loading branch information
dostuffthatmatters committed Nov 8, 2023
1 parent df2e42e commit 28abb88
Showing 1 changed file with 14 additions and 31 deletions.
45 changes: 14 additions & 31 deletions packages/core/modules/opus_measurement.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,19 @@ def run(self, new_config: types.Config) -> None:

# loads latest config
self.config = new_config
if self.config.general.test_mode or (sys.platform != "win32"):
logger.debug(
"Skipping OpusMeasurement in test mode and on non-windows systems"
)
return

logger.info("Running OpusMeasurement")
logger.debug("Updating JSON Config Variables")

# check for PYRA Test Mode status
# everything afterwards will be skipped if PYRA Test Mode is active
if self.config.general.test_mode:
logger.info("Test mode active.")
logger.info("Skipping OpusMeasurement in test mode.")
return

if sys.platform != "win32":
logger.debug("Skipping OpusMeasurement on non-windows systems")
return

logger.info("Running OpusMeasurement")

if not self.initialized:
self.__initialize()

Expand All @@ -117,16 +115,13 @@ def run(self, new_config: types.Config) -> None:
measurements_should_be_running: bool = False
state = interfaces.StateInterface.load_state()
if state.measurements_should_be_running == True:
if new_config.tum_plc is None:
current_cover_angle = state.plc_state.actors.current_angle
if (new_config.tum_plc is None) or (current_cover_angle is None):
measurements_should_be_running = True
else:
current_cover_angle = state.plc_state.actors.current_angle
if current_cover_angle is None:
measurements_should_be_running = True
else:
measurements_should_be_running = (
abs(current_cover_angle) % 360
) > 30
measurements_should_be_running = (
abs(current_cover_angle) % 360
) > 30

if self.last_cycle_automation_status != measurements_should_be_running:
if measurements_should_be_running:
Expand Down Expand Up @@ -223,7 +218,6 @@ def close_opus(self) -> None:
assert sys.platform == "win32"

self.__connect_to_dde_opus()

if not self.__test_dde_connection():
return
answer = self.conversation.Request("CLOSE_OPUS")
Expand All @@ -234,7 +228,6 @@ def __shutdown_dde_server(self) -> None:
and Items) are not cleaned up by this call."""

assert sys.platform == "win32"

self.server.Shutdown()

def __destroy_dde_server(self) -> None:
Expand All @@ -249,24 +242,22 @@ def __is_em27_responsive(self) -> bool:
True -> Connected
False -> Not Connected"""

assert sys.platform == "win32"

response = os.system("ping -n 1 " + self.config.opus.em27_ip.root)
return response == 0

def start_opus(self) -> None:
"""Starts the OPUS.exe with os.startfile(). This simulates
a user click on the executable."""

assert sys.platform == "win32"
interfaces.ActivityHistoryInterface.add_datapoint(opus_startups=1)

opus_path = self.config.opus.executable_path.root
opus_username = self.config.opus.username
opus_password = self.config.opus.password

# works only > python3.10
# works only >= python3.10
# without cwd CT will have trouble loading its internal database)
assert sys.platform == "win32"
try:
os.startfile( # type: ignore
os.path.basename(opus_path),
Expand All @@ -293,8 +284,6 @@ def sun_angle_is_too_low(self) -> bool:
"""Checks defined sun angle in config. Closes OPUS at
the end of the day to start up fresh the next day."""

assert sys.platform == "win32"

return (
utils.Astronomy.get_current_sun_elevation(self.config)
< self.config.general.min_sun_elevation
Expand All @@ -305,8 +294,6 @@ def automated_process_handling(self) -> None:
satisfied. Shuts down OPUS.exe if running and sun angle
conditions not satisfied."""

assert sys.platform == "win32"

if self.sun_angle_is_too_low():
# Close OPUS if running
if self.opus_is_running():
Expand All @@ -332,8 +319,6 @@ def wait_for_opus_startup(self) -> None:
"""Checks for OPUS to be running. Breaks out of the loop
after a defined time."""

assert sys.platform == "win32"

start_time = time.time()
while True:
# brakes when OPUS is up and running
Expand All @@ -350,8 +335,6 @@ def check_for_experiment_change(self) -> None:
active experiment. To reload an experiment during an active
macro the macro needs to be stopped first."""

assert sys.platform == "win32"

if self.config.opus.experiment_path.root != self.current_experiment:
self.stop_macro()
time.sleep(5)
Expand Down

0 comments on commit 28abb88

Please sign in to comment.