Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dask] Support Dask dataframes with 'category' columns (fixes #3861) #3908

Merged
merged 13 commits into from
Feb 6, 2021
8 changes: 4 additions & 4 deletions python-package/lightgbm/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,27 +350,27 @@ def _predict_part(
pred_contrib: bool,
**kwargs: Any
) -> _DaskPart:
data = part.values if isinstance(part, pd_DataFrame) else part

if data.shape[0] == 0:
if part.shape[0] == 0:
result = np.array([])
elif pred_proba:
result = model.predict_proba(
data,
part,
raw_score=raw_score,
pred_leaf=pred_leaf,
pred_contrib=pred_contrib,
**kwargs
)
else:
result = model.predict(
data,
part,
raw_score=raw_score,
pred_leaf=pred_leaf,
pred_contrib=pred_contrib,
**kwargs
)

# dask.DataFrame.map_partitions() expects each call to return a pandas DataFrame or Series
if isinstance(part, pd_DataFrame):
if pred_proba or pred_contrib:
result = pd_DataFrame(result, index=part.index)
Expand Down
176 changes: 163 additions & 13 deletions tests/python_package_test/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
# see https://distributed.dask.org/en/latest/api.html#distributed.Client.close
CLIENT_CLOSE_TIMEOUT = 120

data_output = ['array', 'scipy_csr_matrix', 'dataframe']
data_output = ['array', 'scipy_csr_matrix', 'dataframe', 'dataframe-with-categorical']
data_centers = [[[-4, -4], [4, 4]], [[-4, -4], [4, 4], [-4, 4]]]
group_sizes = [5, 5, 5, 10, 10, 10, 20, 20, 20, 50, 50]

Expand All @@ -60,9 +60,18 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs)
w = rnd.rand(X.shape[0]) * 0.01
g_rle = np.array([len(list(grp)) for _, grp in groupby(g)])

if output == 'dataframe':
if output.startswith('dataframe'):
# add target, weight, and group to DataFrame so that partitions abide by group boundaries.
X_df = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(X.shape[1])])
if output == 'dataframe-with-categorical':
for i in range(5):
col_name = "cat_col" + str(i)
cat_values = rnd.choice(['a', 'b'], X.shape[0])
cat_series = pd.Series(
cat_values,
dtype='category'
)
X_df[col_name] = cat_series
X = X_df.copy()
X_df = X_df.assign(y=y, g=g, w=w)

Expand Down Expand Up @@ -115,8 +124,27 @@ def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size
dX = da.from_array(X, (chunk_size, X.shape[1]))
dy = da.from_array(y, chunk_size)
dw = da.from_array(weights, chunk_size)
elif output == 'dataframe':
elif output.startswith('dataframe'):
X_df = pd.DataFrame(X, columns=['feature_%d' % i for i in range(X.shape[1])])
if output == 'dataframe-with-categorical':
num_cat_cols = 5
for i in range(num_cat_cols):
col_name = "cat_col" + str(i)
cat_values = rnd.choice(['a', 'b'], X.shape[0])
cat_series = pd.Series(
cat_values,
dtype='category'
)
X_df[col_name] = cat_series
X = np.hstack((X, cat_series.cat.codes.values.reshape(-1, 1)))

# for the small data sizes used in tests, it's hard to get LGBMRegressor to choose
# categorical features for splits. So for regression tests with categorical features,
# _create_data() returns a DataFrame with ONLY categorical features
if objective == 'regression':
cat_cols = [col for col in X_df.columns if col.startswith('cat_col')]
X_df = X_df[cat_cols]
X = X[:, -num_cat_cols:]
y_df = pd.Series(y, name='target')
dX = dd.from_pandas(X_df, chunksize=chunk_size)
dy = dd.from_pandas(y_df, chunksize=chunk_size)
Expand Down Expand Up @@ -180,6 +208,12 @@ def test_classifier(output, centers, client, listen_port):
"n_estimators": 10,
"num_leaves": 10
}

if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]

dask_classifier = lgb.DaskLGBMClassifier(
client=client,
time_out=5,
Expand Down Expand Up @@ -207,6 +241,18 @@ def test_classifier(output, centers, client, listen_port):
assert_eq(p1_local, p2)
assert_eq(y, p1_local)

# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_classifier.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='

client.close(timeout=CLIENT_CLOSE_TIMEOUT)


Expand All @@ -223,6 +269,12 @@ def test_classifier_pred_contrib(output, centers, client, listen_port):
"n_estimators": 10,
"num_leaves": 10
}

if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]

dask_classifier = lgb.DaskLGBMClassifier(
client=client,
time_out=5,
Expand All @@ -240,6 +292,18 @@ def test_classifier_pred_contrib(output, centers, client, listen_port):
if output == 'scipy_csr_matrix':
preds_with_contrib = np.array(preds_with_contrib.todense())

# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_classifier.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='

# shape depends on whether it is binary or multiclass classification
num_features = dask_classifier.n_features_
num_classes = dask_classifier.n_classes_
Expand Down Expand Up @@ -301,6 +365,12 @@ def test_regressor(output, client, listen_port):
"random_state": 42,
"num_leaves": 10
}

if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]

dask_regressor = lgb.DaskLGBMRegressor(
client=client,
time_out=5,
Expand All @@ -310,7 +380,7 @@ def test_regressor(output, client, listen_port):
)
dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw)
p1 = dask_regressor.predict(dX)
if output != 'dataframe':
if not output.startswith('dataframe'):
s1 = _r2_score(dy, p1)
p1 = p1.compute()
p1_local = dask_regressor.to_local().predict(X)
Expand All @@ -322,15 +392,33 @@ def test_regressor(output, client, listen_port):
p2 = local_regressor.predict(X)

# Scores should be the same
if output != 'dataframe':
if not output.startswith('dataframe'):
assert_eq(s1, s2, atol=.01)
assert_eq(s1, s1_local, atol=.003)

# Predictions should be roughly the same
assert_eq(y, p1, rtol=1., atol=100.)
assert_eq(y, p2, rtol=1., atol=50.)
# Predictions should be roughly the same.
assert_eq(p1, p1_local)

# The checks below are skipped
# for the categorical data case because it's difficult to get
# a good fit from just categoricals for a regression problem
# with small data
if output != 'dataframe-with-categorical':
assert_eq(y, p1, rtol=1., atol=100.)
assert_eq(y, p2, rtol=1., atol=50.)

# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_regressor.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='

client.close(timeout=CLIENT_CLOSE_TIMEOUT)


Expand All @@ -345,6 +433,12 @@ def test_regressor_pred_contrib(output, client, listen_port):
"n_estimators": 10,
"num_leaves": 10
}

if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]

dask_regressor = lgb.DaskLGBMRegressor(
client=client,
time_out=5,
Expand All @@ -368,6 +462,18 @@ def test_regressor_pred_contrib(output, client, listen_port):
assert preds_with_contrib.shape[1] == num_features + 1
assert preds_with_contrib.shape == local_preds_with_contrib.shape

# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_regressor.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='

client.close(timeout=CLIENT_CLOSE_TIMEOUT)


Expand All @@ -386,6 +492,12 @@ def test_regressor_quantile(output, client, listen_port, alpha):
"n_estimators": 10,
"num_leaves": 10
}

if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]

dask_regressor = lgb.DaskLGBMRegressor(
client=client,
local_listen_port=listen_port,
Expand All @@ -405,17 +517,37 @@ def test_regressor_quantile(output, client, listen_port, alpha):
np.testing.assert_allclose(q1, alpha, atol=0.2)
np.testing.assert_allclose(q2, alpha, atol=0.2)

# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_regressor.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='

client.close(timeout=CLIENT_CLOSE_TIMEOUT)


@pytest.mark.parametrize('output', ['array', 'dataframe'])
@pytest.mark.parametrize('output', ['array', 'dataframe', 'dataframe-with-categorical'])
@pytest.mark.parametrize('group', [None, group_sizes])
def test_ranker(output, client, listen_port, group):

X, y, w, g, dX, dy, dw, dg = _create_ranking_data(
output=output,
group=group
)
if output == 'dataframe-with-categorical':
X, y, w, g, dX, dy, dw, dg = _create_ranking_data(
output=output,
group=group,
n_features=1,
n_informative=1
)
else:
X, y, w, g, dX, dy, dw, dg = _create_ranking_data(
output=output,
group=group,
)

# rebalance small dask.array dataset for better performance.
if output == 'array':
Expand All @@ -434,6 +566,12 @@ def test_ranker(output, client, listen_port, group):
"num_leaves": 20,
"min_child_samples": 1
}

if output == 'dataframe-with-categorical':
params["categorical_feature"] = [
i for i, col in enumerate(dX.columns) if col.startswith('cat_')
]

dask_ranker = lgb.DaskLGBMRanker(
client=client,
time_out=5,
Expand All @@ -457,6 +595,18 @@ def test_ranker(output, client, listen_port, group):
assert spearmanr(rnkvec_dask, rnkvec_local).correlation > 0.8
assert_eq(rnkvec_dask, rnkvec_dask_local)

# be sure LightGBM actually used at least one categorical column,
# and that it was correctly treated as a categorical feature
if output == 'dataframe-with-categorical':
cat_cols = [
col for col in dX.columns
if dX.dtypes[col].name == 'category'
]
tree_df = dask_ranker.booster_.trees_to_dataframe()
node_uses_cat_col = tree_df['split_feature'].isin(cat_cols)
assert node_uses_cat_col.sum() > 0
assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '=='

client.close(timeout=CLIENT_CLOSE_TIMEOUT)


Expand Down