Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Low performance with dask #5785

Closed
MajorCarrot opened this issue Jun 12, 2020 · 7 comments
Closed

Low performance with dask #5785

MajorCarrot opened this issue Jun 12, 2020 · 7 comments

Comments

@MajorCarrot
Copy link

Hi,

I am trying out integration of our xgboost based model with dask here and am using the following function for the training:

def regression(self, df_in, target_series, model_params):
    """ Fit a model to predict target_series with df_in features/columns
        and retain the features importances in the dependency matrix.

        :param df_in: input dataframe representing the context, predictors.
        :param target_series: pandas series of the target variable. Share
        the same indexes as the df_in dataframe.
    """
    assert isinstance(df_in, dd.DataFrame)

    # Split df_in and target to train and test dataset
    df_in_train, df_in_test, target_train, target_test = train_test_split(
        df_in,
        target_series,
        test_size=0.2,
        random_state=self.xcorr_params['random_state'],
        shuffle=True)

    dtrain = xgb.dask.DaskDMatrix(client=self.client,
                                    data=df_in_train,
                                    label=target_train)

    # Create and train a XGBoost booster
    bst = xgb.dask.train(client=self.client,
                            params=model_params,
                            dtrain=dtrain,
                            evals=[(dtrain, 'train')])

    # Make predictions
    dtest = xgb.dask.DaskDMatrix(client=self.client, data=df_in_test)
    target_series_predict = xgb.dask.predict(client=self.client,
                                                model=bst,
                                                data=dtest)

    target_test = target_test.astype('float32').to_dask_array(
        lengths=True).rechunk('auto')
    target_series_predict = target_series_predict.rechunk('auto')

    rmse = np.sqrt(mean_squared_error(target_test, target_series_predict))

    log_metric(target_series.name, rmse)

    LOGGER.info('Making predictions for : %s', target_series.name)
    LOGGER.info('Root Mean Square Error : %s', str(rmse))

    new_row = {}

    score = bst['booster'].get_score(importance_type="gain")
    all_features = [score.get(f, 0.) for f in bst['booster'].feature_names]
    all_features = np.array(all_features, dtype=np.float32)
    feature_importances_ = all_features / all_features.sum()

    for column, feat_imp in zip(df_in.columns, feature_importances_):
        new_row[column] = [feat_imp]

    new_row[target_series.name] = [0.0]

    # Sorting new_row to avoid concatenation warnings
    new_row = dict(sorted(new_row.items()))

    # Concatenating new information about feature importances
    if self._importances_map is not None:
        self._importances_map = pd.concat([
            self._importances_map,
            pd.DataFrame(index=[target_series.name], data=new_row)
        ])

    return bst['booster']

I am not getting high CPU usage (only 30-40% on each core) and the training time is longer than the implementation without Dask!

Am I doing something wrong or is there a bug which can be fixed?

Also, the logs are populated with messages like so:

task [xgboost.dask]:tcp://127.0.0.1:37731 connected to the tracker
task [xgboost.dask]:tcp://127.0.0.1:42431 connected to the tracker
task [xgboost.dask]:tcp://127.0.0.1:40345 connected to the tracker
task [xgboost.dask]:tcp://127.0.0.1:37515 connected to the tracker
task [xgboost.dask]:tcp://127.0.0.1:37731 got new rank 0
task [xgboost.dask]:tcp://127.0.0.1:42431 got new rank 1
task [xgboost.dask]:tcp://127.0.0.1:40345 got new rank 2
task [xgboost.dask]:tcp://127.0.0.1:37515 got new rank 3

Is there a way to turn this off?

xgboost version: 1.1.0

@trivialfis
Copy link
Member

We should document the scaling in practice. Also we should have logging options for rabit.

@SmirnovEgorRu
Copy link
Contributor

@MajorCarrot, we added 2 PRs over 1.1 version, available in master only for now which improve performance of distributed mode with 'hist' tree method:

Could you, please, check the latest code form master? I suppose you should see some improvement.

P.S. XGB 1.1.1 is released, it solves an issue with slower CPU version in PIP package (#5720).

@MajorCarrot
Copy link
Author

Thanks @SmirnovEgorRu I will try it out and give you an update

@MajorCarrot
Copy link
Author

System Details:

  • CPU: Core i7 4600U (Mobile)
  • OS: Ubuntu 20.04.1 LTS

XGBoost:

  • v1.1.0: Time: 478.46 s (tree_method: "auto")
  • v1.1.1: Time: 451.42 s (tree_method: "auto")
  • v1.2.0 (master): Time: 505.92 s (tree_method: "hist") // I will try this again, as well as with tree_method: "auto" to confirm

@SmirnovEgorRu
Copy link
Contributor

@MajorCarrot, it would be nice to try hist method explicitly, because 'auto' doesn't use this.

@MajorCarrot
Copy link
Author

Sorry for the extremely late reply. I have benchmarked with tree_method="hist" all the versions (with and w/o dask) and these are the results

version with dask w/o dask
1.2.0 319 s 85 s
1.1.1 305 s 85 s
1.1.0 309 s 85 s

1.2.0 is the nightly from aws as of 9th July 2020

System Details:

  • CPU: Core i7 4600U (Mobile)
  • OS: Ubuntu 20.04.1 LTS
  • RAM: 8 GB available (1 GB used by system during the benchmark)

@trivialfis
Copy link
Member

On a single node, using dask is slower than using the local interface due to the overhead of TCP socket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants