diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu index f94062fb244c7..f23534c1cd557 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.cu @@ -610,10 +610,45 @@ int GraphGpuWrapper::load_node_file(std::string ntype2files, ntype2files, graph_data_local_path, part_num); } +void GraphGpuWrapper::shuffle_start_nodes_for_training() { + size_t thread_num = device_id_mapping.size(); + + int shuffle_seed = 0; + std::random_device rd; + std::mt19937 rng{rd()}; + std::uniform_int_distribution dice_distribution(0, std::numeric_limits::max()); + + for (size_t i = 0; i < d_node_iter_graph_all_type_keys_.size(); i++) { + for (size_t j = 0; j < d_node_iter_graph_all_type_keys_[i].size(); j++) { + auto stream = get_local_stream(j); + int gpuid = device_id_mapping[j]; + auto place = platform::CUDAPlace(gpuid); + platform::CUDADeviceGuard guard(gpuid); + paddle::memory::ThrustAllocator allocator(place, stream); + const auto &exec_policy = thrust::cuda::par(allocator).on(stream); + shuffle_seed = dice_distribution(rng); + thrust::random::default_random_engine engine(shuffle_seed); + uint64_t *cur_node_iter_ptr = + reinterpret_cast(d_node_iter_graph_all_type_keys_[i][j]->ptr()); + VLOG(2) << "node type: " << i << ", card num: " << j << ", len: " << h_node_iter_graph_all_type_keys_len_[i][j]; + + thrust::shuffle(exec_policy, + thrust::device_pointer_cast(cur_node_iter_ptr), + thrust::device_pointer_cast(cur_node_iter_ptr) + h_node_iter_graph_all_type_keys_len_[i][j], + engine); + } + } + for (size_t i = 0; i < thread_num; i++) { + auto stream = get_local_stream(i); + cudaStreamSynchronize(stream); + } +} + int GraphGpuWrapper::set_node_iter_from_file(std::string ntype2files, std::string node_types_file_path, int part_num, - bool training) { + bool training, + bool shuffle) { // 1. load cpu node ((GpuPsGraphTable *)graph_table) ->cpu_graph_table_->parse_node_and_load( @@ -647,12 +682,16 @@ int GraphGpuWrapper::set_node_iter_from_file(std::string ntype2files, } else { init_type_keys(d_node_iter_graph_all_type_keys_, h_node_iter_graph_all_type_keys_len_); + + if (shuffle) { + shuffle_start_nodes_for_training(); + } } } return 0; } -int GraphGpuWrapper::set_node_iter_from_graph(bool training) { +int GraphGpuWrapper::set_node_iter_from_graph(bool training, bool shuffle) { // 1. init type keys if (!type_keys_initialized_) { init_type_keys(d_graph_all_type_total_keys_, h_graph_all_type_keys_len_); @@ -670,36 +709,8 @@ int GraphGpuWrapper::set_node_iter_from_graph(bool training) { d_node_iter_graph_all_type_keys_ = d_graph_all_type_total_keys_; h_node_iter_graph_all_type_keys_len_ = h_graph_all_type_keys_len_; - size_t thread_num = device_id_mapping.size(); - - int shuffle_seed = 0; - std::random_device rd; - std::mt19937 rng{rd()}; - std::uniform_int_distribution dice_distribution(0, std::numeric_limits::max()); - - for (size_t i = 0; i < d_node_iter_graph_all_type_keys_.size(); i++) { - for (size_t j = 0; j < d_node_iter_graph_all_type_keys_[i].size(); j++) { - auto stream = get_local_stream(j); - int gpuid = device_id_mapping[j]; - auto place = platform::CUDAPlace(gpuid); - platform::CUDADeviceGuard guard(gpuid); - paddle::memory::ThrustAllocator allocator(place, stream); - const auto &exec_policy = thrust::cuda::par(allocator).on(stream); - shuffle_seed = dice_distribution(rng); - thrust::random::default_random_engine engine(shuffle_seed); - uint64_t *cur_node_iter_ptr = - reinterpret_cast(d_node_iter_graph_all_type_keys_[i][j]->ptr()); - VLOG(2) << "node type: " << i << ", card num: " << j << ", len: " << h_node_iter_graph_all_type_keys_len_[i][j]; - - thrust::shuffle(exec_policy, - thrust::device_pointer_cast(cur_node_iter_ptr), - thrust::device_pointer_cast(cur_node_iter_ptr) + h_node_iter_graph_all_type_keys_len_[i][j], - engine); - } - } - for (size_t i = 0; i < thread_num; i++) { - auto stream = get_local_stream(i); - cudaStreamSynchronize(stream); + if (shuffle) { + shuffle_start_nodes_for_training(); } } } diff --git a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h index 664a93249a49c..b29979c45eca0 100644 --- a/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h +++ b/paddle/fluid/framework/fleet/heter_ps/graph_gpu_wrapper.h @@ -111,8 +111,10 @@ class GraphGpuWrapper { int set_node_iter_from_file(std::string ntype2files, std::string nodes_file_path, int part_num, - bool training); - int set_node_iter_from_graph(bool training); + bool training, + bool shuffle); + int set_node_iter_from_graph(bool training, bool shuffle); + void shuffle_start_nodes_for_training(); int32_t load_next_partition(int idx); int32_t get_partition_num(int idx); void load_node_weight(int type_id, int idx, std::string path); diff --git a/paddle/fluid/pybind/fleet_py.cc b/paddle/fluid/pybind/fleet_py.cc index 980349e145c41..7f7994243d840 100644 --- a/paddle/fluid/pybind/fleet_py.cc +++ b/paddle/fluid/pybind/fleet_py.cc @@ -428,10 +428,10 @@ void BindGraphGpuWrapper(py::module* m) { .def("release_graph_node", &GraphGpuWrapper::release_graph_node) .def("finalize", &GraphGpuWrapper::finalize) .def("set_node_iter_from_file", - py::overload_cast( + py::overload_cast( &GraphGpuWrapper::set_node_iter_from_file)) .def("set_node_iter_from_graph", - py::overload_cast(&GraphGpuWrapper::set_node_iter_from_graph)); + py::overload_cast(&GraphGpuWrapper::set_node_iter_from_graph)); } #endif