Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use_dask and joblib don't handle Exceptions #779

Open
louisabraham opened this issue Oct 6, 2018 · 8 comments
Open

use_dask and joblib don't handle Exceptions #779

louisabraham opened this issue Oct 6, 2018 · 8 comments

Comments

@louisabraham
Copy link

louisabraham commented Oct 6, 2018

My issue is described at dask/dask-jobqueue#169

A huge thank to @guillaumeeb who identified the problem!

Context of the issue

Basically, an error is triggered by the dask backend if a computation encounters exceptions,
and after some attempts to relaunch the task, it makes the fit function fail.

Process to reproduce the issue

Set use_dask=True with a dataset that will crash with some pipelines (for example PolynomialFeatures with 800 columns).

Expected result

Without dask, _wrapped_cross_val_score catches the exceptions and returns a -inf score.
The relevant code is there:

tpot/tpot/gp_deap.py

Lines 434 to 480 in 507b45d

if use_dask:
try:
import dask_ml.model_selection # noqa
import dask # noqa
from dask.delayed import Delayed
except ImportError:
msg = "'use_dask' requires the optional dask and dask-ml depedencies."
raise ImportError(msg)
dsk, keys, n_splits = dask_ml.model_selection._search.build_graph(
estimator=sklearn_pipeline,
cv=cv,
scorer=scorer,
candidate_params=[{}],
X=features,
y=target,
groups=groups,
fit_params=sample_weight_dict,
refit=False,
error_score=float('-inf'),
)
cv_results = Delayed(keys[0], dsk)
scores = [cv_results['split{}_test_score'.format(i)]
for i in range(n_splits)]
CV_score = dask.delayed(np.array)(scores)[:, 0]
return dask.delayed(np.nanmean)(CV_score)
else:
try:
with warnings.catch_warnings():
warnings.simplefilter('ignore')
scores = [_fit_and_score(estimator=clone(sklearn_pipeline),
X=features,
y=target,
scorer=scorer,
train=train,
test=test,
verbose=0,
parameters=None,
fit_params=sample_weight_dict)
for train, test in cv_iter]
CV_score = np.array(scores)[:, 0]
return np.nanmean(CV_score)
except TimeoutException:
return "Timeout"
except Exception as e:
return -float('inf')

Current result

The current result is strange (KeyError), but the error message on the master branch of Dask will indicate that a worker restarted (I think).

Possible fix

There should be a way to catch the memory error (or worker restart) to return a -inf score.

When I try with a LSFCluster backend, it causes an exception

When I try with a LocalCluster (Client()), the whole notebook crashes after some attempts.

Immediate fix

I think using the joblib backend provided by Dask should be fine? I'll try tomorrow!

It doesn't work, see below.

Screenshots

from the dask web interface
43351582_285330748746423_8179954919642497024_n

43319853_1882248011842149_2283641915238776832_n

Logs

with one worker

traceback: https://pastebin.com/raw/QvtVXkBD
dask-worker.err: https://ptpb.pw/SwbX

Notice that the dask worker restarted 3 times, which appears to be a constant.

@louisabraham
Copy link
Author

I am not sure, but the timeout handling with the dask backend should be worth testing as well.

@louisabraham
Copy link
Author

Unfortunately, it doesn't work either with the joblib backend.

I launched the following code:

import pandas as pd
import numpy as np

X = pd.read_csv('X_train.csv').drop(columns=['id']).values
y = pd.read_csv('y_train.csv').drop(columns=['id']).values[:, 0]

from dask_jobqueue import LSFCluster
cluster = LSFCluster(cores=2, memory='2GB', job_extra=['-R rusage[mem=2048,scratch=16000]'],
                    local_directory='$TMPDIR',
                    walltime='12:00')

from sklearn.externals import joblib
import distributed.joblib
from dask.distributed import Client
client = Client(cluster)

cluster.scale(6)

from tpot import TPOTRegressor

reg = TPOTRegressor(max_time_mins=60*8, generations=50, population_size=30,
                    cv=5,
                    scoring='r2',
                    memory='auto', random_state=42, verbosity=10, n_jobs=-1)

with joblib.parallel_backend("dask.distributed"):
    reg.fit(X, y)

After some time, I got the same errors:

Skipped pipeline #29 due to time out. Continuing to the next pipeline.
_pre_test decorator: _random_mutation_operator: num_test=0 l2 was provided as affinity. Ward can only work with euclidean distances.
_pre_test decorator: _random_mutation_operator: num_test=0 __init__() got an unexpected keyword argument 'max_depth'
_pre_test decorator: _random_mutation_operator: num_test=0 Unsupported set of arguments: The combination of penalty='l2' and loss='epsilon_insensitive' are not supported when dual=False, Parameters: penalty='l2', loss='epsilon_insensitive', dual=False
_pre_test decorator: _random_mutation_operator: num_test=0 Found array with 0 feature(s) (shape=(50, 0)) while a minimum of 1 is required.
Skipped pipeline #49 due to time out. Continuing to the next pipeline.

distributed.scheduler - ERROR - '74911161'
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1306, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/dask_jobqueue/core.py", line 62, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '74911161'
distributed.scheduler - ERROR - '74911158'
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1306, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/dask_jobqueue/core.py", line 62, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '74911158'
distributed.scheduler - ERROR - '74911160'
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1306, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/dask_jobqueue/core.py", line 62, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '74911160'
distributed.scheduler - ERROR - '74911161'
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1712, in remove_worker
    plugin.remove_worker(scheduler=self, worker=address)
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/dask_jobqueue/core.py", line 74, in remove_worker
    del self.running_jobs[job_id][name]
KeyError: '74911161'
distributed.scheduler - ERROR - '74911161'
Traceback (most recent call last):
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/distributed/scheduler.py", line 1306, in add_worker
    plugin.add_worker(scheduler=self, worker=address)
File "/cluster/home/abrahalo/.local/lib64/python3.6/site-packages/dask_jobqueue/core.py", line 62, in add_worker
    self.running_jobs[job_id] = self.pending_jobs.pop(job_id)
KeyError: '74911161'

and the cell failed without raising an error but early stopped.
The progress bar was displaying 63 iterations but len(reg.evaluated_individuals_) returned 57.
The errors were displayed as the memory usage of some workers exploded.

@louisabraham louisabraham changed the title use_dask doesn't handle Exceptions use_dask and joblib don't handle Exceptions Oct 7, 2018
@louisabraham
Copy link
Author

For use_dask, the problem probably comes from dask_ml.model_selection._search.build_graph because the argument error_score=float('-inf') was used but didn't seem to have an effect.

tpot/tpot/gp_deap.py

Lines 443 to 454 in 507b45d

dsk, keys, n_splits = dask_ml.model_selection._search.build_graph(
estimator=sklearn_pipeline,
cv=cv,
scorer=scorer,
candidate_params=[{}],
X=features,
y=target,
groups=groups,
fit_params=sample_weight_dict,
refit=False,
error_score=float('-inf'),
)

If one solved the problem with use_dask, it is probable that it will solve the dask joblib backend issue as well.

@weixuanfu
Copy link
Contributor

weixuanfu commented Oct 8, 2018

@louisabraham which version of dask you are using right now? I remember we had this kind of error handling issue before but I thought @TomAugspurger fixed it.

Edit: I think that MemoryError or other out-of-resource errors may need another handler in dask.

@louisabraham
Copy link
Author

I am using dask 0.19.3 and tpot 0.9.5.

@TomAugspurger
Copy link
Contributor

It's quite likely timeouts sent by TPOT are not handled correctly right now on the dask backend. That should be possible with a little effort

It'll be a bit before I can look closely at the other errors.

@louisabraham
Copy link
Author

I think it is about MemoryError, not timeouts. I reported the issue at dask/distributed#2297

@dreavjr
Copy link

dreavjr commented Apr 18, 2020

I'm facing the same issue. Apparently an exception in an worker is leading to a KillerWorker exception being raised, which is leading to a mistimed "RuntimeError: A pipeline has not yet been optimized. Please call fit() first." being raised, and then the whole dask machinery is shut down. I'm attaching the relevant portion of the logs and the installed packages (I'm using anaconda inside docker).
log-excerpt.txt
conda3.list.txt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants