Skip to content

Commit

Permalink
[Pglbox2.0] merge gpugraph to develop (#49946)
Browse files Browse the repository at this point in the history
* add set slot_num for psgpuwraper (#177)

* add set slot_num_for_pull_feature for psgpuwarper

* Add get_epoch_finish python interface (#182)

* add get_epoch_finish interface

* add return

* delete return

* add unzip op (#183)

* fix miss key for error dataset (#186)

* fix miss key for error dataset

* fix miss key for error dataset

Co-authored-by: yangjunchao <yangjunchao@baidu.com>

* add excluded_train_pair and infer_node_type (#187)

* support return of degree (#188)

* fix task stuck in barrier (#189)

Co-authored-by: yangjunchao <yangjunchao@baidu.com>

* check node/feature format when loading (#190)

* check node&feature format when loading

* check node&feature format when loading (2£ (2)

* degrade log (#191)

* [PGLBOX]fix conflict

* [PGLBOX]fix conflict

* [PGLBOX]replace LodTensor with phi::DenseTensor

* [PGLBOX]fix gpu_primitives.h include path

* [PGLBOX]from platform::PADDLE_CUDA_NUM_THREADS to phi::PADDLE_CUDA_NUM_THREADS

* [PGLBOX]fix unzip example code

* [PGLBOX]fix unzip example code

* [PGLBOX]fix unzip example code

* [PGLBOX]fix unzip example code

* [PGLBOX]fix unzip ut

* [PGLBOX]fix unzip ut

* [PGLBOX]fix code style

* [PGLBOX]fix code style

* [PGLBOX]fix code style

* fix code style

* fix code style

* fix unzip ut

* fix unzip ut

* fix unzip ut

* fix unzip

* fix code stype

* add ut

* add c++ ut & fix train_mode_ set

* fix load into memory

* fix c++ ut

* fix c++ ut

* fix c++ ut

* fix c++ ut

* fix code style

* fix collective

* fix unzip_op.cc

* fix barrier

* fix code style

* fix barrier

* fix barrier

* fix code styple

* fix unzip

* add unzip.py

* add unzip.py

* fix unzip.py

---------

Co-authored-by: chao9527 <33347532+chao9527@users.noreply.github.com>
Co-authored-by: Siming Dai <908660116@qq.com>
Co-authored-by: huwei02 <53012141+huwei02@users.noreply.github.com>
Co-authored-by: yangjunchao <yangjunchao@baidu.com>
  • Loading branch information
5 people authored Jan 30, 2023
1 parent 382e9a0 commit cb525d4
Show file tree
Hide file tree
Showing 34 changed files with 1,261 additions and 189 deletions.
4 changes: 2 additions & 2 deletions paddle/fluid/distributed/ps/service/ps_local_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,10 @@ ::std::future<int32_t> PsLocalClient::SaveCacheTable(uint32_t table_id,
size_t threshold) {
auto* table_ptr = GetTable(table_id);
std::pair<int64_t, int64_t> ret = table_ptr->PrintTableStat();
VLOG(0) << "table id: " << table_id << ", feasign size: " << ret.first
VLOG(1) << "table id: " << table_id << ", feasign size: " << ret.first
<< ", mf size: " << ret.second;
if (ret.first > (int64_t)threshold) {
VLOG(0) << "run cache table";
VLOG(1) << "run cache table";
table_ptr->CacheTable(pass_id);
}
return done();
Expand Down
115 changes: 96 additions & 19 deletions paddle/fluid/distributed/ps/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,13 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
}
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();

std::stringstream ss;
for (int k = 0; k < slot_num; ++k) {
ss << slot_feature_num_map_[k] << " ";
if (FLAGS_v > 0) {
std::stringstream ss;
for (int k = 0; k < slot_num; ++k) {
ss << slot_feature_num_map_[k] << " ";
}
VLOG(1) << "slot_feature_num_map: " << ss.str();
}
VLOG(0) << "slot_feature_num_map: " << ss.str();

tasks.clear();

Expand All @@ -137,7 +139,7 @@ paddle::framework::GpuPsCommGraphFea GraphTable::make_gpu_ps_graph_fea(
for (size_t i = 0; i < shard_num; i++) {
tot_len += feature_array[i].size();
}
VLOG(0) << "Loaded feature table on cpu, feature_list_size[" << tot_len
VLOG(1) << "Loaded feature table on cpu, feature_list_size[" << tot_len
<< "] node_ids_size[" << node_ids.size() << "]";
res.init_on_cpu(tot_len, (unsigned int)node_ids.size(), slot_num);
unsigned int offset = 0, ind = 0;
Expand Down Expand Up @@ -494,6 +496,8 @@ void GraphTable::export_partition_files(int idx, std::string file_path) {

for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
}
#endif

void GraphTable::clear_graph(int idx) {
for (auto p : edge_shards[idx]) {
p->clear();
Expand All @@ -506,6 +510,7 @@ void GraphTable::clear_graph(int idx) {
}
}

#ifdef PADDLE_WITH_HETERPS
void GraphTable::release_graph() {
// Before releasing graph, prepare for sampling ids and embedding keys.
build_graph_type_keys();
Expand Down Expand Up @@ -545,6 +550,7 @@ void GraphTable::release_graph_node() {
feature_shrink_to_fit();
}
}
#endif

void GraphTable::clear_edge_shard() {
VLOG(0) << "begin clear edge shard";
Expand Down Expand Up @@ -590,6 +596,7 @@ void GraphTable::clear_feature_shard() {
VLOG(0) << "finish clear feature shard";
}

#ifdef PADDLE_WITH_HETERPS
void GraphTable::feature_shrink_to_fit() {
std::vector<std::future<int>> tasks;
for (auto &type_shards : feature_shards) {
Expand Down Expand Up @@ -619,13 +626,16 @@ void GraphTable::merge_feature_shard() {
feature_shards.resize(1);
}

#endif

void GraphTable::clear_graph() {
VLOG(0) << "begin clear_graph";
clear_edge_shard();
clear_feature_shard();
VLOG(0) << "finish clear_graph";
}

#ifdef PADDLE_WITH_HETERPS
int32_t GraphTable::load_next_partition(int idx) {
if (next_partition >= static_cast<int>(partitions[idx].size())) {
VLOG(0) << "partition iteration is done";
Expand Down Expand Up @@ -1203,11 +1213,21 @@ int32_t GraphTable::Load(const std::string &path, const std::string &param) {
if (load_edge) {
bool reverse_edge = (param[1] == '<');
std::string edge_type = param.substr(2);
return this->load_edges(path, reverse_edge, edge_type);
int ret = this->load_edges(path, reverse_edge, edge_type);
if (ret != 0) {
VLOG(0) << "Fail to load edges, path[" << path << "] edge_type["
<< edge_type << "]";
return -1;
}
}
if (load_node) {
std::string node_type = param.substr(1);
return this->load_nodes(path, node_type);
int ret = this->load_nodes(path, node_type);
if (ret != 0) {
VLOG(0) << "Fail to load nodes, path[" << path << "] node_type["
<< node_type << "]";
return -1;
}
}
return 0;
}
Expand Down Expand Up @@ -1319,10 +1339,19 @@ int32_t GraphTable::parse_node_and_load(std::string ntype2files,
return 0;
}
if (FLAGS_graph_load_in_parallel) {
this->load_nodes(npath_str, "");
int ret = this->load_nodes(npath_str, "");
if (ret != 0) {
VLOG(0) << "Fail to load nodes, path[" << npath << "]";
return -1;
}
} else {
for (size_t j = 0; j < ntypes.size(); j++) {
this->load_nodes(npath_str, ntypes[j]);
int ret = this->load_nodes(npath_str, ntypes[j]);
if (ret != 0) {
VLOG(0) << "Fail to load nodes, path[" << npath << "], ntypes["
<< ntypes[j] << "]";
return -1;
}
}
}
return 0;
Expand Down Expand Up @@ -1397,17 +1426,30 @@ int32_t GraphTable::load_node_and_edge_file(std::string etype2files,
return 0;
}
if (FLAGS_graph_load_in_parallel) {
this->load_nodes(npath_str, "");
int ret = this->load_nodes(npath_str, "");
if (ret != 0) {
VLOG(0) << "Fail to load nodes, path[" << npath_str << "]";
return -1;
}
} else {
for (size_t j = 0; j < ntypes.size(); j++) {
this->load_nodes(npath_str, ntypes[j]);
int ret = this->load_nodes(npath_str, ntypes[j]);
if (ret != 0) {
VLOG(0) << "Fail to load nodes, path[" << npath_str
<< "], ntypes[" << ntypes[j] << "]";
return -1;
}
}
}
}
return 0;
}));
}
for (size_t i = 0; i < tasks.size(); i++) tasks[i].get();
if (is_parse_node_fail_) {
VLOG(0) << "Fail to load node_and_edge_file";
return -1;
}
return 0;
}

Expand Down Expand Up @@ -1499,7 +1541,12 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
node->set_feature_size(feat_name[idx].size());
for (int i = 1; i < num; ++i) {
auto &v = vals[i];
parse_feature(idx, v.ptr, v.len, node);
int ret = parse_feature(idx, v.ptr, v.len, node);
if (ret != 0) {
VLOG(0) << "Fail to parse feature, node_id[" << id << "]";
is_parse_node_fail_ = true;
return {0, 0};
}
}
}
local_valid_count++;
Expand Down Expand Up @@ -1551,7 +1598,12 @@ std::pair<uint64_t, uint64_t> GraphTable::parse_node_file(
if (node != NULL) {
for (int i = 2; i < num; ++i) {
auto &v = vals[i];
parse_feature(idx, v.ptr, v.len, node);
int ret = parse_feature(idx, v.ptr, v.len, node);
if (ret != 0) {
VLOG(0) << "Fail to parse feature, node_id[" << id << "]";
is_parse_node_fail_ = true;
return {0, 0};
}
}
}
local_valid_count++;
Expand Down Expand Up @@ -1603,6 +1655,11 @@ int32_t GraphTable::load_nodes(const std::string &path, std::string node_type) {
valid_count += res.second;
}
}
if (is_parse_node_fail_) {
VLOG(0) << "Fail to load nodes, path[" << paths[0] << ".."
<< paths[paths.size() - 1] << "] node_type[" << node_type << "]";
return -1;
}

VLOG(0) << valid_count << "/" << count << " nodes in node_type[ " << node_type
<< "] are loaded successfully!";
Expand Down Expand Up @@ -2103,36 +2160,56 @@ int GraphTable::parse_feature(int idx,
if (dtype == "feasign") {
// string_vector_2_string(fields.begin() + 1, fields.end(), ' ',
// fea_ptr);
FeatureNode::parse_value_to_bytes<uint64_t>(
int ret = FeatureNode::parse_value_to_bytes<uint64_t>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "string") {
string_vector_2_string(
fea_fields.begin(), fea_fields.end(), ' ', fea_ptr);
return 0;
} else if (dtype == "float32") {
FeatureNode::parse_value_to_bytes<float>(
int ret = FeatureNode::parse_value_to_bytes<float>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "float64") {
FeatureNode::parse_value_to_bytes<double>(
int ret = FeatureNode::parse_value_to_bytes<double>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "int32") {
FeatureNode::parse_value_to_bytes<int32_t>(
int ret = FeatureNode::parse_value_to_bytes<int32_t>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
} else if (dtype == "int64") {
FeatureNode::parse_value_to_bytes<uint64_t>(
int ret = FeatureNode::parse_value_to_bytes<uint64_t>(
fea_fields.begin(), fea_fields.end(), fea_ptr);
if (ret != 0) {
VLOG(0) << "Fail to parse value";
return -1;
}
return 0;
}
} else {
VLOG(2) << "feature_name[" << name << "] is not in feat_id_map, ntype_id["
<< idx << "] feat_id_map_size[" << feat_id_map.size() << "]";
}

return -1;
return 0;
}
// thread safe shard vector merge
class MergeShardVector {
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/ps/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,7 @@ class GraphTable : public Table {
std::string slot_feature_separator_ = std::string(" ");
std::string feature_separator_ = std::string(" ");
std::vector<int> slot_feature_num_map_;
bool is_parse_node_fail_ = false;
};

/*
Expand Down
8 changes: 7 additions & 1 deletion paddle/fluid/distributed/ps/table/graph/graph_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ class FeatureNode : public Node {
}

template <typename T>
static void parse_value_to_bytes(
static int parse_value_to_bytes(
std::vector<paddle::string::str_ptr>::iterator feat_str_begin,
std::vector<paddle::string::str_ptr>::iterator feat_str_end,
std::string *output) {
Expand All @@ -269,8 +269,14 @@ class FeatureNode : public Node {
thread_local paddle::string::str_ptr_stream ss;
for (size_t i = 0; i < feat_str_size; i++) {
ss.reset(*(feat_str_begin + i));
int len = ss.end - ss.ptr;
char *old_ptr = ss.ptr;
ss >> fea_ptrs[i];
if (ss.ptr - old_ptr != len) {
return -1;
}
}
return 0;
}

protected:
Expand Down
3 changes: 2 additions & 1 deletion paddle/fluid/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,8 @@ if(WITH_DISTRIBUTE)
fleet
heter_server
brpc
fleet_executor)
fleet_executor
flags)
set(DISTRIBUTE_COMPILE_FLAGS
"-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor -Wno-error=parentheses"
)
Expand Down
7 changes: 6 additions & 1 deletion paddle/fluid/framework/barrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

#pragma once

#if defined _WIN32 || defined __APPLE__
#else
#define __LINUX__
#endif

#ifdef __LINUX__
#include <pthread.h>
#include <semaphore.h>
Expand Down Expand Up @@ -48,7 +53,7 @@ class Barrier {
void wait() {
#ifdef __LINUX__
int err = pthread_barrier_wait(&_barrier);
if (err != 0 && err != PTHREAD_BARRIER_SERIAL_THREAD)) {
if (err != 0 && err != PTHREAD_BARRIER_SERIAL_THREAD) {
CHECK_EQ(1, 0);
}
#endif
Expand Down
9 changes: 9 additions & 0 deletions paddle/fluid/framework/data_feed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2112,15 +2112,24 @@ void SlotRecordInMemoryDataFeed::Init(const DataFeedDesc& data_feed_desc) {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.SetConfig(data_feed_desc);
#endif
if (gpu_graph_mode_) {
train_mode_ = true;
} else {
train_mode_ = data_feed_desc.graph_config().gpu_graph_training();
}
}

#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
void SlotRecordInMemoryDataFeed::InitGraphResource() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.AllocResource(thread_id_, feed_vec_);
#endif
}

void SlotRecordInMemoryDataFeed::InitGraphTrainResource() {
#if defined(PADDLE_WITH_GPU_GRAPH) && defined(PADDLE_WITH_HETERPS)
gpu_graph_data_generator_.AllocTrainResource(thread_id_);
#endif
}
#endif

Expand Down
Loading

0 comments on commit cb525d4

Please sign in to comment.