Skip to content

Commit

Permalink
add switch for shuffle (PaddlePaddle#316)
Browse files Browse the repository at this point in the history
Co-authored-by: root <root@yq01-inf-hic-k8s-a100-ab2-0009.yq01.baidu.com>
  • Loading branch information
huwei02 and root authored Jun 22, 2023
1 parent effbad5 commit a450ac1
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 36 deletions.
75 changes: 43 additions & 32 deletions paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu
Original file line number Diff line number Diff line change
Expand Up @@ -610,10 +610,45 @@ int GraphGpuWrapper::load_node_file(std::string ntype2files,
ntype2files, graph_data_local_path, part_num);
}

void GraphGpuWrapper::shuffle_start_nodes_for_training() {
size_t thread_num = device_id_mapping.size();

int shuffle_seed = 0;
std::random_device rd;
std::mt19937 rng{rd()};
std::uniform_int_distribution<int> dice_distribution(0, std::numeric_limits<int>::max());

for (size_t i = 0; i < d_node_iter_graph_all_type_keys_.size(); i++) {
for (size_t j = 0; j < d_node_iter_graph_all_type_keys_[i].size(); j++) {
auto stream = get_local_stream(j);
int gpuid = device_id_mapping[j];
auto place = platform::CUDAPlace(gpuid);
platform::CUDADeviceGuard guard(gpuid);
paddle::memory::ThrustAllocator<cudaStream_t> allocator(place, stream);
const auto &exec_policy = thrust::cuda::par(allocator).on(stream);
shuffle_seed = dice_distribution(rng);
thrust::random::default_random_engine engine(shuffle_seed);
uint64_t *cur_node_iter_ptr =
reinterpret_cast<uint64_t *>(d_node_iter_graph_all_type_keys_[i][j]->ptr());
VLOG(2) << "node type: " << i << ", card num: " << j << ", len: " << h_node_iter_graph_all_type_keys_len_[i][j];

thrust::shuffle(exec_policy,
thrust::device_pointer_cast(cur_node_iter_ptr),
thrust::device_pointer_cast(cur_node_iter_ptr) + h_node_iter_graph_all_type_keys_len_[i][j],
engine);
}
}
for (size_t i = 0; i < thread_num; i++) {
auto stream = get_local_stream(i);
cudaStreamSynchronize(stream);
}
}

int GraphGpuWrapper::set_node_iter_from_file(std::string ntype2files,
std::string node_types_file_path,
int part_num,
bool training) {
bool training,
bool shuffle) {
// 1. load cpu node
((GpuPsGraphTable *)graph_table)
->cpu_graph_table_->parse_node_and_load(
Expand Down Expand Up @@ -647,12 +682,16 @@ int GraphGpuWrapper::set_node_iter_from_file(std::string ntype2files,
} else {
init_type_keys(d_node_iter_graph_all_type_keys_,
h_node_iter_graph_all_type_keys_len_);

if (shuffle) {
shuffle_start_nodes_for_training();
}
}
}
return 0;
}

int GraphGpuWrapper::set_node_iter_from_graph(bool training) {
int GraphGpuWrapper::set_node_iter_from_graph(bool training, bool shuffle) {
// 1. init type keys
if (!type_keys_initialized_) {
init_type_keys(d_graph_all_type_total_keys_, h_graph_all_type_keys_len_);
Expand All @@ -670,36 +709,8 @@ int GraphGpuWrapper::set_node_iter_from_graph(bool training) {
d_node_iter_graph_all_type_keys_ = d_graph_all_type_total_keys_;
h_node_iter_graph_all_type_keys_len_ = h_graph_all_type_keys_len_;

size_t thread_num = device_id_mapping.size();

int shuffle_seed = 0;
std::random_device rd;
std::mt19937 rng{rd()};
std::uniform_int_distribution<int> dice_distribution(0, std::numeric_limits<int>::max());

for (size_t i = 0; i < d_node_iter_graph_all_type_keys_.size(); i++) {
for (size_t j = 0; j < d_node_iter_graph_all_type_keys_[i].size(); j++) {
auto stream = get_local_stream(j);
int gpuid = device_id_mapping[j];
auto place = platform::CUDAPlace(gpuid);
platform::CUDADeviceGuard guard(gpuid);
paddle::memory::ThrustAllocator<cudaStream_t> allocator(place, stream);
const auto &exec_policy = thrust::cuda::par(allocator).on(stream);
shuffle_seed = dice_distribution(rng);
thrust::random::default_random_engine engine(shuffle_seed);
uint64_t *cur_node_iter_ptr =
reinterpret_cast<uint64_t *>(d_node_iter_graph_all_type_keys_[i][j]->ptr());
VLOG(2) << "node type: " << i << ", card num: " << j << ", len: " << h_node_iter_graph_all_type_keys_len_[i][j];

thrust::shuffle(exec_policy,
thrust::device_pointer_cast(cur_node_iter_ptr),
thrust::device_pointer_cast(cur_node_iter_ptr) + h_node_iter_graph_all_type_keys_len_[i][j],
engine);
}
}
for (size_t i = 0; i < thread_num; i++) {
auto stream = get_local_stream(i);
cudaStreamSynchronize(stream);
if (shuffle) {
shuffle_start_nodes_for_training();
}
}
}
Expand Down
6 changes: 4 additions & 2 deletions paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ class GraphGpuWrapper {
int set_node_iter_from_file(std::string ntype2files,
std::string nodes_file_path,
int part_num,
bool training);
int set_node_iter_from_graph(bool training);
bool training,
bool shuffle);
int set_node_iter_from_graph(bool training, bool shuffle);
void shuffle_start_nodes_for_training();
int32_t load_next_partition(int idx);
int32_t get_partition_num(int idx);
void load_node_weight(int type_id, int idx, std::string path);
Expand Down
4 changes: 2 additions & 2 deletions paddle/fluid/pybind/fleet_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,10 @@ void BindGraphGpuWrapper(py::module* m) {
.def("release_graph_node", &GraphGpuWrapper::release_graph_node)
.def("finalize", &GraphGpuWrapper::finalize)
.def("set_node_iter_from_file",
py::overload_cast<std::string, std::string, int, bool>(
py::overload_cast<std::string, std::string, int, bool, bool>(
&GraphGpuWrapper::set_node_iter_from_file))
.def("set_node_iter_from_graph",
py::overload_cast<bool>(&GraphGpuWrapper::set_node_iter_from_graph));
py::overload_cast<bool, bool>(&GraphGpuWrapper::set_node_iter_from_graph));
}
#endif

Expand Down

0 comments on commit a450ac1

Please sign in to comment.