Skip to content

Commit

Permalink
Merge branch 'develop' into pubapi_paddle
Browse files Browse the repository at this point in the history
  • Loading branch information
XiaoguangHu01 authored Apr 27, 2021
2 parents 0c98ecf + 9930a58 commit 2825f66
Show file tree
Hide file tree
Showing 190 changed files with 4,761 additions and 2,199 deletions.
14 changes: 10 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ if(WIN32)
endforeach(flag_var)
endif()

# NOTE(zhouwei25): temporarily change MP to 1 for reducing CPU & memory utilization
set(PROCESS_MAX 1)
#math(EXPR PROCESS_MAX "${CPU_CORES} * 1 / 2")
math(EXPR PROCESS_MAX "${CPU_CORES} * 2 / 3")

# windows build turn off warnings, use parallel compiling.
foreach(flag_var
Expand All @@ -116,7 +114,10 @@ if(WIN32)
CMAKE_C_FLAGS CMAKE_C_FLAGS_DEBUG CMAKE_C_FLAGS_RELEASE
CMAKE_C_FLAGS_MINSIZEREL CMAKE_C_FLAGS_RELWITHDEBINFO)
string(REGEX REPLACE "/W[1-4]" " /W0 " ${flag_var} "${${flag_var}}")
set(${flag_var} "${${flag_var}} /MP${PROCESS_MAX}")
# NOTE(zhouwei25): GPU compile have too high memory utilization when parallel compiling
if(NOT WITH_GPU)
set(${flag_var} "${${flag_var}} /MP${PROCESS_MAX}")
endif()
endforeach(flag_var)
foreach(flag_var CMAKE_CXX_FLAGS CMAKE_C_FLAGS)
set(${flag_var} "${${flag_var}} /w")
Expand Down Expand Up @@ -352,6 +353,11 @@ if (WITH_MIPS)
add_definitions(-DPADDLE_WITH_MIPS)
endif()

if (WITH_HETERPS)
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -faligned-new")
endif()
endif()
set(PADDLE_PYTHON_BUILD_DIR "${CMAKE_CURRENT_BINARY_DIR}/python/build")

set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O3 -g -DNDEBUG")
Expand Down
23 changes: 13 additions & 10 deletions paddle/fluid/distributed/service/brpc_ps_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "paddle/fluid/distributed/service/brpc_ps_server.h"
#include <thread> // NOLINT
#include "butil/object_pool.h"
#include "paddle/fluid/distributed/table/depends/sparse_utils.h"
#include "paddle/fluid/distributed/table/table.h"
#include "paddle/fluid/framework/archive.h"
Expand Down Expand Up @@ -196,12 +197,13 @@ int32_t BrpcPsService::pull_dense(Table *table, const PsRequestMessage &request,
return 0;
}

std::vector<float> res_data;
res_data.resize(num * table->value_accesor()->select_size() / sizeof(float));
table->pull_dense(res_data.data(), num);
auto res_data = butil::get_object<std::vector<float>>();
res_data->resize(num * table->value_accesor()->select_size() / sizeof(float));
table->pull_dense(res_data->data(), num);

cntl->response_attachment().append((char *)res_data.data(),
res_data.size() * sizeof(float));
cntl->response_attachment().append((char *)(res_data->data()),
res_data->size() * sizeof(float));
butil::return_object(res_data);

return 0;
}
Expand Down Expand Up @@ -367,12 +369,13 @@ int32_t BrpcPsService::pull_sparse(Table *table,

value.DeserializeFromBytes(const_cast<void *>(data));

std::vector<float> res_data;
res_data.resize(num * dim);
table->pull_sparse(res_data.data(), value);
auto res_data = butil::get_object<std::vector<float>>();
res_data->resize(num * dim);
table->pull_sparse(res_data->data(), value);

cntl->response_attachment().append((char *)res_data.data(),
res_data.size() * sizeof(float));
cntl->response_attachment().append((char *)(res_data->data()),
res_data->size() * sizeof(float));
butil::return_object(res_data);
return 0;
}

Expand Down
55 changes: 30 additions & 25 deletions paddle/fluid/distributed/table/common_sparse_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,37 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,

int64_t SaveToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
const int mode) {
int64_t not_save_num = 0;
for (auto& value : block->values_) {
if (mode == SaveMode::delta && !value.second.need_save_) {
not_save_num++;
continue;
}

auto* vs = value.second.data_;
std::stringstream ss;
auto id = value.first;
ss << id << "\t" << value.second.count_ << "\t" << value.second.unseen_days_
<< "\t" << value.second.is_entry_ << "\t";

for (int i = 0; i < block->value_length_; i++) {
ss << vs[i];
ss << ",";
}
int64_t save_num = 0;
for (auto& table : block->values_) {
for (auto& value : table) {
if (mode == SaveMode::delta && !value.second->need_save_) {
continue;
}
save_num += 1;

auto* vs = value.second->data_.data();
std::stringstream ss;
auto id = value.first;
ss << id << "\t" << value.second->count_ << "\t"
<< value.second->unseen_days_ << "\t" << value.second->is_entry_
<< "\t";

for (int i = 0; i < block->value_length_; i++) {
ss << vs[i];
ss << ",";
}

ss << "\n";
ss << "\n";

os->write(ss.str().c_str(), sizeof(char) * ss.str().size());
os->write(ss.str().c_str(), sizeof(char) * ss.str().size());

if (mode == SaveMode::base || mode == SaveMode::delta) {
value.second.need_save_ = false;
if (mode == SaveMode::base || mode == SaveMode::delta) {
value.second->need_save_ = false;
}
}
}

return block->values_.size() - not_save_num;
return save_num;
}

int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,
Expand Down Expand Up @@ -183,7 +186,7 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,

block->Init(id, false);

auto value_instant = block->GetValue(id);
VALUE* value_instant = block->GetValue(id);
if (values.size() == 5) {
value_instant->count_ = std::stoi(values[1]);
value_instant->unseen_days_ = std::stoi(values[2]);
Expand Down Expand Up @@ -373,8 +376,10 @@ std::pair<int64_t, int64_t> CommonSparseTable::print_table_stat() {
int64_t feasign_size = 0;
int64_t mf_size = 0;

for (auto& value : shard_values_) {
feasign_size += value->values_.size();
for (auto& shard : shard_values_) {
for (auto& table : shard->values_) {
feasign_size += table.size();
}
}

return {feasign_size, mf_size};
Expand Down
Loading

1 comment on commit 2825f66

@paddle-bot-old
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Congratulation! Your pull request passed all required CI. You could ask reviewer(s) to approve and merge. 🎉

Please sign in to comment.