-
Notifications
You must be signed in to change notification settings - Fork 3
/
tasks.py
341 lines (293 loc) · 11.6 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
"""Contains classes for running functions/commands asynchronously.
"""
import logging
import pstats
import subprocess
import threading
import time
import traceback
from cProfile import Profile
# try:
from .outputs import CaptureOutput
# except SystemError:
# import sys
# from os.path import dirname
# aries_parent = dirname(dirname(__file__))
# if aries_parent not in sys.path:
# sys.path.append(aries_parent)
# from Aries.outputs import CaptureOutput
logger = logging.getLogger(__name__)
class Task:
"""A base class for representing a task like running a function or a command.
Attributes:
thread: The thread running the task, if the the task is running asynchronous.
The thread value is set by run_async().
The following attributes are designed to capture the output of running the task.
std_out (str): Captured standard outputs.
std_err (str): Captured standard errors.
log_out (str): Captured log messages.
exc_out (str): Captured exception outputs.
returns: Return value of the task.
pid (int): The PID of the process running the task.
This class should not be initialized directly.
The subclass should implement the run() method.
The run() method should handle the capturing of outputs.
"""
def __init__(self):
self.pid = None
self.thread = None
self.returns = None
self.exception = None
self.std_out = ""
self.std_err = ""
self.log_out = ""
self.exc_out = ""
@property
def log_list(self):
"""Log messages as a list.
"""
return self.log_out.strip("\n").split("\n")
def print_outputs(self):
"""Prints the PID, return value, stdout, stderr and logs.
"""
print("=" * 80)
print("PID: %s" % self.pid)
print("RETURNS: %s" % self.returns)
print("STD OUT:")
for line in self.std_out.split("\n"):
print(line)
print("STD ERR:")
for line in self.std_err.split("\n"):
print(line)
print("LOGS:")
for line in self.log_out.split("\n"):
print(line)
def run(self):
"""Runs the task and capture the outputs.
This method should be implemented by a subclass.
"""
raise NotImplementedError(
"A run() method should be implemented to run the task and capture the outputs."
)
def run_async(self):
"""Runs the task asynchronous by calling the run() method in a daemon thread.
Returns: The daemon thread running the task.
"""
thread = threading.Thread(
target=self.run,
)
thread.daemon = True
thread.start()
self.thread = thread
return self.thread
def join(self):
"""Blocks the calling thread until the daemon thread running the task terminates.
"""
if self.thread and self.thread.isAlive():
return self.thread.join()
else:
return None
class FunctionTask(Task):
"""Represents a task of running a function.
The return value of the function to be executed should be serializable.
The logging will be captured by identifying the thread ID of the thread running the function.
Attributes:
thread: The thread running the function, if the the function is running asynchronous.
The thread value is set by run_async().
The following attributes are designed to capture the output of running the function.
std_out (str): Captured standard outputs.
std_err (str): Captured standard errors.
log_out (str): Captured log messages.
exc_out (str): Captured exception outputs.
returns: Return value of the task.
pid (int): The PID of the process running the task.
func: The function to be executed.
args: The arguments for executing the function.
kwargs: The keyword arguments for executing the function.
Remarks:
std_out and std_err will contain the outputs from all threads running in the same process.
"""
# Stores a list of attribute names to be captured from the process running the function
__output_attributes = [
"std_out",
"std_err",
"log_out",
"exc_out",
"returns",
]
def __init__(self, func, *args, **kwargs):
"""Initializes a task to run a function.
Args:
func: The function to be executed.
*args: A list of arguments for the function to be executed.
**kwargs: A dictionary of keyword arguments for the function to be executed.
"""
super(FunctionTask, self).__init__()
self.func = func
self.args = args
self.kwargs = kwargs
self.log_filters = []
self.out = None
def add_log_filter(self, log_filter):
self.log_filters.append(log_filter)
return self
def __unpack_outputs(self, out):
"""Sets a list of attributes (self.__output_attributes) by copying values from a dictionary.
Args:
out (dict): The dictionary containing the values for attributes.
The keys in the dictionary must be the same as the attribute names.
"""
for k in self.__output_attributes:
setattr(self, k, out.get(k))
def __pack_outputs(self, out):
"""Saves a list of attributes (self.__output_attributes) to a dictionary.
Args:
out: An object with all attributes listed in self.__output_attributes.
"""
return {
k: getattr(out, k) for k in self.__output_attributes
}
def __run(self):
try:
with CaptureOutput(filters=self.log_filters) as out:
self.out = out
# Returns may not be serializable.
out.returns = self.func(*self.args, **self.kwargs)
except Exception as ex:
# Catch and save the exception
# run() determines whether to raise the exception
# base on "surpress_exception" argument.
self.exception = ex
else:
# Reset self.exception if the run is successful.
# This is for run_and_retry()
self.exception = None
try:
# name = self.func.__name__ if hasattr(self.func, "__name__") else str(self.func)
# logger.debug("Finished running %s()..." % name)
return self.__pack_outputs(out)
except Exception as ex:
print(ex)
return {
"exc_out": traceback.format_exc()
}
def exit_run(self):
"""Additional processing before exiting the task.
This method is intended to be implemented by a subclass.
"""
pass
def run(self, suppress_exception=True):
"""Runs the function and captures the outputs.
"""
# receiver, pipe = Pipe()
# p = Process(target=self.__run, args=(pipe,))
# p.start()
#
# self.pid = p.pid
# print("%s PROCESS STARTED" % p.pid)
# out = receiver.recv()
# print("%s MESSAGE RECEIVED" % p.pid)
# p.terminate()
self.__unpack_outputs(self.__run())
if self.exc_out:
print(self.exc_out)
self.exit_run()
if not suppress_exception and self.exception is not None:
raise self.exception
return self.returns
def run_profiler(self):
"""Runs the function with profiler.
"""
profile = Profile()
profile.runcall(self.func, *self.args, **self.kwargs)
stats = pstats.Stats(profile)
stats.strip_dirs()
# Display profiling results
stats.sort_stats('cumulative', 'time').print_stats(0.1)
def run_and_retry(self, max_retry=10, exceptions=Exception,
base_interval=2, retry_pattern='exponential', capture_output='True'):
"""Runs the function and retry a few times if certain exceptions occurs.
The time interval between the ith and (i+1)th retry is base_interval**i,
i.e. interval increases exponentially.
Args:
max_retry (int): The number of times to re-try.
exceptions: An exception class or A tuple of exception classes.
base_interval (int): The interval before the first retry in seconds.
retry_pattern (str): The pattern of the retry interval.
"exponential": The time between two retries will increase exponentially.
i.e., the interval will be "base_interval ** i" after the ith try.
"linear": The time between two retries will increase linear.
i.e., the interval will be "base_interval * i" after the ith try.
capture_output: Indicate if the outputs and logs of the function should be captured.
Outputs and logs will be captured to std_out, std_err and log_out attributes.
Setting capture_output to False will improve the performance.
Returns: The return value of the function.
"""
error = None
for i in range(max_retry):
try:
if capture_output:
results = self.run(suppress_exception=False)
else:
results = self.func(*self.args, **self.kwargs)
except exceptions as ex:
error = ex
traceback.print_exc()
if retry_pattern == "exponential":
time.sleep(base_interval ** (i + 1))
else:
time.sleep(base_interval * (i + 1))
else:
return results
# The following will be executed only if for loop finishes without break/return
else:
raise error
class ShellCommand(Task):
"""Represents a task of running a shell command.
Attributes:
thread: The thread running the task, if the the task is running asynchronous.
The thread value is set by run_async().
The following attributes are designed to capture the output of running the task.
std_out (str): Captured standard outputs.
std_err (str): Captured standard errors.
log_out (str): Captured log messages.
exc_out (str): Captured exception outputs.
returns: Return value of the task.
pid (int): The PID of the process running the task.
This class can be used to run a shell command and capture the outputs.
For example, the command "ls -a ~" displays all files in the user's home directory.
The following code runs this command:
cmd = "ls -a ~"
task = ShellCommand(cmd)
task.run()
print(task.std_out)
The outputs are stored in "task.std_out" as a string.
Run the command asynchronously, if the command takes a long time to complete:
cmd = "YOUR_AWESOME_COMMAND"
task = ShellCommand(cmd)
task.run_async()
# Feel free to do something else here
# ...
# Get the outputs
task.join()
print(task.std_out)
"""
def __init__(self, cmd):
super(ShellCommand, self).__init__()
self.cmd = cmd
self.process = None
def run(self):
"""Runs the command with Popen()
"""
self.process = subprocess.Popen(
self.cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True
)
self.pid = self.process.pid
out, err = self.process.communicate()
self.std_out = out.decode()
self.std_err = err.decode()
self.returns = self.process.returncode
return self