Skip to content

Commit

Permalink
added stop handling
Browse files Browse the repository at this point in the history
  • Loading branch information
hahahannes committed May 8, 2024
1 parent dddff9c commit b910efb
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 15 deletions.
26 changes: 12 additions & 14 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import dotenv
dotenv.load_dotenv()

from operator_lib.util import OperatorBase, logger, InitPhase, setup_operator_starttime, todatetime, timestamp_to_str
from operator_lib.util import OperatorBase, logger, InitPhase, todatetime, timestamp_to_str
from operator_lib.util.persistence import save, load
import os
import pandas as pd
from algo import utils
import pickle

UNUSUAL_FILENAME = "unusual_drop_detections.pickle"
WINDOW_FILENAME = "window_closing_times.pickle"

from operator_lib.util import Config
class CustomConfig(Config):
data_path = "/opt/data"
Expand Down Expand Up @@ -57,21 +60,11 @@ def init(self, *args, **kwargs):
self.first_data_time = load(self.config.data_path, "first_data_time.pickle")

self.sliding_window = [] # This contains the data from the last hour. Entries of the list are pairs of the form {"timestamp": ts, "value": humidity}
self.unsusual_drop_detections = []
self.unsusual_drop_detections_path = f"{data_path}/unusual_drop_detections.pickle"

if os.path.exists(self.unsusual_drop_detections_path):
with open(self.unsusual_drop_detections_path, "rb") as f:
self.unsusual_drop_detections = pickle.load(f)

self.unsusual_drop_detections = load(f"{data_path}/{UNUSUAL_FILENAME}", [])

self.window_open = False
self.window_closing_times = []
self.window_closing_times_path = f"{data_path}/window_closing_times.pickle"
self.window_closing_times = load(f"{data_path}/{WINDOW_FILENAME}", [])

if os.path.exists(self.window_closing_times_path):
with open(self.window_closing_times_path, "rb") as f:
self.window_closing_times = pickle.load(f)

init_phase_duration = pd.Timedelta(self.config.init_phase_length, self.config.init_phase_level)
self.init_phase_handler = InitPhase(data_path, init_phase_duration, self.first_data_time)
value = {
Expand All @@ -81,6 +74,11 @@ def init(self, *args, **kwargs):
if self.init_phase_handler.first_init_msg_needs_to_send():
init_msg = self.init_phase_handler.generate_first_init_msg(value)
self.produce(init_msg)

def stop(self):
super().stop()
save(self.data_path, UNUSUAL_FILENAME, self.unsusual_drop_detections)
save(self.data_path, WINDOW_FILENAME, self.window_closing_times)

def run(self, data, selector = None, device_id=None):
current_timestamp = todatetime(data['Humidity_Time'])
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
git+https://github.com/SENERGY-Platform/analytics-operator-lib-python@v1.0.17
git+https://github.com/SENERGY-Platform/analytics-operator-lib-python@v1.0.18

confluent_kafka<2
pandas<2
Expand Down

0 comments on commit b910efb

Please sign in to comment.