Skip to content

Commit

Permalink
reconstruct table,reduce hbm 20.8G (PaddlePaddle#46)
Browse files Browse the repository at this point in the history
* reconstruct table,reduce hbm 20.8G
  • Loading branch information
chao9527 authored Jun 23, 2022
1 parent 35a704f commit fdf59b6
Show file tree
Hide file tree
Showing 11 changed files with 373 additions and 308 deletions.
87 changes: 55 additions & 32 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,28 +54,36 @@ int32_t GraphTable::Load_to_ssd(const std::string &path,
paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
std::vector<uint64_t> &node_ids, int slot_num) {
std::vector<std::vector<uint64_t>> bags(task_pool_size_);
for (int i = 0; i < task_pool_size_; i++) {
auto predsize = node_ids.size() / task_pool_size_;
bags[i].reserve(predsize * 1.2);
}

for (auto x : node_ids) {
int location = x % shard_num % task_pool_size_;
bags[location].push_back(x);
}

std::vector<std::future<int>> tasks;
std::vector<uint64_t> feature_array[task_pool_size_];
std::vector<uint8_t> slot_id_array[task_pool_size_];
std::vector<paddle::framework::GpuPsGraphFeaNode>
node_fea_array[task_pool_size_];
std::vector<uint64_t> node_id_array[task_pool_size_];
std::vector<paddle::framework::GpuPsFeaInfo>
node_fea_info_array[task_pool_size_];
for (size_t i = 0; i < bags.size(); i++) {
if (bags[i].size() > 0) {
tasks.push_back(_shards_task_pool[i]->enqueue([&, i, this]() -> int {
paddle::framework::GpuPsGraphFeaNode x;
uint64_t node_id;
paddle::framework::GpuPsFeaInfo x;
std::vector<uint64_t> feature_ids;
for (size_t j = 0; j < bags[i].size(); j++) {
// TODO use FEATURE_TABLE instead
Node *v = find_node(1, bags[i][j]);
x.node_id = bags[i][j];
node_id = bags[i][j];
if (v == NULL) {
x.feature_size = 0;
x.feature_offset = 0;
node_fea_array[i].push_back(x);
node_fea_info_array[i].push_back(x);
} else {
// x <- v
x.feature_offset = feature_array[i].size();
Expand All @@ -91,8 +99,9 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
}
}
x.feature_size = total_feature_size;
node_fea_array[i].push_back(x);
node_fea_info_array[i].push_back(x);
}
node_id_array[i].push_back(node_id);
}
return 0;
}));
Expand All @@ -109,9 +118,10 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
res.init_on_cpu(tot_len, (unsigned int)node_ids.size(), slot_num);
unsigned int offset = 0, ind = 0;
for (int i = 0; i < task_pool_size_; i++) {
for (int j = 0; j < (int)node_fea_array[i].size(); j++) {
res.node_list[ind] = node_fea_array[i][j];
res.node_list[ind++].feature_offset += offset;
for (int j = 0; j < (int)node_id_array[i].size(); j++) {
res.node_list[ind] = node_id_array[i][j];
res.fea_info_list[ind] = node_fea_info_array[i][j];
res.fea_info_list[ind++].feature_offset += offset;
}
for (size_t j = 0; j < feature_array[i].size(); j++) {
res.feature_list[offset + j] = feature_array[i][j];
Expand All @@ -125,49 +135,62 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
int idx, std::vector<uint64_t> ids) {
std::vector<std::vector<uint64_t>> bags(task_pool_size_);
for (int i = 0; i < task_pool_size_; i++) {
auto predsize = ids.size() / task_pool_size_;
bags[i].reserve(predsize * 1.2);
}
for (auto x : ids) {
int location = x % shard_num % task_pool_size_;
bags[location].push_back(x);
}

std::vector<std::future<int>> tasks;
std::vector<uint64_t> edge_array[task_pool_size_];
std::vector<paddle::framework::GpuPsGraphNode> node_array[task_pool_size_];
std::vector<uint64_t> node_array[task_pool_size_]; // node id list
std::vector<paddle::framework::GpuPsNodeInfo> info_array[task_pool_size_];
std::vector<uint64_t> edge_array[task_pool_size_]; // edge id list

for (size_t i = 0; i < bags.size(); i++) {
if (bags[i].size() > 0) {
tasks.push_back(_shards_task_pool[i]->enqueue([&, i, this]() -> int {
paddle::framework::GpuPsGraphNode x;
node_array[i].resize(bags[i].size());
info_array[i].resize(bags[i].size());
edge_array[i].reserve(bags[i].size());

for (size_t j = 0; j < bags[i].size(); j++) {
Node *v = find_node(0, idx, bags[i][j]);
x.node_id = bags[i][j];
if (v == NULL) {
x.neighbor_size = 0;
x.neighbor_offset = 0;
node_array[i].push_back(x);
} else {
x.neighbor_size = v->get_neighbor_size();
x.neighbor_offset = edge_array[i].size();
node_array[i].push_back(x);
for (size_t k = 0; k < (size_t)x.neighbor_size; k++) {
edge_array[i].push_back(v->get_neighbor_id(k));
auto node_id = bags[i][j];
node_array[i][j] = node_id;
Node *v = find_node(0, idx, node_id);
if (v != nullptr) {
info_array[i][j].neighbor_offset = edge_array[i].size();
info_array[i][j].neighbor_size = v->get_neighbor_size();
for (size_t k = 0; k < v->get_neighbor_size(); k++) {
edge_array[i].push_back(v->get_neighbor_id(k));
}
}
else {
info_array[i][j].neighbor_offset = 0;
info_array[i][j].neighbor_size = 0;
}
}
}
return 0;
}));
}
}
for (int i = 0; i < (int)tasks.size(); i++) tasks[i].get();
paddle::framework::GpuPsCommGraph res;

int64_t tot_len = 0;
for (int i = 0; i < task_pool_size_; i++) {
tot_len += edge_array[i].size();
}

paddle::framework::GpuPsCommGraph res;
res.init_on_cpu(tot_len, ids.size());
int64_t offset = 0, ind = 0;
for (int i = 0; i < task_pool_size_; i++) {
for (int j = 0; j < (int)node_array[i].size(); j++) {
res.node_list[ind] = node_array[i][j];
res.node_list[ind++].neighbor_offset += offset;
res.node_info_list[ind] = info_array[i][j];
res.node_info_list[ind++].neighbor_offset += offset;
}
for (size_t j = 0; j < edge_array[i].size(); j++) {
res.neighbor_list[offset + j] = edge_array[i][j];
Expand Down Expand Up @@ -275,7 +298,7 @@ int64_t GraphTable::load_graph_to_memory_from_ssd(int idx,
std::string str;
if (_db->get(i, ch, sizeof(int) * 2 + sizeof(uint64_t), str) == 0) {
count[i] += (int64_t)str.size();
for (int j = 0; j < (int)str.size(); j += sizeof(uint64_t)) {
for (size_t j = 0; j < (int)str.size(); j += sizeof(uint64_t)) {
uint64_t id = *(uint64_t *)(str.c_str() + j);
add_comm_edge(idx, v, id);
}
Expand Down Expand Up @@ -345,7 +368,7 @@ void GraphTable::make_partitions(int idx, int64_t byte_size, int device_len) {
score[i] = 0;
}
}
for (int j = 0; j < (int)value.size(); j += sizeof(uint64_t)) {
for (size_t j = 0; j < (int)value.size(); j += sizeof(uint64_t)) {
uint64_t v = *((uint64_t *)(value.c_str() + j));
int index = -1;
if (id_map.find(v) != id_map.end()) {
Expand Down Expand Up @@ -438,7 +461,7 @@ void GraphTable::clear_graph(int idx) {
}
}
int32_t GraphTable::load_next_partition(int idx) {
if (next_partition >= partitions[idx].size()) {
if (next_partition >= (int)partitions[idx].size()) {
VLOG(0) << "partition iteration is done";
return -1;
}
Expand Down Expand Up @@ -500,7 +523,7 @@ int32_t GraphTable::dump_edges_to_ssd(int idx) {
std::vector<Node *> &v = shards[i]->get_bucket();
for (size_t j = 0; j < v.size(); j++) {
std::vector<uint64_t> s;
for (int k = 0; k < (int)v[j]->get_neighbor_size(); k++) {
for (size_t k = 0; k < (int)v[j]->get_neighbor_size(); k++) {
s.push_back(v[j]->get_neighbor_id(k));
}
cost += v[j]->get_neighbor_size() * sizeof(uint64_t);
Expand Down Expand Up @@ -1794,7 +1817,7 @@ std::vector<std::vector<uint64_t>> GraphTable::get_all_id(int type_id, int idx,
auto &search_shards = type_id == 0 ? edge_shards[idx] : feature_shards[idx];
std::vector<std::future<std::vector<uint64_t>>> tasks;
VLOG(0) << "begin task, task_pool_size_[" << task_pool_size_ << "]";
for (int i = 0; i < search_shards.size(); i++) {
for (size_t i = 0; i < search_shards.size(); i++) {
tasks.push_back(_shards_task_pool[i % task_pool_size_]->enqueue(
[&search_shards, i]() -> std::vector<uint64_t> {
return search_shards[i]->get_all_id();
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ limitations under the License. */
#if defined(PADDLE_WITH_CUDA)
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/device/gpu/gpu_info.h"
#include "paddle/fluid/framework/fleet/heter_ps/gpu_graph_utils.h"
#endif

DECLARE_int32(record_pool_max_size);
Expand Down Expand Up @@ -422,7 +423,6 @@ struct UsedSlotGpuType {
};

#if defined(PADDLE_WITH_CUDA) && defined(PADDLE_WITH_HETERPS)
#define CUDA_CHECK(val) CHECK(val == gpuSuccess)
template <typename T>
struct CudaBuffer {
T* cu_buffer;
Expand Down
Loading

0 comments on commit fdf59b6

Please sign in to comment.