Skip to content
This repository has been archived by the owner on Jul 16, 2021. It is now read-only.

Commit

Permalink
support sample weights
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomas Laube committed Oct 12, 2018
1 parent 4661c8a commit 9f81e58
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 15 deletions.
40 changes: 30 additions & 10 deletions dask_xgboost/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,21 @@ def train_part(env, param, list_of_parts, dmatrix_kwargs=None, **kwargs):
-------
model if rank zero, None otherwise
"""
data, labels = zip(*list_of_parts) # Prepare data
# Prepare data
if len(list_of_parts[0]) == 3:
data, labels, weight = zip(*list_of_parts)
weight = concat(weight)
else:
data, labels = zip(*list_of_parts)
weight = None

data = concat(data) # Concatenate many parts into one
labels = concat(labels)
if dmatrix_kwargs is None:
dmatrix_kwargs = {}

dmatrix_kwargs["feature_names"] = getattr(data, 'columns', None)
dtrain = xgb.DMatrix(data, labels, **dmatrix_kwargs)
dtrain = xgb.DMatrix(data, labels, weight=weight, **dmatrix_kwargs)

args = [('%s=%s' % item).encode() for item in env.items()]
xgb.rabit.init(args)
Expand All @@ -99,7 +106,7 @@ def train_part(env, param, list_of_parts, dmatrix_kwargs=None, **kwargs):


@gen.coroutine
def _train(client, params, data, labels, dmatrix_kwargs={}, **kwargs):
def _train(client, params, data, labels, sample_weight, dmatrix_kwargs={}, **kwargs):
"""
Asynchronous version of train
Expand All @@ -117,8 +124,18 @@ def _train(client, params, data, labels, dmatrix_kwargs={}, **kwargs):
assert label_parts.ndim == 1 or label_parts.shape[1] == 1
label_parts = label_parts.flatten().tolist()

# Arrange parts into pairs. This enforces co-locality
parts = list(map(delayed, zip(data_parts, label_parts)))
if sample_weight is not None:
sample_weight_parts = sample_weight.to_delayed()
if isinstance(sample_weight_parts, np.ndarray):
assert sample_weight_parts.ndim == 1 or sample_weight_parts.shape[1] == 1
sample_weight_parts = sample_weight_parts.flatten().tolist()

# Arrange parts into pairs. This enforces co-locality
parts = list(map(delayed, zip(data_parts, label_parts, sample_weight_parts)))
else:
# Arrange parts into pairs. This enforces co-locality
parts = list(map(delayed, zip(data_parts, label_parts)))

parts = client.compute(parts) # Start computation in the background
yield wait(parts)

Expand Down Expand Up @@ -158,7 +175,7 @@ def _train(client, params, data, labels, dmatrix_kwargs={}, **kwargs):
raise gen.Return(result)


def train(client, params, data, labels, dmatrix_kwargs={}, **kwargs):
def train(client, params, data, labels, sample_weight=None, dmatrix_kwargs={}, **kwargs):
""" Train an XGBoost model on a Dask Cluster
This starts XGBoost on all Dask workers, moves input data to those workers,
Expand Down Expand Up @@ -188,7 +205,7 @@ def train(client, params, data, labels, dmatrix_kwargs={}, **kwargs):
predict
"""
return client.sync(_train, client, params, data,
labels, dmatrix_kwargs, **kwargs)
labels, sample_weight, dmatrix_kwargs, **kwargs)


def _predict_part(part, model=None):
Expand Down Expand Up @@ -258,7 +275,7 @@ def predict(client, model, data):

class XGBRegressor(xgb.XGBRegressor):

def fit(self, X, y=None):
def fit(self, X, y=None, sample_weight=None):
"""Fit the gradient boosting model
Parameters
Expand All @@ -279,6 +296,7 @@ def fit(self, X, y=None):
client = default_client()
xgb_options = self.get_xgb_params()
self._Booster = train(client, xgb_options, X, y,
sample_weight,
num_boost_round=self.n_estimators)
return self

Expand All @@ -289,7 +307,7 @@ def predict(self, X):

class XGBClassifier(xgb.XGBClassifier):

def fit(self, X, y=None, classes=None):
def fit(self, X, y=None, classes=None, sample_weight=None):
"""Fit a gradient boosting classifier
Parameters
Expand All @@ -301,6 +319,8 @@ def fit(self, X, y=None, classes=None):
classes : sequence, optional
The unique values in `y`. If no specified, this will be
eagerly computed from `y` before training.
sample_weight : array-line [n_samples]
Weights for each traning sample
Returns
-------
Expand Down Expand Up @@ -345,9 +365,9 @@ def fit(self, X, y=None, classes=None):

# TODO: auto label-encode y
# that will require a dependency on dask-ml
# TODO: sample weight

self._Booster = train(client, xgb_options, X, y,
sample_weight,
num_boost_round=self.n_estimators)
return self

Expand Down
17 changes: 12 additions & 5 deletions dask_xgboost/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import dask_xgboost as dxgb


# Workaround for conflict with distributed 1.23.0
# https://github.com/dask/dask-xgboost/pull/27#issuecomment-417474734
from concurrent.futures import ThreadPoolExecutor
Expand All @@ -32,6 +33,7 @@

X = df.values
y = labels.values
weight = np.random.rand(10)


def test_classifier(loop): # noqa
Expand All @@ -40,11 +42,12 @@ def test_classifier(loop): # noqa
a = dxgb.XGBClassifier()
X2 = da.from_array(X, 5)
y2 = da.from_array(y, 5)
a.fit(X2, y2)
weight1 = da.from_array(weight, 5)
a.fit(X2, y2, sample_weight=weight1)
p1 = a.predict(X2)

b = xgb.XGBClassifier()
b.fit(X, y)
b.fit(X, y, sample_weight=weight)
np.testing.assert_array_almost_equal(a.feature_importances_,
b.feature_importances_)
assert_eq(p1, b.predict(X))
Expand Down Expand Up @@ -125,11 +128,15 @@ def test_regressor(loop): # noqa
a = dxgb.XGBRegressor()
X2 = da.from_array(X, 5)
y2 = da.from_array(y, 5)
a.fit(X2, y2)
weight1 = da.from_array(weight, 5)
a.fit(X2, y2, sample_weight=weight1)
p1 = a.predict(X2)

b = xgb.XGBRegressor()
b.fit(X, y)
b.fit(X, y, sample_weight=weight)

np.testing.assert_array_almost_equal(a.feature_importances_,
b.feature_importances_)
assert_eq(p1, b.predict(X))


Expand Down Expand Up @@ -163,7 +170,7 @@ def test_dmatrix_kwargs(c, s, a, b):
xgb.rabit.init() # workaround for "Doing rabit call after Finalize"
dX = da.from_array(X, chunks=(2, 2))
dy = da.from_array(y, chunks=(2,))
dbst = yield dxgb.train(c, param, dX, dy, {"missing": 0.0})
dbst = yield dxgb.train(c, param, dX, dy, dmatrix_kwargs={"missing": 0.0})

# Distributed model matches local model with dmatrix kwargs
dtrain = xgb.DMatrix(X, label=y, missing=0.0)
Expand Down

0 comments on commit 9f81e58

Please sign in to comment.