Skip to content

Commit

Permalink
Merge pull request #2 from seemingwang/accessor_merge
Browse files Browse the repository at this point in the history
Accessor merge
  • Loading branch information
zhaocaibei123 authored Aug 25, 2021
2 parents 587235c + a88ed02 commit 91c7536
Show file tree
Hide file tree
Showing 11 changed files with 1,131 additions and 1,023 deletions.
74 changes: 35 additions & 39 deletions paddle/fluid/distributed/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim,
sleep(sleep_seconds_before_fail_exit_);
}

//check zcb
// check zcb
/*
std::cout << "pull sparse check zcb\n";
for (int i = 0; i < fea_keys.size(); ++ i) {
Expand All @@ -307,7 +307,6 @@ void FleetWrapper::PullSparseToTensorSync(const uint64_t table_id, int fea_dim,
}
std::cout << "\n";
}*/

}

void FleetWrapper::PullDenseVarsAsync(
Expand Down Expand Up @@ -391,17 +390,18 @@ void FleetWrapper::PushDenseVarsAsync(
float* g = tensor->mutable_data<float>(place);
paddle::distributed::Region reg(g, tensor->numel());
regions.emplace_back(std::move(reg));
VLOG(1) << "FleetWrapper::PushDenseVarsAsync Var " << t
<< " talbe_id " << table_id << " Temp_data[0] " << g[0]
<< " Temp_data[-1] " << g[tensor->numel() - 1];
VLOG(1) << "FleetWrapper::PushDenseVarsAsync Var " << t << " talbe_id "
<< table_id << " Temp_data[0] " << g[0] << " Temp_data[-1] "
<< g[tensor->numel() - 1];
}

auto* communicator = dynamic_cast<AsyncCommunicator*>(Communicator::GetInstance());
//PADDLE_ENFORCE_EQ(
auto* communicator =
dynamic_cast<AsyncCommunicator*>(Communicator::GetInstance());
// PADDLE_ENFORCE_EQ(
// communicator->Check(table_id), true,
// platform::errors::InvalidArgument(
// "can not find table: %s, please check your config", table_id));
//communicator->Send(var_names, scope);
// communicator->Send(var_names, scope);
auto push_status = communicator->_worker_ptr->push_dense(
regions.data(), regions.size(), table_id);

Expand Down Expand Up @@ -448,14 +448,10 @@ void FleetWrapper::PushSparseFromTensorWithLabelAsync(
}

void FleetWrapper::PushSparseFromTensorAsync(
const uint64_t table_id, int fea_dim,
uint64_t padding_id,
platform::Place place,
std::vector<const LoDTensor*>* inputs,
const LoDTensor* shows,
const LoDTensor* clks,
const uint64_t table_id, int fea_dim, uint64_t padding_id,
platform::Place place, std::vector<const LoDTensor*>* inputs,
const LoDTensor* shows, const LoDTensor* clks,
std::vector<LoDTensor*>* outputs) {

int batch_size = -1;
for (auto* input : *inputs) {
int cur_batch_size =
Expand All @@ -467,11 +463,13 @@ void FleetWrapper::PushSparseFromTensorAsync(
}
}
CHECK(batch_size > 0); // NOLINT

//TODO: check batch_size of show clks
int show_size = shows->lod().size() ? shows->lod()[0].size() - 1 : shows->dims()[0];

// TODO: check batch_size of show clks
int show_size =
shows->lod().size() ? shows->lod()[0].size() - 1 : shows->dims()[0];
CHECK(show_size == batch_size || show_size == 1);
int clk_size = clks->lod().size() ? clks->lod()[0].size() - 1 : clks->dims()[0];
int clk_size =
clks->lod().size() ? clks->lod()[0].size() - 1 : clks->dims()[0];
CHECK(clk_size == batch_size || clk_size == 1);

std::vector<float> g;
Expand All @@ -491,7 +489,7 @@ void FleetWrapper::PushSparseFromTensorAsync(

VLOG(0) << "yxf::fleet.cc::emb_dim: " << fea_dim;

//TODO: type of show/clk is int? float? uint64?
// TODO: type of show/clk is int? float? uint64?
const long int* show_tensor = shows->data<int64_t>();
const long int* clk_tensor = clks->data<int64_t>();

Expand All @@ -500,25 +498,25 @@ void FleetWrapper::PushSparseFromTensorAsync(
const int64_t* ids = tensor->data<int64_t>();
size_t len = tensor->numel();

if (tensor->lod().size() > 0 ) {
if (tensor->lod().size() > 0) {
for (size_t i = 0; i < tensor->lod()[0].size() - 1; ++i) {
for (int j = tensor->lod()[0][i]; j < tensor->lod()[0][i + 1]; ++ j, output_len += fea_dim) {
for (int j = tensor->lod()[0][i]; j < tensor->lod()[0][i + 1];
++j, output_len += fea_dim) {
if (static_cast<uint64_t>(ids[j]) == padding_id) {
continue;
}
push_keys.emplace_back(ids[j]);
///fake by zcb
/// fake by zcb
push_values.emplace_back(fea_dim + 3);
push_values.back()[0] = 2; //TODO: slot
push_values.back()[1] = (i >= show_size? 1: (float)show_tensor[i]);
push_values.back()[2] = (i >= clk_size? 0: (float)clk_tensor[i]);
push_values.back()[0] = 2; // TODO: slot
push_values.back()[1] = (i >= show_size ? 1 : (float)show_tensor[i]);
push_values.back()[2] = (i >= clk_size ? 0 : (float)clk_tensor[i]);

float* data = push_values.back().data() + 3;

memcpy(data, g.data() + output_len,
sizeof(float) * fea_dim);

++ input_idx;
memcpy(data, g.data() + output_len, sizeof(float) * fea_dim);

++input_idx;
}
}
} else {
Expand All @@ -527,17 +525,16 @@ void FleetWrapper::PushSparseFromTensorAsync(
continue;
}
push_keys.emplace_back(ids[i]);
///fake by zcb
/// fake by zcb
push_values.emplace_back(fea_dim + 3);
push_values.back()[0] = 2; //TODO: slot
push_values.back()[1] = (i >= show_size? 1: (float)show_tensor[i]);
push_values.back()[2] = (i >= clk_size? 0: (float)clk_tensor[i]);
push_values.back()[0] = 2; // TODO: slot
push_values.back()[1] = (i >= show_size ? 1 : (float)show_tensor[i]);
push_values.back()[2] = (i >= clk_size ? 0 : (float)clk_tensor[i]);

float* data = push_values.back().data() + 3;

memcpy(data, g.data() + output_len,
sizeof(float) * fea_dim);


memcpy(data, g.data() + output_len, sizeof(float) * fea_dim);

++input_idx;
}
}
Expand All @@ -564,7 +561,6 @@ void FleetWrapper::PushSparseFromTensorAsync(
auto status = communicator->_worker_ptr->push_sparse(
table_id, push_keys.data(), (const float**)push_g_vec.data(),
push_keys.size());

}

void FleetWrapper::LoadModel(const std::string& path, const std::string& mode) {
Expand Down
Loading

0 comments on commit 91c7536

Please sign in to comment.