diff --git a/examples/linear_regression.cpp b/examples/linear_regression.cpp index 9cf7203..d3b0d43 100644 --- a/examples/linear_regression.cpp +++ b/examples/linear_regression.cpp @@ -33,11 +33,21 @@ // info: (a.k.a number of epoch) // How many time the entire training data will be went through // +// is_sparse +// type: string +// info: whether the data is dense or sparse +// +// format +// type: string +// info: the data format of input file: libsvm/tsv +// // Configuration example: -// train:/datasets/regression/MSD/train -// test:/datasets/regression/MSD/test -// n_iter:10 -// alpha:0.1 +// train=hdfs:///datasets/regression/MSD/train +// test=hdfs:///datasets/regression/MSD/test +// is_sparse=false +// format=tsv +// n_iter=10 +// alpha=0.1 #include #include @@ -48,34 +58,32 @@ #include "lib/ml/scaler.hpp" #include "lib/ml/sgd.hpp" -using husky::lib::ml::SparseFeatureLabel; using husky::lib::ml::ParameterBucket; void report(std::string msg) { if (husky::Context::get_global_tid() == 0) husky::base::log_msg(msg); } -void linear_regression() { - auto & train_set = husky::ObjListStore::create_objlist("train_set"); - auto & test_set = husky::ObjListStore::create_objlist("test_set"); + +template +void linear_regression(double alpha, int num_iter, husky::lib::ml::DataFormat format) { + typedef husky::lib::ml::LabeledPointHObj LabeledPointHObj; + auto & train_set = husky::ObjListStore::create_objlist("train_set"); + auto & test_set = husky::ObjListStore::create_objlist("test_set"); // load data - husky::lib::ml::DataLoader data_loader(husky::lib::ml::kTSVFormat); - data_loader.load_info(husky::Context::get_param("train"), train_set); - data_loader.load_info(husky::Context::get_param("test"), test_set); - int num_features = data_loader.get_num_feature(); + int num_features = husky::lib::ml::load_data(husky::Context::get_param("train"), train_set, format); + husky::lib::ml::load_data(husky::Context::get_param("test"), test_set, format, num_features); // scale values to [-1, 1] - // TODO(Tatiana): inconsistent scaling for train and test set may be problematic - husky::lib::ml::LinearScaler<> scaler_train(num_features); - husky::lib::ml::LinearScaler<> scaler_test(num_features); + // TODO(Tatiana): applying the same scaling results in large error? + husky::lib::ml::LinearScaler scaler_train(num_features); + husky::lib::ml::LinearScaler scaler_test(num_features); scaler_train.fit_transform(train_set); scaler_test.fit_transform(test_set); - double alpha = std::stod(husky::Context::get_param("alpha")); - int num_iter = std::stoi(husky::Context::get_param("n_iter")); - // initialize linear regression model - husky::lib::ml::LinearRegression> lr(num_features); + husky::lib::ml::LinearRegression> lr(num_features); + lr.report_per_round = true; // report training error per round - lr.train>>(train_set, num_iter, alpha); + lr.template train(train_set, num_iter, alpha); report("The error on training set = " + std::to_string(lr.avg_error(train_set))); report("The score on training set = " + std::to_string(lr.score(train_set))); @@ -84,16 +92,29 @@ void linear_regression() { report("The score on testing set = " + std::to_string(lr.score(test_set))); } +void initialize() { + double alpha = std::stod(husky::Context::get_param("alpha")); + int num_iter = std::stoi(husky::Context::get_param("n_iter")); + auto format = husky::Context::get_param("format"); + husky::lib::ml::DataFormat data_format; + if (format == "libsvm") { + data_format = husky::lib::ml::kLIBSVMFormat; + } else if (format == "tsv") { + data_format = husky::lib::ml::kTSVFormat; + } + if (husky::Context::get_param("is_sparse") == "true") { + linear_regression(alpha, num_iter, data_format); + } else { + linear_regression(alpha, num_iter, data_format); + } +} + int main(int argc, char** argv) { - std::vector args; - args.push_back("hdfs_namenode"); - args.push_back("hdfs_namenode_port"); - args.push_back("train"); - args.push_back("test"); - args.push_back("n_iter"); - args.push_back("alpha"); + std::vector args({ + "hdfs_namenode", "hdfs_namenode_port", "train", "test", "n_iter", "alpha", "format", "is_sparse" + }); if (husky::init_with_args(argc, argv, args)) { - husky::run_job(linear_regression); + husky::run_job(initialize); return 0; } return 1; diff --git a/examples/logistic_regression.cpp b/examples/logistic_regression.cpp index 4fbc4cd..d2a7aa4 100644 --- a/examples/logistic_regression.cpp +++ b/examples/logistic_regression.cpp @@ -28,16 +28,26 @@ // type: string // info: The path of testing data in hadoop // +// is_sparse +// type: string +// info: whether the data is dense or sparse +// +// format +// type: string +// info: the data format of input file: libsvm/tsv +// // n_iter // type: int // info: (a.k.a number of epoch) // How many time the entire training data will be went through // // Configuration example: -// train:/datasets/classification/a9 -// test:/datasets/classification/a9t -// n_iter:50 -// alpha:0.5 +// train=hdfs:///datasets/classification/a9 +// test=hdfs:///datasets/classification/a9t +// is_sparse=true +// format=libsvm +// n_iter=50 +// alpha=0.5 #include #include @@ -47,47 +57,72 @@ #include "lib/ml/fgd.hpp" #include "lib/ml/logistic_regression.hpp" -using husky::lib::ml::SparseFeatureLabel; using husky::lib::ml::ParameterBucket; + +template void logistic_regression() { - auto & train_set = husky::ObjListStore::create_objlist("train_set"); - auto & test_set = husky::ObjListStore::create_objlist("test_set"); + using LabeledPointHObj = husky::lib::ml::LabeledPointHObj; + auto & train_set = husky::ObjListStore::create_objlist("train_set"); + auto & test_set = husky::ObjListStore::create_objlist("test_set"); // load data - husky::lib::ml::DataLoader data_loader(husky::lib::ml::kLIBSVMFormat); - data_loader.load_info(husky::Context::get_param("train"), train_set); - data_loader.load_info(husky::Context::get_param("test"), test_set); - int num_features = data_loader.get_num_feature(); + auto format_str = husky::Context::get_param("format"); + husky::lib::ml::DataFormat format; + if (format_str == "libsvm") { + format = husky::lib::ml::kLIBSVMFormat; + } else if (format_str == "tsv") { + format = husky::lib::ml::kTSVFormat; + } + + int num_features = husky::lib::ml::load_data(husky::Context::get_param("train"), train_set, format); + husky::lib::ml::load_data(husky::Context::get_param("test"), test_set, format, num_features); + + // processing labels + husky::list_execute(train_set, [](auto& this_obj) { + if (this_obj.y < 0) this_obj.y = 0; + }); + husky::list_execute(test_set, [](auto& this_obj) { + if (this_obj.y < 0) this_obj.y = 0; + }); double alpha = std::stod(husky::Context::get_param("alpha")); int num_iter = std::stoi(husky::Context::get_param("n_iter")); // initialize logistic regression model - husky::lib::ml::LogisticRegression> lr(num_features); + husky::lib::ml::LogisticRegression> lr(num_features); lr.report_per_round = true; // report training error per round // train the model - lr.train>>(train_set, num_iter, alpha); + lr.template train(train_set, num_iter, alpha); // estimate generalization error - double test_error = lr.avg_error(test_set); + auto test_error = lr.avg_error(test_set); if (husky::Context::get_global_tid() == 0) { - // lr.present_param(); // validation husky::base::log_msg("Error on testing set: " + std::to_string(test_error)); } } +void init() { + if (husky::Context::get_param("is_sparse") == "true") { + logistic_regression(); + } else { + logistic_regression(); + } +} + int main(int argc, char** argv) { - std::vector args; - args.push_back("hdfs_namenode"); - args.push_back("hdfs_namenode_port"); - args.push_back("train"); - args.push_back("test"); - args.push_back("n_iter"); - args.push_back("alpha"); + std::vector args(8); + args[0] = "hdfs_namenode"; + args[1] = "hdfs_namenode_port"; + args[2] = "train"; + args[3] = "test"; + args[4] = "n_iter"; + args[5] = "alpha"; + args[6] = "format"; + args[7] = "is_sparse"; if (husky::init_with_args(argc, argv, args)) { - husky::run_job(logistic_regression); + husky::run_job(init); return 0; } return 1; diff --git a/examples/svm.cpp b/examples/svm.cpp index fb68e70..6930298 100644 --- a/examples/svm.cpp +++ b/examples/svm.cpp @@ -21,21 +21,31 @@ // // train // type: string -// info: the path of training data in hadoop +// info: the path of training data in hadoop, in LibSVM format // // test // type: string -// info: the path of testing data in hadoop +// info: the path of testing data in hadoop, in LibSVM format // // n_iter // type: int // info: number of epochs the entire training data will be went through // +// is_sparse +// type: string +// info: whether the data is dense or sparse +// +// format +// type: string +// info: the data format of input file: libsvm/tsv +// // configuration example: -// train:/datasets/classification/a9 -// test:/datasets/classification/a9t -// n_iter:50 -// lambda:0.01 +// train=hdfs:///datasets/classification/a9 +// test=hdfs:///datasets/classification/a9t +// is_sparse=true +// format=libsvm +// n_iter=50 +// lambda=0.01 #include #include @@ -49,41 +59,39 @@ #include "lib/ml/data_loader.hpp" #include "lib/ml/feature_label.hpp" #include "lib/ml/parameter.hpp" -#include "lib/ml/vector_linalg.hpp" using husky::lib::Aggregator; using husky::lib::AggregatorFactory; -namespace husky { -namespace lib { -namespace ml { - -typedef SparseFeatureLabel ObjT; - -// how to get label and feature from data object -double get_y_(ObjT& this_obj) { return this_obj.get_label(); } -std::vector> get_X_(ObjT& this_obj) { return this_obj.get_feature(); } - +template void svm() { - auto& train_set = husky::ObjListStore::create_objlist("train_set"); - auto& test_set = husky::ObjListStore::create_objlist("test_set"); + using ObjT = husky::lib::ml::LabeledPointHObj; + auto& train_set = husky::ObjListStore::create_objlist("train_set"); + auto& test_set = husky::ObjListStore::create_objlist("test_set"); + + auto format_str = husky::Context::get_param("format"); + husky::lib::ml::DataFormat format; + if (format_str == "libsvm") { + format = husky::lib::ml::kLIBSVMFormat; + } else if (format_str == "tsv") { + format = husky::lib::ml::kTSVFormat; + } // load data - DataLoader data_loader(kLIBSVMFormat); - data_loader.load_info(husky::Context::get_param("train"), train_set); - data_loader.load_info(husky::Context::get_param("test"), test_set); - int num_features = data_loader.get_num_feature(); + int num_features = husky::lib::ml::load_data(husky::Context::get_param("train"), train_set, format); + husky::lib::ml::load_data(husky::Context::get_param("test"), test_set, format); // get model config parameters double lambda = std::stod(husky::Context::get_param("lambda")); int num_iter = std::stoi(husky::Context::get_param("n_iter")); // initialize parameters - ParameterBucket param_list(num_features + 1); // scalar b and vector w + husky::lib::ml::ParameterBucket param_list(num_features + 1); // scalar b and vector w if (husky::Context::get_global_tid() == 0) { husky::base::log_msg("num of params: " + std::to_string(param_list.get_num_param())); } + // get the number of global records Aggregator num_samples_agg(0, [](int& a, const int& b) { a += b; }); num_samples_agg.update(train_set.get_size()); @@ -107,13 +115,13 @@ void svm() { double regulator = 0.0; // prevent overfitting // calculate w square - for (int idx = 1; idx <= num_features; idx++) { + for (int idx = 0; idx < num_features; idx++) { double w = param_list.param_at(idx); sqr_w += w * w; } // get local copy of parameters - std::vector bweight = param_list.get_all_param(); + auto bweight = param_list.get_all_param(); // calculate regulator regulator = (sqr_w == 0) ? 1.0 : std::min(1.0, 1.0 / sqrt(sqr_w * lambda)); @@ -126,7 +134,7 @@ void svm() { // regularize w in param_list if (husky::Context::get_global_tid() == 0) { - for (int idx = 1; idx < bweight.size(); idx++) { + for (int idx = 0; idx < num_features; idx++) { double w = bweight[idx]; param_list.update(idx, (w - w / regulator - eta * w)); } @@ -136,21 +144,22 @@ void svm() { // calculate gradient list_execute(train_set, {}, {&ac}, [&](ObjT& this_obj) { double prod = 0; // prod = WX * y - double y = get_y_(this_obj); - std::vector> X = get_X_(this_obj); - for (auto& x : X) - prod += bweight[x.first] * x.second; + double y = this_obj.y; + auto X = this_obj.x; + for (auto it = X.begin_feaval(); it != X.end_feaval(); ++it) + prod += bweight[(*it).fea] * (*it).val; // bias - prod += bweight[0]; + prod += bweight[num_features]; prod *= y; if (prod < 1) { // the data point falls within the margin - for (auto& x : X) { - x.second *= y; // calculate the gradient for each parameter - param_list.update(x.first, eta * x.second / num_samples / lambda); + for (auto it = X.begin_feaval(); it != X.end_feaval(); ++it) { + auto x = *it; + x.val *= y; // calculate the gradient for each parameter + param_list.update(x.fea, eta * x.val / num_samples / lambda); } // update bias - param_list.update(0, eta * y / num_samples); + param_list.update(num_features, eta * y / num_samples); loss_agg.update(1 - prod); } sqr_w_agg.update(sqr_w); @@ -172,8 +181,7 @@ void svm() { // Show result if (husky::Context::get_global_tid() == 0) { param_list.present(); - husky::base::log_msg( - "Time per iter: " + + husky::base::log_msg("Time per iter: " + std::to_string(std::chrono::duration_cast>(end - start).count() / num_iter)); } @@ -184,12 +192,12 @@ void svm() { auto bweight = param_list.get_all_param(); list_execute(test_set, {}, {&ac}, [&](ObjT& this_obj) { double indicator = 0; - double y = get_y_(this_obj); - std::vector> X = get_X_(this_obj); - for (auto& x : X) - indicator += bweight[x.first] * x.second; + auto y = this_obj.y; + auto X = this_obj.x; + for (auto it = X.begin_feaval(); it != X.end_feaval(); it++) + indicator += bweight[(*it).fea] * (*it).val; // bias - indicator += bweight[0]; + indicator += bweight[num_features]; indicator *= y; // right prediction if positive (Wx+b and y have the same sign) if (indicator < 0) error_agg.update(1); num_test_agg.update(1); @@ -201,20 +209,20 @@ void svm() { } } -} // namespace ml -} // namespace lib -} // namespace husky +void init() { + if (husky::Context::get_param("is_sparse") == "true") { + svm(); + } else { + svm(); + } +} int main(int argc, char** argv) { - std::vector args; - args.push_back("hdfs_namenode"); - args.push_back("hdfs_namenode_port"); - args.push_back("train"); - args.push_back("test"); - args.push_back("n_iter"); - args.push_back("lambda"); + std::vector args({ + "hdfs_namenode", "hdfs_namenode_port", "train", "test", "n_iter", "lambda", "format", "is_sparse" + }); if (husky::init_with_args(argc, argv, args)) { - husky::run_job(husky::lib::ml::svm); + husky::run_job(init); return 0; } return 1; diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 343ede4..2af172e 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -27,6 +27,4 @@ set(lib-objs $) # Visible to parent directory set(lib-objs ${lib-objs} PARENT_SCOPE) -add_subdirectory(ml) - add_library(husky-lib STATIC ${lib-objs} ${ml-objs}) diff --git a/lib/ml/CMakeLists.txt b/lib/ml/CMakeLists.txt deleted file mode 100644 index 59fa22a..0000000 --- a/lib/ml/CMakeLists.txt +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2016 Husky Team -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -include_directories(${PROJECT_SOURCE_DIR} ${Boost_INCLUDE_DIRS}) - -add_library(gradient-descent-objs OBJECT vector_linalg.cpp) -set_property(TARGET gradient-descent-objs PROPERTY CXX_STANDARD 14) - -# Combine lib-objs -set(ml-objs $) - -# Visible to parent directory -set(ml-objs ${ml-objs} PARENT_SCOPE) diff --git a/lib/ml/data_loader.hpp b/lib/ml/data_loader.hpp index 1aaa8da..ebef390 100644 --- a/lib/ml/data_loader.hpp +++ b/lib/ml/data_loader.hpp @@ -15,186 +15,172 @@ #pragma once #include -#include #include #include -#include -#include #include "boost/tokenizer.hpp" -#include "core/engine.hpp" -#include "io/input/line_inputformat.hpp" + +#include "core/executor.hpp" +#include "core/objlist.hpp" +#include "core/utils.hpp" +#include "io/input/inputformat_store.hpp" #include "lib/aggregator_factory.hpp" #include "lib/ml/feature_label.hpp" +#include "lib/vector.hpp" namespace husky { namespace lib { namespace ml { -typedef std::vector vec_double; -typedef std::vector> vec_sp; // indicate format -const int kLIBSVMFormat = 0; -const int kTSVFormat = 1; - -template -class DataLoader { - typedef ObjList ObjL; - - public: - DataLoader() {} - explicit DataLoader(int _format) { this->format_ = _format; } - explicit DataLoader(std::function _load_func) { load_func_ = _load_func; } - - void load_info(std::string url, ObjL& data) { - ASSERT_MSG(load_func_ != nullptr, "Load function is not specified."); - load_func_(url, data); +enum DataFormat { kLIBSVMFormat, kTSVFormat }; + +// load data without knowing the number of features +template +int load_data(std::string url, ObjList>& data, DataFormat format) { + using DataObj = LabeledPointHObj; + + auto& infmt = husky::io::InputFormatStore::create_line_inputformat(); + infmt.set_input(url); + + husky::lib::Aggregator num_features_agg(0, [](int& a, const int& b) { a = std::max(a, b); }); + auto& ac = AggregatorFactory::get_channel(); + + std::function parser; + if (format == kLIBSVMFormat) { + parser = [&](boost::string_ref chunk) { + if (chunk.empty()) + return; + boost::char_separator sep(" \t"); + boost::tokenizer> tok(chunk, sep); + + // get the largest index of features for this record + int sz = 0; + if (!is_sparse) { + auto last_colon = chunk.find_last_of(':'); + if (last_colon != -1) { + auto last_space = chunk.substr(0, last_colon).find_last_of(' '); + sz = std::stoi(chunk.substr(last_space + 1, last_colon).data()); + } + ASSERT_MSG(sz > 0, "The input file does not conform to LibSVM format."); + } + DataObj this_obj(sz); // create a data object + + bool is_y = true; + for (auto& w : tok) { + if (!is_y) { + boost::char_separator sep2(":"); + boost::tokenizer> tok2(w, sep2); + auto it = tok2.begin(); + int idx = std::stoi(*it++); + double val = std::stod(*it++); + num_features_agg.update(idx); + this_obj.x.set(idx - 1, val); + } else { + this_obj.y = std::stod(w); + is_y = false; + } + } + data.add_object(this_obj); + }; + } else if (format == kTSVFormat) { + parser = [&](boost::string_ref chunk) { + if (chunk.empty()) + return; + + boost::char_separator sep(" \t"); + boost::tokenizer> tok(chunk, sep); + boost::tokenizer> tok1(chunk, sep); + + // get the number of features + int i = 0, num_features = -1; + for (auto& w : tok) { + num_features++; + } + + DataObj this_obj(num_features); // create a data object + for (auto& w : tok1) { + if (i < num_features) { + this_obj.x.set(i++, std::stod(w)); + } else { + this_obj.y = std::stod(w); + } + } + data.add_object(this_obj); + num_features_agg.update(num_features); + }; + } else { + ASSERT_MSG(false, "Unsupported data format"); } - - inline int get_num_feature() const { return this->num_feature_; } - - protected: - int num_feature_ = -1; - int format_; - std::function load_func_ = nullptr; -}; - -template <> -DataLoader::DataLoader(int _format) - : DataLoader([this](std::string url, DataLoader::ObjL& data) { - - husky::io::LineInputFormat infmt; - infmt.set_input(url); - - husky::lib::Aggregator num_features_agg(0, [](int& a, const int& b) { a = b; }); - auto& ac = husky::lib::AggregatorFactory::get_channel(); - - std::function parser; - if (this->format_ == kLIBSVMFormat) { - // LIBSVM format -- [label] [featureIndex]:[featureValue] [featureIndex]:[featureValue] - parser = [&](boost::string_ref chunk) { - if (chunk.empty()) - return; - boost::char_separator sep(" \t"); - boost::tokenizer> tok(chunk, sep); - - FeatureLabel this_obj; - double& label = this_obj.use_label(); - vec_double& feature = this_obj.use_feature(); - - int i = 0; - for (auto& w : tok) { - if (i++) { - boost::char_separator sep2(":"); - boost::tokenizer> tok2(w, sep2); - auto it = tok2.begin(); - int idx = std::stoi(*it++); - double val = std::stod(*it++); - num_features_agg.update(idx); - feature.push_back(val); - } else { - label = std::stod(w); - } - } - data.add_object(this_obj); - }; - } else if (this->format_ == kTSVFormat) { - // TSV format -- [feature]\t[feature]\t[label] - parser = [&](boost::string_ref chunk) { - if (chunk.empty()) - return; - boost::char_separator sep(" \t"); - boost::tokenizer> tok(chunk, sep); - - FeatureLabel this_obj; - double& label = this_obj.use_label(); - vec_double& feature = this_obj.use_feature(); - - int i = 0; - for (auto& w : tok) { - feature.push_back(std::stod(w)); - } - label = feature.back(); - feature.pop_back(); - data.add_object(this_obj); - num_features_agg.update(feature.size()); - }; - } else { - ASSERT_MSG(false, "Unsupported data format"); - // parser = [&](boost::string_ref chunk) {}; - } - husky::load(infmt, {&ac}, parser); - this->num_feature_ = std::max(this->num_feature_, num_features_agg.get_value()); - // husky::globalize(data); - }) { - this->format_ = _format; + husky::load(infmt, {&ac}, parser); + + int num_features = num_features_agg.get_value(); + list_execute(data, [&](DataObj& this_obj) { + if (this_obj.x.get_feature_num() != num_features) { + this_obj.x.resize(num_features); + } + }); + return num_features; } -template <> -DataLoader::DataLoader(int _format) - : DataLoader([this](std::string url, DataLoader::ObjL& data) { - husky::io::LineInputFormat infmt; - infmt.set_input(url); - - husky::lib::Aggregator num_features_agg(0, [](int& a, const int& b) { a = std::max(a, b); }); - auto& ac = husky::lib::AggregatorFactory::get_channel(); - std::function parser; - if (this->format_ == kLIBSVMFormat) { - parser = [&](boost::string_ref chunk) { - if (chunk.empty()) - return; - boost::char_separator sep(" \t"); - boost::tokenizer> tok(chunk, sep); - - SparseFeatureLabel this_obj; - double& label = this_obj.use_label(); - vec_sp& feature = this_obj.use_feature(); - - int i = 0; - for (auto& w : tok) { - if (i++) { - boost::char_separator sep2(":"); - boost::tokenizer> tok2(w, sep2); - auto it = tok2.begin(); - int idx = std::stoi(*it++); - double val = std::stod(*it++); - num_features_agg.update(idx); - feature.push_back(std::make_pair(idx, val)); - } else { - label = std::stod(w); - } - } - data.add_object(this_obj); - }; - } else if (this->format_ == kTSVFormat) { - parser = [&](boost::string_ref chunk) { - if (chunk.empty()) - return; - boost::char_separator sep(" \t"); - boost::tokenizer> tok(chunk, sep); - - SparseFeatureLabel this_obj; - double& label = this_obj.use_label(); - vec_sp& feature = this_obj.use_feature(); - - int i = 0; - for (auto& w : tok) { - i++; - feature.push_back(std::make_pair(i, std::stod(w))); - } - label = feature.back().second; - feature.pop_back(); - data.add_object(this_obj); - num_features_agg.update(feature.size()); - }; - } else { - ASSERT_MSG(false, "Unsupported data format"); - // parser = [&](boost::string_ref chunk) {}; - } - husky::load(infmt, {&ac}, parser); - this->num_feature_ = std::max(this->num_feature_, num_features_agg.get_value()); - }) { - this->format_ = _format; +template +void load_data(std::string url, ObjList>& data, DataFormat format, + int num_features) { + ASSERT_MSG(num_features > 0, "the number of features is non-positive."); + using DataObj = LabeledPointHObj; + + husky::io::LineInputFormat infmt; + infmt.set_input(url); + + std::function parser; + if (format == kLIBSVMFormat) { + parser = [&](boost::string_ref chunk) { + if (chunk.empty()) + return; + boost::char_separator sep(" \t"); + boost::tokenizer> tok(chunk, sep); + + DataObj this_obj(num_features); + + bool is_y = true; + for (auto& w : tok) { + if (!is_y) { + boost::char_separator sep2(":"); + boost::tokenizer> tok2(w, sep2); + auto it = tok2.begin(); + int idx = std::stoi(*it++) - 1; + double val = std::stod(*it++); + this_obj.x.set(idx, val); + } else { + this_obj.y = std::stod(w); + is_y = false; + } + } + data.add_object(this_obj); + }; + } else if (format == kTSVFormat) { + parser = [&](boost::string_ref chunk) { + if (chunk.empty()) + return; + boost::char_separator sep(" \t"); + boost::tokenizer> tok(chunk, sep); + + DataObj this_obj(num_features); + + int i = 0; + for (auto& w : tok) { + if (i < num_features) { + this_obj.x.set(i++, std::stod(w)); + } else { + this_obj.y = std::stod(w); + } + } + data.add_object(this_obj); + }; + } else { + ASSERT_MSG(false, "Unsupported data format"); + } + husky::load(infmt, parser); } } // namespace ml diff --git a/lib/ml/feature_label.hpp b/lib/ml/feature_label.hpp index 3d4ca59..86172b6 100644 --- a/lib/ml/feature_label.hpp +++ b/lib/ml/feature_label.hpp @@ -14,55 +14,29 @@ #pragma once -#include -#include -#include -#include -#include - -#include "boost/tokenizer.hpp" -#include "core/engine.hpp" +#include "lib/vector.hpp" namespace husky { namespace lib { namespace ml { -using vec_double = std::vector; -template -class FeatureLabelBase { +// extends LabeledPoint, using Vector to represent features +template +class LabeledPointHObj : public LabeledPoint, LabelT> { public: + using FeatureV = Vector; // Object key using KeyT = int; KeyT key; const KeyT& id() const { return key; } // constructors - FeatureLabelBase() = default; - explicit FeatureLabelBase(FT& _feature, LT& _label, const KeyT& k) : feature_(_feature), label_(_label), key(k) {} - - // get feature and label by reference - inline FT& use_feature() { return this->feature_; } - inline LT& use_label() { return this->label_; } - - // get feature and label by value - inline FT get_feature() const { return this->feature_; } - inline LT get_label() const { return this->label_; } - - // serialization - friend BinStream& operator<<(BinStream& stream, const FeatureLabelBase& this_obj) { - return stream << this_obj.key << this_obj.feature_ << this_obj.label_; - } - friend BinStream& operator>>(BinStream& stream, FeatureLabelBase& this_obj) { - return stream >> this_obj.key >> this_obj.feature_ >> this_obj.label_; - } - - protected: - FT feature_; - LT label_; -}; // FeatureLabelBase + LabeledPointHObj() : LabeledPoint() {} + explicit LabeledPointHObj(int feature_num) : LabeledPoint() { this->x = FeatureV(feature_num); } + LabeledPointHObj(FeatureV& x, LabelT& y) : LabeledPoint(x, y) {} + LabeledPointHObj(FeatureV&& x, LabelT&& y) : LabeledPoint(x, y) {} +}; // LabeledPointHObj -typedef FeatureLabelBase, double> FeatureLabel; -typedef FeatureLabelBase>, double> SparseFeatureLabel; } // namespace ml } // namespace lib } // namespace husky diff --git a/lib/ml/fgd.hpp b/lib/ml/fgd.hpp index 8d485a3..96e8899 100644 --- a/lib/ml/fgd.hpp +++ b/lib/ml/fgd.hpp @@ -14,50 +14,46 @@ #pragma once -#include #include -#include -#include -#include "core/engine.hpp" +#include "core/executor.hpp" +#include "core/objlist.hpp" +#include "core/utils.hpp" #include "lib/ml/feature_label.hpp" #include "lib/ml/gradient_descent.hpp" #include "lib/ml/parameter.hpp" -#include "lib/ml/vector_linalg.hpp" +#include "lib/vector.hpp" namespace husky { namespace lib { namespace ml { -using lib::AggregatorFactory; -using vec_double = std::vector; -using vec_sp = std::vector>; - // base class for full(batch) gradient descent, extends GradientDescentBase -template > -class FGD : public GradientDescentBase { +template +class FGD : public GradientDescentBase { private: + using ObjT = LabeledPointHObj; using ObjL = ObjList; + using VecT = Vector; public: // constructors - FGD() : GradientDescentBase() {} - explicit FGD(std::function)> _gradient_func, double _learning_rate) - : GradientDescentBase(_gradient_func, _learning_rate) {} + FGD() : GradientDescentBase() {} + FGD(std::function&)> _gradient_func, double _learning_rate) + : GradientDescentBase(_gradient_func, _learning_rate) {} - void update_vec(ObjL& data, ParamT param_list, int num_global_samples) override { + template + void update_vec(ObjL& data, ParamT& param_list, int num_global_samples) { ASSERT_MSG(this->learning_rate_ != 0, "Learning rate is set to 0."); ASSERT_MSG(this->gradient_func_ != nullptr, "Gradient function is not specified."); - vec_double current_vec = param_list.get_all_param(); // local copy of parameter + auto current_vec = param_list.get_all_param(); // local copy of parameter auto& ac = AggregatorFactory::get_channel(); - list_execute(data, {}, {&ac}, [&grad_func = this->gradient_func_, &rate = this->learning_rate_, - ¤t_vec, ¶m_list, &num_global_samples](ObjT& this_obj) { - vec_sp grad = grad_func(this_obj, - [¤t_vec](int idx){ return current_vec[idx]; }); // calculate gradient - for (auto& w : grad) { - double delta = w.second * rate; - param_list.update(w.first, delta / num_global_samples); + list_execute(data, {}, {&ac}, [&, this](ObjT& this_obj) { + auto grad = this->gradient_func_(this_obj, current_vec); // calculate gradient + for (auto it = grad.begin_feaval(); it != grad.end_feaval(); ++it) { + const auto& w = *it; + param_list.update(w.fea, w.val * this->learning_rate_ / num_global_samples); } }); } diff --git a/lib/ml/gradient_descent.hpp b/lib/ml/gradient_descent.hpp index 5ffea94..0a23058 100644 --- a/lib/ml/gradient_descent.hpp +++ b/lib/ml/gradient_descent.hpp @@ -14,52 +14,43 @@ #pragma once -#include #include -#include -#include -#include "core/engine.hpp" +#include "core/objlist.hpp" #include "lib/ml/feature_label.hpp" #include "lib/ml/parameter.hpp" -#include "lib/ml/vector_linalg.hpp" +#include "lib/vector.hpp" namespace husky { namespace lib { namespace ml { -using lib::AggregatorFactory; -using vec_double = std::vector; -using vec_sp = std::vector>; - // base class for gradient descent -template > +template class GradientDescentBase { private: + using ObjT = LabeledPointHObj; using ObjL = ObjList; + using VecT = Vector; public: // constructors GradientDescentBase() = default; - GradientDescentBase( // standard constructor accepting 3 arguments: - std::function)> _gradient_func, // function to calculate gradient - double _learning_rate // learning rate + GradientDescentBase( // standard constructor accepting 2 arguments: + std::function&)> _gradient_func, // gradient function + double _learning_rate // learning rate ) : gradient_func_(_gradient_func), learning_rate_(_learning_rate) {} - // get update vector for parameters - virtual void update_vec(ObjL& data, ParamT param_list, int num_global_samples) = 0; - // manipulate gradient function and learning rate - void set_gradient_func(std::function)> _gradient_func) { + inline void set_gradient_func(std::function&)> _gradient_func) { this->gradient_func_ = _gradient_func; } - void set_learning_rate(double _learning_rate) { this->learning_rate_ = _learning_rate; } + inline void set_learning_rate(FeatureT _learning_rate) { this->learning_rate_ = _learning_rate; } protected: - std::function)> gradient_func_ = - nullptr; // function to calculate gradient - double learning_rate_; + std::function&)> gradient_func_ = nullptr; + FeatureT learning_rate_; }; // GradientDescentBase } // namespace ml diff --git a/lib/ml/linear_regression.hpp b/lib/ml/linear_regression.hpp index 0ec684d..8bb6524 100644 --- a/lib/ml/linear_regression.hpp +++ b/lib/ml/linear_regression.hpp @@ -15,70 +15,65 @@ #pragma once #include -#include -#include -#include "core/engine.hpp" +#include "core/executor.hpp" +#include "core/objlist.hpp" #include "lib/aggregator_factory.hpp" #include "lib/ml/feature_label.hpp" -#include "lib/ml/gradient_descent.hpp" #include "lib/ml/parameter.hpp" #include "lib/ml/regression.hpp" -#include "lib/ml/vector_linalg.hpp" namespace husky { namespace lib { namespace ml { -using vec_double = std::vector; -using husky::lib::AggregatorFactory; -using husky::lib::Aggregator; - -template > -class LinearRegression : public Regression { +template +class LinearRegression : public Regression { + public: + using ObjT = LabeledPointHObj; using ObjL = ObjList; - public: // constructors - LinearRegression() : Regression() {} - explicit LinearRegression(int _num_feature) : Regression(_num_feature + 1) {} - LinearRegression( // standard constructor accepting 3 arguments: - std::function>(ObjT&)> _getX, // get feature vector (idx,val) - std::function _gety, // get label - int _num_feature // number of features - ) - : Regression(_num_feature + 1) { // add intercept parameter - // gradient function: X * (true_y - innerproduct(xv, pv)) - this->gradient_func_ = [this](ObjT& this_obj, std::function param_at) { - std::vector> vec_X = this->get_X_(this_obj); - vec_X.push_back(std::make_pair(0, 1.0)); - double pred_y = 0; - for (auto& x : vec_X) { - pred_y += param_at(x.first) * x.second; - } - double delta = this->get_y_(this_obj) - pred_y; - for (auto& w : vec_X) { - w.second *= delta; - } + LinearRegression() : Regression() {} + explicit LinearRegression(int _num_feature) : Regression(_num_feature + 1) { + this->num_feature_ = _num_feature; - return vec_X; // gradient vector + // gradient function: X * (true_y - innerproduct(xv, pv)) + this->gradient_func_ = [](ObjT& this_obj, Vector& param) { + auto vec_X = this_obj.x; + auto pred_y = param.dot_with_intcpt(vec_X); + auto delta = static_cast(this_obj.y) - pred_y; + vec_X *= delta; + int num_param = param.get_feature_num(); + vec_X.resize(num_param); + vec_X.set(num_param - 1, delta); // intercept factor + return vec_X; // gradient vector }; // error function: (true_y - pred_y)^2 - this->error_func_ = [this](ObjT& this_obj, ParamT& param_list) { - std::vector> vec_X = this->get_X_(this_obj); - vec_X.push_back(std::make_pair(0, 1.0)); - double pred_y = 0; - for (auto& x : vec_X) { - pred_y += param_list.param_at(x.first) * x.second; + this->error_func_ = [](ObjT& this_obj, ParamT& param_list) { + const auto& vec_X = this_obj.x; + FeatureT pred_y = 0; + for (auto it = vec_X.begin_feaval(); it != vec_X.end_feaval(); ++it) { + const auto& x = *it; + pred_y += param_list.param_at(x.fea) * x.val; } - double delta = this->get_y_(this_obj) - pred_y; + pred_y += param_list.param_at(param_list.get_num_param() - 1); // intercept factor + auto delta = static_cast(this_obj.y) - pred_y; return delta * delta; }; - this->num_feature_ = _num_feature; - this->get_X_ = _getX; - this->get_y_ = _gety; + // predict function + this->predict_func_ = [](ObjT& this_obj, ParamT& param_list) -> LabelT { + auto vec_X = this_obj.x; + this_obj.y = 0; + for (auto it = vec_X.begin_feaval(); it != vec_X.end_feaval(); it++) { + const auto& x = *it; + this_obj.y += param_list.param_at(x.fea) * x.val; + } + this_obj.y += param_list.param_at(param_list.get_num_param() - 1); // intercept factor + return this_obj.y; + }; } void set_num_feature(int _num_feature) { @@ -98,17 +93,17 @@ class LinearRegression : public Regression { Aggregator sum_var_agg(0.0, sum_double); auto& ch = AggregatorFactory::get_channel(); - list_execute(data, {}, {&ch}, [&, this](ObjT& this_obj) { - num_samples_agg.update(1); // number of records - sum_y_agg.update(this->get_y_(this_obj)); // sum of y + list_execute(data, {}, {&ch}, [&](ObjT& this_obj) { + num_samples_agg.update(1); // number of records + sum_y_agg.update(static_cast(this_obj.y)); // sum of y }); double y_mean = sum_y_agg.get_value() / num_samples_agg.get_value(); // y mean list_execute(data, {}, {&ch}, [&, this](ObjT& this_obj) { - double y_true = this->get_y_(this_obj); - double y_pred = this->predict_y(this_obj); - double error = y_true - y_pred; - double variance = y_true - y_mean; + auto y_true = this_obj.y; + auto y_pred = this->predict_y(this_obj); + double error = static_cast(y_true - y_pred); + double variance = static_cast(y_true - y_mean); sum_error_agg.update(error * error); sum_var_agg.update(variance * variance); }); @@ -117,27 +112,20 @@ class LinearRegression : public Regression { } protected: - std::function>(ObjT&)> get_X_ = nullptr; // get feature vector (idx,val) - std::function get_y_ = nullptr; // get label - - double predict_y(ObjT& this_obj) { - std::vector> vec_X = get_X_(this_obj); - double pred_y = 0; - for (auto& x : vec_X) { - pred_y += this->param_list_.param_at(x.first) * x.second; + LabelT predict_y(ObjT& this_obj) { + auto vec_X = this_obj.x; + LabelT pred_y = 0; + for (auto it = vec_X.begin_feaval(); it != vec_X.end_feaval(); it++) { + const auto& x = *it; + pred_y += this->param_list_.param_at(x.fea) * x.val; } pred_y += this->param_list_.param_at(this->num_feature_); // intercept factor return pred_y; } - double delta_func(ObjT& this_obj) { return this->get_y_(this_obj) - predict_y(this_obj); } + LabelT delta_func(ObjT& this_obj) { return this_obj.y - predict_y(this_obj); } }; -template <> -LinearRegression>::LinearRegression(int _num_feature) - : LinearRegression([](SparseFeatureLabel& this_obj) { return this_obj.get_feature(); }, - [](SparseFeatureLabel& this_obj) { return this_obj.get_label(); }, _num_feature) {} - } // namespace ml } // namespace lib } // namespace husky diff --git a/lib/ml/logistic_regression.hpp b/lib/ml/logistic_regression.hpp index e9fad0d..6311f18 100644 --- a/lib/ml/logistic_regression.hpp +++ b/lib/ml/logistic_regression.hpp @@ -15,104 +15,88 @@ #pragma once #include -#include -#include -#include "core/engine.hpp" +#include "core/objlist.hpp" #include "lib/aggregator_factory.hpp" #include "lib/ml/feature_label.hpp" -#include "lib/ml/gradient_descent.hpp" #include "lib/ml/parameter.hpp" #include "lib/ml/regression.hpp" -#include "lib/ml/vector_linalg.hpp" namespace husky { namespace lib { namespace ml { -using vec_double = std::vector; -using husky::lib::AggregatorFactory; -using husky::lib::Aggregator; - -template > -class LogisticRegression : public Regression { +template +class LogisticRegression : public Regression { + public: + using ObjT = LabeledPointHObj; using ObjL = ObjList; - public: // constructors - LogisticRegression() : Regression() {} - explicit LogisticRegression(int _num_feature) { this->set_num_param(_num_feature + 1); } - LogisticRegression( - std::function>(ObjT&)> _getX, // function to get feature vector - std::function _gety, // function to get label - int _num_feature // number of features - ) - : Regression(_num_feature + 1) { - // gradient function: X * (true_y - innerproduct(xv, pv)) - this->gradient_func_ = [this](ObjT& this_obj, std::function param_at) { - std::vector> vec_X = this->get_X_(this_obj); - vec_X.push_back(std::make_pair(0, 1.0)); // intercept factor - double pred_y = 0; - - for (auto& x : vec_X) { - pred_y += param_at(x.first) * x.second; - } - - pred_y = 1. / (1. + exp(-pred_y)); - double delta = this->get_y_(this_obj) - pred_y; - for (auto& w : vec_X) { - w.second *= delta; - } + LogisticRegression() : Regression() {} + explicit LogisticRegression(int _num_feature) : Regression(_num_feature + 1) { + this->num_feature_ = _num_feature; - return vec_X; // gradient vector + // gradient function: X * (true_y - innerproduct(xv, pv)) + this->gradient_func_ = [](ObjT& this_obj, Vector& param) { + auto vec_X = this_obj.x; + auto pred_y = param.dot_with_intcpt(vec_X); + pred_y = static_cast(1. / (1. + exp(-pred_y))); + auto delta = static_cast(this_obj.y) - pred_y; + vec_X *= delta; + int num_param = param.get_feature_num(); + vec_X.resize(num_param); + vec_X.set(num_param - 1, delta); // intercept factor + return vec_X; // gradient vector }; // error function: (true_y - pred_y)^2 - this->error_func_ = [this](ObjT& this_obj, ParamT& param_list) { - std::vector> vec_X = this->get_X_(this_obj); - vec_X.push_back(std::make_pair(0, 1.0)); // intercept factor - double pred_y = 0; - for (auto& x : vec_X) { - pred_y += param_list.param_at(x.first) * x.second; + this->error_func_ = [](ObjT& this_obj, ParamT& param_list) { + const auto& vec_X = this_obj.x; + FeatureT pred_y = 0; + for (auto it = vec_X.begin_feaval(); it != vec_X.end_feaval(); ++it) { + const auto& x = *it; + pred_y += param_list.param_at(x.fea) * x.val; } + pred_y += param_list.param_at(param_list.get_num_param() - 1); pred_y = (pred_y > 0) ? 1 : 0; - return (pred_y == this->get_y_(this_obj)) ? 0 : 1; + return (pred_y == this_obj.y) ? 0 : 1; }; - this->num_feature_ = _num_feature; - this->get_X_ = _getX; - this->get_y_ = _gety; + // prediction function + this->predict_func_ = [](ObjT& this_obj, ParamT& param_list) -> LabelT { + const auto& vec_X = this_obj.x; + FeatureT pred_y = 0; + for (auto it = vec_X.begin_feaval(); it != vec_X.end_feaval(); ++it) { + const auto& x = *it; + pred_y += param_list.param_at(x.fea) * x.val; + } + pred_y += param_list.param_at(param_list.get_num_param() - 1); // intercept + this_obj.y = (pred_y > 0) ? 1 : 0; + return this_obj.y; + }; } void set_num_feature(int _n_feature) { - this->set_num_param(_n_feature + 1); // intercept term + this->set_num_param(_n_feature + 1); this->num_feature_ = _n_feature; } int get_num_feature() { return this->num_feature_; } protected: - std::function>(ObjT&)> get_X_ = nullptr; // get feature vector (idx,val) - std::function get_y_ = nullptr; // get label - - double predict_y(ObjT& this_obj) { - std::vector> vec_X = get_X_(this_obj); - double pred_y = 0; - for (auto& x : vec_X) - pred_y += this->param_list_.param_at(x.first) * x.second; - pred_y += this->param_list_.param_at(this->num_feature_) * 1.0; // intercept factor - return 1. / (1. + exp(-pred_y)); + LabelT predict_y(ObjT& this_obj) { + const auto& vec_X = this_obj.x; + FeatureT pred_y = 0; + for (auto it = vec_X.begin_feaval(); it != vec_X.end_feaval(); ++it) { + const auto& x = *it; + pred_y += this->param_list_.param_at(x.fea) * x.val; + } + pred_y += this->param_list_.param_at(this->num_feature_); // intercept + return static_cast((pred_y > 0) ? 1 : 0); } - - double delta_func(ObjT& this_obj) { return this->get_y_(this_obj) - predict_y(this_obj); } }; // LogisticRegression -template <> -LogisticRegression>::LogisticRegression(int _num_feature) - : LogisticRegression([](SparseFeatureLabel& this_obj) { return this_obj.get_feature(); }, - [](SparseFeatureLabel& this_obj) { return (this_obj.get_label() > 0) ? 1 : 0; }, - _num_feature) {} - } // namespace ml } // namespace lib } // namespace husky diff --git a/lib/ml/parameter.hpp b/lib/ml/parameter.hpp index f9e6b3c..9681e68 100644 --- a/lib/ml/parameter.hpp +++ b/lib/ml/parameter.hpp @@ -14,13 +14,14 @@ #pragma once -#include #include #include #include -#include "core/engine.hpp" +#include "core/utils.hpp" +#include "lib/aggregator.hpp" #include "lib/aggregator_factory.hpp" +#include "lib/vector.hpp" namespace husky { namespace lib { @@ -31,55 +32,24 @@ class ParameterBase { public: // constructors ParameterBase() = default; - ParameterBase(std::function()> _get_all_param_func, std::function _param_at_func, - std::function _update_func, int _num_param) - : get_all_param_func(_get_all_param_func), - param_at_func(_param_at_func), - update_func(_update_func), - num_param_(_num_param) {} + explicit ParameterBase(int _num_param) : num_param_(_num_param) {} // get all parameters - std::vector get_all_param() { - assert(this->get_all_param_func != nullptr); - return get_all_param_func(); - } + virtual Vector get_all_param() = 0; // get one parameter by index - T param_at(int idx) { - assert(param_at_func != nullptr); - return param_at_func(idx); - } + virtual T param_at(int idx) = 0; // update parameter - void update(int idx, T val) { - assert(update_func != nullptr); - update_func(idx, val); - } + virtual void update(int idx, T val) = 0; // regularize parameter - void regularize(double regulator) { - ASSERT_MSG(regularize_func_ != nullptr, "regularize function not specified"); - regularize_func_(regulator); - } - - // set get_all_param_func - void set_get_all_param(std::function()> _get_all_param_func) { - this->get_all_param_func = _get_all_param_func; - } - - // set param_at_func - void set_param_at(std::function _param_at_func) { this->param_at_func = _param_at_func; } - - // set update_func - void set_update(std::function _update_func) { this->update_func = _update_func; } - - // set regularize function - void set_regularize(std::function _regularize_func) { this->regularize_func_ = _regularize_func; } + // TODO(Tatiana): implement as needed + virtual void regularize(double regulator) {} // present parameter void present() { - assert(this->get_all_param_func != nullptr); - std::vector param = get_all_param_func(); + Vector param = get_all_param(); int idx = 0; for (T val : param) { base::log_msg("Parameter " + std::to_string(++idx) + ": " + std::to_string(val)); @@ -89,50 +59,47 @@ class ParameterBase { int get_num_param() { return num_param_; } protected: - std::function()> get_all_param_func = nullptr; - std::function param_at_func = nullptr; - std::function update_func = nullptr; - std::function regularize_func_ = nullptr; int num_param_ = -1; }; template class ParameterBucket : public ParameterBase { - using VecT = std::vector; + using VecT = Vector; public: // constructors - ParameterBucket() : ParameterBase() { - auto& ac = husky::lib::AggregatorFactory::get_channel(); - this->get_all_param_func = [this]() { - assert(!this->vector_agg.empty()); - VecT param_vec; - int count = 0; - for (auto& bucket_agg : this->vector_agg) { - const VecT& bucket = bucket_agg.get_value(); - for (int i = 0; i < bucket.size() && count < this->num_param_; i++, count++) { - param_vec.push_back(bucket[i]); - } + ParameterBucket() : ParameterBase() {} + explicit ParameterBucket(int _num_param) : ParameterBucket() { init(_num_param); } + + Vector get_all_param() override { + assert(!this->vector_agg.empty()); + VecT param_vec(this->num_param_); + int count = 0; + for (auto& bucket_agg : this->vector_agg) { + const VecT& bucket = bucket_agg.get_value(); + for (int i = 0; i < bucket.get_feature_num() && count < this->num_param_; i++, count++) { + param_vec[count] = bucket[i]; } - return param_vec; - }; - this->param_at_func = [this](int idx) { - ASSERT_MSG(this->block_size > 0, "Parameter block size is 0."); - ASSERT_MSG(!this->vector_agg.empty(), "Parameter is not initialized."); - double result = this->vector_agg[idx / this->block_size].get_value()[idx % this->block_size]; - return this->vector_agg[idx / this->block_size].get_value()[idx % this->block_size]; - }; - - // the default update function - this->update_func = [this](int idx, T val) { - ASSERT_MSG(this->block_size > 0, "Parameter block size is 0."); - ASSERT_MSG(!this->vector_agg.empty(), "Parameter is not initialized."); - this->vector_agg[idx / this->block_size].update_any([&](VecT& a) { a[idx % this->block_size] += val; }); - }; + } + return param_vec; + } + + T param_at(int idx) override { + ASSERT_MSG(this->block_size > 0, "Parameter block size is 0."); + ASSERT_MSG(!this->vector_agg.empty(), "Parameter is not initialized."); + double result = this->vector_agg[idx / this->block_size].get_value()[idx % this->block_size]; + return this->vector_agg[idx / this->block_size].get_value()[idx % this->block_size]; + } + + // the default update function + void update(int idx, T val) override { + ASSERT_MSG(this->block_size > 0, "Parameter block size is 0."); + ASSERT_MSG(!this->vector_agg.empty(), "Parameter is not initialized."); + this->vector_agg[idx / this->block_size].update_any([&](VecT& a) { a[idx % this->block_size] += val; }); } - explicit ParameterBucket(int _num_param) : ParameterBucket() { init(_num_param); } // initialize aggregators + // TODO(Tatiana): implement random number initialization? void init(int _num_param, double initial_val = 0.0) { this->num_param_ = _num_param; // split aggregation of parameters @@ -144,10 +111,7 @@ class ParameterBucket : public ParameterBase { for (int i = 0; i < num_worker; i++) { if (i * block_size < _num_param) { vector_agg.push_back(lib::Aggregator(VecT(block_size, initial_val), - [](VecT& a, const VecT& b) { - for (int j = 0; j < a.size(); j++) - a[j] += b[j]; - }, + [](VecT& a, const VecT& b) { a += b; }, [this](VecT& a) { a = VecT(this->block_size, (T) 0.0); })); } } diff --git a/lib/ml/regression.hpp b/lib/ml/regression.hpp index 46b6571..e519f07 100644 --- a/lib/ml/regression.hpp +++ b/lib/ml/regression.hpp @@ -15,45 +15,42 @@ #pragma once #include -#include #include -#include -#include -#include "core/engine.hpp" +#include "core/context.hpp" +#include "core/objlist.hpp" +#include "core/utils.hpp" +#include "lib/aggregator.hpp" #include "lib/aggregator_factory.hpp" #include "lib/ml/feature_label.hpp" #include "lib/ml/parameter.hpp" -#include "lib/ml/sgd.hpp" -#include "lib/ml/vector_linalg.hpp" +#include "lib/vector.hpp" namespace husky { namespace lib { namespace ml { -using vec_double = std::vector; using husky::lib::AggregatorFactory; using husky::lib::Aggregator; // base class for regression -template > +template > class Regression { + typedef LabeledPointHObj ObjT; typedef ObjList ObjL; public: bool report_per_round = false; // whether report error per iteration // constructors - Regression() = default; + Regression() {} explicit Regression(int _num_param) { set_num_param(_num_param); } Regression( // standard (3 args): - std::function>(ObjT&, std::function)> - _gradient_func, // function to calculate gradient - std::function - _error_func, // error function - int _num_param // number of parameters - ) - : gradient_func_(_gradient_func), error_func_(_error_func) { + std::function(ObjT&, Vector&)> _gradient_func, // gradient func + std::function _error_func, // error function + int _num_param) // number of params + : gradient_func_(_gradient_func), + error_func_(_error_func) { set_num_param(_num_param); } @@ -66,39 +63,38 @@ class Regression { // query parameters int get_num_param() { return param_list_.get_num_param(); } // get parameter size - void present_param() { + void present_param() { // print each parameter to log if (this->trained_ == true) param_list_.present(); - } // print each parameter to log + } // predict and store in y - void set_predict_func(std::function _predict_func) { this->predict_func_ = _predict_func; } - void predict(ObjL& data, std::function use_y) { + void set_predict_func(std::function _predict_func) { this->predict_func_ = _predict_func; } + void predict(ObjL& data) { ASSERT_MSG(this->predict_func_ != nullptr, "Predict function is not specified."); list_execute(data, [&, this](ObjT& this_obj) { - double& y = use_y(this_obj); + auto& y = this_obj.y; y = this->predict_func_(this_obj, this->param_list_); }); } // calculate average error rate - double avg_error(ObjL& data) { + FeatureT avg_error(ObjL& data) { Aggregator num_samples_agg(0, [](int& a, const int& b) { a += b; }); - Aggregator error_agg(0.0, [](double& a, const double& b) { a += b; }); + Aggregator error_agg(0.0, [](FeatureT& a, const FeatureT& b) { a += b; }); auto& ac = AggregatorFactory::get_channel(); list_execute(data, {}, {&ac}, [&, this](ObjT& this_obj) { error_agg.update(this->error_func_(this_obj, this->param_list_)); num_samples_agg.update(1); }); int num_samples = num_samples_agg.get_value(); - // base::log_msg("Testing set size = " + std::to_string(data.get_size())); - double global_error = error_agg.get_value(); - double mse = global_error / num_samples; + auto global_error = error_agg.get_value(); + auto mse = global_error / num_samples; return mse; } // train model - template > + template