Skip to content

Commit

Permalink
Thread safe, inplace prediction. (#5389)
Browse files Browse the repository at this point in the history
Normal prediction with DMatrix is now thread safe with locks.  Added inplace prediction is lock free thread safe.

When data is on device (cupy, cudf), the returned data is also on device.

* Implementation for numpy, csr, cudf and cupy.

* Implementation for dask.

* Remove sync in simple dmatrix.
  • Loading branch information
trivialfis authored Mar 30, 2020
1 parent 7f980e9 commit 6601a64
Show file tree
Hide file tree
Showing 25 changed files with 1,214 additions and 164 deletions.
17 changes: 17 additions & 0 deletions include/xgboost/gbm.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#define XGBOOST_GBM_H_

#include <dmlc/registry.h>
#include <dmlc/any.h>
#include <xgboost/base.h>
#include <xgboost/data.h>
#include <xgboost/host_device_vector.h>
Expand Down Expand Up @@ -92,6 +93,22 @@ class GradientBooster : public Model, public Configurable {
PredictionCacheEntry* out_preds,
bool training,
unsigned ntree_limit = 0) = 0;

/*!
* \brief Inplace prediction.
*
* \param x A type erased data adapter.
* \param missing Missing value in the data.
* \param [in,out] out_preds The output preds.
* \param layer_begin (Optional) Begining of boosted tree layer used for prediction.
* \param layer_end (Optional) End of booster layer. 0 means do not limit trees.
*/
virtual void InplacePredict(dmlc::any const &x, float missing,
PredictionCacheEntry *out_preds,
uint32_t layer_begin = 0,
uint32_t layer_end = 0) const {
LOG(FATAL) << "Inplace predict is not supported by current booster.";
}
/*!
* \brief online prediction function, predict score for one instance at a time
* NOTE: use the batch prediction interface if possible, batch prediction is usually
Expand Down
16 changes: 16 additions & 0 deletions include/xgboost/learner.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#ifndef XGBOOST_LEARNER_H_
#define XGBOOST_LEARNER_H_

#include <dmlc/any.h>
#include <rabit/rabit.h>
#include <xgboost/base.h>
#include <xgboost/feature_map.h>
Expand Down Expand Up @@ -120,6 +121,21 @@ class Learner : public Model, public Configurable, public rabit::Serializable {
bool approx_contribs = false,
bool pred_interactions = false) = 0;

/*!
* \brief Inplace prediction.
*
* \param x A type erased data adapter.
* \param type Prediction type.
* \param missing Missing value in the data.
* \param [in,out] out_preds Pointer to output prediction vector.
* \param layer_begin (Optional) Begining of boosted tree layer used for prediction.
* \param layer_end (Optional) End of booster layer. 0 means do not limit trees.
*/
virtual void InplacePredict(dmlc::any const& x, std::string const& type,
float missing,
HostDeviceVector<bst_float> **out_preds,
uint32_t layer_begin = 0, uint32_t layer_end = 0) = 0;

void LoadModel(Json const& in) override = 0;
void SaveModel(Json* out) const override = 0;

Expand Down
14 changes: 14 additions & 0 deletions include/xgboost/predictor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <unordered_map>
#include <utility>
#include <vector>
#include <mutex>

// Forward declarations
namespace xgboost {
Expand Down Expand Up @@ -54,6 +55,7 @@ struct PredictionCacheEntry {
class PredictionContainer {
std::unordered_map<DMatrix *, PredictionCacheEntry> container_;
void ClearExpiredEntries();
std::mutex cache_lock_;

public:
PredictionContainer() = default;
Expand Down Expand Up @@ -133,6 +135,18 @@ class Predictor {
const gbm::GBTreeModel& model, int tree_begin,
uint32_t const ntree_limit = 0) = 0;

/**
* \brief Inplace prediction.
* \param x Type erased data adapter.
* \param model The model to predict from.
* \param missing Missing value in the data.
* \param [in,out] out_preds The output preds.
* \param tree_begin (Optional) Begining of boosted trees used for prediction.
* \param tree_end (Optional) End of booster trees. 0 means do not limit trees.
*/
virtual void InplacePredict(dmlc::any const &x, const gbm::GBTreeModel &model,
float missing, PredictionCacheEntry *out_preds,
uint32_t tree_begin = 0, uint32_t tree_end = 0) const = 0;
/**
* \brief online prediction function, predict score for one instance at a time
* NOTE: use the batch prediction interface if possible, batch prediction is
Expand Down
2 changes: 1 addition & 1 deletion python-package/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def __init__(self, name):
super().__init__(name=name, sources=[])


class BuildExt(build_ext.build_ext):
class BuildExt(build_ext.build_ext): # pylint: disable=too-many-ancestors
'''Custom build_ext command using CMake.'''

logger = logging.getLogger('XGBoost build_ext')
Expand Down
169 changes: 164 additions & 5 deletions python-package/xgboost/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,19 @@ def ctypes2numpy(cptr, length, dtype):
return res


def ctypes2cupy(cptr, length, dtype):
"""Convert a ctypes pointer array to a cupy array."""
import cupy # pylint: disable=import-error
mem = cupy.zeros(length.value, dtype=dtype, order='C')
addr = ctypes.cast(cptr, ctypes.c_void_p).value
# pylint: disable=c-extension-no-member,no-member
cupy.cuda.runtime.memcpy(
mem.__cuda_array_interface__['data'][0], addr,
length.value * ctypes.sizeof(ctypes.c_float),
cupy.cuda.runtime.memcpyDeviceToDevice)
return mem


def ctypes2buffer(cptr, length):
"""Convert ctypes pointer to buffer type."""
if not isinstance(cptr, ctypes.POINTER(ctypes.c_char)):
Expand Down Expand Up @@ -474,6 +487,7 @@ def __init__(self, data, label=None, weight=None, base_margin=None,
data, feature_names, feature_types = _convert_dataframes(
data, feature_names, feature_types
)
missing = np.nan if missing is None else missing

if isinstance(data, (STRING_TYPES, os_PathLike)):
handle = ctypes.c_void_p()
Expand Down Expand Up @@ -1428,12 +1442,17 @@ def predict(self,
training=False):
"""Predict with data.
.. note:: This function is not thread safe.
.. note:: This function is not thread safe except for ``gbtree``
booster.
For ``gbtree`` booster, the thread safety is guaranteed by locks.
For lock free prediction use ``inplace_predict`` instead. Also, the
safety does not hold when used in conjunction with other methods.
For each booster object, predict can only be called from one thread.
If you want to run prediction using multiple thread, call
``bst.copy()`` to make copies of model object and then call
``predict()``.
When using booster other than ``gbtree``, predict can only be called
from one thread. If you want to run prediction using multiple
thread, call ``bst.copy()`` to make copies of model object and then
call ``predict()``.
Parameters
----------
Expand Down Expand Up @@ -1547,6 +1566,146 @@ def predict(self,
preds = preds.reshape(nrow, chunk_size)
return preds

def inplace_predict(self, data, iteration_range=(0, 0),
predict_type='value', missing=np.nan):
'''Run prediction in-place, Unlike ``predict`` method, inplace prediction does
not cache the prediction result.
Calling only ``inplace_predict`` in multiple threads is safe and lock
free. But the safety does not hold when used in conjunction with other
methods. E.g. you can't train the booster in one thread and perform
prediction in the other.
.. code-block:: python
booster.set_param({'predictor': 'gpu_predictor'})
booster.inplace_predict(cupy_array)
booster.set_param({'predictor': 'cpu_predictor})
booster.inplace_predict(numpy_array)
Parameters
----------
data : numpy.ndarray/scipy.sparse.csr_matrix/cupy.ndarray/
cudf.DataFrame/pd.DataFrame
The input data, must not be a view for numpy array. Set
``predictor`` to ``gpu_predictor`` for running prediction on CuPy
array or CuDF DataFrame.
iteration_range : tuple
Specifies which layer of trees are used in prediction. For
example, if a random forest is trained with 100 rounds. Specifying
`iteration_range=(10, 20)`, then only the forests built during [10,
20) (open set) rounds are used in this prediction.
predict_type : str
* `value` Output model prediction values.
* `margin` Output the raw untransformed margin value.
missing : float
Value in the input data which needs to be present as a missing
value.
Returns
-------
prediction : numpy.ndarray/cupy.ndarray
The prediction result. When input data is on GPU, prediction
result is stored in a cupy array.
'''

def reshape_output(predt, rows):
'''Reshape for multi-output prediction.'''
if predt.size != rows and predt.size % rows == 0:
cols = int(predt.size / rows)
predt = predt.reshape(rows, cols)
return predt
return predt

length = c_bst_ulong()
preds = ctypes.POINTER(ctypes.c_float)()
iteration_range = (ctypes.c_uint(iteration_range[0]),
ctypes.c_uint(iteration_range[1]))

# once caching is supported, we can pass id(data) as cache id.
if isinstance(data, DataFrame):
data = data.values
if isinstance(data, np.ndarray):
assert data.flags.c_contiguous
arr = np.array(data.reshape(data.size), copy=False,
dtype=np.float32)
_check_call(_LIB.XGBoosterPredictFromDense(
self.handle,
arr.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
c_bst_ulong(data.shape[0]),
c_bst_ulong(data.shape[1]),
ctypes.c_float(missing),
iteration_range[0],
iteration_range[1],
c_str(predict_type),
c_bst_ulong(0),
ctypes.byref(length),
ctypes.byref(preds)
))
preds = ctypes2numpy(preds, length.value, np.float32)
rows = data.shape[0]
return reshape_output(preds, rows)
if isinstance(data, scipy.sparse.csr_matrix):
csr = data
_check_call(_LIB.XGBoosterPredictFromCSR(
self.handle,
c_array(ctypes.c_size_t, csr.indptr),
c_array(ctypes.c_uint, csr.indices),
c_array(ctypes.c_float, csr.data),
ctypes.c_size_t(len(csr.indptr)),
ctypes.c_size_t(len(csr.data)),
ctypes.c_size_t(csr.shape[1]),
ctypes.c_float(missing),
iteration_range[0],
iteration_range[1],
c_str(predict_type),
c_bst_ulong(0),
ctypes.byref(length),
ctypes.byref(preds)))
preds = ctypes2numpy(preds, length.value, np.float32)
rows = data.shape[0]
return reshape_output(preds, rows)
if lazy_isinstance(data, 'cupy.core.core', 'ndarray'):
assert data.flags.c_contiguous
interface = data.__cuda_array_interface__
if 'mask' in interface:
interface['mask'] = interface['mask'].__cuda_array_interface__
interface_str = bytes(json.dumps(interface, indent=2), 'utf-8')
_check_call(_LIB.XGBoosterPredictFromArrayInterface(
self.handle,
interface_str,
ctypes.c_float(missing),
iteration_range[0],
iteration_range[1],
c_str(predict_type),
c_bst_ulong(0),
ctypes.byref(length),
ctypes.byref(preds)))
mem = ctypes2cupy(preds, length, np.float32)
rows = data.shape[0]
return reshape_output(mem, rows)
if lazy_isinstance(data, 'cudf.core.dataframe', 'DataFrame'):
interfaces_str = _cudf_array_interfaces(data)
_check_call(_LIB.XGBoosterPredictFromArrayInterfaceColumns(
self.handle,
interfaces_str,
ctypes.c_float(missing),
iteration_range[0],
iteration_range[1],
c_str(predict_type),
c_bst_ulong(0),
ctypes.byref(length),
ctypes.byref(preds)))
mem = ctypes2cupy(preds, length, np.float32)
rows = data.shape[0]
predt = reshape_output(mem, rows)
return predt

raise TypeError('Data type:' + str(type(data)) +
' not supported by inplace prediction.')

def save_model(self, fname):
"""Save the model to a file.
Expand Down
Loading

0 comments on commit 6601a64

Please sign in to comment.