-
Notifications
You must be signed in to change notification settings - Fork 0
/
rule.py
176 lines (142 loc) · 5.24 KB
/
rule.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import datetime
from loguru import logger
import store
from actions.lut import ACTION_LUT
from instructions import InstructionConstant
from instructions.lut import INSTRUCTION_LUT
import uuid
class RuleParsingException(Exception):
pass
class InvalidInstructionException(RuleParsingException):
pass
class Rule:
def __init__(
self,
id=None,
name=None,
description=None,
enabled=True,
conditions=[],
actions=[],
last_execution=None,
execution_count=0,
):
# Generate and assign a unique ID for this rule
# Same rules might have different UUID's
self.rule_uuid = uuid.uuid4()
self.id = id
self.name = name
self.description = description
self.enabled = enabled
self.conditions = conditions
self.actions = actions
self.last_execution = last_execution
self.execution_count = execution_count
# Devices that this rule uses for final evaluation
self.dependent_devices = []
self.instruction_stream = []
self.action_stream = []
self.parse_conditions()
self.parse_actions()
self.determine_device_dependencies()
self.periodic_execution = True
logger.debug(f"{self} dependent devices -> {self.dependent_devices}")
def parse_conditions(self):
for ins_data in self.conditions:
operation = ins_data["operation"].upper()
if operation in INSTRUCTION_LUT:
Instruction = INSTRUCTION_LUT[operation]
self.instruction_stream.append(Instruction(ins_data, self))
else:
logger.error(f"Incorrect/Unknown operation: {operation}")
raise InvalidInstructionException(
f"Incorrect/Unknown operation: {operation}"
)
self.infix_to_postfix()
def infix_to_postfix(self):
"""Perform infix to postfix conversion to make it easier for the VM to evaluate the rule"""
stack = []
temp_ins = []
for ins in self.instruction_stream:
# If it's an operator, do this
if (ins.instruction_type == InstructionConstant.LOGICAL_AND) or (
ins.instruction_type == InstructionConstant.LOGICAL_OR
):
if len(stack) == 0:
stack.append(ins)
else:
temp_ins.append(stack.pop())
stack.append(ins)
# If it's an operand, simply throw it into temp_ins
else:
temp_ins.append(ins)
# Put the remaining items from stack
for ins in stack:
temp_ins.append(ins)
self.instruction_stream = temp_ins
def determine_device_dependencies(self):
"""Lists out all the devices on which the result of rule evaluation could change."""
for ins in self.instruction_stream:
if hasattr(ins, "device_id"):
self.dependent_devices.append(ins.device_id)
def parse_actions(self):
i = 0
for action_data in self.actions:
if "type" in action_data:
operation = action_data["type"].upper()
if operation in ACTION_LUT:
Action = ACTION_LUT[operation]
self.action_stream.append(Action(action_data))
else:
logger.error(
f"Incorrect/Unknown action type at index {i}: {operation}"
)
else:
logger.error(
f"{self.id}: Action @ index {i} has no `type` field attached."
)
i += 1
async def get_rule_document(self):
return await store.get_document("rules", self.id)
def set_last_execution(self, datetime):
self.last_execution = datetime
def set_execution_count(self, count):
self.execution_count = count
def set_periodic_execution(self, value):
self.periodic_execution = value
def update_rule_uuid(self):
self.rule_uuid = uuid.uuid4()
def create_clone(self):
"""Create a new copy of this object"""
return Rule(
id=self.id,
name=self.name,
description=self.description,
enabled=self.enabled,
conditions=self.conditions,
actions=self.actions,
last_execution=self.last_execution,
execution_count=self.execution_count,
)
async def update_execution_info(self):
import pytz
india_tz = pytz.timezone('Asia/Kolkata')
self.execution_count += 1
self.last_execution = datetime.datetime.now(india_tz)
await store.update_document(
"rules",
self.id,
{
"last_executed": self.last_execution,
"execution_count": self.execution_count,
},
)
logger.debug(
f"Rule execution count({self.execution_count}) and last executed datetime ({self.last_execution}) updated."
)
def __str__(self):
return f"<Rule({self.rule_uuid}): {self.id}>"
def __eq__(self, other):
return self.id == other
def __repr__(self):
return self.__str__()