forked from mathoudebine/turing-smart-screen-python
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscheduler.py
109 lines (80 loc) · 3.01 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import sched
import threading
import time
from datetime import timedelta
from functools import wraps
import library.config as config
import library.stats as stats
CONFIG_DATA = config.CONFIG_DATA
def async_job(threadname=None):
""" wrapper to handle asynchronous threads """
def decorator(func):
""" Decorator to extend async_func """
@wraps(func)
def async_func(*args, **kwargs):
""" create an asynchronous function to wrap around our thread """
func_hl = threading.Thread(target=func, name=threadname, args=args, kwargs=kwargs)
func_hl.start()
return func_hl
return async_func
return decorator
def schedule(interval):
""" wrapper to schedule asynchronous threads """
def decorator(func):
""" Decorator to extend periodic """
def periodic(scheduler, periodic_interval, action, actionargs=()):
""" Wrap the scheduler with our periodic interval """
scheduler.enter(periodic_interval, 1, periodic,
(scheduler, periodic_interval, action, actionargs))
action(*actionargs)
@wraps(func)
def wrap(
*args,
**kwargs
):
""" Wrapper to create our schedule and run it at the appropriate time """
scheduler = sched.scheduler(time.time, time.sleep)
periodic(scheduler, interval, func)
scheduler.run()
return wrap
return decorator
@async_job("CPU_Percentage")
@schedule(timedelta(seconds=CONFIG_DATA['STATS']['CPU']['PERCENTAGE'].get("INTERVAL", None)).total_seconds())
def CPUPercentage():
""" Refresh the CPU Percentage """
# print("Refresh CPU Percentage")
stats.CPU.percentage()
@async_job("CPU_Frequency")
@schedule(timedelta(seconds=CONFIG_DATA['STATS']['CPU']['FREQUENCY'].get("INTERVAL", None)).total_seconds())
def CPUFrequency():
""" Refresh the CPU Frequency """
# print("Refresh CPU Frequency")
stats.CPU.frequency()
@async_job("CPU_Load")
@schedule(timedelta(seconds=CONFIG_DATA['STATS']['CPU']['LOAD'].get("INTERVAL", None)).total_seconds())
def CPULoad():
""" Refresh the CPU Load """
# print("Refresh CPU Load")
stats.CPU.load()
@async_job("GPU_Stats")
@schedule(timedelta(seconds=CONFIG_DATA['STATS']['GPU'].get("INTERVAL", None)).total_seconds())
def GPUStats():
""" Refresh the GPU Stats """
# print("Refresh GPU Stats")
stats.GPU.stats()
@async_job("Memory_Stats")
@schedule(timedelta(seconds=CONFIG_DATA['STATS']['MEMORY'].get("INTERVAL", None)).total_seconds())
def MemoryStats():
# print("Refresh memory stats")
stats.Memory.stats()
@async_job("Disk_Stats")
@schedule(timedelta(seconds=CONFIG_DATA['STATS']['DISK'].get("INTERVAL", None)).total_seconds())
def DiskStats():
# print("Refresh disk stats")
stats.Disk.stats()
@async_job("Queue_Handler")
@schedule(timedelta(milliseconds=1).total_seconds())
def QueueHandler():
f, args = config.update_queue.get()
if f:
f(*args)