From 2997cf72d1e2ae234d9cfb891c6ad289ec41af22 Mon Sep 17 00:00:00 2001 From: Ziyue Xu Date: Mon, 1 Apr 2024 10:51:58 -0400 Subject: [PATCH] remove processor from another PR --- src/processing/plugins/dummy_processor.cc | 134 ---------------------- src/processing/plugins/dummy_processor.h | 44 ------- src/processing/processor.h | 111 ------------------ src/processing/processor_loader.cc | 62 ---------- src/tree/hist/histogram.h | 4 +- 5 files changed, 3 insertions(+), 352 deletions(-) delete mode 100644 src/processing/plugins/dummy_processor.cc delete mode 100644 src/processing/plugins/dummy_processor.h delete mode 100644 src/processing/processor.h delete mode 100644 src/processing/processor_loader.cc diff --git a/src/processing/plugins/dummy_processor.cc b/src/processing/plugins/dummy_processor.cc deleted file mode 100644 index 1699ca4515ca..000000000000 --- a/src/processing/plugins/dummy_processor.cc +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Copyright 2014-2024 by XGBoost Contributors - */ -#include "./dummy_processor.h" - -using std::vector; -using std::cout; -using std::endl; - -const char kSignature[] = "NVDADAM1"; // DAM (Direct Accessible Marshalling) V1 -const int kPrefixLen = 24; - -bool ValidDam(std::int8_t *buffer) { - return memcmp(buffer, kSignature, strlen(kSignature)) == 0; -} - -xgboost::common::Span DummyProcessor::ProcessGHPairs(vector &pairs) { - cout << "ProcessGHPairs called with pairs size: " << pairs.size() << endl; - - auto buf_size = kPrefixLen + pairs.size()*10*8; // Assume encrypted size is 10x - - // This memory needs to be freed - char *buf = static_cast(calloc(buf_size, 1)); - memcpy(buf, kSignature, strlen(kSignature)); - memcpy(buf + 8, &buf_size, 8); - memcpy(buf + 16, &xgboost::processing::kDataTypeGHPairs, 8); - - // Simulate encryption by duplicating value 10 times - int index = kPrefixLen; - for (auto value : pairs) { - for (int i = 0; i < 10; i++) { - memcpy(buf+index, &value, 8); - index += 8; - } - } - - // Save pairs for future operations - this->gh_pairs_ = new vector(pairs); - - return xgboost::common::Span(reinterpret_cast(buf), buf_size); -} - -xgboost::common::Span DummyProcessor::HandleGHPairs(xgboost::common::Span buffer) { - cout << "HandleGHPairs called with buffer size: " << buffer.size() << " Active: " << active_ << endl; - - if (!ValidDam(buffer.data())) { - cout << "Invalid buffer received" << endl; - return buffer; - } - - // For dummy, this call is used to set gh_pairs for passive sites - if (!active_) { - int8_t *ptr = buffer.data() + kPrefixLen; - double *pairs = reinterpret_cast(ptr); - size_t num = (buffer.size() - kPrefixLen) / 8; - gh_pairs_ = new vector(); - for (int i = 0; i < num; i += 10) { - gh_pairs_->push_back(pairs[i]); - } - cout << "GH Pairs saved. Size: " << gh_pairs_->size() << endl; - } - - return buffer; -} - -xgboost::common::Span DummyProcessor::ProcessAggregation( - std::vector const &nodes_to_build, xgboost::common::RowSetCollection const &row_set) { - auto total_bin_size = gidx_->Cuts().Values().size(); - auto histo_size = total_bin_size*2; - auto buf_size = kPrefixLen + 8*histo_size*nodes_to_build.size(); - cout << "ProcessAggregation called with bin size: " << total_bin_size << " Buffer Size: " << buf_size << endl; - std::int8_t *buf = static_cast(calloc(buf_size, 1)); - memcpy(buf, kSignature, strlen(kSignature)); - memcpy(buf + 8, &buf_size, 8); - memcpy(buf + 16, &xgboost::processing::kDataTypeHisto, 8); - - double *histo = reinterpret_cast(buf + kPrefixLen); - for (auto &node_id : nodes_to_build) { - auto elem = row_set[node_id]; - for (auto it = elem.begin; it != elem.end; ++it) { - auto row_id = *it; - for (std::size_t f = 0; f < gidx_->Cuts().Ptrs().size()-1; f++) { - auto slot = gidx_->GetGindex(row_id, f); - if (slot < 0) { - continue; - } - - if (slot >= total_bin_size) { - cout << "Slot too big, ignored: " << slot << endl; - continue; - } - - if (row_id >= gh_pairs_->size()/2) { - cout << "Row ID too big: " << row_id << endl; - } - - auto g = (*gh_pairs_)[row_id*2]; - auto h = (*gh_pairs_)[row_id*2+1]; - histo[slot*2] += g; - histo[slot*2+1] += h; - } - } - histo += histo_size; - } - - return xgboost::common::Span(reinterpret_cast(buf), buf_size); -} - -std::vector DummyProcessor::HandleAggregation(xgboost::common::Span buffer) { - cout << "HandleAggregation called with buffer size: " << buffer.size() << endl; - std::vector result = std::vector(); - - int8_t* ptr = buffer.data(); - auto rest_size = buffer.size(); - - while (rest_size > kPrefixLen) { - if (!ValidDam(ptr)) { - cout << "Invalid buffer at offset " << buffer.size() - rest_size << endl; - continue; - } - std::int64_t *size_ptr = reinterpret_cast(ptr + 8); - double *array_start = reinterpret_cast(ptr + kPrefixLen); - auto array_size = (*size_ptr - kPrefixLen)/8; - cout << "Histo size for buffer: " << array_size << endl; - result.insert(result.end(), array_start, array_start + array_size); - cout << "Result size: " << result.size() << endl; - rest_size -= *size_ptr; - ptr = ptr + *size_ptr; - } - - cout << "Total histo size: " << result.size() << endl; - - return result; -} diff --git a/src/processing/plugins/dummy_processor.h b/src/processing/plugins/dummy_processor.h deleted file mode 100644 index 9511cf7f56f6..000000000000 --- a/src/processing/plugins/dummy_processor.h +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Copyright 2014-2024 by XGBoost Contributors - */ -#pragma once -#include -#include -#include -#include "../processor.h" - -class DummyProcessor: public xgboost::processing::Processor { - private: - bool active_ = false; - const std::map *params_; - std::vector *gh_pairs_{nullptr}; - const xgboost::GHistIndexMatrix *gidx_; - - public: - void Initialize(bool active, std::map params) override { - this->active_ = active; - this->params_ = ¶ms; - } - - void Shutdown() override { - this->gh_pairs_ = nullptr; - this->gidx_ = nullptr; - } - - void FreeBuffer(xgboost::common::Span buffer) override { - free(buffer.data()); - } - - xgboost::common::Span ProcessGHPairs(std::vector &pairs) override; - - xgboost::common::Span HandleGHPairs(xgboost::common::Span buffer) override; - - void InitAggregationContext(xgboost::GHistIndexMatrix const &gidx) override { - this->gidx_ = &gidx; - } - - xgboost::common::Span ProcessAggregation(std::vector const &nodes_to_build, - xgboost::common::RowSetCollection const &row_set) override; - - std::vector HandleAggregation(xgboost::common::Span buffer) override; -}; diff --git a/src/processing/processor.h b/src/processing/processor.h deleted file mode 100644 index 952acfbe60b6..000000000000 --- a/src/processing/processor.h +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Copyright 2014-2024 by XGBoost Contributors - */ -#pragma once - -#include -#include -#include -#include -#include -#include "../data/gradient_index.h" - -namespace xgboost::processing { - -const char kLibraryPath[] = "LIBRARY_PATH"; -const char kDummyProcessor[] = "dummy"; -const char kLoadFunc[] = "LoadProcessor"; - -// Data type definition -const int kDataTypeGHPairs = 1; -const int kDataTypeHisto = 2; - -/*! \brief An processor interface to handle tasks that require external library through plugins */ -class Processor { - public: - /*! - * \brief Initialize the processor - * - * \param active If true, this is the active node - * \param params Optional parameters - */ - virtual void Initialize(bool active, std::map params) = 0; - - /*! - * \brief Shutdown the processor and free all the resources - * - */ - virtual void Shutdown() = 0; - - /*! - * \brief Free buffer - * - * \param buffer Any buffer returned by the calls from the plugin - */ - virtual void FreeBuffer(common::Span buffer) = 0; - - /*! - * \brief Preparing g & h pairs to be sent to other clients by active client - * - * \param pairs g&h pairs in a vector (g1, h1, g2, h2 ...) for every sample - * - * \return The encoded buffer to be sent - */ - virtual common::Span ProcessGHPairs(std::vector& pairs) = 0; - - /*! - * \brief Handle buffers with encoded pairs received from broadcast - * - * \param The encoded buffer - * - * \return The encoded buffer - */ - virtual common::Span HandleGHPairs(common::Span buffer) = 0; - - /*! - * \brief Initialize aggregation context by providing global GHistIndexMatrix - * - * \param gidx The matrix for every sample with its feature and slot assignment - */ - virtual void InitAggregationContext(GHistIndexMatrix const &gidx) = 0; - - /*! - * \brief Prepare row set for aggregation - * - * \param row_set Information for node IDs and its sample IDs - * - * \return The encoded buffer to be sent via AllGather - */ - virtual common::Span ProcessAggregation(std::vector const &nodes_to_build, - common::RowSetCollection const &row_set) = 0; - - /*! - * \brief Handle all gather result - * - * \param buffer Buffer from all gather, only buffer from active site is needed - * - * \return A flattened vector of histograms for each site, each node in the form of - * site1_node1, site1_node2 site1_node3, site2_node1, site2_node2, site2_node3 - */ - virtual std::vector HandleAggregation(xgboost::common::Span buffer) = 0; -}; - -class ProcessorLoader { - private: - std::map params; - void *handle = NULL; - - - public: - ProcessorLoader(): params{} {} - - ProcessorLoader(std::map& params): params(params) {} - - Processor* load(const std::string& plugin_name); - - void unload(); -}; - -} // namespace xgboost::processing - -extern xgboost::processing::Processor *processor_instance; \ No newline at end of file diff --git a/src/processing/processor_loader.cc b/src/processing/processor_loader.cc deleted file mode 100644 index 47a31f482d46..000000000000 --- a/src/processing/processor_loader.cc +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Copyright 2014-2024 by XGBoost Contributors - */ -#include -#include - -#include "./processor.h" -#include "plugins/dummy_processor.h" - -namespace xgboost::processing { - using LoadFunc = Processor *(const char *); - - Processor* ProcessorLoader::load(const std::string& plugin_name) { - // Dummy processor for unit testing without loading a shared library - if (plugin_name == kDummyProcessor) { - return new DummyProcessor(); - } - - auto lib_name = "libproc_" + plugin_name; - - auto extension = -#if defined(__APPLE__) || defined(__MACH__) - ".dylib"; -#else - ".so"; -#endif - auto lib_file_name = lib_name + extension; - - std::string lib_path; - - if (params.find(kLibraryPath) == params.end()) { - lib_path = lib_file_name; - } else { - auto p = params[kLibraryPath]; - if (p.back() != '/') { - p += '/'; - } - lib_path = p + lib_file_name; - } - - handle = dlopen(lib_path.c_str(), RTLD_LAZY); - if (!handle) { - std::cerr << "Failed to load the dynamic library: " << dlerror() << std::endl; - return NULL; - } - - void* func_ptr = dlsym(handle, kLoadFunc); - - if (!func_ptr) { - std::cerr << "Failed to find loader function: " << dlerror() << std::endl; - return NULL; - } - - auto func = reinterpret_cast(func_ptr); - - return (*func)(plugin_name.c_str()); - } - - void ProcessorLoader::unload() { - dlclose(handle); - } -} // namespace xgboost::processing diff --git a/src/tree/hist/histogram.h b/src/tree/hist/histogram.h index 9b024e56c220..32d5d8d5a3a7 100644 --- a/src/tree/hist/histogram.h +++ b/src/tree/hist/histogram.h @@ -53,6 +53,7 @@ class HistogramBuilder { bool is_col_split_{false}; bool is_secure_{false}; xgboost::common::Span hist_data; + public: /** * @brief Reset the builder, should be called before growing a new tree. @@ -213,7 +214,8 @@ class HistogramBuilder { std::size_t n = n_total_bins * nodes_to_build.size() * 2; // Perform AllGather - auto hist_vec = std::vector(hist_data.data(), hist_data.data() + hist_data.size()); + auto hist_vec = std::vector(hist_data.data(), + hist_data.data() + hist_data.size()); auto hist_entries = collective::Allgather(hist_vec); // Call interface here to post-process the messages auto hist_span = common::Span(hist_entries.data(), hist_entries.size());