Description
Following the discussion on dask/dask-jobqueue#169, I produced a sscce. Thank you @guillaumeeb and @mrocklin!
Description
The issue is that a MemoryError cannot be produced (except with an explicit raise) because the worker restarts before.
Motivations
It seems trivial, but there are situations when you want to catch those errors.
The bug originates from EpistasisLab/tpot#779 where an evolutionary algorithm tests scikit-learn pipelines. Some pipelines will produce a MemoryError (like PolynomialFeatures with a large number of columns)
Code
Note that the same works with the delayed interface.
from dask.distributed import Client
import numpy as np
def f(n):
a = np.arange(n)
return sum(a)
def errf(n):
raise MemoryError
def sep():
print()
print('-' * 42)
print()
if __name__ == '__main__':
client = Client(ncores=1, memory_limit='200M')
sep()
try:
r = client.submit(f, 42)
print(r.result())
except MemoryError as e:
print('Just triggered a MemoryError', e)
sep()
try:
r = client.submit(errf, 42)
print(r.result())
except MemoryError as e:
print('Just triggered a MemoryError', e)
sep()
try:
r = client.submit(f, 10**8)
print(r.result())
except MemoryError as e:
print('Just triggered a MemoryError', e)
Output
------------------------------------------
861
------------------------------------------
distributed.worker - WARNING - Compute Failed
Function: errf
args: (42)
kwargs: {}
Exception: MemoryError()
distributed.scheduler - ERROR - error from worker tcp://127.0.0.1:57166:
Just triggered a MemoryError
------------------------------------------
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 22433 was killed by unknown signal
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 22432 was killed by unknown signal
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 22430 was killed by unknown signal
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker process 22431 was killed by unknown signal
Traceback (most recent call last):
File "dask_version.py", line 45, in <module>
print(r.result())
File "/Users/louisabraham/miniconda3/lib/python3.6/site-packages/distributed/client.py", line 171, in result
six.reraise(*result)
File "/Users/louisabraham/miniconda3/lib/python3.6/site-packages/six.py", line 693, in reraise
raise value
distributed.scheduler.KilledWorker: ('f-53758efeacb65d99d2d76fb863336f1b', 'tcp://127.0.0.1:57177')
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-7, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-6, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-5, started daemon)>
distributed.nanny - WARNING - Worker process 22436 was killed by unknown signal
distributed.nanny - WARNING - Worker process 22435 was killed by unknown signal
distributed.nanny - WARNING - Worker process 22434 was killed by unknown signal
Also, one can notice that the warnings are strange, because distributed.nanny should know what killed the workers.
Discussion
Since I only started looking at the internals of Dask yesterday, my opinion is very subjective ; but I see two options.
- When the memory usage is too big and the task causes more than n restarts, make it raise a MemoryError
- Wrap the system calls of the worker to make it think it has 95% of its memory budget so that it will naturally produce a MemoryError
The default behavior is to try executing the task 3 times. If a worker has a MemoryError because of a temporary problem on its node, then dask should retry the task.
I don't know if 1. is possible, but if it is, it offers the most flexibility as it would just be a variation on the default behavior that raises a MemoryError if the task failed 3 times because of a worker that restarted. This value could be customized for this particular reason of worker restart.
The 2. could be system dependent, and would probably need a mechanism to retry the tasks. This mechanism would be simple to handle from the application code, but I really think it should be provided by dask as well.