diff --git a/paddle/fluid/framework/data_feed.cu b/paddle/fluid/framework/data_feed.cu index 2a629b711bcb3..2ff8bf5937587 100644 --- a/paddle/fluid/framework/data_feed.cu +++ b/paddle/fluid/framework/data_feed.cu @@ -31,9 +31,9 @@ limitations under the License. */ #include "paddle/fluid/framework/fleet/heter_ps/hashtable.h" #include "paddle/fluid/framework/fleet/ps_gpu_wrapper.h" #include "paddle/fluid/framework/io/fs.h" +#include "paddle/fluid/platform/collective_helper.h" #include "paddle/phi/kernels/gpu/graph_reindex_funcs.h" #include "paddle/phi/kernels/graph_reindex_kernel.h" -#include "paddle/fluid/platform/collective_helper.h" DECLARE_bool(enable_opt_get_features); DECLARE_bool(graph_metapath_split_opt); @@ -217,25 +217,39 @@ int dedup_keys_and_fillidx(const uint64_t *d_keys, uint32_t *d_restore_idx, // inverse const paddle::platform::Place &place, cudaStream_t stream) { - auto d_sorted_keys = memory::AllocShared(place, total_nodes_num * sizeof(uint64_t), - phi::Stream(reinterpret_cast(stream))); - uint64_t* d_sorted_keys_ptr = reinterpret_cast(d_sorted_keys->ptr()); - auto d_sorted_idx = memory::AllocShared(place, total_nodes_num * sizeof(uint32_t), - phi::Stream(reinterpret_cast(stream))); - uint32_t* d_sorted_idx_ptr = reinterpret_cast(d_sorted_idx->ptr()); - auto d_offset = memory::AllocShared(place, total_nodes_num * sizeof(uint32_t), - phi::Stream(reinterpret_cast(stream))); - uint32_t* d_offset_ptr = reinterpret_cast(d_offset->ptr()); - auto d_merged_cnts = memory::AllocShared(place, total_nodes_num * sizeof(uint32_t), - phi::Stream(reinterpret_cast(stream))); - uint32_t* d_merged_cnts_ptr = reinterpret_cast(d_merged_cnts->ptr()); + auto d_sorted_keys = + memory::AllocShared(place, + total_nodes_num * sizeof(uint64_t), + phi::Stream(reinterpret_cast(stream))); + uint64_t *d_sorted_keys_ptr = + reinterpret_cast(d_sorted_keys->ptr()); + auto d_sorted_idx = + memory::AllocShared(place, + total_nodes_num * sizeof(uint32_t), + phi::Stream(reinterpret_cast(stream))); + uint32_t *d_sorted_idx_ptr = + reinterpret_cast(d_sorted_idx->ptr()); + auto d_offset = + memory::AllocShared(place, + total_nodes_num * sizeof(uint32_t), + phi::Stream(reinterpret_cast(stream))); + uint32_t *d_offset_ptr = reinterpret_cast(d_offset->ptr()); + auto d_merged_cnts = + memory::AllocShared(place, + total_nodes_num * sizeof(uint32_t), + phi::Stream(reinterpret_cast(stream))); + uint32_t *d_merged_cnts_ptr = + reinterpret_cast(d_merged_cnts->ptr()); std::shared_ptr d_buf = NULL; int merged_size = 0; // Final num - auto d_index_in = memory::Alloc(place, sizeof(uint32_t) * (total_nodes_num + 1), - phi::Stream(reinterpret_cast(stream))); - uint32_t* d_index_in_ptr = reinterpret_cast(d_index_in->ptr()); - int *d_merged_size = reinterpret_cast(&d_index_in_ptr[total_nodes_num]); + auto d_index_in = + memory::Alloc(place, + sizeof(uint32_t) * (total_nodes_num + 1), + phi::Stream(reinterpret_cast(stream))); + uint32_t *d_index_in_ptr = reinterpret_cast(d_index_in->ptr()); + int *d_merged_size = + reinterpret_cast(&d_index_in_ptr[total_nodes_num]); fill_idx<<>>( d_index_in_ptr, total_nodes_num); cub_sort_pairs(total_nodes_num, @@ -260,21 +274,28 @@ int dedup_keys_and_fillidx(const uint64_t *d_keys, cudaMemcpyDeviceToHost, stream)); CUDA_CHECK(cudaStreamSynchronize(stream)); - cub_exclusivesum(merged_size, d_merged_cnts_ptr, d_offset_ptr, stream, d_buf, place); + cub_exclusivesum( + merged_size, d_merged_cnts_ptr, d_offset_ptr, stream, d_buf, place); if (total_nodes_num < merged_size * 2) { kernel_fill_restore_idx<<>>( - merged_size, d_sorted_idx_ptr, d_offset_ptr, d_merged_cnts_ptr, d_restore_idx); + stream>>>(merged_size, + d_sorted_idx_ptr, + d_offset_ptr, + d_merged_cnts_ptr, + d_restore_idx); } else { // used mid search fill idx when high dedup rate kernel_fill_restore_idx_by_search<<>>( - total_nodes_num, d_sorted_idx_ptr, merged_size, d_offset_ptr, d_restore_idx); + stream>>>(total_nodes_num, + d_sorted_idx_ptr, + merged_size, + d_offset_ptr, + d_restore_idx); } CUDA_CHECK(cudaStreamSynchronize(stream)); return merged_size; @@ -318,25 +339,20 @@ __global__ void FillSlotValueOffsetKernel(const int ins_num, } } -struct RandInt -{ - int low, high; +struct RandInt { + int low, high; - __host__ __device__ - RandInt(int low, int high) : low(low), high(high) {}; + __host__ __device__ RandInt(int low, int high) : low(low), high(high){}; - __host__ __device__ - int operator()(const unsigned int n) const - { - thrust::default_random_engine rng; - thrust::uniform_int_distribution dist(low, high); - rng.discard(n); + __host__ __device__ int operator()(const unsigned int n) const { + thrust::default_random_engine rng; + thrust::uniform_int_distribution dist(low, high); + rng.discard(n); - return dist(rng); - } + return dist(rng); + } }; - void SlotRecordInMemoryDataFeed::FillSlotValueOffset( const int ins_num, const int used_slot_num, @@ -512,7 +528,8 @@ __global__ void GraphFillIdKernel(uint64_t *id_tensor, int last_row = row[idx] * col_num; int next_row = last_row + col_num; - if ((src + step) >= last_row && (src + step) < next_row && walk[src] != 0 && walk[src + step] != 0) { + if ((src + step) >= last_row && (src + step) < next_row && walk[src] != 0 && + walk[src + step] != 0) { for (int i = 0; i < excluded_train_pair_len; i += 2) { if (walk_ntype[src] == excluded_train_pair[i] && walk_ntype[src + step] == excluded_train_pair[i + 1]) { @@ -526,7 +543,8 @@ __global__ void GraphFillIdKernel(uint64_t *id_tensor, local_key[dst * 2] = walk[src]; local_key[dst * 2 + 1] = walk[src + step]; if (pair_label_conf != NULL) { - int pair_label_conf_index = walk_ntype[src] * node_type_num + walk_ntype[src + step]; + int pair_label_conf_index = + walk_ntype[src] * node_type_num + walk_ntype[src + step]; local_pair_label[dst] = pair_label_conf[pair_label_conf_index]; } } @@ -545,7 +563,8 @@ __global__ void GraphFillIdKernel(uint64_t *id_tensor, id_tensor[global_num * 2 + 2 * threadIdx.x + 1] = local_key[2 * threadIdx.x + 1]; if (pair_label_conf != NULL) { - pair_label_tensor[global_num + threadIdx.x] = local_pair_label[threadIdx.x]; + pair_label_tensor[global_num + threadIdx.x] = + local_pair_label[threadIdx.x]; } } } @@ -632,11 +651,17 @@ int GraphDataGenerator::FillIdShowClkTensor(int total_instance, train_stream_); if (conf_.enable_pair_label) { - pair_label_ptr_ = feed_vec_[3]->mutable_data({total_instance / 2}, this->place_); - int32_t *pair_label_buf = reinterpret_cast(d_pair_label_buf_->ptr()); - int32_t *pair_label_cursor = pair_label_buf + ins_buf_pair_len_ - total_instance / 2; - cudaMemcpyAsync(pair_label_ptr_, pair_label_cursor, sizeof(int32_t) * total_instance / 2, - cudaMemcpyDeviceToDevice, train_stream_); + pair_label_ptr_ = feed_vec_[3]->mutable_data( + {total_instance / 2}, this->place_); + int32_t *pair_label_buf = + reinterpret_cast(d_pair_label_buf_->ptr()); + int32_t *pair_label_cursor = + pair_label_buf + ins_buf_pair_len_ - total_instance / 2; + cudaMemcpyAsync(pair_label_ptr_, + pair_label_cursor, + sizeof(int32_t) * total_instance / 2, + cudaMemcpyDeviceToDevice, + train_stream_); } } else { // infer @@ -673,16 +698,24 @@ int GraphDataGenerator::FillGraphIdShowClkTensor(int uniq_instance, feed_vec_[2]->mutable_data({uniq_instance}, this->place_); int index_offset = 0; if (conf_.enable_pair_label) { - pair_label_ptr_ = feed_vec_[3]->mutable_data({total_instance / 2}, this->place_); - int32_t *pair_label_buf = reinterpret_cast(d_pair_label_buf_->ptr()); - int32_t *pair_label_cursor = pair_label_buf + ins_buf_pair_len_ - total_instance / 2; - cudaMemcpyAsync(pair_label_ptr_, pair_label_vec_[index]->ptr(), sizeof(int32_t) * total_instance / 2, - cudaMemcpyDeviceToDevice, train_stream_); + pair_label_ptr_ = + feed_vec_[3]->mutable_data({total_instance / 2}, this->place_); + int32_t *pair_label_buf = + reinterpret_cast(d_pair_label_buf_->ptr()); + int32_t *pair_label_cursor = + pair_label_buf + ins_buf_pair_len_ - total_instance / 2; + cudaMemcpyAsync(pair_label_ptr_, + pair_label_vec_[index]->ptr(), + sizeof(int32_t) * total_instance / 2, + cudaMemcpyDeviceToDevice, + train_stream_); } if (!conf_.return_weight) { - index_offset = id_offset_of_feed_vec_ + conf_.slot_num * 2 + 5 * samples_.size(); + index_offset = + id_offset_of_feed_vec_ + conf_.slot_num * 2 + 5 * samples_.size(); } else { - index_offset = id_offset_of_feed_vec_ + conf_.slot_num * 2 + 6 * samples_.size(); // add edge weights + index_offset = id_offset_of_feed_vec_ + conf_.slot_num * 2 + + 6 * samples_.size(); // add edge weights } index_tensor_ptr_ = feed_vec_[index_offset]->mutable_data( {total_instance}, this->place_); @@ -705,8 +738,9 @@ int GraphDataGenerator::FillGraphIdShowClkTensor(int uniq_instance, graph_edges_vec_[index]; int graph_edges_index = 0; for (int i = 0; i < len_samples; i++) { - int offset = conf_.return_weight ? (id_offset_of_feed_vec_ + 2 * conf_.slot_num + 6 * i) : - (id_offset_of_feed_vec_ + 2 * conf_.slot_num + 5 * i); + int offset = conf_.return_weight + ? (id_offset_of_feed_vec_ + 2 * conf_.slot_num + 6 * i) + : (id_offset_of_feed_vec_ + 2 * conf_.slot_num + 5 * i); std::vector edges_split_num = edges_split_num_for_graph[i]; int neighbor_len = edges_split_num[edge_to_id_len_ + 2]; @@ -811,16 +845,16 @@ int GraphDataGenerator::FillGraphSlotFeature( } } -int MakeInsPair(const std::shared_ptr& d_walk, // input - const std::shared_ptr& d_walk_ntype, // input - const GraphDataGeneratorConfig& conf, - const std::shared_ptr& d_random_row, - const std::shared_ptr& d_random_row_col_shift, - BufState& buf_state, - std::shared_ptr& d_ins_buf, // output - std::shared_ptr& d_pair_label_buf, // output - std::shared_ptr& d_pair_num_ptr, // output - int& ins_buf_pair_len, +int MakeInsPair(const std::shared_ptr &d_walk, // input + const std::shared_ptr &d_walk_ntype, // input + const GraphDataGeneratorConfig &conf, + const std::shared_ptr &d_random_row, + const std::shared_ptr &d_random_row_col_shift, + BufState &buf_state, + std::shared_ptr &d_ins_buf, // output + std::shared_ptr &d_pair_label_buf, // output + std::shared_ptr &d_pair_num_ptr, // output + int &ins_buf_pair_len, cudaStream_t stream) { uint64_t *walk = reinterpret_cast(d_walk->ptr()); uint8_t *walk_ntype = NULL; @@ -837,10 +871,12 @@ int MakeInsPair(const std::shared_ptr& d_walk, int32_t *pair_label_conf = NULL; if (conf.enable_pair_label) { pair_label_buf = reinterpret_cast(d_pair_label_buf->ptr()); - pair_label_conf = reinterpret_cast(conf.d_pair_label_conf->ptr()); + pair_label_conf = + reinterpret_cast(conf.d_pair_label_conf->ptr()); } int *random_row = reinterpret_cast(d_random_row->ptr()); - int *random_row_col_shift = reinterpret_cast(d_random_row_col_shift->ptr()); + int *random_row_col_shift = + reinterpret_cast(d_random_row_col_shift->ptr()); int *d_pair_num = reinterpret_cast(d_pair_num_ptr->ptr()); cudaMemsetAsync(d_pair_num, 0, sizeof(int), stream); int len = buf_state.len; @@ -884,17 +920,17 @@ int MakeInsPair(const std::shared_ptr& d_walk, return ins_buf_pair_len; } -int FillInsBuf(const std::shared_ptr& d_walk, // input - const std::shared_ptr& d_walk_ntype, // input - const GraphDataGeneratorConfig& conf, - const std::shared_ptr& d_random_row, - const std::shared_ptr& d_random_row_col_shift, - BufState& buf_state, - std::shared_ptr& d_ins_buf, // output - std::shared_ptr& d_pair_label_buf, // output - std::shared_ptr& d_pair_num, // output - int& ins_buf_pair_len, - cudaStream_t stream) { +int FillInsBuf(const std::shared_ptr &d_walk, // input + const std::shared_ptr &d_walk_ntype, // input + const GraphDataGeneratorConfig &conf, + const std::shared_ptr &d_random_row, + const std::shared_ptr &d_random_row_col_shift, + BufState &buf_state, + std::shared_ptr &d_ins_buf, // output + std::shared_ptr &d_pair_label_buf, // output + std::shared_ptr &d_pair_num, // output + int &ins_buf_pair_len, + cudaStream_t stream) { if (ins_buf_pair_len >= conf.batch_size) { return conf.batch_size; } @@ -906,9 +942,17 @@ int FillInsBuf(const std::shared_ptr& d_walk, // input if (total_instance == 0) { return -1; } - return MakeInsPair(d_walk, d_walk_ntype, conf, d_random_row, - d_random_row_col_shift, buf_state, d_ins_buf, d_pair_label_buf, - d_pair_num, ins_buf_pair_len, stream); + return MakeInsPair(d_walk, + d_walk_ntype, + conf, + d_random_row, + d_random_row_col_shift, + buf_state, + d_ins_buf, + d_pair_label_buf, + d_pair_num, + ins_buf_pair_len, + stream); } int GraphDataGenerator::GenerateBatch() { @@ -940,16 +984,25 @@ int GraphDataGenerator::GenerateBatch() { // train if (!conf_.sage_mode) { while (ins_buf_pair_len_ < conf_.batch_size) { - res = FillInsBuf(d_walk_, d_walk_ntype_, conf_, d_random_row_, - d_random_row_col_shift_, buf_state_, d_ins_buf_, d_pair_label_buf_, - d_pair_num_, ins_buf_pair_len_, train_stream_); + res = FillInsBuf(d_walk_, + d_walk_ntype_, + conf_, + d_random_row_, + d_random_row_col_shift_, + buf_state_, + d_ins_buf_, + d_pair_label_buf_, + d_pair_num_, + ins_buf_pair_len_, + train_stream_); if (res == -1) { if (ins_buf_pair_len_ == 0) { if (is_multi_node_) { pass_end_ = 1; if (total_row_ != 0) { buf_state_.Reset(total_row_); - VLOG(1) << "reset buf state to make batch num equal in multi node"; + VLOG(1) + << "reset buf state to make batch num equal in multi node"; } } else { return 0; @@ -959,8 +1012,8 @@ int GraphDataGenerator::GenerateBatch() { } } } - total_instance = - ins_buf_pair_len_ < conf_.batch_size ? ins_buf_pair_len_ : conf_.batch_size; + total_instance = ins_buf_pair_len_ < conf_.batch_size ? ins_buf_pair_len_ + : conf_.batch_size; total_instance *= 2; VLOG(2) << "total_instance: " << total_instance << ", ins_buf_pair_len = " << ins_buf_pair_len_; @@ -1200,21 +1253,22 @@ __global__ void UniqueFeature(uint64_t *d_in, } } // Fill sample_res to the stepth column of walk -void FillOneStep(uint64_t *d_start_ids, - int etype_id, - uint64_t *walk, - uint8_t *walk_ntype, - int len, - NeighborSampleResult &sample_res, - int cur_degree, - int step, - const GraphDataGeneratorConfig& conf, - std::shared_ptr& d_sample_keys_ptr, - std::shared_ptr& d_prefix_sum_ptr, - std::vector>& d_sampleidx2rows, - int& cur_sampleidx2row, - const paddle::platform::Place& place, - cudaStream_t stream) { +void FillOneStep( + uint64_t *d_start_ids, + int etype_id, + uint64_t *walk, + uint8_t *walk_ntype, + int len, + NeighborSampleResult &sample_res, + int cur_degree, + int step, + const GraphDataGeneratorConfig &conf, + std::shared_ptr &d_sample_keys_ptr, + std::shared_ptr &d_prefix_sum_ptr, + std::vector> &d_sampleidx2rows, + int &cur_sampleidx2row, + const paddle::platform::Place &place, + cudaStream_t stream) { auto gpu_graph_ptr = GraphGpuWrapper::GetInstance(); uint64_t node_id = gpu_graph_ptr->edge_to_node_map_[etype_id]; uint8_t edge_src_id = node_id >> 32; @@ -1223,7 +1277,8 @@ void FillOneStep(uint64_t *d_start_ids, int *d_actual_sample_size = sample_res.actual_sample_size; uint64_t *d_neighbors = sample_res.val; int *d_prefix_sum = reinterpret_cast(d_prefix_sum_ptr->ptr()); - uint64_t *d_sample_keys = reinterpret_cast(d_sample_keys_ptr->ptr()); + uint64_t *d_sample_keys = + reinterpret_cast(d_sample_keys_ptr->ptr()); int *d_sampleidx2row = reinterpret_cast(d_sampleidx2rows[cur_sampleidx2row]->ptr()); int *d_tmp_sampleidx2row = @@ -1235,10 +1290,10 @@ void FillOneStep(uint64_t *d_start_ids, d_prefix_sum + 1, len, stream)); - auto d_temp_storage = memory::Alloc( - place, - temp_storage_bytes, - phi::Stream(reinterpret_cast(stream))); + auto d_temp_storage = + memory::Alloc(place, + temp_storage_bytes, + phi::Stream(reinterpret_cast(stream))); CUDA_CHECK(cub::DeviceScan::InclusiveSum(d_temp_storage->ptr(), temp_storage_bytes, @@ -1250,35 +1305,31 @@ void FillOneStep(uint64_t *d_start_ids, cudaStreamSynchronize(stream); if (step == 1) { - GraphFillFirstStepKernel<<>>(d_prefix_sum, - d_tmp_sampleidx2row, - walk, - walk_ntype, - d_start_ids, - edge_src_id, - edge_dst_id, - len, - conf.walk_degree, - conf.walk_len, - d_actual_sample_size, - d_neighbors, - d_sample_keys); + GraphFillFirstStepKernel<<>>( + d_prefix_sum, + d_tmp_sampleidx2row, + walk, + walk_ntype, + d_start_ids, + edge_src_id, + edge_dst_id, + len, + conf.walk_degree, + conf.walk_len, + d_actual_sample_size, + d_neighbors, + d_sample_keys); } else { - GraphFillSampleKeysKernel<<>>(d_neighbors, - d_sample_keys, - d_prefix_sum, - d_sampleidx2row, - d_tmp_sampleidx2row, - d_actual_sample_size, - cur_degree, - len); + GraphFillSampleKeysKernel<<>>( + d_neighbors, + d_sample_keys, + d_prefix_sum, + d_sampleidx2row, + d_tmp_sampleidx2row, + d_actual_sample_size, + cur_degree, + len); GraphDoWalkKernel<<>>( d_neighbors, @@ -1294,7 +1345,8 @@ void FillOneStep(uint64_t *d_start_ids, edge_dst_id); } if (conf.debug_mode) { - size_t once_max_sample_keynum = conf.walk_degree * conf.once_sample_startid_len; + size_t once_max_sample_keynum = + conf.walk_degree * conf.once_sample_startid_len; int *h_prefix_sum = new int[len + 1]; int *h_actual_size = new int[len]; int *h_offset2idx = new int[once_max_sample_keynum]; @@ -1358,10 +1410,12 @@ int GraphDataGenerator::FillSlotFeature(uint64_t *d_walk, size_t key_num) { if (fea_num == 0) { int64_t default_lod = 1; for (int i = 0; i < conf_.slot_num; ++i) { - slot_lod_tensor_ptr_[i] = feed_vec_[id_offset_of_feed_vec_ + 2 * i + 1]->mutable_data( - {(long)key_num + 1}, this->place_); // NOLINT + slot_lod_tensor_ptr_[i] = + feed_vec_[id_offset_of_feed_vec_ + 2 * i + 1]->mutable_data( + {(long)key_num + 1}, this->place_); // NOLINT slot_tensor_ptr_[i] = - feed_vec_[id_offset_of_feed_vec_ + 2 * i]->mutable_data({1, 1}, this->place_); + feed_vec_[id_offset_of_feed_vec_ + 2 * i]->mutable_data( + {1, 1}, this->place_); CUDA_CHECK(cudaMemsetAsync( slot_tensor_ptr_[i], 0, sizeof(int64_t), train_stream_)); CUDA_CHECK(cudaMemsetAsync(slot_lod_tensor_ptr_[i], @@ -1384,9 +1438,10 @@ int GraphDataGenerator::FillSlotFeature(uint64_t *d_walk, size_t key_num) { uint8_t *d_slot_list_ptr = reinterpret_cast(d_slot_list->ptr()); std::shared_ptr d_each_ins_slot_num_inner_prefix = - memory::AllocShared(place_, (conf_.slot_num * key_num) * sizeof(uint32_t)); - std::shared_ptr d_each_ins_slot_num = - memory::AllocShared(place_, (conf_.slot_num * key_num) * sizeof(uint32_t)); + memory::AllocShared(place_, + (conf_.slot_num * key_num) * sizeof(uint32_t)); + std::shared_ptr d_each_ins_slot_num = memory::AllocShared( + place_, (conf_.slot_num * key_num) * sizeof(uint32_t)); uint32_t *d_each_ins_slot_num_ptr = reinterpret_cast(d_each_ins_slot_num->ptr()); uint32_t *d_each_ins_slot_num_inner_prefix_ptr = @@ -1426,13 +1481,16 @@ int GraphDataGenerator::FillSlotFeature(uint64_t *d_walk, size_t key_num) { sizeof(uint64_t *) * conf_.slot_num, cudaMemcpyHostToDevice, train_stream_)); - fill_slot_num<<>>( - d_each_ins_slot_num_ptr, d_ins_slot_num_vector_ptr, key_num, conf_.slot_num); + fill_slot_num<<>>(d_each_ins_slot_num_ptr, + d_ins_slot_num_vector_ptr, + key_num, + conf_.slot_num); CUDA_CHECK(cudaStreamSynchronize(train_stream_)); for (int i = 0; i < conf_.slot_num; ++i) { - slot_lod_tensor_ptr_[i] = feed_vec_[id_offset_of_feed_vec_ + 2 * i + 1]->mutable_data( - {(long)key_num + 1}, this->place_); // NOLINT + slot_lod_tensor_ptr_[i] = + feed_vec_[id_offset_of_feed_vec_ + 2 * i + 1]->mutable_data( + {(long)key_num + 1}, this->place_); // NOLINT } size_t temp_storage_bytes = 0; CUDA_CHECK(cub::DeviceScan::InclusiveSum(NULL, @@ -1464,8 +1522,9 @@ int GraphDataGenerator::FillSlotFeature(uint64_t *d_walk, size_t key_num) { } CUDA_CHECK(cudaStreamSynchronize(train_stream_)); for (int i = 0; i < conf_.slot_num; ++i) { - slot_tensor_ptr_[i] = feed_vec_[id_offset_of_feed_vec_ + 2 * i]->mutable_data( - {each_slot_fea_num[i], 1}, this->place_); + slot_tensor_ptr_[i] = + feed_vec_[id_offset_of_feed_vec_ + 2 * i]->mutable_data( + {each_slot_fea_num[i], 1}, this->place_); } int64_t default_lod = 1; for (int i = 0; i < conf_.slot_num; ++i) { @@ -1482,7 +1541,8 @@ int GraphDataGenerator::FillSlotFeature(uint64_t *d_walk, size_t key_num) { // trick for empty tensor if (each_slot_fea_num[i] == 0) { slot_tensor_ptr_[i] = - feed_vec_[id_offset_of_feed_vec_ + 2 * i]->mutable_data({1, 1}, this->place_); + feed_vec_[id_offset_of_feed_vec_ + 2 * i]->mutable_data( + {1, 1}, this->place_); CUDA_CHECK(cudaMemsetAsync( slot_tensor_ptr_[i], 0, sizeof(uint64_t), train_stream_)); CUDA_CHECK(cudaMemcpyAsync( @@ -1582,23 +1642,24 @@ int GraphDataGenerator::FillSlotFeature(uint64_t *d_walk, size_t key_num) { return 0; } -uint64_t CopyUniqueNodes(HashTable* table, - uint64_t copy_unique_len, - const paddle::platform::Place &place, - const std::shared_ptr& d_uniq_node_num_ptr, - std::vector& host_vec, // output - cudaStream_t stream); +uint64_t CopyUniqueNodes( + HashTable *table, + uint64_t copy_unique_len, + const paddle::platform::Place &place, + const std::shared_ptr &d_uniq_node_num_ptr, + std::vector &host_vec, // output + cudaStream_t stream); // 对于deepwalk模式,尝试插入table,0表示插入成功,1表示插入失败; // 对于sage模式,尝试插入table,table数量不够则清空table重新插入,返回值无影响。 -int InsertTable(const uint64_t *d_keys, // Input - uint64_t len, // Input - std::shared_ptr& d_uniq_node_num, - const GraphDataGeneratorConfig& conf, - uint64_t& copy_unique_len, +int InsertTable(const uint64_t *d_keys, // Input + uint64_t len, // Input + std::shared_ptr &d_uniq_node_num, + const GraphDataGeneratorConfig &conf, + uint64_t ©_unique_len, const paddle::platform::Place &place, - HashTable* table, - std::vector& host_vec, // Output + HashTable *table, + std::vector &host_vec, // Output cudaStream_t stream) { // Used under NOT WHOLE_HBM. uint64_t h_uniq_node_num = 0; @@ -1612,26 +1673,25 @@ int InsertTable(const uint64_t *d_keys, // Input cudaStreamSynchronize(stream); if (conf.gpu_graph_training) { - VLOG(2) << "table capacity: " << conf.train_table_cap << ", " << h_uniq_node_num - << " used"; + VLOG(2) << "table capacity: " << conf.train_table_cap << ", " + << h_uniq_node_num << " used"; if (h_uniq_node_num + len >= conf.train_table_cap) { if (!conf.sage_mode) { return 1; } else { // Copy unique nodes first. - uint64_t copy_len = CopyUniqueNodes(table, copy_unique_len, place, - d_uniq_node_num, host_vec, stream); + uint64_t copy_len = CopyUniqueNodes( + table, copy_unique_len, place, d_uniq_node_num, host_vec, stream); copy_unique_len += copy_len; table->clear(stream); - cudaMemsetAsync( - d_uniq_node_num_ptr, 0, sizeof(uint64_t), stream); + cudaMemsetAsync(d_uniq_node_num_ptr, 0, sizeof(uint64_t), stream); } } } else { // used only for sage_mode. if (h_uniq_node_num + len >= conf.infer_table_cap) { - uint64_t copy_len = CopyUniqueNodes(table, copy_unique_len, place, - d_uniq_node_num, host_vec, stream); + uint64_t copy_len = CopyUniqueNodes( + table, copy_unique_len, place, d_uniq_node_num, host_vec, stream); copy_unique_len += copy_len; table->clear(stream); cudaMemsetAsync(d_uniq_node_num_ptr, 0, sizeof(uint64_t), stream); @@ -1728,9 +1788,9 @@ GraphDataGenerator::SampleNeighbors(int64_t *uniq_nodes, all_sample_size * sizeof(float), phi::Stream(reinterpret_cast(sample_stream_))); final_sample_weight_ptr = - reinterpret_cast(final_sample_weight->ptr()); + reinterpret_cast(final_sample_weight->ptr()); all_sample_weight_ptr = - reinterpret_cast(sample_res.weight_mem->ptr()); + reinterpret_cast(sample_res.weight_mem->ptr()); } FillActualNeighbors<< FillReindexHashTable( input, num_input, len_hashtable, keys, key_index); // Get item index count. - auto item_count = memory::Alloc( - place, - (num_input + 1) * sizeof(int), - phi::Stream(reinterpret_cast(stream))); + auto item_count = + memory::Alloc(place, + (num_input + 1) * sizeof(int), + phi::Stream(reinterpret_cast(stream))); int *item_count_ptr = reinterpret_cast(item_count->ptr()); - cudaMemsetAsync( - item_count_ptr, 0, sizeof(int) * (num_input + 1), stream); + cudaMemsetAsync(item_count_ptr, 0, sizeof(int) * (num_input + 1), stream); phi::GetItemIndexCount <<>>( input, item_count_ptr, num_input, len_hashtable, keys, key_index); @@ -1791,10 +1850,10 @@ std::shared_ptr FillReindexHashTable( item_count_ptr, num_input + 1, stream); - auto d_temp_storage = memory::Alloc( - place, - temp_storage_bytes, - phi::Stream(reinterpret_cast(stream))); + auto d_temp_storage = + memory::Alloc(place, + temp_storage_bytes, + phi::Stream(reinterpret_cast(stream))); cub::DeviceScan::ExclusiveSum(d_temp_storage->ptr(), temp_storage_bytes, item_count_ptr, @@ -1810,24 +1869,23 @@ std::shared_ptr FillReindexHashTable( stream); cudaStreamSynchronize(stream); - auto unique_items = memory::AllocShared( - place, - total_unique_items * sizeof(int64_t), - phi::Stream(reinterpret_cast(stream))); + auto unique_items = + memory::AllocShared(place, + total_unique_items * sizeof(int64_t), + phi::Stream(reinterpret_cast(stream))); int64_t *unique_items_ptr = reinterpret_cast(unique_items->ptr()); *final_nodes_len = total_unique_items; // Get unique items phi::FillUniqueItems - <<>>( - input, - num_input, - len_hashtable, - unique_items_ptr, - item_count_ptr, - keys, - values, - key_index); + <<>>(input, + num_input, + len_hashtable, + unique_items_ptr, + item_count_ptr, + keys, + values, + key_index); cudaStreamSynchronize(stream); return unique_items; } @@ -1841,34 +1899,39 @@ std::shared_ptr GetReindexResult( int64_t neighbor_len, const paddle::platform::Place &place, cudaStream_t stream) { - auto d_reindex_table_key = memory::AllocShared(place, reindex_table_size * sizeof(int64_t), - phi::Stream(reinterpret_cast(stream))); - int64_t *d_reindex_table_key_ptr = reinterpret_cast(d_reindex_table_key->ptr()); - auto d_reindex_table_value = memory::AllocShared(place, reindex_table_size * sizeof(int), - phi::Stream(reinterpret_cast(stream))); - int *d_reindex_table_value_ptr = reinterpret_cast(d_reindex_table_value->ptr()); - auto d_reindex_table_index = memory::AllocShared(place, reindex_table_size * sizeof(int), - phi::Stream(reinterpret_cast(stream))); - int *d_reindex_table_index_ptr = reinterpret_cast(d_reindex_table_index->ptr()); + auto d_reindex_table_key = + memory::AllocShared(place, + reindex_table_size * sizeof(int64_t), + phi::Stream(reinterpret_cast(stream))); + int64_t *d_reindex_table_key_ptr = + reinterpret_cast(d_reindex_table_key->ptr()); + auto d_reindex_table_value = + memory::AllocShared(place, + reindex_table_size * sizeof(int), + phi::Stream(reinterpret_cast(stream))); + int *d_reindex_table_value_ptr = + reinterpret_cast(d_reindex_table_value->ptr()); + auto d_reindex_table_index = + memory::AllocShared(place, + reindex_table_size * sizeof(int), + phi::Stream(reinterpret_cast(stream))); + int *d_reindex_table_index_ptr = + reinterpret_cast(d_reindex_table_index->ptr()); // Fill table with -1. cudaMemsetAsync(d_reindex_table_key_ptr, -1, reindex_table_size * sizeof(int64_t), stream); - cudaMemsetAsync(d_reindex_table_value_ptr, - -1, - reindex_table_size * sizeof(int), - stream); - cudaMemsetAsync(d_reindex_table_index_ptr, - -1, - reindex_table_size * sizeof(int), - stream); + cudaMemsetAsync( + d_reindex_table_value_ptr, -1, reindex_table_size * sizeof(int), stream); + cudaMemsetAsync( + d_reindex_table_index_ptr, -1, reindex_table_size * sizeof(int), stream); - auto all_nodes = memory::AllocShared( - place, - (node_len + neighbor_len) * sizeof(int64_t), - phi::Stream(reinterpret_cast(stream))); + auto all_nodes = + memory::AllocShared(place, + (node_len + neighbor_len) * sizeof(int64_t), + phi::Stream(reinterpret_cast(stream))); int64_t *all_nodes_data = reinterpret_cast(all_nodes->ptr()); cudaMemcpyAsync(all_nodes_data, @@ -1917,13 +1980,13 @@ std::shared_ptr GraphDataGenerator::GenerateSampleGraph( phi::Stream(reinterpret_cast(sample_stream_))); int *inverse_ptr = reinterpret_cast(inverse->ptr()); int64_t *uniq_nodes_data = reinterpret_cast(uniq_nodes->ptr()); - int uniq_len = dedup_keys_and_fillidx( - node_ids, - len, - reinterpret_cast(uniq_nodes_data), - reinterpret_cast(inverse_ptr), - place_, - sample_stream_); + int uniq_len = + dedup_keys_and_fillidx(node_ids, + len, + reinterpret_cast(uniq_nodes_data), + reinterpret_cast(inverse_ptr), + place_, + sample_stream_); int len_samples = samples_.size(); VLOG(2) << "Sample Neighbors and Reindex"; @@ -2026,12 +2089,13 @@ std::shared_ptr GraphDataGenerator::GetNodeDegree( return node_degree; } -uint64_t CopyUniqueNodes(HashTable* table, - uint64_t copy_unique_len, - const paddle::platform::Place &place, - const std::shared_ptr& d_uniq_node_num_ptr, - std::vector& host_vec, // output - cudaStream_t stream) { +uint64_t CopyUniqueNodes( + HashTable *table, + uint64_t copy_unique_len, + const paddle::platform::Place &place, + const std::shared_ptr &d_uniq_node_num_ptr, + std::vector &host_vec, // output + cudaStream_t stream) { if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) { uint64_t h_uniq_node_num = 0; uint64_t *d_uniq_node_num = @@ -2096,9 +2160,17 @@ void GraphDataGenerator::DoWalkandSage() { while (ins_pair_flag) { int res = 0; while (ins_buf_pair_len_ < conf_.batch_size) { - res = FillInsBuf(d_walk_, d_walk_ntype_, conf_, d_random_row_, - d_random_row_col_shift_, buf_state_, d_ins_buf_, d_pair_label_buf_, - d_pair_num_, ins_buf_pair_len_, sample_stream_); + res = FillInsBuf(d_walk_, + d_walk_ntype_, + conf_, + d_random_row_, + d_random_row_col_shift_, + buf_state_, + d_ins_buf_, + d_pair_label_buf_, + d_pair_num_, + ins_buf_pair_len_, + sample_stream_); if (res == -1) { if (ins_buf_pair_len_ == 0) { ins_pair_flag = false; @@ -2111,8 +2183,9 @@ void GraphDataGenerator::DoWalkandSage() { break; } - total_instance = - ins_buf_pair_len_ < conf_.batch_size ? ins_buf_pair_len_ : conf_.batch_size; + total_instance = ins_buf_pair_len_ < conf_.batch_size + ? ins_buf_pair_len_ + : conf_.batch_size; total_instance *= 2; ins_buf = reinterpret_cast(d_ins_buf_->ptr()); @@ -2132,12 +2205,19 @@ void GraphDataGenerator::DoWalkandSage() { } if (conf_.enable_pair_label) { - auto pair_label = memory::AllocShared(place_, total_instance / 2 * sizeof(int), - phi::Stream(reinterpret_cast(sample_stream_))); - int32_t *pair_label_buf = reinterpret_cast(d_pair_label_buf_->ptr()); - int32_t *pair_label_cursor = pair_label_buf + ins_buf_pair_len_ - total_instance / 2; - cudaMemcpyAsync(pair_label->ptr(), pair_label_cursor, sizeof(int32_t) * total_instance / 2, - cudaMemcpyDeviceToDevice, sample_stream_); + auto pair_label = memory::AllocShared( + place_, + total_instance / 2 * sizeof(int), + phi::Stream(reinterpret_cast(sample_stream_))); + int32_t *pair_label_buf = + reinterpret_cast(d_pair_label_buf_->ptr()); + int32_t *pair_label_cursor = + pair_label_buf + ins_buf_pair_len_ - total_instance / 2; + cudaMemcpyAsync(pair_label->ptr(), + pair_label_cursor, + sizeof(int32_t) * total_instance / 2, + cudaMemcpyDeviceToDevice, + sample_stream_); pair_label_vec_.emplace_back(pair_label); } @@ -2145,8 +2225,15 @@ void GraphDataGenerator::DoWalkandSage() { if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) { uint64_t *final_sage_nodes_ptr = reinterpret_cast(final_sage_nodes->ptr()); - InsertTable(final_sage_nodes_ptr, uniq_instance, d_uniq_node_num_, conf_, - copy_unique_len_, place_, table_, host_vec_, sample_stream_); + InsertTable(final_sage_nodes_ptr, + uniq_instance, + d_uniq_node_num_, + conf_, + copy_unique_len_, + place_, + table_, + host_vec_, + sample_stream_); } final_sage_nodes_vec_.emplace_back(final_sage_nodes); inverse_vec_.emplace_back(inverse); @@ -2155,8 +2242,12 @@ void GraphDataGenerator::DoWalkandSage() { ins_buf_pair_len_ -= total_instance / 2; sage_batch_num_ += 1; } - uint64_t h_uniq_node_num = CopyUniqueNodes(table_, copy_unique_len_, place_, - d_uniq_node_num_, host_vec_, sample_stream_); + uint64_t h_uniq_node_num = CopyUniqueNodes(table_, + copy_unique_len_, + place_, + d_uniq_node_num_, + host_vec_, + sample_stream_); VLOG(1) << "train sage_batch_num: " << sage_batch_num_; } } @@ -2167,13 +2258,14 @@ void GraphDataGenerator::DoWalkandSage() { sage_batch_num_ = 0; if (infer_flag) { int total_instance = 0, uniq_instance = 0; - total_instance = (infer_node_start_ + conf_.batch_size <= infer_node_end_) - ? conf_.batch_size - : infer_node_end_ - infer_node_start_; + total_instance = + (infer_node_start_ + conf_.batch_size <= infer_node_end_) + ? conf_.batch_size + : infer_node_end_ - infer_node_start_; total_instance *= 2; while (total_instance != 0) { - uint64_t *d_type_keys = - reinterpret_cast(d_device_keys_[infer_cursor_]->ptr()); + uint64_t *d_type_keys = reinterpret_cast( + d_device_keys_[infer_cursor_]->ptr()); d_type_keys += infer_node_start_; infer_node_start_ += total_instance / 2; auto node_buf = memory::AllocShared( @@ -2205,8 +2297,15 @@ void GraphDataGenerator::DoWalkandSage() { if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) { uint64_t *final_sage_nodes_ptr = reinterpret_cast(final_sage_nodes->ptr()); - InsertTable(final_sage_nodes_ptr, uniq_instance, d_uniq_node_num_, conf_, - copy_unique_len_, place_, table_, host_vec_, sample_stream_); + InsertTable(final_sage_nodes_ptr, + uniq_instance, + d_uniq_node_num_, + conf_, + copy_unique_len_, + place_, + table_, + host_vec_, + sample_stream_); } final_sage_nodes_vec_.emplace_back(final_sage_nodes); inverse_vec_.emplace_back(inverse); @@ -2214,14 +2313,19 @@ void GraphDataGenerator::DoWalkandSage() { total_instance_vec_.emplace_back(total_instance); sage_batch_num_ += 1; - total_instance = (infer_node_start_ + conf_.batch_size <= infer_node_end_) - ? conf_.batch_size - : infer_node_end_ - infer_node_start_; + total_instance = + (infer_node_start_ + conf_.batch_size <= infer_node_end_) + ? conf_.batch_size + : infer_node_end_ - infer_node_start_; total_instance *= 2; } - uint64_t h_uniq_node_num = CopyUniqueNodes(table_, copy_unique_len_, place_, - d_uniq_node_num_, host_vec_, sample_stream_); + uint64_t h_uniq_node_num = CopyUniqueNodes(table_, + copy_unique_len_, + place_, + d_uniq_node_num_, + host_vec_, + sample_stream_); VLOG(1) << "infer sage_batch_num: " << sage_batch_num_; } } @@ -2248,7 +2352,7 @@ int GraphDataGenerator::FillInferBuf() { total_row_ = 0; if (infer_cursor < h_device_keys_len_.size()) { while (global_infer_node_type_start[infer_cursor] >= - h_device_keys_len_[infer_cursor]) { + h_device_keys_len_[infer_cursor]) { infer_cursor++; if (infer_cursor >= h_device_keys_len_.size()) { return 0; @@ -2314,7 +2418,8 @@ void GraphDataGenerator::ClearSampleState() { int GraphDataGenerator::FillWalkBuf() { platform::CUDADeviceGuard guard(conf_.gpuid); - size_t once_max_sample_keynum = conf_.walk_degree * conf_.once_sample_startid_len; + size_t once_max_sample_keynum = + conf_.walk_degree * conf_.once_sample_startid_len; //////// uint64_t *h_walk; uint64_t *h_sample_keys; @@ -2350,21 +2455,28 @@ int GraphDataGenerator::FillWalkBuf() { auto &type_to_index = gpu_graph_ptr->get_graph_type_to_index(); auto &cursor = gpu_graph_ptr->cursor_[conf_.thread_id]; size_t node_type_len = first_node_type.size(); - int remain_size = - buf_size_ - conf_.walk_degree * conf_.once_sample_startid_len * conf_.walk_len; + int remain_size = buf_size_ - conf_.walk_degree * + conf_.once_sample_startid_len * + conf_.walk_len; int total_samples = 0; // Definition of variables related to multi machine sampling - int switch_flag = EVENT_NOT_SWTICH; // Mark whether the local machine needs to switch metapath - int switch_command = EVENT_NOT_SWTICH; // Mark whether to switch metapath, after multi node sync - int sample_flag = EVENT_CONTINUE_SAMPLE; // Mark whether the local machine needs to continue sampling - int sample_command = EVENT_CONTINUE_SAMPLE; // Mark whether to continue sampling, after multi node sync - - // In the case of a single machine, for scenarios where the d_walk buffer is full, - // epoch sampling ends, and metapath switching occurs, direct decisions are made - // to end the current card sampling or perform metapath switching. - // However, in the case of multiple machines, further decisions can only be made - // after waiting for the multiple machines to synchronize and exchange information. + int switch_flag = EVENT_NOT_SWTICH; // Mark whether the local machine needs + // to switch metapath + int switch_command = EVENT_NOT_SWTICH; // Mark whether to switch metapath, + // after multi node sync + int sample_flag = EVENT_CONTINUE_SAMPLE; // Mark whether the local machine + // needs to continue sampling + int sample_command = + EVENT_CONTINUE_SAMPLE; // Mark whether to continue sampling, after multi + // node sync + + // In the case of a single machine, for scenarios where the d_walk buffer is + // full, epoch sampling ends, and metapath switching occurs, direct decisions + // are made to end the current card sampling or perform metapath switching. + // However, in the case of multiple machines, further decisions can only be + // made after waiting for the multiple machines to synchronize and exchange + // information. while (1) { if (i > remain_size) { // scenarios 1: d_walk is full @@ -2387,11 +2499,10 @@ int GraphDataGenerator::FillWalkBuf() { ? device_key_size - start : conf_.once_sample_startid_len; VLOG(2) << "choose node_type: " << node_type - << " cur_node_idx: " << cur_node_idx - << " meta_path.size: " << meta_path.size() - << " key_size: " << device_key_size - << " start: " << start - << " tmp_len: " << tmp_len; + << " cur_node_idx: " << cur_node_idx + << " meta_path.size: " << meta_path.size() + << " key_size: " << device_key_size << " start: " << start + << " tmp_len: " << tmp_len; if (tmp_len == 0) { finish_node_type.insert(node_type); if (finish_node_type.size() == node_type_start.size()) { @@ -2420,7 +2531,7 @@ int GraphDataGenerator::FillWalkBuf() { if (is_multi_node_) { switch_command = multi_node_sync_sample(switch_flag, ncclProd); VLOG(2) << "gpuid:" << conf_.gpuid << " multi node sample sync" - << " switch_flag:" << switch_flag << "," << switch_command; + << " switch_flag:" << switch_flag << "," << switch_command; if (switch_command) { cursor += 1; switch_flag = EVENT_NOT_SWTICH; @@ -2429,7 +2540,7 @@ int GraphDataGenerator::FillWalkBuf() { sample_command = multi_node_sync_sample(sample_flag, ncclMax); VLOG(2) << "gpuid:" << conf_.gpuid << " multi node sample sync" - << " sample_flag:" << sample_flag << "," << sample_command; + << " sample_flag:" << sample_flag << "," << sample_command; if (sample_command == EVENT_FINISH_EPOCH) { // end sampling current epoch cursor = 0; @@ -2437,7 +2548,7 @@ int GraphDataGenerator::FillWalkBuf() { VLOG(0) << "sample epoch finish!"; break; } else if (sample_command == EVENT_WALKBUF_FULL) { - // end sampling current pass + // end sampling current pass VLOG(0) << "sample pass finish!"; break; } else if (sample_command == EVENT_CONTINUE_SAMPLE) { @@ -2464,8 +2575,8 @@ int GraphDataGenerator::FillWalkBuf() { conf_.walk_degree, tmp_len, step); - auto sample_res = gpu_graph_ptr->graph_neighbor_sample_v3(q, false, true, - conf_.weighted_sample); + auto sample_res = gpu_graph_ptr->graph_neighbor_sample_v3( + q, false, true, conf_.weighted_sample); jump_rows_ = sample_res.total_sample_size; total_samples += sample_res.total_sample_size; @@ -2480,8 +2591,7 @@ int GraphDataGenerator::FillWalkBuf() { int flag = jump_rows_ > 0 ? 1 : 0; int command = multi_node_sync_sample(flag, ncclMax); VLOG(2) << "gpuid:" << conf_.gpuid << " multi node step sync" - << " step:" << step - << " step_sample:" << flag << "," << command; + << " step:" << step << " step_sample:" << flag << "," << command; if (command <= 0) { node_type_start[node_type] = tmp_len + start; cursor += 1; @@ -2491,18 +2601,32 @@ int GraphDataGenerator::FillWalkBuf() { if (!conf_.sage_mode) { if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) { - if (InsertTable(d_type_keys + start, tmp_len, d_uniq_node_num_, conf_, - copy_unique_len_, place_, table_, host_vec_, sample_stream_) != 0) { - VLOG(2) << "gpu:" << conf_.gpuid << " in step 0, insert key stage, table is full"; + if (InsertTable(d_type_keys + start, + tmp_len, + d_uniq_node_num_, + conf_, + copy_unique_len_, + place_, + table_, + host_vec_, + sample_stream_) != 0) { + VLOG(2) << "gpu:" << conf_.gpuid + << " in step 0, insert key stage, table is full"; update = false; assert(false); break; } if (InsertTable(sample_res.actual_val, sample_res.total_sample_size, - d_uniq_node_num_, conf_, - copy_unique_len_, place_, table_, host_vec_, sample_stream_) != 0) { - VLOG(2) << "gpu:" << conf_.gpuid << " in step 0, insert sample res, table is full"; + d_uniq_node_num_, + conf_, + copy_unique_len_, + place_, + table_, + host_vec_, + sample_stream_) != 0) { + VLOG(2) << "gpu:" << conf_.gpuid + << " in step 0, insert sample res, table is full"; update = false; assert(false); break; @@ -2548,8 +2672,8 @@ int GraphDataGenerator::FillWalkBuf() { int flag = sample_res.total_sample_size > 0 ? 1 : 0; int command = multi_node_sync_sample(flag, ncclMax); VLOG(2) << "gpuid:" << conf_.gpuid << " multi node step sync" - << " step:" << step - << " step_sample:" << flag << "," << command; + << " step:" << step << " step_sample:" << flag << "," + << command; if (command <= 0) { break; } @@ -2566,18 +2690,25 @@ int GraphDataGenerator::FillWalkBuf() { edge_type_id, (uint64_t)sample_keys_ptr, 1, - sample_res.total_sample_size, step); + sample_res.total_sample_size, + step); int sample_key_len = sample_res.total_sample_size; - sample_res = gpu_graph_ptr->graph_neighbor_sample_v3(q, false, true, - conf_.weighted_sample); + sample_res = gpu_graph_ptr->graph_neighbor_sample_v3( + q, false, true, conf_.weighted_sample); total_samples += sample_res.total_sample_size; if (!conf_.sage_mode) { if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) { if (InsertTable(sample_res.actual_val, sample_res.total_sample_size, - d_uniq_node_num_, conf_, - copy_unique_len_, place_, table_, host_vec_, sample_stream_) != 0) { - VLOG(0) << "gpu:" << conf_.gpuid << " in step: " << step << ", table is full"; + d_uniq_node_num_, + conf_, + copy_unique_len_, + place_, + table_, + host_vec_, + sample_stream_) != 0) { + VLOG(0) << "gpu:" << conf_.gpuid << " in step: " << step + << ", table is full"; update = false; assert(false); break; @@ -2622,18 +2753,18 @@ int GraphDataGenerator::FillWalkBuf() { } VLOG(2) << "sample " << sample_times << " finish, node_type=" << node_type - << ", path:[" << path[0] << "," << path[1] << "]" - << ", start:" << start - << ", len:" << tmp_len - << ", row:" << jump_rows_ - << ", total_step:" << step - << ", device_key_size:" << device_key_size; + << ", path:[" << path[0] << "," << path[1] << "]" + << ", start:" << start << ", len:" << tmp_len + << ", row:" << jump_rows_ << ", total_step:" << step + << ", device_key_size:" << device_key_size; } buf_state_.Reset(total_row_); int *d_random_row = reinterpret_cast(d_random_row_->ptr()); - int *d_random_row_col_shift = reinterpret_cast(d_random_row_col_shift_->ptr()); + int *d_random_row_col_shift = + reinterpret_cast(d_random_row_col_shift_->ptr()); - paddle::memory::ThrustAllocator allocator(place_, sample_stream_); + paddle::memory::ThrustAllocator allocator(place_, + sample_stream_); thrust::random::default_random_engine engine(shuffle_seed_); const auto &exec_policy = thrust::cuda::par(allocator).on(sample_stream_); thrust::counting_iterator cnt_iter(0); @@ -2670,11 +2801,16 @@ int GraphDataGenerator::FillWalkBuf() { } if (!conf_.sage_mode) { - uint64_t h_uniq_node_num = CopyUniqueNodes(table_, copy_unique_len_, place_, - d_uniq_node_num_, host_vec_, sample_stream_); + uint64_t h_uniq_node_num = CopyUniqueNodes(table_, + copy_unique_len_, + place_, + d_uniq_node_num_, + host_vec_, + sample_stream_); VLOG(1) << "sample_times:" << sample_times << ", d_walk_size:" << buf_size_ << ", d_walk_offset:" << i << ", total_rows:" << total_row_ - << ", total_samples:" << total_samples << ", h_uniq_node_num: " << h_uniq_node_num; + << ", total_samples:" << total_samples + << ", h_uniq_node_num: " << h_uniq_node_num; } else { VLOG(1) << "sample_times:" << sample_times << ", d_walk_size:" << buf_size_ << ", d_walk_offset:" << i << ", total_rows:" << total_row_ @@ -2685,7 +2821,8 @@ int GraphDataGenerator::FillWalkBuf() { int GraphDataGenerator::FillWalkBufMultiPath() { platform::CUDADeviceGuard guard(conf_.gpuid); - size_t once_max_sample_keynum = conf_.walk_degree * conf_.once_sample_startid_len; + size_t once_max_sample_keynum = + conf_.walk_degree * conf_.once_sample_startid_len; //////// uint64_t *h_walk; uint64_t *h_sample_keys; @@ -2726,8 +2863,9 @@ int GraphDataGenerator::FillWalkBufMultiPath() { auto it = gpu_graph_ptr->node_to_id.find(first_node); auto node_type = it->second; - int remain_size = - buf_size_ - conf_.walk_degree * conf_.once_sample_startid_len * conf_.walk_len; + int remain_size = buf_size_ - conf_.walk_degree * + conf_.once_sample_startid_len * + conf_.walk_len; int total_samples = 0; while (i <= remain_size) { @@ -2761,9 +2899,10 @@ int GraphDataGenerator::FillWalkBufMultiPath() { path[0], (uint64_t)(d_type_keys + start), conf_.walk_degree, - tmp_len, step); - auto sample_res = gpu_graph_ptr->graph_neighbor_sample_v3(q, false, true, - conf_.weighted_sample); + tmp_len, + step); + auto sample_res = gpu_graph_ptr->graph_neighbor_sample_v3( + q, false, true, conf_.weighted_sample); jump_rows_ = sample_res.total_sample_size; total_samples += sample_res.total_sample_size; @@ -2776,16 +2915,28 @@ int GraphDataGenerator::FillWalkBufMultiPath() { if (!conf_.sage_mode) { if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) { - if (InsertTable(d_type_keys + start, tmp_len, d_uniq_node_num_, conf_, - copy_unique_len_, place_, table_, host_vec_, sample_stream_) != 0) { + if (InsertTable(d_type_keys + start, + tmp_len, + d_uniq_node_num_, + conf_, + copy_unique_len_, + place_, + table_, + host_vec_, + sample_stream_) != 0) { VLOG(2) << "in step 0, insert key stage, table is full"; update = false; break; } if (InsertTable(sample_res.actual_val, sample_res.total_sample_size, - d_uniq_node_num_, conf_, - copy_unique_len_, place_, table_, host_vec_, sample_stream_) != 0) { + d_uniq_node_num_, + conf_, + copy_unique_len_, + place_, + table_, + host_vec_, + sample_stream_) != 0) { VLOG(2) << "in step 0, insert sample res stage, table is full"; update = false; break; @@ -2837,17 +2988,23 @@ int GraphDataGenerator::FillWalkBufMultiPath() { edge_type_id, (uint64_t)sample_keys_ptr, 1, - sample_res.total_sample_size, step); + sample_res.total_sample_size, + step); int sample_key_len = sample_res.total_sample_size; - sample_res = gpu_graph_ptr->graph_neighbor_sample_v3(q, false, true, - conf_.weighted_sample); + sample_res = gpu_graph_ptr->graph_neighbor_sample_v3( + q, false, true, conf_.weighted_sample); total_samples += sample_res.total_sample_size; if (!conf_.sage_mode) { if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) { if (InsertTable(sample_res.actual_val, sample_res.total_sample_size, - d_uniq_node_num_, conf_, - copy_unique_len_, place_, table_, host_vec_, sample_stream_) != 0) { + d_uniq_node_num_, + conf_, + copy_unique_len_, + place_, + table_, + host_vec_, + sample_stream_) != 0) { VLOG(2) << "in step: " << step << ", table is full"; update = false; break; @@ -2893,9 +3050,11 @@ int GraphDataGenerator::FillWalkBufMultiPath() { } buf_state_.Reset(total_row_); int *d_random_row = reinterpret_cast(d_random_row_->ptr()); - int *d_random_row_col_shift = reinterpret_cast(d_random_row_col_shift_->ptr()); + int *d_random_row_col_shift = + reinterpret_cast(d_random_row_col_shift_->ptr()); - paddle::memory::ThrustAllocator allocator(place_, sample_stream_); + paddle::memory::ThrustAllocator allocator(place_, + sample_stream_); thrust::random::default_random_engine engine(shuffle_seed_); const auto &exec_policy = thrust::cuda::par(allocator).on(sample_stream_); thrust::counting_iterator cnt_iter(0); @@ -2932,8 +3091,12 @@ int GraphDataGenerator::FillWalkBufMultiPath() { } if (!conf_.sage_mode) { - uint64_t h_uniq_node_num = CopyUniqueNodes(table_, copy_unique_len_, place_, - d_uniq_node_num_, host_vec_, sample_stream_); + uint64_t h_uniq_node_num = CopyUniqueNodes(table_, + copy_unique_len_, + place_, + d_uniq_node_num_, + host_vec_, + sample_stream_); VLOG(1) << "sample_times:" << sample_times << ", d_walk_size:" << buf_size_ << ", d_walk_offset:" << i << ", total_rows:" << total_row_ << ", h_uniq_node_num:" << h_uniq_node_num @@ -2962,15 +3125,17 @@ void GraphDataGenerator::AllocResource( platform::CUDADeviceGuard guard(conf_.gpuid); sample_stream_ = gpu_graph_ptr->get_local_stream(conf_.gpuid); train_stream_ = dynamic_cast( - platform::DeviceContextPool::Instance().Get(place_)) - ->stream(); + platform::DeviceContextPool::Instance().Get(place_)) + ->stream(); if (FLAGS_gpugraph_storage_mode != GpuGraphStorageMode::WHOLE_HBM) { if (conf_.gpu_graph_training) { table_ = new HashTable( - conf_.train_table_cap / FLAGS_gpugraph_hbm_table_load_factor, sample_stream_); + conf_.train_table_cap / FLAGS_gpugraph_hbm_table_load_factor, + sample_stream_); } else { table_ = new HashTable( - conf_.infer_table_cap / FLAGS_gpugraph_hbm_table_load_factor, sample_stream_); + conf_.infer_table_cap / FLAGS_gpugraph_hbm_table_load_factor, + sample_stream_); } } VLOG(1) << "AllocResource gpuid " << conf_.gpuid @@ -3006,8 +3171,10 @@ void GraphDataGenerator::AllocResource( gpu_graph_ptr->h_node_iter_graph_metapath_keys_len_[thread_id]; VLOG(2) << "h train metapaths key len: " << h_train_metapath_keys_len_; } else { - auto &d_graph_all_type_keys = gpu_graph_ptr->d_node_iter_graph_all_type_keys_; - auto &h_graph_all_type_keys_len = gpu_graph_ptr->h_node_iter_graph_all_type_keys_len_; + auto &d_graph_all_type_keys = + gpu_graph_ptr->d_node_iter_graph_all_type_keys_; + auto &h_graph_all_type_keys_len = + gpu_graph_ptr->h_node_iter_graph_all_type_keys_len_; for (size_t i = 0; i < d_graph_all_type_keys.size(); i++) { d_device_keys_.push_back(d_graph_all_type_keys[i][thread_id]); @@ -3016,7 +3183,8 @@ void GraphDataGenerator::AllocResource( VLOG(2) << "h_device_keys size: " << h_device_keys_len_.size(); } - size_t once_max_sample_keynum = conf_.walk_degree * conf_.once_sample_startid_len; + size_t once_max_sample_keynum = + conf_.walk_degree * conf_.once_sample_startid_len; d_prefix_sum_ = memory::AllocShared( place_, (once_max_sample_keynum + 1) * sizeof(int), @@ -3078,59 +3246,79 @@ void GraphDataGenerator::AllocResource( buf_state_.Init(conf_.batch_size, conf_.walk_len, &conf_.window_step); d_random_row_ = memory::AllocShared( place_, - (conf_.once_sample_startid_len * conf_.walk_degree * repeat_time_) * sizeof(int), + (conf_.once_sample_startid_len * conf_.walk_degree * repeat_time_) * + sizeof(int), phi::Stream(reinterpret_cast(sample_stream_))); d_random_row_col_shift_ = memory::AllocShared( place_, - (conf_.once_sample_startid_len * conf_.walk_degree * repeat_time_) * sizeof(int), + (conf_.once_sample_startid_len * conf_.walk_degree * repeat_time_) * + sizeof(int), phi::Stream(reinterpret_cast(sample_stream_))); shuffle_seed_ = 0; ins_buf_pair_len_ = 0; - d_ins_buf_ = memory::AllocShared(place_, (conf_.batch_size * 2 * 2) * sizeof(uint64_t), - phi::Stream(reinterpret_cast(sample_stream_))); - d_pair_num_ = memory::AllocShared(place_, sizeof(int), - phi::Stream(reinterpret_cast(sample_stream_))); - conf_.enable_pair_label = conf_.gpu_graph_training && gpu_graph_ptr->pair_label_conf_.size() > 0; + d_ins_buf_ = memory::AllocShared( + place_, + (conf_.batch_size * 2 * 2) * sizeof(uint64_t), + phi::Stream(reinterpret_cast(sample_stream_))); + d_pair_num_ = memory::AllocShared( + place_, + sizeof(int), + phi::Stream(reinterpret_cast(sample_stream_))); + conf_.enable_pair_label = + conf_.gpu_graph_training && gpu_graph_ptr->pair_label_conf_.size() > 0; if (conf_.enable_pair_label) { conf_.node_type_num = gpu_graph_ptr->id_to_feature.size(); - d_pair_label_buf_ = memory::AllocShared(place_, (conf_.batch_size * 2) * sizeof(int32_t), - phi::Stream(reinterpret_cast(sample_stream_))); - conf_.d_pair_label_conf = memory::AllocShared(place_, - conf_.node_type_num * conf_.node_type_num * sizeof(int32_t), - phi::Stream(reinterpret_cast(sample_stream_))); - CUDA_CHECK(cudaMemcpyAsync(conf_.d_pair_label_conf->ptr(), - gpu_graph_ptr->pair_label_conf_.data(), - conf_.node_type_num * conf_.node_type_num * sizeof(int32_t), - cudaMemcpyHostToDevice, sample_stream_)); + d_pair_label_buf_ = memory::AllocShared( + place_, + (conf_.batch_size * 2) * sizeof(int32_t), + phi::Stream(reinterpret_cast(sample_stream_))); + conf_.d_pair_label_conf = memory::AllocShared( + place_, + conf_.node_type_num * conf_.node_type_num * sizeof(int32_t), + phi::Stream(reinterpret_cast(sample_stream_))); + CUDA_CHECK(cudaMemcpyAsync( + conf_.d_pair_label_conf->ptr(), + gpu_graph_ptr->pair_label_conf_.data(), + conf_.node_type_num * conf_.node_type_num * sizeof(int32_t), + cudaMemcpyHostToDevice, + sample_stream_)); id_offset_of_feed_vec_ = 4; } else { id_offset_of_feed_vec_ = 3; } - conf_.need_walk_ntype = conf_.excluded_train_pair_len > 0 || conf_.enable_pair_label; + conf_.need_walk_ntype = + conf_.excluded_train_pair_len > 0 || conf_.enable_pair_label; if (conf_.need_walk_ntype) { - d_walk_ntype_ = memory::AllocShared(place_, buf_size_ * sizeof(uint8_t), + d_walk_ntype_ = memory::AllocShared( + place_, + buf_size_ * sizeof(uint8_t), phi::Stream(reinterpret_cast(sample_stream_))); - cudaMemsetAsync(d_walk_ntype_->ptr(), 0, buf_size_ * sizeof(uint8_t), sample_stream_); + cudaMemsetAsync( + d_walk_ntype_->ptr(), 0, buf_size_ * sizeof(uint8_t), sample_stream_); } if (!conf_.sage_mode) { conf_.slot_num = (feed_vec.size() - id_offset_of_feed_vec_) / 2; } else { - int offset = conf_.get_degree ? id_offset_of_feed_vec_ + 2 : id_offset_of_feed_vec_ + 1; + int offset = conf_.get_degree ? id_offset_of_feed_vec_ + 2 + : id_offset_of_feed_vec_ + 1; int sample_offset = conf_.return_weight ? 6 : 5; - conf_.slot_num = (feed_vec.size() - offset - samples_.size() * sample_offset) / 2; + conf_.slot_num = + (feed_vec.size() - offset - samples_.size() * sample_offset) / 2; } - d_slot_tensor_ptr_ = - memory::AllocShared(place_, conf_.slot_num * sizeof(uint64_t *), - phi::Stream(reinterpret_cast(sample_stream_))); - d_slot_lod_tensor_ptr_ = - memory::AllocShared(place_, conf_.slot_num * sizeof(uint64_t *), - phi::Stream(reinterpret_cast(sample_stream_))); + d_slot_tensor_ptr_ = memory::AllocShared( + place_, + conf_.slot_num * sizeof(uint64_t *), + phi::Stream(reinterpret_cast(sample_stream_))); + d_slot_lod_tensor_ptr_ = memory::AllocShared( + place_, + conf_.slot_num * sizeof(uint64_t *), + phi::Stream(reinterpret_cast(sample_stream_))); if (conf_.sage_mode) { conf_.reindex_table_size = conf_.batch_size * 2; @@ -3169,13 +3357,14 @@ void GraphDataGenerator::AllocResource( << infer_node_type_index_set_.size(); } - int *stat_ptr = multi_node_sync_stat_.mutable_data(place_, sizeof(int) * 4); + int *stat_ptr = + multi_node_sync_stat_.mutable_data(place_, sizeof(int) * 4); int flags[4] = {0, 1, 2, 0}; cudaMemcpyAsync(stat_ptr, // output - &flags, - sizeof(int) * 4, - cudaMemcpyHostToDevice, - sample_stream_); + &flags, + sizeof(int) * 4, + cudaMemcpyHostToDevice, + sample_stream_); cudaStreamSynchronize(sample_stream_); debug_gpu_memory_info(conf_.gpuid, "AllocResource end"); @@ -3185,10 +3374,10 @@ void GraphDataGenerator::AllocTrainResource(int thread_id) { if (conf_.slot_num > 0) { platform::CUDADeviceGuard guard(conf_.gpuid); if (!conf_.sage_mode) { - d_feature_size_list_buf_ = - memory::AllocShared(place_, (conf_.batch_size * 2) * sizeof(uint32_t)); - d_feature_size_prefixsum_buf_ = - memory::AllocShared(place_, (conf_.batch_size * 2 + 1) * sizeof(uint32_t)); + d_feature_size_list_buf_ = memory::AllocShared( + place_, (conf_.batch_size * 2) * sizeof(uint32_t)); + d_feature_size_prefixsum_buf_ = memory::AllocShared( + place_, (conf_.batch_size * 2 + 1) * sizeof(uint32_t)); } else { d_feature_size_list_buf_ = NULL; d_feature_size_prefixsum_buf_ = NULL; @@ -3211,8 +3400,8 @@ void GraphDataGenerator::SetConfig( conf_.batch_size = conf_.once_sample_startid_len; } repeat_time_ = graph_config.sample_times_one_chunk(); - buf_size_ = - conf_.once_sample_startid_len * conf_.walk_len * conf_.walk_degree * repeat_time_; + buf_size_ = conf_.once_sample_startid_len * conf_.walk_len * + conf_.walk_degree * repeat_time_; conf_.train_table_cap = graph_config.train_table_cap(); conf_.infer_table_cap = graph_config.infer_table_cap(); conf_.get_degree = graph_config.get_degree(); @@ -3233,8 +3422,10 @@ void GraphDataGenerator::SetConfig( std::string str_samples = graph_config.samples(); auto gpu_graph_ptr = GraphGpuWrapper::GetInstance(); debug_gpu_memory_info("init_conf start"); - gpu_graph_ptr->init_conf( - first_node_type, meta_path, graph_config.excluded_train_pair(), graph_config.pair_label()); + gpu_graph_ptr->init_conf(first_node_type, + meta_path, + graph_config.excluded_train_pair(), + graph_config.pair_label()); debug_gpu_memory_info("init_conf end"); auto edge_to_id = gpu_graph_ptr->edge_to_id; @@ -3289,7 +3480,8 @@ void GraphDataGenerator::DumpWalkPath(std::string dump_path, size_t dump_rate) { #endif } -int GraphDataGenerator::multi_node_sync_sample(int flag, const ncclRedOp_t& op) { +int GraphDataGenerator::multi_node_sync_sample(int flag, + const ncclRedOp_t &op) { if (flag < 0 && flag > 2) { VLOG(0) << "invalid flag! " << flag; assert(false); @@ -3302,13 +3494,8 @@ int GraphDataGenerator::multi_node_sync_sample(int flag, const ncclRedOp_t& op) auto comm = platform::NCCLCommContext::Instance().Get(0, place_.GetDeviceId()); auto stream = comm->stream(); - PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce(&stat_ptr[flag], - &stat_ptr[3], - 1, - ncclInt, - op, - comm->comm(), - stream)); + PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclAllReduce( + &stat_ptr[flag], &stat_ptr[3], 1, ncclInt, op, comm->comm(), stream)); PADDLE_ENFORCE_GPU_SUCCESS(cudaMemcpyAsync(&ret, // output &stat_ptr[3], sizeof(int), diff --git a/paddle/fluid/framework/data_feed.h b/paddle/fluid/framework/data_feed.h index b38428492796a..90da325c1acdc 100644 --- a/paddle/fluid/framework/data_feed.h +++ b/paddle/fluid/framework/data_feed.h @@ -893,11 +893,12 @@ struct BufState { }; /// Related behaviors and events during sampling -const int EVENT_FINISH_EPOCH = 0; // End of sampling single epoch -const int EVENT_CONTINUE_SAMPLE = 1; // Continue sampling -const int EVENT_WALKBUF_FULL = 2; // d_walk is full, end current pass sampling -const int EVENT_NOT_SWTICH = 0; // Continue sampling on the current metapath. -const int EVENT_SWTICH_METAPATH = 1; // Switch to the next metapath to perform sampling +const int EVENT_FINISH_EPOCH = 0; // End of sampling single epoch +const int EVENT_CONTINUE_SAMPLE = 1; // Continue sampling +const int EVENT_WALKBUF_FULL = 2; // d_walk is full, end current pass sampling +const int EVENT_NOT_SWTICH = 0; // Continue sampling on the current metapath. +const int EVENT_SWTICH_METAPATH = + 1; // Switch to the next metapath to perform sampling struct GraphDataGeneratorConfig { bool need_walk_ntype; @@ -940,8 +941,7 @@ class GraphDataGenerator { int FillInferBuf(); void DoWalkandSage(); int FillSlotFeature(uint64_t* d_walk); - int FillIdShowClkTensor(int total_instance, - bool gpu_graph_training); + int FillIdShowClkTensor(int total_instance, bool gpu_graph_training); int FillGraphIdShowClkTensor(int uniq_instance, int total_instance, int index); @@ -952,7 +952,7 @@ class GraphDataGenerator { int FillSlotFeature(uint64_t* d_walk, size_t key_num); int GetPathNum() { return total_row_; } void ResetPathNum() { total_row_ = 0; } - int GetGraphBatchsize() {return conf_.batch_size;}; + int GetGraphBatchsize() { return conf_.batch_size; }; void SetNewBatchsize(int batch_num) { if (!conf_.gpu_graph_training && !conf_.sage_mode) { conf_.batch_size = (total_row_ + batch_num - 1) / batch_num; @@ -1193,9 +1193,7 @@ class DataFeed { return gpu_graph_data_generator_.get_pass_end(); } - virtual void reset_pass_end() { - gpu_graph_data_generator_.reset_pass_end(); - } + virtual void reset_pass_end() { gpu_graph_data_generator_.reset_pass_end(); } virtual void ResetPathNum() { gpu_graph_data_generator_.ResetPathNum(); }