Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LoadNode; Change add_node; #3

Merged
merged 10 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 58 additions & 10 deletions paddle/fluid/distributed/table/common_graph_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,22 @@ size_t GraphShard::get_size() {
return res;
}

std::list<GraphNode *>::iterator GraphShard::add_node(GraphNode *node) {
if (node_location.find(node->get_id()) != node_location.end())
return node_location.find(node->get_id())->second;
std::list<GraphNode *>::iterator GraphShard::add_node(uint64_t id, std::string feature) {
if (node_location.find(id) != node_location.end())
return node_location.find(id)->second;

int index = node->get_id() % shard_num % bucket_size;
int index = id % shard_num % bucket_size;
GraphNode *node = new GraphNode(id, feature);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already change into feature as second augments.


std::list<GraphNode *>::iterator iter =
bucket[index].insert(bucket[index].end(), node);

node_location[node->get_id()] = iter;
node_location[id] = iter;
return iter;
}

void GraphShard::add_neighboor(uint64_t id, GraphEdge *edge) {
(*add_node(new GraphNode(id, std::string(""))))->add_edge(edge);
(*add_node(id, std::string("")))->add_edge(edge);
}

GraphNode *GraphShard::find_node(uint64_t id) {
Expand All @@ -88,13 +89,55 @@ GraphNode *GraphShard::find_node(uint64_t id) {

int32_t GraphTable::load(const std::string &path, const std::string &param) {
auto cmd = paddle::string::split_string<std::string>(param, "|");
std::set<std::string> cmd_set(cmd.begin(), cmd.end());
bool load_edge = cmd_set.count(std::string("edge"));
std::set<std::string> cmd_set(cmd.begin(), cmd.end());
bool reverse_edge = cmd_set.count(std::string("reverse"));
VLOG(0) << "Reverse Edge " << reverse_edge;
bool load_edge = cmd_set.count(std::string("edge"));
if(load_edge) {
return this -> load_edges(path, reverse_edge);
}
else {
return this -> load_nodes(path);
}
}

int32_t GraphTable::load_nodes(const std::string &path) {
auto paths = paddle::string::split_string<std::string>(path, ";");
for (auto path : paths) {
std::ifstream file(path);
std::string line;
while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
if (values.size() < 2) continue;
auto id = std::stoull(values[1]);


size_t shard_id = id % shard_num;
if (shard_id >= shard_end || shard_id < shard_start) {
VLOG(0) << "will not load " << id << " from " << path
<< ", please check id distribution";
continue;

}

std::string node_type = values[0];
std::vector<std::string > feature;
feature.push_back(node_type);
for(size_t slice = 2; slice < values.size(); slice ++) {
feature.push_back(values[slice]);
}
auto feat = paddle::string::join_strings(feature, '\t');
size_t index = shard_id - shard_start;
shards[index].add_node(id, feat);

}
}
return 0;
}


int32_t GraphTable::load_edges(const std::string &path, bool reverse_edge) {

auto paths = paddle::string::split_string<std::string>(path, ";");
VLOG(0) << paths.size();
int count = 0;

for (auto path : paths) {
Expand All @@ -113,13 +156,15 @@ int32_t GraphTable::load(const std::string &path, const std::string &param) {
if (values.size() == 3) {
weight = std::stof(values[2]);
}

size_t src_shard_id = src_id % shard_num;

if (src_shard_id >= shard_end || src_shard_id < shard_start) {
VLOG(0) << "will not load " << src_id << " from " << path
<< ", please check id distribution";
continue;
}

size_t index = src_shard_id - shard_start;
GraphEdge *edge = new GraphEdge(dst_id, weight);
shards[index].add_neighboor(src_id, edge);
Expand All @@ -128,6 +173,7 @@ int32_t GraphTable::load(const std::string &path, const std::string &param) {
VLOG(0) << "Load Finished Total Edge Count " << count;

// Build Sampler j

for (auto &shard : shards) {
auto bucket = shard.get_bucket();
for (int i = 0; i < bucket.size(); i++) {
Expand All @@ -141,6 +187,7 @@ int32_t GraphTable::load(const std::string &path, const std::string &param) {
}
return 0;
}

GraphNode *GraphTable::find_node(uint64_t id) {
size_t shard_id = id % shard_num;
if (shard_id >= shard_end || shard_id < shard_start) {
Expand Down Expand Up @@ -264,3 +311,4 @@ int32_t GraphTable::initialize() {
}
}
};

8 changes: 7 additions & 1 deletion paddle/fluid/distributed/table/common_graph_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class GraphShard {
}
return -1;
}
std::list<GraphNode *>::iterator add_node(GraphNode *node);
std::list<GraphNode *>::iterator add_node(uint64_t id, std::string feature);
GraphNode *find_node(uint64_t id);
void add_neighboor(uint64_t id, GraphEdge *edge);
std::unordered_map<uint64_t, std::list<GraphNode *>::iterator>
Expand All @@ -74,7 +74,13 @@ class GraphTable : public SparseTable {
virtual int32_t random_sample(uint64_t node_id, int sampe_size, char *&buffer,
int &actual_size);
virtual int32_t initialize();

int32_t load(const std::string &path, const std::string &param);

int32_t load_edges(const std::string &path, bool reverse);

int32_t load_nodes(const std::string &path);

GraphNode *find_node(uint64_t id);

virtual int32_t pull_sparse(float *values, const uint64_t *keys, size_t num) {
Expand Down