-
Notifications
You must be signed in to change notification settings - Fork 2
/
poll.py
169 lines (144 loc) · 6.05 KB
/
poll.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
import time
from threading import Thread
from queue import Empty
import gi
gi.require_version("Gdk", "3.0")
from gi.repository import Gdk, GObject
from util import Util
from environment import Environment
from gui.common import CommonActions as ca
class PollsFactory(object):
""" Returns a polling object geared towards either shell use or the gui. The
reason for having separate poll objects is because gui dialogs cannot be
produced from a separate thread; we must use gobject's add_timeout to
allow this, and so the poll semantics are slightly different.
"""
def __new__(cls, ProcessHandler):
if Environment.interface == 'shell':
return ShellPolls(ProcessHandler)
elif Environment.interface == 'gui':
return GUIPolls(ProcessHandler)
class BasePolls(object):
""" Polls base class
"""
def __init__(self, ProcessHandler):
self._should_poll = True
self._is_polling_threads = False
self._is_polling_exceptions = False
self.ProcessHandler = ProcessHandler
def start_polls(self):
if self._should_poll:
self._start_thread_poll()
self._start_exception_poll()
def stop_polls(self):
self._should_poll = False
def _start_thread_poll(self):
pass
def _thread_poll(self):
pass
def _start_exception_poll(self):
pass
def _exception_poll(self):
pass
#TODO add some progress indicator for command line users
class ShellPolls(BasePolls):
""" Monitors the CPU bound threads, submitting waiting processes when others
finish, and will abort a process when an exception arises. In the event
of an exception, all processes running in conjunction to the failed
process will also be shutdown, i.e., tasks distributed across multple
cores will see all their associated threads terminate. The exception will
also be printed to the screen and logged.
"""
def __init__(self, ProcessHandler):
super(ShellPolls, self).__init__(ProcessHandler)
def _start_thread_poll(self):
if not self._is_polling_threads:
self._tpoll = Thread(target=self._thread_poll)
self._tpoll.start()
self._is_polling_threads = True
def _thread_poll(self):
while True:
time.sleep(1.0)
if (not self._should_poll or
not self.ProcessHandler._are_active_processes()):
self._is_polling_threads = False
break
self.ProcessHandler._clear_inactive()
self.ProcessHandler._submit_waiting()
def _start_exception_poll(self):
if not self._is_polling_exceptions:
self._epoll = Thread(target=self._exception_poll)
self._epoll.start()
self._is_polling_exceptions = True
def _exception_poll(self):
while True:
time.sleep(1.0)
if (not self._should_poll or
not self.ProcessHandler._are_active_processes()):
self._is_polling_exceptions = False
break
try:
pid, exc_info = self.ProcessHandler._exception_queue.get_nowait()
except Empty:
pass
else:
exception, traceback = exc_info
for _pid, _exception in self.ProcessHandler._handled_exceptions:
identifier, cls = pid.split('.')[:2]
if identifier+'.'+cls in _pid:
return True
self.ProcessHandler._handled_exceptions.append((pid, exception))
msg = 'Exception in ' + pid + ':\n' + traceback
identifier = pid.split('.')[0]
self.ProcessHandler.abort(identifier)
print (msg)
class GUIPolls(BasePolls):
""" Monitors the CPU bound threads, submitting waiting processes when others
finish, and will abort a process when an exception arises. In the event
of an exception, all processes running in conjunction to the failed
process will also be shutdown, i.e., tasks distributed across multple
cores will see all their associated threads terminate. The exception will
also be displayed in a dialog box and logged.
"""
def __init__(self, ProcessHandler):
super(GUIPolls, self).__init__(ProcessHandler)
def _start_thread_poll(self):
if not self._is_polling_threads:
GObject.timeout_add(1000, self._thread_poll)
self._is_polling_threads = True
def _thread_poll(self):
#if (not self._should_poll or
# not self.ProcessHandler._are_active_processes()):
# self._is_polling_threads = False
# self._should_poll = False
# return False
self.ProcessHandler._clear_inactive()
self.ProcessHandler._submit_waiting()
return True
def _start_exception_poll(self):
if not self._is_polling_exceptions:
GObject.timeout_add(500, self._exception_poll)
self._is_polling_exceptions = True
def _exception_poll(self):
#if (not self._should_poll or
# not self.ProcessHandler._are_active_processes()):
# self._is_polling_exceptions = False
# self._should_poll = False
# return False
try:
#print ('epoll')
pid, exc_info = self.ProcessHandler._exception_queue.get_nowait()
except Empty:
return True
else:
exception, traceback = exc_info
for _pid, _exception in self.ProcessHandler._handled_exceptions:
identifier, cls = pid.split('.')[:2]
if identifier+'.'+cls in _pid:
return True
self.ProcessHandler._handled_exceptions.append((pid, exception))
msg = 'Exception in ' + pid + ':\n' + traceback
identifier = pid.split('.')[0]
self.ProcessHandler.abort(identifier)
ca.dialog(message=msg)
return True