From 9ff293595a26dba1cdf09e85afb6521b10614197 Mon Sep 17 00:00:00 2001 From: Zhihong Zhang Date: Thu, 25 Apr 2024 13:20:18 -0400 Subject: [PATCH] Added support for horizontal secure XGBoost --- .../{dummy_processor.cc => mock_processor.cc} | 73 ++++++++- .../{dummy_processor.h => mock_processor.h} | 7 +- src/processing/processor.h | 28 +++- src/processing/processor_loader.cc | 18 +-- tests/cpp/processing/test_processor.cc | 146 +++++++++++------- 5 files changed, 192 insertions(+), 80 deletions(-) rename src/processing/plugins/{dummy_processor.cc => mock_processor.cc} (55%) rename src/processing/plugins/{dummy_processor.h => mock_processor.h} (84%) diff --git a/src/processing/plugins/dummy_processor.cc b/src/processing/plugins/mock_processor.cc similarity index 55% rename from src/processing/plugins/dummy_processor.cc rename to src/processing/plugins/mock_processor.cc index f04d70a3338d..2943c2aea284 100644 --- a/src/processing/plugins/dummy_processor.cc +++ b/src/processing/plugins/mock_processor.cc @@ -4,7 +4,7 @@ #include #include #include -#include "./dummy_processor.h" +#include "./mock_processor.h" const char kSignature[] = "NVDADAM1"; // DAM (Direct Accessible Marshalling) V1 const int64_t kPrefixLen = 24; @@ -13,7 +13,7 @@ bool ValidDam(void *buffer, std::size_t size) { return size >= kPrefixLen && memcmp(buffer, kSignature, strlen(kSignature)) == 0; } -void* DummyProcessor::ProcessGHPairs(std::size_t *size, const std::vector& pairs) { +void* MockProcessor::ProcessGHPairs(std::size_t *size, const std::vector& pairs) { *size = kPrefixLen + pairs.size()*10*8; // Assume encrypted size is 10x int64_t buf_size = *size; @@ -39,13 +39,13 @@ void* DummyProcessor::ProcessGHPairs(std::size_t *size, const std::vector(buffer); ptr += kPrefixLen; @@ -60,7 +60,7 @@ void* DummyProcessor::HandleGHPairs(std::size_t *size, void *buffer, std::size_t return buffer; } -void *DummyProcessor::ProcessAggregation(std::size_t *size, std::map> nodes) { +void *MockProcessor::ProcessAggregation(std::size_t *size, std::map> nodes) { int total_bin_size = cuts_.back(); int histo_size = total_bin_size*2; *size = kPrefixLen + 8*histo_size*nodes.size(); @@ -93,7 +93,7 @@ void *DummyProcessor::ProcessAggregation(std::size_t *size, std::map DummyProcessor::HandleAggregation(void *buffer, std::size_t buf_size) { +std::vector MockProcessor::HandleAggregation(void *buffer, std::size_t buf_size) { std::vector result = std::vector(); int8_t* ptr = static_cast(buffer); @@ -101,7 +101,7 @@ std::vector DummyProcessor::HandleAggregation(void *buffer, std::size_t while (rest_size > kPrefixLen) { if (!ValidDam(ptr, rest_size)) { - continue; + break; } int64_t *size_ptr = reinterpret_cast(ptr + 8); double *array_start = reinterpret_cast(ptr + kPrefixLen); @@ -113,3 +113,62 @@ std::vector DummyProcessor::HandleAggregation(void *buffer, std::size_t return result; } + +void* MockProcessor::ProcessHistograms(std::size_t *size, const std::vector& histograms) { + *size = kPrefixLen + histograms.size()*10*8; // Assume encrypted size is 10x + + int64_t buf_size = *size; + // This memory needs to be freed + char *buf = static_cast(malloc(buf_size)); + memcpy(buf, kSignature, strlen(kSignature)); + memcpy(buf + 8, &buf_size, 8); + memcpy(buf + 16, &kDataTypeAggregatedHisto, 8); + + // Simulate encryption by duplicating value 10 times + int index = kPrefixLen; + for (auto value : histograms) { + for (std::size_t i = 0; i < 10; i++) { + memcpy(buf+index, &value, 8); + index += 8; + } + } + + return buf; +} + +std::vector MockProcessor::HandleHistograms(void *buffer, std::size_t buf_size) { + std::vector result = std::vector(); + + int8_t* ptr = static_cast(buffer); + auto rest_size = buf_size; + + while (rest_size > kPrefixLen) { + if (!ValidDam(ptr, rest_size)) { + break; + } + int64_t *size_ptr = reinterpret_cast(ptr + 8); + double *array_start = reinterpret_cast(ptr + kPrefixLen); + auto array_size = (*size_ptr - kPrefixLen)/8; + auto empty = result.empty(); + if (!empty) { + if (result.size() != array_size / 10) { + std::cout << "Histogram size doesn't match " << result.size() << " != " << array_size << std::endl; + return result; + } + } + + for (std::size_t i = 0; i < array_size/10; i++) { + auto value = array_start[i*10]; + if (empty) { + result.push_back(value); + } else { + result[i] += value; + } + } + + rest_size -= *size_ptr; + ptr = ptr + *size_ptr; + } + + return result; +} diff --git a/src/processing/plugins/dummy_processor.h b/src/processing/plugins/mock_processor.h similarity index 84% rename from src/processing/plugins/dummy_processor.h rename to src/processing/plugins/mock_processor.h index c1690e61e20d..4045280edf83 100644 --- a/src/processing/plugins/dummy_processor.h +++ b/src/processing/plugins/mock_processor.h @@ -10,8 +10,9 @@ // Data type definition const int64_t kDataTypeGHPairs = 1; const int64_t kDataTypeHisto = 2; +const int64_t kDataTypeAggregatedHisto = 3; -class DummyProcessor: public processing::Processor { +class MockProcessor: public processing::Processor { private: bool active_ = false; const std::map *params_{nullptr}; @@ -50,4 +51,8 @@ class DummyProcessor: public processing::Processor { void *ProcessAggregation(size_t *size, std::map> nodes) override; std::vector HandleAggregation(void *buffer, size_t buf_size) override; + + void *ProcessHistograms(size_t *size, const std::vector& histograms) override; + + std::vector HandleHistograms(void *buffer, size_t buf_size) override; }; diff --git a/src/processing/processor.h b/src/processing/processor.h index 14cce65b087e..a6994ef9f5b7 100644 --- a/src/processing/processor.h +++ b/src/processing/processor.h @@ -12,7 +12,7 @@ namespace processing { const char kLibraryPath[] = "LIBRARY_PATH"; -const char kDummyProcessor[] = "dummy"; +const char kMockProcessor[] = "mock"; const char kLoadFunc[] = "LoadProcessor"; /*! \brief An processor interface to handle tasks that require external library through plugins */ @@ -76,7 +76,7 @@ class Processor { * \param size The output buffer size * \param nodes Map of node and the rows belong to this node * - * \return The encoded buffer to be sent via AllGather + * \return The encoded buffer to be sent via AllGatherV */ virtual void *ProcessAggregation(size_t *size, std::map> nodes) = 0; @@ -90,16 +90,32 @@ class Processor { * site1_node1, site1_node2 site1_node3, site2_node1, site2_node2, site2_node3 */ virtual std::vector HandleAggregation(void *buffer, size_t buf_size) = 0; + + /*! + * \brief Prepare histograms for further processing + * + * \param size The output buffer size + * \param histograms Flattened array of histograms for all features + * + * \return The encoded buffer to be sent via AllGatherV + */ + virtual void *ProcessHistograms(size_t *size, const std::vector& histograms) = 0; + + /*! + * \brief Handle processed histograms + * + * \param buffer Buffer from allgatherV + * \param buf_size The size of the buffer + * + * \return A flattened vector of histograms for all features + */ + virtual std::vector HandleHistograms(void *buffer, size_t buf_size) = 0; }; class ProcessorLoader { private: std::map params; -#if defined(_WIN32) - HMODULE handle_ = NULL; -#else void *handle_ = NULL; -#endif public: ProcessorLoader(): params{} {} diff --git a/src/processing/processor_loader.cc b/src/processing/processor_loader.cc index 6bb0480cb2b7..00bb49d03cbe 100644 --- a/src/processing/processor_loader.cc +++ b/src/processing/processor_loader.cc @@ -4,28 +4,28 @@ #include -#if defined(_WIN32) +#if defined(_WIN32) || defined(_WIN64) #include #else #include #endif #include "./processor.h" -#include "plugins/dummy_processor.h" +#include "plugins/mock_processor.h" namespace processing { using LoadFunc = Processor *(const char *); Processor* ProcessorLoader::load(const std::string& plugin_name) { // Dummy processor for unit testing without loading a shared library - if (plugin_name == kDummyProcessor) { - return new DummyProcessor(); + if (plugin_name == kMockProcessor) { + return new MockProcessor(); } auto lib_name = "libproc_" + plugin_name; auto extension = -#if defined(_WIN32) +#if defined(_WIN32) || defined(_WIN64) ".dll"; #elif defined(__APPLE__) || defined(__MACH__) ".dylib"; @@ -46,19 +46,18 @@ namespace processing { lib_path = p + lib_file_name; } -#if defined(_WIN32) - HMODULE handle_ = LoadLibrary(lib_path.c_str()); +#if defined(_WIN32) || defined(_WIN64) + handle_ = reinterpret_cast(LoadLibrary(lib_path.c_str())); if (!handle_) { std::cerr << "Failed to load the dynamic library" << std::endl; return NULL; } - void* func_ptr = GetProcAddress(handle_, kLoadFunc); + void* func_ptr = reinterpret_cast(GetProcAddress((HMODULE)handle_, kLoadFunc)); if (!func_ptr) { std::cerr << "Failed to find loader function." << std::endl; return NULL; } - #else handle_ = dlopen(lib_path.c_str(), RTLD_LAZY); if (!handle_) { @@ -70,7 +69,6 @@ namespace processing { std::cerr << "Failed to find loader function: " << dlerror() << std::endl; return NULL; } - #endif auto func = reinterpret_cast(func_ptr); diff --git a/tests/cpp/processing/test_processor.cc b/tests/cpp/processing/test_processor.cc index 0cefcc65bd9f..58c575821cdc 100644 --- a/tests/cpp/processing/test_processor.cc +++ b/tests/cpp/processing/test_processor.cc @@ -5,75 +5,109 @@ #include "../../../src/processing/processor.h" +const double kError = 1E-10; + class ProcessorTest : public testing::Test { public: - void SetUp() override { - auto loader = processing::ProcessorLoader(); - processor_ = loader.load("dummy"); - processor_->Initialize(true, {}); - } + void SetUp() override { + auto loader = processing::ProcessorLoader(); + processor_ = loader.load(processing::kMockProcessor); + processor_->Initialize(true, {}); + } - void TearDown() override { - processor_->Shutdown(); - processor_ = nullptr; - } + void TearDown() override { + processor_->Shutdown(); + processor_ = nullptr; + } protected: - processing::Processor *processor_ = nullptr; - - // Test data, 4 Rows, 2 Features - std::vector gh_pairs_ = {1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1}; // 4 Rows, 8 GH Pairs - std::vector cuts_ = {0, 4, 10}; // 2 features, one has 4 bins, another 6 - std::vector slots_ = { - 0, 4, - 1, 9, - 3, 7, - 0, 4 - }; - - std::vector node0_ = {0, 2}; - std::vector node1_ = {1, 3}; - - std::map> nodes_ = {{0, node0_}, {1, node1_}}; + processing::Processor *processor_ = nullptr; + + // Test data, 4 Rows, 2 Features + std::vector gh_pairs_ = {1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1}; // 4 Rows, 8 GH Pairs + std::vector cuts_ = {0, 4, 10}; // 2 features, one has 4 bins, another 6 + std::vector slots_ = { + 0, 4, + 1, 9, + 3, 7, + 0, 4 + }; + + std::vector node0_ = {0, 2}; + std::vector node1_ = {1, 3}; + + std::map> nodes_ = {{0, node0_}, + {1, node1_}}; + + std::vector histo1_ = {1.0, 2.0, 3.0, 4.0}; + std::vector histo2_ = {5.0, 6.0, 7.0, 8.0}; }; TEST_F(ProcessorTest, TestLoading) { - auto base_class = dynamic_cast(processor_); - ASSERT_NE(base_class, nullptr); + auto base_class = dynamic_cast(processor_); + ASSERT_NE(base_class, nullptr); } TEST_F(ProcessorTest, TestGHEncoding) { - size_t buf_size; - auto buffer = processor_->ProcessGHPairs(&buf_size, gh_pairs_); - size_t expected_size = 24; // DAM header size - expected_size += gh_pairs_.size()*10*8; // Dummy plugin duplicate each number 10x to simulate encryption - ASSERT_EQ(buf_size, expected_size); - - size_t new_size; - auto new_buffer = processor_->HandleGHPairs(&new_size, buffer, buf_size); - // Dummy plugin doesn't change buffer - ASSERT_EQ(new_size, buf_size); - ASSERT_EQ(0, memcmp(buffer, new_buffer, buf_size)); + size_t buf_size; + auto buffer = processor_->ProcessGHPairs(&buf_size, gh_pairs_); + size_t expected_size = 24; // DAM header size + expected_size += gh_pairs_.size() * 10 * 8; // Dummy plugin duplicate each number 10x to simulate encryption + ASSERT_EQ(buf_size, expected_size); + + size_t new_size; + auto new_buffer = processor_->HandleGHPairs(&new_size, buffer, buf_size); + // Dummy plugin doesn't change buffer + ASSERT_EQ(new_size, buf_size); + ASSERT_EQ(0, memcmp(buffer, new_buffer, buf_size)); } TEST_F(ProcessorTest, TestAggregation) { - size_t buf_size; - processor_->ProcessGHPairs(&buf_size, gh_pairs_); // Pass the GH pairs to the plugin - - processor_->InitAggregationContext(cuts_, slots_); - auto buffer = processor_->ProcessAggregation(&buf_size, nodes_); - auto histos = processor_->HandleAggregation(buffer, buf_size); - std::vector expected_histos = { - 1.1, 2.1, 0, 0, 0, 0, 5.1, 6.1, 1.1, 2.1, - 0, 0, 0, 0, 5.1, 6.1, 0, 0, 0, 0, - 7.1, 8.1, 3.1, 4.1, 0, 0, 0, 0, 7.1, 8.1, - 0, 0, 0, 0, 0, 0, 0, 0, 3.1, 4.1 - }; - - ASSERT_EQ(expected_histos.size(), histos.size()) << "Histograms have different sizes"; - - for (size_t i = 0; i < histos.size(); ++i) { - EXPECT_EQ(expected_histos[i], histos[i]) << "Histogram differs at index " << i; - } + size_t buf_size; + processor_->ProcessGHPairs(&buf_size, gh_pairs_); // Pass the GH pairs to the plugin + + processor_->InitAggregationContext(cuts_, slots_); + auto buffer = processor_->ProcessAggregation(&buf_size, nodes_); + auto histos = processor_->HandleAggregation(buffer, buf_size); + double expected_result[] = { + 1.1, 2.1, 0, 0, 0, 0, 5.1, 6.1, 1.1, 2.1, + 0, 0, 0, 0, 5.1, 6.1, 0, 0, 0, 0, + 7.1, 8.1, 3.1, 4.1, 0, 0, 0, 0, 7.1, 8.1, + 0, 0, 0, 0, 0, 0, 0, 0, 3.1, 4.1 + }; + + auto expected_size = sizeof(expected_result)/sizeof(expected_result[0]); + + ASSERT_EQ(expected_size, histos.size()) << "Histograms have different sizes"; + + for (size_t i = 0; i < histos.size(); ++i) { + EXPECT_NEAR(expected_result[i], histos[i], kError) << "Histogram differs at index " << i; + } +} + +TEST_F(ProcessorTest, TestHistogramSum) { + + size_t buf1_size, buf2_size; + + auto buf1 = processor_->ProcessHistograms(&buf1_size, histo1_); + auto buf2 = processor_->ProcessHistograms(&buf2_size, histo2_); + + // Simulate allgatherV + auto buf_size = buf1_size + buf2_size; + auto buf = malloc(buf_size); + memcpy(buf, buf1, buf1_size); + memcpy(static_cast(buf) + buf1_size, buf2, buf2_size); + + auto result = processor_->HandleHistograms(buf, buf_size); + + double expected_result[] = {6.0, 8.0, 10.0, 12.0}; + auto expected_size = sizeof(expected_result)/sizeof(expected_result[0]); + ASSERT_EQ(expected_size, result.size()) << "Histograms have different sizes"; + + for (size_t i = 0; i < result.size(); ++i) { + EXPECT_NEAR(expected_result[i], result[i], kError) << "Histogram differs at index " << i; + } + + free(buf); }