Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge gpugraph_new #81

Merged
merged 17 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions paddle/fluid/distributed/ps/service/ps_local_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ int32_t PsLocalClient::Initialize() {

::std::future<int32_t> PsLocalClient::Shrink(uint32_t table_id,
const std::string threshold) {
// threshold not use
auto* table_ptr = GetTable(table_id);
table_ptr->Shrink("");
return done();
}

Expand Down
188 changes: 156 additions & 32 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,96 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
return res;
}

paddle::framework::GpuPsCommGraphFloatFea GraphTable::make_gpu_ps_graph_float_fea(
int gpu_id, std::vector<uint64_t> &node_ids, int float_slot_num) {
size_t shard_num = 64;
std::vector<std::vector<uint64_t>> bags(shard_num);
std::vector<float> feature_array[shard_num];
std::vector<uint8_t> slot_id_array[shard_num];
std::vector<uint64_t> node_id_array[shard_num];
std::vector<paddle::framework::GpuPsFeaInfo> node_fea_info_array[shard_num];
for (size_t i = 0; i < shard_num; i++) {
auto predsize = node_ids.size() / shard_num;
bags[i].reserve(predsize * 1.2);
feature_array[i].reserve(predsize * 1.2 * float_slot_num);
slot_id_array[i].reserve(predsize * 1.2 * float_slot_num);
node_id_array[i].reserve(predsize * 1.2);
node_fea_info_array[i].reserve(predsize * 1.2);
}

for (auto x : node_ids) {
int location = x % shard_num;
bags[location].push_back(x);
}

std::vector<std::future<int>> tasks;

for (size_t i = 0; i < bags.size(); i++) {
if (bags[i].size() > 0) {
tasks.push_back(_cpu_worker_pool[gpu_id]->enqueue([&, i, this]() -> int {
uint64_t node_id;
paddle::framework::GpuPsFeaInfo x;
// std::vector<uint64_t> feature_ids;
for (size_t j = 0; j < bags[i].size(); j++) {
Node *v = find_node(GraphTableType::FEATURE_TABLE, bags[i][j]);
node_id = bags[i][j];
if (v == NULL) {
x.feature_size = 0;
x.feature_offset = 0;
node_fea_info_array[i].push_back(x);
} else {
// x <- v
x.feature_offset = feature_array[i].size();
int total_feature_size = 0;
for (int k = 0; k < float_slot_num; ++k) {
auto float_feature_size =
v->get_float_feature(k, feature_array[i], slot_id_array[i]);
total_feature_size += float_feature_size;
}
x.feature_size = total_feature_size;
node_fea_info_array[i].push_back(x);
}
node_id_array[i].push_back(node_id);
}
return 0;
}));
}
}
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();

tasks.clear();

paddle::framework::GpuPsCommGraphFloatFea res;
uint64_t tot_len = 0;
for (size_t i = 0; i < shard_num; i++) {
tot_len += feature_array[i].size();
}
VLOG(1) << "Loaded float feature table on cpu, float feature_list_size[" << tot_len
<< "] node_ids_size[" << node_ids.size() << "]";
res.init_on_cpu(tot_len, (unsigned int)node_ids.size(), float_slot_num);
unsigned int offset = 0, ind = 0;
for (size_t i = 0; i < shard_num; i++) {
tasks.push_back(
_cpu_worker_pool[gpu_id]->enqueue([&, i, ind, offset, this]() -> int {
auto start = ind;
for (size_t j = 0; j < node_id_array[i].size(); j++) {
res.node_list[start] = node_id_array[i][j];
res.fea_info_list[start] = node_fea_info_array[i][j];
res.fea_info_list[start++].feature_offset += offset;
}
for (size_t j = 0; j < feature_array[i].size(); j++) {
res.feature_list[offset + j] = feature_array[i][j];
res.slot_id_list[offset + j] = slot_id_array[i][j];
}
return 0;
}));
offset += feature_array[i].size();
ind += node_id_array[i].size();
}
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
return res;
}

paddle::framework::GpuPsCommGraph GraphTable::make_gpu_ps_graph(
int idx, const std::vector<uint64_t> &ids) {
std::vector<std::vector<uint64_t>> bags(task_pool_size_);
Expand Down Expand Up @@ -1231,16 +1321,19 @@ GraphNode *GraphShard::add_graph_node(Node *node) {
return reinterpret_cast<GraphNode *>(bucket[node_location[id]]);
}

FeatureNode *GraphShard::add_feature_node(uint64_t id, bool is_overlap) {
FeatureNode *GraphShard::add_feature_node(uint64_t id, bool is_overlap, int float_fea_num) {
if (node_location.find(id) == node_location.end()) {
node_location[id] = bucket.size();
bucket.push_back(new FeatureNode(id));
if (float_fea_num > 0) {
bucket.push_back(new FloatFeatureNode(id));
} else {
bucket.push_back(new FeatureNode(id));
}
return reinterpret_cast<FeatureNode *>(bucket[node_location[id]]);
}
if (is_overlap) {
return reinterpret_cast<FeatureNode *>(bucket[node_location[id]]);
}

return NULL;
}

Expand Down Expand Up @@ -1851,10 +1944,15 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
local_count++;

size_t index = shard_id - shard_start;
int slot_fea_num = 0;
if (feat_name.size() > 0) slot_fea_num = feat_name[idx].size();
int float_fea_num = 0;
if (float_feat_id_map.size() > 0) float_fea_num = float_feat_id_map[idx].size();
if (load_slot) {
auto node = feature_shards[idx][index]->add_feature_node(id, false);
auto node = feature_shards[idx][index]->add_feature_node(id, false, float_fea_num);
if (node != NULL) {
node->set_feature_size(feat_name[idx].size());
if (slot_fea_num > 0) node->set_feature_size(slot_fea_num);
if (float_fea_num > 0) node->set_float_feature_size(float_fea_num);
for (int i = 1; i < num; ++i) {
auto &v = vals[i];
int ret = parse_feature(idx, v.ptr, v.len, node);
Expand All @@ -1866,7 +1964,7 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
}
}
} else {
node_shards[idx][index]->add_feature_node(id, false);
node_shards[idx][index]->add_feature_node(id, false, float_fea_num);
}
local_valid_count++;
}
Expand Down Expand Up @@ -1920,8 +2018,10 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
}
}
size_t index = shard_id - shard_start;
int float_fea_num = 0;
if (float_feat_id_map.size() > 0) float_fea_num = float_feat_id_map[idx].size();
if (load_slot) {
auto node = feature_shards[idx][index]->add_feature_node(id, false);
auto node = feature_shards[idx][index]->add_feature_node(id, false, float_fea_num);
if (node != NULL) {
for (int i = 2; i < num; ++i) {
auto &v = vals[i];
Expand All @@ -1934,7 +2034,7 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
}
}
} else {
node_shards[idx][index]->add_feature_node(id, false);
node_shards[idx][index]->add_feature_node(id, false, float_fea_num);
}
local_valid_count++;
}
Expand Down Expand Up @@ -2529,22 +2629,6 @@ int GraphTable::parse_feature(int idx,
string_vector_2_string(
fea_fields.begin(), fea_fields.end(), ' ', fea_ptr);
return 0;
} else if (dtype == "float32") {
int ret = FeatureNode::parse_value_to_bytes<float>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "float64") {
int ret = FeatureNode::parse_value_to_bytes<double>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "int32") {
int ret = FeatureNode::parse_value_to_bytes<int32_t>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
Expand All @@ -2563,10 +2647,36 @@ int GraphTable::parse_feature(int idx,
return 0;
}
} else {
VLOG(10) << "feature_name[" << name << "] is not in feat_id_map, ntype_id["
<< idx << "] feat_id_map_size[" << feat_id_map.size() << "]";
if (float_feat_id_map.size() > (size_t)idx) {
auto float_it = float_feat_id_map[idx].find(name);
if (float_it != float_feat_id_map[idx].end()) {
int32_t id = float_it->second;
std::string *fea_ptr = node->mutable_float_feature(id);
std::string dtype = this->float_feat_dtype[idx][id];
if (dtype == "float32") {
int ret = FeatureNode::parse_value_to_bytes<float>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
}
// else if (dtype == "float64") { // not used
// int ret = FeatureNode::parse_value_to_bytes<double>(
// fea_fields.begin(), fea_fields.end(), fea_ptr);
// if (ret != 0) {
// VLOG(0) << "Fail to parse value";
// return -1;
// }
// return 0;
// }
} else {
VLOG(4) << "feature_name[" << name << "] is not in feat_id_map, ntype_id["
<< idx << "] feat_id_map_size[" << feat_id_map.size() << "]";
}
}
}

return 0;
}
// thread safe shard vector merge
Expand Down Expand Up @@ -2930,18 +3040,32 @@ int32_t GraphTable::Initialize(const GraphParameter &graph) {
auto feature = graph_feature[k];
id_to_feature.push_back(node_type);
int feat_conf_size = static_cast<int>(feature.name().size());

int feasign_idx = 0, float_idx = 0;
for (int i = 0; i < feat_conf_size; i++) {
// auto &f_name = common.attributes()[i];
// auto &f_shape = common.dims()[i];
// auto &f_dtype = common.params()[i];
auto &f_name = feature.name()[i];
auto &f_shape = feature.shape()[i];
auto &f_dtype = feature.dtype()[i];
feat_name[k].push_back(f_name);
feat_shape[k].push_back(f_shape);
feat_dtype[k].push_back(f_dtype);
feat_id_map[k][f_name] = i;
if (f_dtype == "feasign" || f_dtype == "int64") {
feat_name[k].push_back(f_name);
feat_shape[k].push_back(f_shape);
feat_dtype[k].push_back(f_dtype);
feat_id_map[k][f_name] = feasign_idx++;
}
else if (f_dtype == "float32"){
if (float_feat_id_map.size() < (size_t)node_types.size()) {
float_feat_name.resize(node_types.size());
float_feat_shape.resize(node_types.size());
float_feat_dtype.resize(node_types.size());
float_feat_id_map.resize(node_types.size());
}
float_feat_name[k].push_back(f_name);
float_feat_shape[k].push_back(f_shape);
float_feat_dtype[k].push_back(f_dtype);
float_feat_id_map[k][f_name] = float_idx++;
}
VLOG(0) << "init graph table feat conf name:" << f_name
<< " shape:" << f_shape << " dtype:" << f_dtype;
}
Expand Down
10 changes: 9 additions & 1 deletion paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class GraphShard {
}
GraphNode *add_graph_node(uint64_t id);
GraphNode *add_graph_node(Node *node);
FeatureNode *add_feature_node(uint64_t id, bool is_overlap = true);
FeatureNode *add_feature_node(uint64_t id, bool is_overlap = true, int float_fea_num = 0);
Node *find_node(uint64_t id);
void delete_node(uint64_t id);
void clear();
Expand Down Expand Up @@ -725,6 +725,8 @@ class GraphTable : public Table {
int idx, const std::vector<uint64_t> &ids);
virtual paddle::framework::GpuPsCommGraphFea make_gpu_ps_graph_fea(
int gpu_id, std::vector<uint64_t> &node_ids, int slot_num); // NOLINT
virtual paddle::framework::GpuPsCommGraphFloatFea make_gpu_ps_graph_float_fea(
int gpu_id, std::vector<uint64_t> &node_ids, int float_slot_num); // NOLINT
int32_t Load_to_ssd(const std::string &path, const std::string &param);
int64_t load_graph_to_memory_from_ssd(int idx,
std::vector<uint64_t> &ids); // NOLINT
Expand Down Expand Up @@ -779,7 +781,13 @@ class GraphTable : public Table {
std::vector<std::vector<std::string>> feat_name;
std::vector<std::vector<std::string>> feat_dtype;
std::vector<std::vector<int32_t>> feat_shape;
std::vector<std::vector<std::string>> float_feat_name;
std::vector<std::vector<std::string>> float_feat_dtype;
std::vector<std::vector<int32_t>> float_feat_shape;
// int slot_fea_num_{-1};
// int float_fea_num_{-1};
std::vector<std::unordered_map<std::string, int32_t>> feat_id_map;
std::vector<std::unordered_map<std::string, int32_t>> float_feat_id_map;
std::unordered_map<std::string, int> feature_to_id, edge_to_id;
std::vector<std::string> id_to_feature, id_to_edge;
std::string table_name;
Expand Down
Loading