-
Notifications
You must be signed in to change notification settings - Fork 0
/
smart_meter.py
704 lines (574 loc) · 27.4 KB
/
smart_meter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
# Should include
# - Algorithm
# - Communication between all nodes
# - Polling price from NordPool
# May have this structures:
# - WaitingList = {id, time, power, (group id), deadline}
# - Price = {Hour, avg price}
import socketserver
import json
import download_price
import socket
import threading
import time
import select
import matplotlib.pyplot as plt
import sys
class SmartMeter():
def __init__(self):
# Fetch electricity price for the following 24 hours
self.pricelist = self.fetch_pricelist()
self.next_pricelist = self.fetch_pricelist() # This should be different from the first one
# Start the server
HOST, PORT = "localhost", 9000
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((HOST, PORT))
self.server_socket.listen(10)
self.sockets = {}
print ("Listening on port: " + str(PORT))
# Scheduling variables
self.node_list = {} # Dict with all known devices
self.waiting_list = {} # Dict with all waiting background loads
self.active_list = {} # Dict with all active devices
self.background_list = {} # Dict with all known background devices(active and inactive)
self.background_load = {} # Dict with all active background devices
self.deadline_load = {} # Dict with all active deadline tasks
self.current_power = 0 # We start on 0 Watts
self.deadline_power = 0 # We start on 0 Watts
self.threshold = 1200 # maximum allowed power
self.blocks_per_hour = 6 # Set how many blocks there is per hour
self.current_hour = 12 # Keeps track of the current hour of the day
self.clock = self.blocks_per_hour * self.current_hour
self.block_schedule = [[] for _ in range(self.blocks_per_hour * 24)]
self.worst_case_price = 0
self.scheduled_price = 0
###########################################################################
# Price functions #
###########################################################################
"""
Fetch a pricelist.
"""
def fetch_pricelist(self):
return download_price.downloadPrice("elspot_prices.xls")
"""
Update the pricelist every hour.
"""
def update_pricelist(self, current_hour):
if current_hour == 0:
self.pricelist[23] = self.next_pricelist[23]
else:
self.pricelist[current_hour-1] = self.next_pricelist[current_hour-1]
"""
Calculate the total price if we would have started now instead of
scheduling it.
"""
def calculate_worstcase_price(self, duration, power):
total_price = 0
index = self.current_hour
stop = (self.current_hour + duration) % 24
# Make it to kW instead of Watt since price is per kWh
power = power / 1000
# Calculate total price from current hour and so many hours the device need to run
while (index != stop):
total_price += (self.pricelist[index] * power)
index += 1
index = index % 24
return total_price
"""
Calculate the total price of a scheduled task.
"""
def calculate_price(self, hours, power):
total_price = 0
# Make it to kW instead of Watt since price is per kWh
power = power / 1000
for hour in hours:
total_price += self.pricelist[hour] * power
return total_price
###########################################################################
# Scheduling a task with deadline #
###########################################################################
"""
Find the cheapest hour in the pricelist.
Should maybe consider how much power that is scheduled to that hour already?
Could be used if there are blocks with the same price.
"""
def find_cheapest_hour(self):
lowest_price = (min(self.pricelist.items(), key=lambda x: x[1]))
# print ("Hour: " + str(lowest[0]) + " is chepeast, " + str(lowest[1]) + "kr/kWh")
return lowest_price
"""
Find hours between start and deadline (length of duration blocks) with the best price.
"""
def find_hours(self, duration, deadline):
valid_hours = [] # This is a list with valid hours (used in order to check the upcoming day)
hours = [] # This is a list of all the hours that have been chosen
start = self.current_hour + 1
# Create a list with all valid hours
while (start != deadline):
valid_hours.append(start)
# Increase by one and modulo 24 to catch cases where deadline is lower than starttime
start += 1
start = start % 24
for i in range(0, duration):
cheapest_price = 1000.0
# Pick out the hour with cheapest price and fulfill the requirements
for k, v in self.pricelist.items():
if ((k not in hours) and (k in valid_hours)):
if (v < cheapest_price):
cheapest_price = v
hour = k
hours.append(hour)
return hours
"""
Schedule a task with a deadline.
"""
def schedule_deadline_task(self, node_id, deadline, duration):
print(self.pricelist)
# TODO: Should it include power as well? Checking threshold and so on
hours = self.find_hours(duration, deadline)
print("hours = " + str(hours))
power = self.node_list[node_id]['power']
print("power : " + str(power))
# Add the power consumption for each block of the best suitable hours
for h in hours:
# Find the blocks representing each hour
index = h * self.blocks_per_hour
for i in range(index, (index + self.blocks_per_hour)):
# Add id and power to that index in the list, id to be able to
# call it later when it should be activated
self.block_schedule[i].append(({'id' : node_id, 'power': power}))
print("Node " + str(node_id) + " scheduled!")
# Add price calculation to the total for this day.
self.worst_case_price += self.calculate_worstcase_price(duration, power)
self.scheduled_price += self.calculate_price(hours, power)
print("Total price if scheduled now: " + str(self.calculate_worstcase_price(duration, power)))
print("Total price when scheduled now: " + str(self.calculate_price(hours, power)))
"""
Check if there is a scheduled task that should be started this block.
"""
def check_scheduled_tasks(self):
# Get which block in the schedule list we should look at
# Go through all tasks in the block schedule and see if some of them not is
# active, then we know it should be started now
total_clocks = self.blocks_per_hour * 24
for node in self.block_schedule[((self.clock + 1) % total_clocks)]:
# v could looks like [{1:300}, {2:500}] should check if id:s is in active list already
if node['id'] not in self.active_list:
print("ID not in active list, should start it now instead")
# Send activate message
payload = json.dumps({'action':'activate'}).encode('utf-8')
self.sockets[node['id']].send(payload)
# Add device to lists that keep track of the active ones
self.active_list[node['id']] = {'id': node['id']}
self.deadline_load[node['id']] = {'id': node['id']}
# add power to current_power as well
self.deadline_power += node['power']
###########################################################################
# Background / Interactive Scheduling #
###########################################################################
"""
Helpfunction that help finding the best background load to pause
with the shortest time left, since it is easier to schedule it later.
"""
def find_least_slack(self, temp_list):
# Sort by lowest time block left, and if same, sort by lowest power
if (len(temp_list) > 0):
# slack = 6 means run 6/6 blocks/hour, meaning maximum and no scheduable
slack = self.blocks_per_hour
# Temporary list that keeps track of nodes with identical values
tmp_list = {}
# Find minimum time
for k, v in temp_list.items():
if (v['time'] < slack):
slack = v['time']
node_id = k
tmp_list = {}
tmp_list.update({k: v})
elif (v['time'] == slack and slack != self.blocks_per_hour):
tmp_list.update({k: v})
# Find which node that has lowest power and return it
if (len(tmp_list) > 1):
# Infinite high number to make sure no device has higher power
low_pow = 99999
# Find minimum power consumption
for k, v in tmp_list.items():
if (v['power'] < low_pow):
low_pow = v['power']
for k, v in tmp_list.items():
if v['power'] == low_pow:
node_id = k
val = v
break
else:
val = tmp_list[node_id]
#print(str(node_id) + " has the highest slack")
return node_id, val
# If no backgroundloads exists, send back a message for that
else:
return None, None
"""
Start a background load if the threshold is okay.
Also check if a background load MUST be started
in order to make it this hour.
"""
def schedule_background(self, clock):
activate_list = []
# Check if there are any loads that have to be turned on this round
if (self.waiting_list):
time_left = self.blocks_per_hour - clock - 1
for k, v in self.waiting_list.items():
if (v['time'] == time_left):
print("Turn on emergency background for node: " + str(k))
# Send activate msg to the background node
payload = json.dumps({'action':'approved'}).encode('utf-8')
self.sockets[k].send(payload)
# Update current power
self.current_power += v['power']
# Add it to the active list and remove it from waiting list
self.active_list[k] = {'id': k}
# Add it to background loads to be able to see active backgrounds
self.background_load[k] = v
# add id:s to a temporary list since you can't change size of the list you iterate over
activate_list.append(k)
# Remove all loads that are started
for k in activate_list:
self.waiting_list.pop(k)
wait_lenght = len(self.waiting_list) # Number of waiting items
tries = 0
temp_waitlist = self.waiting_list.copy()
while ((self.current_power < self.threshold) and self.waiting_list):
# find the background node that should be turned on
node_id, node_details = self.find_least_slack(temp_waitlist)
print("Want to turn on " + str(node_id))
if((self.current_power + node_details['power']) < self.threshold):
print("Turn on " + str(node_id))
# Send activate msg to the background node
payload = json.dumps({'action':'approved'}).encode('utf-8')
self.sockets[node_id].send(payload)
# Update current power
self.current_power += node_details['power']
# Add it to the active list and remove it from waiting list
self.active_list[node_id] = {'id': node_id}
self.waiting_list.pop(node_id)
temp_waitlist.pop(node_id)
# Add it to background loads to be able to see active backgrounds
self.background_load[node_id] = node_details
else:
temp_waitlist.pop(node_id)
tries += 1
if (tries == wait_lenght):
break
else:
print("Uses to much power to enable background")
"""
Function that runs every hour in order to reset background loads.
"""
def reset_backgrounds(self):
# Loop through all background devices and reset the time
for k, v in self.background_list.items():
v['time'] = self.node_list[k]['time']
self.background_list.update({k: v})
# If we miss someone, should throw error or empty the list
if ((len(self.waiting_list) != 0) or (len(self.background_load) != 0)):
# Remove all background loads from active list
for k, v in self.background_list.items():
try:
self.active_list.pop(k)
self.current_power -= self.node_list[k]['power']
except:
continue
self.waiting_list.clear()
self.background_load.clear()
print("Opps! Missed to schedule some background loads")
# Add all reset items to the list again
for k, v in self.background_list.items():
self.waiting_list[k] = v
###########################################################################
# General Helper functions #
###########################################################################
"""
We want to change to the next block. Decrease the remaining
time of all background loads with 1 and check if we should disconnect
a scheduled task for now. The scheduled task might be finished, otherwise
it will be started another block.
"""
def decrease_time(self):
disconnect_list = []
# If it is a background task
if (self.background_load):
for k, v in self.background_load.items():
v['time'] = v['time'] - 1
# If time is 0, disconnect the device
if (v['time'] == 0):
print(str(k) + " is done for this hour")
# Send disconnect msg to the background node
payload = json.dumps({'action':'disconnect'}).encode('utf-8')
self.sockets[k].send(payload)
self.current_power -= v['power']
self.active_list.pop(k)
# add id:s to a temporary list since you can't change size of the list you iterate over
disconnect_list.append(k)
# Remove all loads that are done
for k in disconnect_list:
self.background_load.pop(k)
# If it is a deadline task
if (self.deadline_load):
# Get all scheduled task next hour
total_clocks = self.blocks_per_hour * 24
next_step = self.block_schedule[((self.clock + 1) % total_clocks)]
for node in self.block_schedule[self.clock]:
if ((node['id'] in self.active_list) and (node not in next_step)):
payload = json.dumps({'action':'disconnect'}).encode('utf-8')
self.sockets[node['id']].send(payload)
self.deadline_power -= self.node_list[node['id']]['power']
self.active_list.pop(node['id'])
self.deadline_load.pop(node['id'])
###########################################################################
# Communication and handle helpers #
###########################################################################
"""
Register a node. Save neccesary information about it.
"""
def handle_register(self, payload):
# Add the node to the list of all nodes
print('Register from node: ' + str(payload['id']))
self.node_list[payload['id']] = payload['details'].copy()
id = payload['id']
# Check if the node is a background task
if (payload['details']['flexible'] == 1):
self.background_list[payload['id']] = payload['details']
self.waiting_list[payload['id']] = payload['details']
elif (payload['details']['flexible'] == 2):
print("Scheduable task")
return id
"""
Handle a request from a node. This might be a request
to schedule something with a deadline or an interactive
task that needs to be started right away.
"""
def handle_request(self, payload):
print('Request from node: ' + str(payload['id']))
id = payload['id']
# Get the tuple of details based on the requested node's id
details = self.node_list[payload['id']]
# Check which flexibility the node has
# Interactive load
if (details['flexible'] == 0):
print('Interactive load')
# Add the power to the total consumption
self.current_power += details['power']
# Add device to active list
self.active_list[payload['id']] = {'id': payload['id']}
# Send approval to the node
payload = json.dumps({'action':'approved'}).encode('utf-8')
self.sockets[id].send(payload)
# If interactive load exceed the limit, turn off background load
if self.current_power > self.threshold:
# Until we have a current power below threshold, continue, worst case is
# when we have emergency loads, then we will break anyway
tmp_backgroundload = self.background_load.copy()
while ((self.current_power > self.threshold) and tmp_backgroundload):
# find the background node that should be turned off
node_id, node_details = self.find_least_slack(tmp_backgroundload)
# Check that there arent any background loads to disconnect
if (not node_id):
break
# If the time left is the same time as the node require, don't pause it
time_left = self.blocks_per_hour - (self.clock % 6) - 1
if (node_details['time'] == time_left):
tmp_backgroundload.pop(node_id)
continue
# Send disconnect msg to the background node
payload = json.dumps({'action':'disconnect'}).encode('utf-8')
self.sockets[node_id].send(payload)
# Remove it from the active list
self.active_list.pop(node_id)
self.background_load.pop(node_id)
tmp_backgroundload.pop(node_id)
# Add the device back to the waiting list
self.waiting_list[node_id] = node_details
# Decrease the power
self.current_power -= self.node_list[node_id]['power']
# Deadline task
elif (details['flexible'] == 2):
print('Schedulable')
deadline = details['deadline']
duration = details['time']
self.schedule_deadline_task(id, deadline, duration)
else:
raise Exception
"""
Disconnect a load. This should not happen except
for interactive loads.
"""
def handle_disconnect(self, payload):
print('Disconnect from node: ' + str(payload['id']))
id = payload['id']
self.active_list.pop(id)
payload = json.dumps({'action':'disconnect'}).encode('utf-8')
self.sockets[id].send(payload)
details = self.node_list[id]
self.current_power -= details['power']
"""
Update from a node. This is currently not used
from the node / load since we assume that loads
does not change during runtime.
"""
def handle_update(self, payload):
print('Update from node: ' + str(payload['id']))
"""
Helper function for receive
"""
def handle_recv(self, s):
try:
data = s.recv(1024)
except Exception as e:
return
if not data:
return
data = data.decode('utf-8')
try:
data = json.loads(data)
except Exception as e:
print (e)
return
return data
"""
Helper function to handle different incoming
actions. Send to the correct helper function for
that action.
"""
def handle_action(self, data):
action = data['action']
payload = data['payload']
# Request action
if (action == 'request'):
self.handle_request(payload)
# Update action
elif (action == 'update'):
self.handle_update(payload)
# Disconnect action
elif (action == 'disconnect'):
self.handle_disconnect(payload)
# Invalid, drop it
else:
print('Invalid action received')
###########################################################################
# Main #
###########################################################################
def main(self, plot):
# Set up the graph
if (plot):
plt.ion()
self.figure, self.axis = plt.subplots()
self.lines, = self.axis.plot([],[], 'r-', label="Watt")
self.axis.set_autoscaley_on(True)
self.axis.set_xlim(0, 144)
self.axis.set_ylim(0, 5000)
self.axis.set_xlabel('Blocks')
self.axis.set_ylabel('Watt')
self.axis.grid()
plt.legend()
# Lists in order to keep track of usage different hours
block_usage = []
blocks = []
while True:
if (plot):
if self.current_hour >= 0 and self.current_hour < 12:
blocks.append(self.clock + 72)
else:
blocks.append(self.clock - 72)
block_usage.append(self.current_power + self.deadline_power)
plt.pause(0.05)
#plt.plot(blocks, block_usage, zorder=1)
self.lines.set_xdata(blocks)
self.lines.set_ydata(block_usage)
self.axis.relim()
self.axis.autoscale_view()
self.figure.canvas.draw()
self.figure.canvas.flush_events()
# Print useful debugging information
print("======== New block ========")
print("Current power: " + str(self.current_power))
print("Deadline power: " + str (self.deadline_power))
print("Active list: " + str(self.active_list))
print("Background load: " + str(self.background_load))
print("Deadline load: " + str(self.deadline_load))
print("Waiting list: " + str(self.waiting_list))
print("Clock: " + str(self.clock))
print("Hour: " + str(self.current_hour))
# The scheduler for already scheduled tasks, check if some should be turned on
self.check_scheduled_tasks()
# Always decrease time when we executed one turn in the loop
self.decrease_time()
# Fetch current second
self.current_second = int(time.strftime('%S', time.gmtime()))
# The scheduler for the background loads
self.schedule_background((self.clock%self.blocks_per_hour))
# Wait here until next second
while(self.current_second == int(time.strftime('%S', time.gmtime()))):
# Check if the main socket has connection
readable, writable, errored = select.select([self.server_socket], [], [], 0)
for s in readable:
if s is self.server_socket:
client_socket, address = self.server_socket.accept()
data = client_socket.recv(1024)
if not data:
continue
data = data.decode('utf-8')
try:
data = json.loads(data)
except Exception as e:
print (e)
continue
# Set it up
# Might need to set up a much higher timeout here as well, AND in node.py sockets
client_socket.setblocking(0)
# Fetch the id and add it to the socket list
id = self.handle_register(data['payload'])
self.sockets[id] = client_socket
# Check if the other sockets have sent something to us
for s in self.sockets.values():
data = self.handle_recv(s)
if data:
self.handle_action(data)
else:
continue
time.sleep(0.2)
# Increase time
self.clock += 1
if (self.clock % self.blocks_per_hour == 0):
print("================== NEW HOUR =================")
# Increase to new hour and keep it between 0 and 23
self.current_hour += 1
self.current_hour = self.current_hour % 24
# Turn off the algorithm after 24 hours
if (self.current_hour == 12):
print("################################# Simulation Done #################################")
print("You saved: " + str(self.worst_case_price - self.scheduled_price) + " kr")
print("Price if no scheduling: " + str(self.worst_case_price) + " kr")
print("Price with scheduling: " + str(self.scheduled_price) + " kr")
self.worst_case_price = self.scheduled_price = 0
print("Blocks: ", blocks)
print("Block consumption: ", block_usage)
break
# Reset function that reset the internal time for all background devices every 6th block (seconds)
self.reset_backgrounds()
# Update pricelist with the last hour
self.update_pricelist(self.current_hour)
if (self.clock % (self.blocks_per_hour*24) == 0):
print("!!!!!!!!!!!!!!!!!! New day! !!!!!!!!!!!!!!!!!!")
self.clock = 0 # Reset the clock since it is a new day
time.sleep(30)
if __name__ == "__main__":
# Check command line arguments
if len(sys.argv) > 1:
if ((str(sys.argv[1])) == "plot"):
plot = True
else:
plot = False
else:
plot = False
smart_meter = SmartMeter()
smart_meter.main(plot)