-
Notifications
You must be signed in to change notification settings - Fork 11
Flexps Programming Guide
This page is to teach you how to write a machine learning program in Flexps. Here we use the case of Logistic Regression as an example.
One can load data easily from HDFS by using HDFSManager:
HDFSManager hdfs_manager(my_node, nodes, config, zmq_context);
hdfs_manager.Start();
std::vector<DataObj> data;
std::mutex mylock;
hdfs_manager.Run([my_node, &data, &mylock](HDFSManager::InputFormat* input_format, int local_tid) {
DataObj this_obj;
while (input_format->HasRecord()) {
auto record = input_format->GetNextRecord();
if (record.empty()) return;
this_obj = libsvm_parser(record);
mylock.lock();
data.push_back(std::move(this_obj));
mylock.unlock();
}
});
hdfs_manager.Stop();
Where DataObj
is a self-defined data object type and libsvm_parser is a parser to parse the libsvm datatype.
Engine
is a driver program that manages all the background programs of the system.
Engine engine(my_node, nodes);
engine.StartEverything();
Set up the range for each server you used.
std::vector<third_party::Range> range;
int num_total_servers = nodes.size() * FLAGS_num_servers_per_node;
for (int i = 0; i < num_total_servers - 1; ++ i) {
range.push_back({FLAGS_num_dims / num_total_servers * i, FLAGS_num_dims / num_total_servers * (i + 1)});
}
range.push_back({FLAGS_num_dims / num_total_servers * (num_total_servers - 1), (uint64_t)FLAGS_num_dims});
-
ModelType
: ASP, BSP, SSP, (SparseSSP) -
StorageType
: Map, Vector
ModelType model_type = ModelType::SSP;
StorageType storage_type = StorageType::Vector;
engine.CreateTable<float>(kTableId, range, model_type, storage_type, FLAGS_kStaleness, FLAGS_kSpeculation, sparse_ssp_recorder_type);
engine.Barrier();
-
kTableId
: Id of the table -
FLAGS_kStaleness
: Staleness of SSP model -
FLAGS_kSpeculation
: for SparseSSP model (not used here) -
sparse_ssp_recorder_type
: for SparseSSP model (not used here)
Construct your own machine learning task.
Allocate workers for each node.
MLTask task;
std::vector<WorkerAlloc> worker_alloc;
for (auto& node : nodes) { // each node has num_workers_per_node workers
worker_alloc.push_back({node.id,FLAGS_num_workers_per_node});
}
task.SetWorkerAlloc(worker_alloc);
task.SetTables({kTableId}); // Use table 0
put your task's logic inside the Lambda, and pass needed variables into it.
task.SetLambda([kTableId, &data](const Info& info) {
// put your machine learning task here
}
We offer five kinds of kv-table, KVTable
, KVClientTable
, SparseKVClienTable
, KVChunkClientTable
, SimpleKVChunkTable
, each of them served as the interface for fetching or processing parameters on the server side. Here we use the simplest one (KVTable):
auto table = info.CreateKVTable<float>(kTableId);
Note that the returned table
is an std::unique_ptr pointing to the client table.
third_party::SArray<float> params;
third_party::SArray<float> deltas;
BatchDataSampler<DataObj> batch_data_sampler(data, FLAGS_batch_size);
for (int i = 0; i < FLAGS_num_iters; ++ i) {
batch_data_sampler.random_start_point();
auto& keys = batch_data_sampler.prepare_next_batch();
auto& data_ptrs = batch_data_sampler.get_data_ptrs();
table->Get(keys, ¶ms);
deltas.resize(keys.size(), 0.0);
for (auto data : data_ptrs) { // iterate over the data in the batch
auto& x = data->first;
float y = data->second;
if (y < 0)
y = 0;
float pred_y = 0.0;
int j = 0;
for (auto field : x) {
while (keys[j] < field.first)
j += 1;
pred_y += params[j] * field.second;
}
pred_y = 1. / (1. + exp(-1 * pred_y));
j = 0;
for (auto field : x) {
while (keys[j] < field.first) {
j += 1;
}
deltas[j] += FLAGS_alpha * field.second * (y - pred_y);
}
}
table->Add(keys, deltas); // issue Push
table->Clock();
batch_data_sampler: A sampler that helps sample the data in batch, can return samples' keys and pointers to data.
You can now write your own machine learning application in Flexps :)