Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dask] Support pred_contrib in Dask predict() methods (fixes #3713) #3774

Merged
merged 16 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions python-package/lightgbm/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,47 +228,67 @@ def _train(client, data, label, params, model_factory, weight=None, **kwargs):
return results[0]


def _predict_part(part, model, proba, **kwargs):
def _predict_part(part, model, pred_proba, pred_leaf, pred_contrib, **kwargs):
jameslamb marked this conversation as resolved.
Show resolved Hide resolved
data = part.values if isinstance(part, pd.DataFrame) else part

if data.shape[0] == 0:
result = np.array([])
elif proba:
result = model.predict_proba(data, **kwargs)
elif pred_proba:
result = model.predict_proba(data, pred_leaf=pred_leaf, pred_contrib=pred_contrib, **kwargs)
else:
result = model.predict(data, **kwargs)
result = model.predict(data, pred_leaf=pred_leaf, pred_contrib=pred_contrib, **kwargs)

if isinstance(part, pd.DataFrame):
if proba:
if pred_proba or pred_contrib:
result = pd.DataFrame(result, index=part.index)
else:
result = pd.Series(result, index=part.index, name='predictions')

return result


def _predict(model, data, proba=False, dtype=np.float32, **kwargs):
def _predict(model, data, pred_proba=False, pred_leaf=False, pred_contrib=False,
dtype=np.float32, **kwargs):
"""Inner predict routine.

Parameters
----------
model :
data : dask array of shape = [n_samples, n_features]
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved
Input feature matrix.
proba : bool
pred_proba : bool
jameslamb marked this conversation as resolved.
Show resolved Hide resolved
Should method return results of predict_proba (proba == True) or predict (proba == False)
jameslamb marked this conversation as resolved.
Show resolved Hide resolved
pred_leaf : bool, optional (default=False)
Whether to predict leaf index.
pred_contrib : bool, optional (default=False)
Whether to predict feature contributions.
dtype : np.dtype
Dtype of the output
jameslamb marked this conversation as resolved.
Show resolved Hide resolved
kwargs : other parameters passed to predict or predict_proba method
jameslamb marked this conversation as resolved.
Show resolved Hide resolved
"""
if isinstance(data, dd._Frame):
return data.map_partitions(_predict_part, model=model, proba=proba, **kwargs).values
return data.map_partitions(
_predict_part,
model=model,
pred_proba=pred_proba,
pred_leaf=pred_leaf,
pred_contrib=pred_contrib,
**kwargs
).values
elif isinstance(data, da.Array):
if proba:
if pred_proba:
kwargs['chunks'] = (data.chunks[0], (model.n_classes_,))
else:
kwargs['drop_axis'] = 1
return data.map_blocks(_predict_part, model=model, proba=proba, dtype=dtype, **kwargs)
return data.map_blocks(
_predict_part,
model=model,
pred_proba=pred_proba,
pred_leaf=pred_leaf,
pred_contrib=pred_contrib,
dtype=dtype,
**kwargs
)
else:
raise TypeError('Data must be either Dask array or dataframe. Got %s.' % str(type(data)))

Expand Down Expand Up @@ -317,7 +337,7 @@ def predict(self, X, **kwargs):

def predict_proba(self, X, **kwargs):
"""Docstring is inherited from the lightgbm.LGBMClassifier.predict_proba."""
return _predict(self.to_local(), X, proba=True, **kwargs)
return _predict(self.to_local(), X, pred_proba=True, **kwargs)
predict_proba.__doc__ = LGBMClassifier.predict_proba.__doc__

def to_local(self):
Expand Down
78 changes: 78 additions & 0 deletions tests/python_package_test/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
if not sys.platform.startswith("linux"):
pytest.skip("lightgbm.dask is currently supported in Linux environments", allow_module_level=True)

import dask
import dask.array as da
import dask.dataframe as dd
import numpy as np
Expand Down Expand Up @@ -97,6 +98,57 @@ def test_classifier(output, centers, client, listen_port):
assert_eq(p1_proba, p2_proba, atol=0.3)


@pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('centers', data_centers)
def test_classifier_pred_contrib(output, centers, client, listen_port):
X, y, w, dX, dy, dw = _create_data('classification', output=output, centers=centers)

dask_classifier = dlgbm.DaskLGBMClassifier(
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved
time_out=5,
local_listen_port=listen_port,
tree_learner='data'
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add n_estimators=10 and num_leaves=10?
#3786.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh good idea

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 6428589

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I just added this again (was lost because of a bad merge conflict resolution, sorry)

dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw, client=client)
preds_with_contrib = dask_classifier.predict(dX, pred_contrib=True).compute()

local_classifier = lightgbm.LGBMClassifier()
local_classifier.fit(X, y, sample_weight=w)
local_preds_with_contrib = local_classifier.predict(X, pred_contrib=True)

if output == 'scipy_csr_matrix':
preds_with_contrib = np.array(preds_with_contrib.todense())

# shape depends on whether it is binary or multiclass classification
num_classes = len(centers)
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved
if num_classes == 2:
expected_num_cols = dX.shape[1] + 1
else:
expected_num_cols = (dX.shape[1] + 1) * num_classes

if isinstance(dX, dask.dataframe.core.DataFrame):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong, but according to docs and sources, we can use it without core part to not depend on inner implementation.
https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame
https://github.com/dask/dask/blob/72304a94c98ace592f01df91e3d9e89febda307c/dask/dataframe/__init__.py#L3

Suggested change
if isinstance(dX, dask.dataframe.core.DataFrame):
if isinstance(dX, dask.dataframe.DataFrame):

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooo that's a good idea, let me try that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 6428589 and it worked ok

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok added this back again (now as dd.DataFrame)

assert preds_with_contrib.shape == (dX.shape[0].compute(), expected_num_cols)
else:
assert preds_with_contrib.shape == (dX.shape[0], expected_num_cols)

assert preds_with_contrib.shape == local_preds_with_contrib.shape

# * shape depends on whether it is binary or multiclass classification
# * matrix for binary classification is of the form [feature_contrib, base_value],
# for multi-class it's [feat_contrib_class1, base_value_class1, feat_contrib_class2, base_value_class2, etc.]
# * contrib outputs for distributed training are different than from local training, so we can just test
# that the output has the right shape and base values are in the right position
num_features = dX.shape[1]
num_classes = len(centers)
if num_classes == 2:
assert preds_with_contrib.shape[1] == num_features + 1
assert len(np.unique(preds_with_contrib[:, num_features]) == 1)
else:
assert preds_with_contrib.shape[1] == (num_features + 1) * num_classes
for i in range(num_classes):
base_value_col = num_features * (i + 1) + i
assert len(np.unique(preds_with_contrib[:, base_value_col]) == 1)


def test_training_does_not_fail_on_port_conflicts(client):
_, _, _, dX, dy, dw = _create_data('classification', output='array')

Expand Down Expand Up @@ -170,6 +222,32 @@ def test_regressor(output, client, listen_port):
assert_eq(y, p2, rtol=1., atol=50.)


@pytest.mark.parametrize('output', data_output)
def test_regressor_pred_contrib(output, client, listen_port):
X, y, w, dX, dy, dw = _create_data('regression', output=output)

dask_regressor = dlgbm.DaskLGBMRegressor(
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved
time_out=5,
local_listen_port=listen_port,
tree_learner='data'
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add n_estimators=10 and num_leaves=10?
#3786.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in 6428589

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added back again

dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw, client=client)
preds_with_contrib = dask_regressor.predict(dX, pred_contrib=True).compute()

local_regressor = lightgbm.LGBMRegressor()
local_regressor.fit(X, y, sample_weight=w)
local_preds_with_contrib = local_regressor.predict(X, pred_contrib=True)

if output == "scipy_csr_matrix":
preds_with_contrib = np.array(preds_with_contrib.todense())

# contrib outputs for distributed training are different than from local training, so we can just test
# that the output has the right shape and base values are in the right position
num_features = dX.shape[1]
assert preds_with_contrib.shape[1] == num_features + 1
assert preds_with_contrib.shape == local_preds_with_contrib.shape


@pytest.mark.parametrize('output', data_output)
@pytest.mark.parametrize('alpha', [.1, .5, .9])
def test_regressor_quantile(output, client, listen_port, alpha):
Expand Down