-
Notifications
You must be signed in to change notification settings - Fork 0
/
pid.py
222 lines (205 loc) · 5.54 KB
/
pid.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
#!/usr/bin/env python
#
# vim: set tabstop=4:
#
# Copyright (c) 2014 David Jander
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
import time
import multiprocessing
import queue
def PidProcess(pid, cmdqueue):
cmd = ""
print("PID: Starting", pid.actuator.name, "...")
while True:
while not cmdqueue.empty():
try:
obj = cmdqueue.get()
except queue.Empty:
break
if not "command" in obj:
print("PID: Unknown command object:", repr(obj))
continue
cmd = obj["command"]
if cmd == "shutdown":
break
else:
print("PID: Unsupported command:", cmd)
if cmd == "shutdown":
break
if not pid.iteration():
break
print("PID: Shutting down", pid.actuator.name, "...")
class PidController(object):
def __init__(self, sensor, actuator, P, I, D, period=1.0):
self.sensor = sensor
self.actuator = actuator
self.P = P
self.I = I * period
self.D = D / period
self.period = period
self.windup_limit = 100.0
self.reset_pid()
self.setvalue = multiprocessing.Value('d')
self.outvalue = multiprocessing.Value('d')
self.invalue = multiprocessing.Value('d')
self.cmdqueue = multiprocessing.Queue()
self.proc = multiprocessing.Process(target=PidProcess,
args=(self, self.cmdqueue))
self.spawned = False
self.validate_previous = None
self.sample_time = time.monotonic() + 0.2
self.sample_count = 0
self.sample_acc = 0
self.error_state = False
def reset_pid(self):
self.integ = 0.0
self.errq = queue.Queue()
self.err0 = 0.0
self.output = 0.0
self.failure_timestamp = 0
def set_setpoint(self, sp):
self.setvalue.value = sp
def validate_sensor(self, current):
if current < 10.0:
if not self.error_state:
print("Temperature too low!")
self.failure_timestamp = time.monotonic()
return None
if current > 300.0:
if not self.error_state:
print("Temperature too high!")
self.failure_timestamp = time.monotonic()
return None
prev = self.validate_previous
if prev is None:
self.validate_previous = current
return current
if abs(prev - current) > 20.0:
print("Temperature sensor unstable!")
self.failure_timestamp = time.monotonic()
self.validate_previous = current
return None
self.validate_previous = current
return current
def _get_tts(self):
tts = self.sample_time - time.monotonic()
return tts
def _set_tts(self):
tts = self.sample_time + 0.2
if tts < time.monotonic():
tts = time.monotonic() + 0.2
self.sample_time = tts
def _filter_sample(self):
current = self.sensor.read()
current = self.validate_sensor(current)
if current is None:
if not self.error_state:
print("Temperature sensor failure detected. Shutting down heater!")
self.error_state = True
self.actuator.set_output(0)
self.outvalue.value = 0
time.sleep(2)
return
self.sample_acc += current
self.sample_count += 1
def sample(self, sec):
while True:
self._filter_sample()
tts = self._get_tts()
if tts >= sec:
break
if tts >= 0.0:
time.sleep(tts)
else:
break
sec -= tts
self._set_tts()
if sec > 0.0:
time.sleep(sec)
def iteration(self):
if self.sample_count <= 0:
if not self.error_state:
print("PID: Error. Did not read any valid samples...")
self.actuator.set_output(0)
self.outvalue.value = 0
self.sample(1)
return True
current = self.sample_acc / self.sample_count
self.sample_acc = 0
self.sample_count = 0
self.invalue.value = current
setpoint = self.setvalue.value
if (time.monotonic() - self.failure_timestamp) < 10 or not setpoint:
self.actuator.set_output(0)
self.outvalue.value = 0
self.sample(1)
return True
if self.failure_timestamp:
print("Temperature sensor back to normal. Resuming heater.")
self.error_state = False
self.reset_pid()
err = setpoint - current
derr = err - self.err0
self.errq.put(err)
if self.errq.qsize() < 3:
self.err0 = err
else:
self.err0 = self.errq.get()
self.integ += err
if self.integ > self.windup_limit:
self.integ = self.windup_limit
if self.integ < -self.windup_limit:
self.integ = -self.windup_limit
self.output = self.P * err + self.I * self.integ + self.D * derr
if self.output > 1.0:
self.output = 1.0
if self.output < 0.0:
self.output = 0.0
ontime = self.output * self.period
if ontime > 0.0:
self.actuator.set_output(1)
self.sample(ontime)
if ontime < self.period:
self.actuator.set_output(0)
self.sample(self.period - ontime)
self.outvalue.value = self.output
return True
def get_output(self):
return self.outvalue.value
def get_input(self):
return self.invalue.value
def spawn(self):
self.proc.start()
self.spawned = True
def shutdown(self):
if not self.spawned:
return
self.cmdqueue.put({"command": "shutdown"})
self.proc.join()
self.actuator.set_output(0)
if __name__ == "__main__":
import signal, sys
from temp100k import Thermistor100k
from hwmon import ScaledSensor
from config import Config
from gpio import GPOutput
s = ScaledSensor(Config("kamaq.conf"), "EXT")
t = Thermistor100k(s)
o = GPOutput("heater_EXT")
p = PidController(t, o, 0.2, 0.002, 0.5)
def signal_handler(signal, frame):
print('You pressed Ctrl+C!')
p.shutdown()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
p.spawn()
sp = 230.0
p.set_setpoint(sp)
while True:
print("Temp =", t.read(), "setpoint =", sp, "Ouput =", p.get_output())
time.sleep(1.0)
p.shutdown()