Skip to content

Commit

Permalink
add adam/sharedadam optimzier for gpups (PaddlePaddle#32)
Browse files Browse the repository at this point in the history
* add adam/sharedadam optimzier for gpups;edit optimizer struct;test=develop

* remove useless code;test=develop

* remove useless code;test=develop

* remove useless code;test=develop

* remove useless code;test=develop

* remove useless code;test=develop
  • Loading branch information
danleifeng authored Jun 16, 2022
1 parent f19ca37 commit e5b7978
Show file tree
Hide file tree
Showing 29 changed files with 1,373 additions and 611 deletions.
12 changes: 6 additions & 6 deletions paddle/fluid/distributed/ps/table/ctr_dymf_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ int CtrDymfAccessor::Initialize() {
if (_config.ctr_accessor_param().show_scale()) {
_show_scale = true;
}
VLOG(0) << " INTO CtrDymfAccessor::Initialize()";
VLOG(0) << " INTO CtrDymfAccessor::Initialize(); embed_sgd_dim:" << common_feature_value.embed_sgd_dim
<< " embedx_dim:" << common_feature_value.embedx_dim
<< " embedx_sgd_dim:" << common_feature_value.embedx_sgd_dim;
InitAccessorInfo();
return 0;
}
Expand All @@ -51,9 +53,9 @@ void CtrDymfAccessor::InitAccessorInfo() {

auto embedx_dim = _config.embedx_dim();
VLOG(0) << "InitAccessorInfo embedx_dim:" << embedx_dim;
_accessor_info.select_dim = 3 + embedx_dim;
_accessor_info.select_dim = 4 + embedx_dim;
_accessor_info.select_size = _accessor_info.select_dim * sizeof(float);
_accessor_info.update_dim = 4 + embedx_dim;
_accessor_info.update_dim = 5 + embedx_dim;
_accessor_info.update_size = _accessor_info.update_dim * sizeof(float);
_accessor_info.mf_size =
(embedx_dim + common_feature_value.embedx_sgd_dim) * sizeof(float);
Expand Down Expand Up @@ -286,7 +288,7 @@ std::string CtrDymfAccessor::ParseToString(const float* v, int param) {
os << v[0] << " " << v[1] << " " << v[2] << " " << v[3] << " " << v[4];
// << v[5] << " " << v[6];
for (int i = common_feature_value.EmbedG2SumIndex();
i < common_feature_value.EmbedxWIndex(); i++) {
i < common_feature_value.SlotIndex(); i++) {
os << " " << v[i];
}
os << " " << common_feature_value.Slot(const_cast<float*>(v)) << " "
Expand All @@ -296,8 +298,6 @@ std::string CtrDymfAccessor::ParseToString(const float* v, int param) {
auto score = ShowClickScore(show, click);
if (score >= _config.embedx_threshold() &&
param > common_feature_value.EmbedxG2SumIndex()) {
VLOG(3) << "common_feature_value.EmbedxG2SumIndex():"
<< common_feature_value.EmbedxG2SumIndex();
for (auto i = common_feature_value.EmbedxG2SumIndex();
i < common_feature_value.Dim(); ++i) {
os << " " << v[i];
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/distributed/ps/table/ctr_dymf_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ class CtrDymfAccessor : public ValueAccessor {
int ClickIndex() { return ShowIndex() + 1; }
int EmbedWIndex() { return ClickIndex() + 1; }
int EmbedG2SumIndex() { return EmbedWIndex() + 1; }
int SlotIndex() { return EmbedG2SumIndex() + 1; }
int SlotIndex() { return EmbedG2SumIndex() + embed_sgd_dim; }
int MfDimIndex() { return SlotIndex() + 1; }
int EmbedxG2SumIndex() { return MfDimIndex() + 1; }
int EmbedxWIndex() { return EmbedxG2SumIndex() + 1; }
int EmbedxWIndex() { return EmbedxG2SumIndex() + embedx_sgd_dim; }

float& UnseenDays(float* val) { return val[UnseenDaysIndex()]; }
float& DeltaScore(float* val) { return val[DeltaScoreIndex()]; }
Expand Down
80 changes: 79 additions & 1 deletion paddle/fluid/distributed/ps/table/sparse_sgd_rule.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ void SparseAdamSGDRule::UpdateValueWork(float* w, float* sgd, const float* grad,
float beta1_pow_ = *beta1_pow;
float beta2_pow_ = *beta2_pow;

// lr not change in one update
lr *= sqrt(1 - beta2_pow_) / (1 - beta1_pow_);
for (int i = 0; i < _embedding_dim; i++) {
// Calculation
Expand Down Expand Up @@ -238,5 +237,84 @@ void SparseAdamSGDRule::InitValueWork(float* value, float* sgd,
*(sgd + Beta1PowIndex()) = _beta1_decay_rate;
*(sgd + Beta2PowIndex()) = _beta2_decay_rate;
}

void SparseSharedAdamSGDRule::LoadConfig(const SparseCommonSGDRuleParameter& param,
size_t emb_dim) {
_embedding_dim = emb_dim;
auto adam_param = param.adam();
learning_rate_ = adam_param.learning_rate();
_initial_range = adam_param.initial_range();
_beta1_decay_rate = adam_param.beta1_decay_rate();
_beta2_decay_rate = adam_param.beta2_decay_rate();
_ada_epsilon = adam_param.ada_epsilon();
if (adam_param.weight_bounds_size() == 0) {
_min_bound = -std::numeric_limits<float>::max();
_max_bound = std::numeric_limits<float>::max();
} else {
CHECK(adam_param.weight_bounds_size() >= 2)
<< "invalid repeated size for weight_bounds:"
<< adam_param.weight_bounds_size();
_min_bound = adam_param.weight_bounds(0);
_max_bound = adam_param.weight_bounds(1);
}
}

void SparseSharedAdamSGDRule::UpdateValueWork(float* w, float* sgd, const float* grad,
float scale) {
float* gsum = sgd + GSumIndex();
float* g2sum = sgd + G2SumIndex();
float* beta1_pow = sgd + Beta1PowIndex();
float* beta2_pow = sgd + Beta2PowIndex();
const float* g = grad;

float lr = learning_rate_;
float beta1_pow_ = *beta1_pow;
float beta2_pow_ = *beta2_pow;
float gsum_ = *gsum;
float g2sum_ = *g2sum;

lr *= sqrt(1 - beta2_pow_) / (1 - beta1_pow_);
double sum_gsum = 0.0;
double sum_g2sum = 0.0;
for (int i = 0; i < _embedding_dim; i++) {
// Calculation
double new_gsum = _beta1_decay_rate * gsum_ + (1 - _beta1_decay_rate) * g[i];
double new_g2sum =
_beta2_decay_rate * g2sum_ + (1 - _beta2_decay_rate) * g[i] * g[i];
w[i] = w[i] - lr * (new_gsum / (sqrt(new_g2sum) + _ada_epsilon));
BoundValue(w[i]);
sum_gsum += new_gsum;
sum_g2sum += new_g2sum;
}
// update beta_pow_decay
(*gsum) = sum_gsum / _embedding_dim;
(*g2sum) = sum_g2sum / _embedding_dim;
(*beta1_pow) *= _beta1_decay_rate;
(*beta2_pow) *= _beta2_decay_rate;
}

void SparseSharedAdamSGDRule::InitValueWork(float* value, float* sgd,
bool zero_init) {
for (int i = 0; i < _embedding_dim; ++i) {
if (zero_init) {
value[i] = 0.0;
BoundValue(value[i]);
} else {
value[i] =
(local_uniform_real_distribution<double>()(local_random_engine()) *
2 -
1) *
_initial_range;
BoundValue(value[i]);
}
}
// init rule gsum and g2sum
for (int i = GSumIndex(); i < Beta1PowIndex(); i++) {
sgd[i] = 0.0;
}
// init beta1_pow and beta2_pow
*(sgd + Beta1PowIndex()) = _beta1_decay_rate;
*(sgd + Beta2PowIndex()) = _beta2_decay_rate;
}
} // namespace distributed
} // namespace paddle
21 changes: 21 additions & 0 deletions paddle/fluid/distributed/ps/table/sparse_sgd_rule.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,26 @@ class SparseAdamSGDRule : public SparseValueSGDRule {
float _beta2_decay_rate;
float _ada_epsilon;
};

class SparseSharedAdamSGDRule : public SparseValueSGDRule {
public:
virtual void LoadConfig(const SparseCommonSGDRuleParameter& param,
size_t emb_dim);
virtual void UpdateValueWork(float* w, float* sgd, const float* push_value,
float scale);
virtual void InitValueWork(float* value, float* sgd, bool zero_init);
virtual size_t Dim() { return 4; }
size_t GSumIndex() { return 0; }
size_t G2SumIndex() { return GSumIndex() + 1; }
size_t Beta1PowIndex() { return G2SumIndex() + 1; }
size_t Beta2PowIndex() { return Beta1PowIndex() + 1; }

protected:
float learning_rate_;
float _beta1_decay_rate;
float _beta2_decay_rate;
float _ada_epsilon;
};

} // namespace distributed
} // namespace paddle
1 change: 1 addition & 0 deletions paddle/fluid/distributed/ps/table/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ REGISTER_PSCORE_CLASS(SparseValueSGDRule, StdAdaGradSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseAdamSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseNaiveSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseAdaGradSGDRule);
REGISTER_PSCORE_CLASS(SparseValueSGDRule, SparseSharedAdamSGDRule);

int32_t TableManager::Initialize() {
static bool initialized = false;
Expand Down
9 changes: 8 additions & 1 deletion paddle/fluid/distributed/ps/wrapper/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@

get_property(RPC_DEPS GLOBAL PROPERTY RPC_DEPS)

set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=parentheses")
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(DISTRIBUTE_COMPILE_FLAGS
"${DISTRIBUTE_COMPILE_FLAGS} -faligned-new")
endif()

set_source_files_properties(fleet.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
cc_library(fleet
SRCS fleet.cc
DEPS framework_proto ps_framework_proto ps_service variable_helper scope op_registry fs shell ${RPC_DEPS})
DEPS framework_proto ps_framework_proto ps_service variable_helper scope
ps_gpu_wrapper op_registry fs shell ${RPC_DEPS})

target_link_libraries(fleet z)
28 changes: 19 additions & 9 deletions paddle/fluid/distributed/ps/wrapper/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ limitations under the License. */
#include "paddle/fluid/distributed/ps/service/communicator/communicator.h"
#include "paddle/fluid/distributed/ps/table/table.h"
#include "paddle/fluid/distributed/ps/wrapper/fleet.h"
#if defined PADDLE_WITH_HETERPS && defined PADDLE_WITH_PSCORE
#include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h"
#endif

namespace paddle {
namespace distributed {
Expand Down Expand Up @@ -121,6 +124,13 @@ void FleetWrapper::InitWorker(const std::string& dist_desc,
worker_ptr_ = std::shared_ptr<paddle::distributed::PSClient>(
paddle::distributed::PSClientFactory::Create(ps_param));
worker_ptr_->Configure(ps_param, dense_pull_regions, ps_env_, index);
#if defined PADDLE_WITH_HETERPS && defined PADDLE_WITH_PSCORE
VLOG(0) << "FleetWrapper::InitWorker InitializeGPUServer";
auto* accessor = worker_ptr_->GetTableAccessor(0);
auto ps_gpu_wrapper = paddle::framework::PSGPUWrapper::GetInstance();
ps_gpu_wrapper->InitializeGPUServer(ps_param);
ps_gpu_wrapper->SetTableAccessor(accessor);
#endif
}
} else {
VLOG(3) << "Client can be initialized only once";
Expand Down Expand Up @@ -478,24 +488,24 @@ void FleetWrapper::PushSparseFromTensorAsync(
int batch_size = -1;
bool batch_size_consist = true;
for (auto* input : *inputs) {
int cur_batch_size =
size_t cur_batch_size =
input->lod().size() ? input->lod()[0].size() - 1 : input->dims()[0];
if (batch_size == -1) {
batch_size = cur_batch_size;
} else if (batch_size != cur_batch_size) {
batch_size = int(cur_batch_size);
} else if (batch_size != int(cur_batch_size)) {
// CHECK(batch_size == cur_batch_size); // NOLINT
batch_size_consist = false;
break;
}
}
CHECK(batch_size > 0); // NOLINT

int show_size =
size_t show_size =
shows->lod().size() ? shows->lod()[0].size() - 1 : shows->dims()[0];
CHECK(show_size == batch_size || show_size == 1);
int clk_size =
CHECK(show_size == size_t(batch_size) || show_size == 1);
size_t clk_size =
clks->lod().size() ? clks->lod()[0].size() - 1 : clks->dims()[0];
CHECK(clk_size == batch_size || clk_size == 1);
CHECK(clk_size == size_t(batch_size) || clk_size == 1);

CHECK(outputs->size() == inputs->size());
std::vector<uint64_t> push_keys;
Expand Down Expand Up @@ -536,7 +546,7 @@ void FleetWrapper::PushSparseFromTensorAsync(

if (tensor->lod().size() > 0) {
for (size_t i = 0; i < tensor->lod()[0].size() - 1; ++i) {
for (int j = tensor->lod()[0][i]; j < tensor->lod()[0][i + 1];
for (size_t j = tensor->lod()[0][i]; j < tensor->lod()[0][i + 1];
++j, output_len += fea_dim) {
uint64_t real_id = static_cast<uint64_t>(ids[j]);
if (real_id == padding_id) {
Expand Down Expand Up @@ -591,7 +601,7 @@ void FleetWrapper::PushSparseFromTensorAsync(
++input_idx;
}
}
CHECK(output_len == g_tensor->numel());
CHECK(int64_t(output_len) == g_tensor->numel());
}

std::vector<float*> push_g_vec(input_idx, nullptr);
Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/framework/distributed_strategy.proto
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,14 @@ message TableParameter {

message TableAccessorParameter {
optional string accessor_class = 1;
optional SGDParameter embed_sgd_param = 2;
optional SGDParameter embedx_sgd_param = 3;
optional uint32 fea_dim = 4 [ default = 11 ]; // field size of one value
optional uint32 embedx_dim = 5 [ default = 8 ]; // embedx feature size
optional uint32 embedx_threshold = 6
[ default = 10 ]; // embedx feature create threshold
optional CtrAccessorParameter ctr_accessor_param = 7;
repeated TableAccessorSaveParameter table_accessor_save_param = 8;
optional SGDParameter embed_sgd_param = 10;
optional SGDParameter embedx_sgd_param = 11;
}

message SGDParameter {
Expand All @@ -227,7 +227,7 @@ message
repeated float weight_bounds = 4;
}

message SparseAdamSGDParameter { // SparseAdamSGDRule
message SparseAdamSGDParameter { // SparseAdamSGDRule | SparseSharedAdamSGDRule
optional double learning_rate = 1 [ default = 0.001 ];
optional double initial_range = 2 [ default = 0.0001 ];
optional double beta1_decay_rate = 3 [ default = 0.9 ];
Expand Down
2 changes: 0 additions & 2 deletions paddle/fluid/framework/fleet/heter_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class HeterContext {
std::vector<std::vector<FeatureValue>> device_values_;
std::vector<std::vector<FeatureKey>> device_keys_;
std::vector<std::vector<std::vector<FeatureKey>>> device_dim_keys_;
std::vector<std::vector<std::vector<FeatureValue>>> device_dim_values_;
std::vector<std::mutex*> mutex_;
std::vector<std::vector<std::mutex*>> dim_mutex_;
int multi_mf_dim_ = 0;
Expand Down Expand Up @@ -113,7 +112,6 @@ class HeterContext {
value_dim_ptr_[i].resize(dim_num);
}
device_values_.resize(device_num);
device_dim_values_.resize(device_num);
device_keys_.resize(device_num);

device_dim_keys_.resize(device_num);
Expand Down
Loading

0 comments on commit e5b7978

Please sign in to comment.