Skip to content

Commit

Permalink
[Cherry-Pick] logclean & embedding doc (#32009)
Browse files Browse the repository at this point in the history
* fix en doc for emb (#31980)

* fix en doc for emb, test=document_fix;
Change-Id: I4757e67caacd7189f068493ed45a7445f87ffb40

* LOG CLEAN (#31819)

* upgrade vlog

* train from dataset fetch optimize
  • Loading branch information
seiriosPlus authored Apr 2, 2021
1 parent e7542a4 commit 8140485
Show file tree
Hide file tree
Showing 19 changed files with 104 additions and 70 deletions.
2 changes: 1 addition & 1 deletion cmake/external/brpc.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ ExternalProject_Add(
${EXTERNAL_PROJECT_LOG_ARGS}
# TODO(gongwb): change to de newst repo when they changed.
GIT_REPOSITORY "https://github.com/wangjiawei04/brpc"
GIT_TAG "6d79e0b17f25107c35b705ea58d888083f59ff47"
GIT_TAG "e203afb794caf027da0f1e0776443e7d20c0c28e"
PREFIX ${BRPC_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
Expand Down
5 changes: 3 additions & 2 deletions paddle/fluid/distributed/service/brpc_ps_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ uint64_t BrpcPsServer::start(const std::string &ip, uint32_t port) {
std::unique_lock<std::mutex> lock(mutex_);

std::string ip_port = ip + ":" + std::to_string(port);
VLOG(3) << "server of rank " << _rank << " starts at " << ip_port;
VLOG(0) << "running server with rank id: " << _rank
<< ", endpoint: " << ip_port;
brpc::ServerOptions options;

int num_threads = std::thread::hardware_concurrency();
Expand Down Expand Up @@ -535,7 +536,7 @@ int32_t BrpcPsService::stop_server(Table *table,
auto *p_server = _server;
std::thread t_stop([p_server]() {
p_server->stop();
LOG(INFO) << "Server Stoped";
VLOG(3) << "Server Stoped";
});
t_stop.detach();
return 0;
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/service/brpc_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ std::string GetIntTypeEndpoint(const std::string& ip, const uint32_t& port) {

while (hp->h_addr_list[i] != NULL) {
int_ip = inet_ntoa(*(struct in_addr*)hp->h_addr_list[i]);
VLOG(0) << "Brpc Get host by name, host:" << ip << " -> ip: " << int_ip;
VLOG(3) << "Brpc Get host by name, host:" << ip << " -> ip: " << int_ip;
break;
}

Expand Down
10 changes: 4 additions & 6 deletions paddle/fluid/distributed/service/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct PSHost {

// |---ip---|---port---|--rank--|
// |-32bit--|--20bit---|--12bit-|
// for pslib

uint64_t serialize_to_uint64() {
uint64_t host_label = 0;
host_label = inet_addr(ip.c_str());
Expand Down Expand Up @@ -174,14 +174,12 @@ class PSEnvironment {
host.ip = ip;
host.port = port;
host.rank = rank;
if (sign_set.count(rank) > 0) {
LOG(WARNING) << "ps-host :" << host.ip << ":" << host.port
<< ", rank:" << host.rank
<< " already register, ignore register";
} else {

if (sign_set.count(rank) == 0) {
host_list.push_back(host);
sign_set.insert(rank);
}

return 0;
}

Expand Down
3 changes: 1 addition & 2 deletions paddle/fluid/distributed/service/ps_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ PSClient *PSClientFactory::create(const PSParameter &ps_config) {
}

TableManager::instance().initialize();
LOG(INFO) << "Create PSClient[" << service_param.client_class()
<< "] success";
VLOG(3) << "Create PSClient[" << service_param.client_class() << "] success";
return client;
}
} // namespace distributed
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/service/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ paddle::distributed::PSParameter load_from_prototxt(
}

void PSCore::init_gflag(const std::string& gflags) {
LOG(INFO) << "Init With Gflags:" << gflags;
VLOG(3) << "Init With Gflags:" << gflags;
std::vector<std::string> flags = paddle::string::split_string(gflags);
if (flags.size() < 1) {
flags.push_back("-max_body_size=314217728");
Expand Down
2 changes: 0 additions & 2 deletions paddle/fluid/distributed/table/depends/dense.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class DSGD : public DenseOptimizer {

auto blas = GetBlas<float>();
float lr = *(global_learning_rate_) * (*learning_rate);
VLOG(4) << "DSGD LearningRate: " << lr;
blas.VCOPY(update_numel, update_values + begin, grads.data());
blas.SCAL(update_numel, lr, grads.data());
blas.VSUB(update_numel, param + begin, grads.data(), param + begin);
Expand Down Expand Up @@ -157,7 +156,6 @@ class DAdam : public DenseOptimizer {
beta2_pow[0] = beta2_pow[0] * beta2;

float lr_ = *(global_learning_rate_)*learning_rate[0];
VLOG(4) << "DAdam LearningRate: " << lr_;
lr_ *= sqrt(1 - beta2_pow[0]) / (1 - beta1_pow[0]);

float* tmp_ = tmp.data();
Expand Down
2 changes: 0 additions & 2 deletions paddle/fluid/distributed/table/depends/sparse.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ class SSGD : public SparseOptimizer {
auto* value = block->Get(id);

float learning_rate = *(global_learning_rate_) * (value + lr_offset)[0];
VLOG(4) << "SSGD LearningRate: " << learning_rate;
float* param = value + param_offset;

std::vector<float> grads;
Expand Down Expand Up @@ -166,7 +165,6 @@ class SAdam : public SparseOptimizer {
if (!block->GetEntry(id)) continue;
auto* values = block->Get(id);
float lr_ = *(global_learning_rate_) * (values + lr_offset)[0];
VLOG(4) << "SAdam LearningRate: " << lr_;
float* param = values + param_offset;
float* moment1 = values + m1_offset;
float* moment2 = values + m2_offset;
Expand Down
28 changes: 12 additions & 16 deletions paddle/fluid/framework/details/build_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
AppendPassWithCheck(strategy_.fuse_bn_add_act_ops_, "fuse_bn_add_act_pass");
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) && !defined(__APPLE__)
AppendPassWithCheck(strategy_.enable_auto_fusion_, "fusion_group_pass");
#else
LOG(WARNING) << "fusion_group is not enabled for Windows/MacOS now, and "
"only effective when running with CUDA GPU.";
#endif
AppendPassWithCheck(strategy_.fuse_elewise_add_act_ops_,
"fuse_elewise_add_act_pass");
Expand Down Expand Up @@ -271,12 +268,11 @@ class ParallelExecutorPassBuilder : public ir::PassBuilder {
if (FLAGS_use_mkldnn) {
AppendPass(pass_name);
} else if (!strategy_.mkldnn_enabled_op_types_.empty()) {
LOG(WARNING)
<< "mkldnn_enabled_op_types specify the operator type list to "
"use MKLDNN acceleration. It is null in default, means "
"that all the operators supported by MKLDNN will be "
"accelerated. And it should not be set when "
"FLAGS_use_mkldnn=false.";
VLOG(1) << "mkldnn_enabled_op_types specify the operator type list to "
"use MKLDNN acceleration. It is null in default, means "
"that all the operators supported by MKLDNN will be "
"accelerated. And it should not be set when "
"FLAGS_use_mkldnn=false.";
}
#else
PADDLE_ENFORCE_NE(FLAGS_use_mkldnn, true,
Expand Down Expand Up @@ -409,26 +405,26 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph,
<< ", num_trainers:" << num_trainers_;
} else if (pass->Type() == "fuse_relu_depthwise_conv_pass") {
if (use_device != p::kCUDA) {
LOG(WARNING) << "fuse_relu_depthwise_conv_pass is only supported on "
"GPU, skipped.";
VLOG(1) << "fuse_relu_depthwise_conv_pass is only supported on "
"GPU, skipped.";
continue;
}
} else if (pass->Type() == "fusion_group_pass") {
pass->Set<bool>("use_gpu", new bool((use_device == p::kCUDA)));
if (use_device != p::kCUDA) {
LOG(WARNING) << "fusion_group_pass is only supported on GPU, skipped.";
VLOG(1) << "fusion_group_pass is only supported on GPU, skipped.";
continue;
}
} else if (pass->Type() == "fuse_bn_act_pass") {
if (use_device != p::kCUDA) {
LOG(WARNING) << "fuse_bn_act_pass is only supported on "
"GPU, skipped.";
VLOG(1) << "fuse_bn_act_pass is only supported on "
"GPU, skipped.";
continue;
}
} else if (pass->Type() == "fuse_bn_add_act_pass") {
if (use_device != p::kCUDA) {
LOG(WARNING) << "fuse_bn_add_act_pass is only supported on "
"GPU, skipped.";
VLOG(1) << "fuse_bn_add_act_pass is only supported on "
"GPU, skipped.";
continue;
}
} else if (pass->Type() == "mkldnn_placement_pass") {
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/framework/device_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ class DeviceWorker {
Scope* root_scope_ = nullptr;
Scope* thread_scope_;
paddle::platform::Place place_;
int64_t batch_num_;
int64_t batch_num_ = 0;
FetchConfig fetch_config_;
bool use_cvm_;
bool no_cvm_;
Expand Down
31 changes: 25 additions & 6 deletions paddle/fluid/framework/hogwild_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include <ctime>
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/device_worker.h"
#include "paddle/fluid/framework/device_worker_factory.h"
Expand Down Expand Up @@ -227,14 +228,32 @@ void HogwildWorker::PrintFetchVars() {
// call count
batch_num_++;
int batch_per_print = fetch_config_.print_period();
if (thread_id_ == 0) {
if (batch_num_ % batch_per_print == 0) {
int fetch_var_num = fetch_config_.fetch_var_names_size();
for (int i = 0; i < fetch_var_num; ++i) {
platform::PrintVar(thread_scope_, fetch_config_.fetch_var_names(i),
fetch_config_.fetch_var_str_format(i));
int fetch_var_num = fetch_config_.fetch_var_names_size();

if (fetch_var_num == 0) {
return;
}

if (thread_id_ == 0 && batch_num_ % batch_per_print == 0) {
time_t curtime;
time(&curtime);
char mbstr[80];
std::strftime(mbstr, sizeof(mbstr), "%Y-%m-%d %H:%M:%S",
std::localtime(&curtime));

std::stringstream ss;
ss << "time: [" << mbstr << "], ";
ss << "batch: [" << batch_num_ << "], ";

for (int i = 0; i < fetch_var_num; ++i) {
platform::PrintVar(thread_scope_, fetch_config_.fetch_var_names(i),
fetch_config_.fetch_var_str_format(i), &ss);
if (i < fetch_var_num - 1) {
ss << ", ";
}
}

std::cout << ss.str() << std::endl;
}
}

Expand Down
30 changes: 22 additions & 8 deletions paddle/fluid/platform/lodtensor_printer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,38 @@ namespace paddle {
namespace platform {

void PrintVar(framework::Scope* scope, const std::string& var_name,
const std::string& print_info) {
const std::string& print_info, std::stringstream* sstream) {
framework::Variable* var = scope->FindVar(var_name);
if (var == nullptr) {
VLOG(1) << "Variable Name " << var_name << " does not exist in your scope";
VLOG(0) << "Variable Name " << var_name << " does not exist in your scope";
return;
}
framework::LoDTensor* tensor = var->GetMutable<framework::LoDTensor>();
if (tensor == nullptr) {
VLOG(1) << "tensor of variable " << var_name
VLOG(0) << "tensor of variable " << var_name
<< " does not exist in your scope";
return;
}

std::ostringstream sstream;
sstream << print_info << "\t";
sstream << var_name << "\t";
sstream << *tensor << "\t";
std::cout << sstream.str() << std::endl;
*sstream << print_info << ": ";

#define PrintTensorCallback(cpp_type, proto_type) \
do { \
if (tensor->type() == proto_type) { \
*sstream << "["; \
auto* data = tensor->data<cpp_type>(); \
auto element_num = tensor->numel(); \
if (element_num > 0) { \
*sstream << data[0]; \
for (int j = 1; j < element_num; ++j) { \
*sstream << " " << data[j]; \
} \
} \
*sstream << "]"; \
} \
} while (0)

_ForEachDataType_(PrintTensorCallback);
}

} // end namespace platform
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/platform/lodtensor_printer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ class Scope;
namespace paddle {
namespace platform {
void PrintVar(framework::Scope* scope, const std::string& var_name,
const std::string& print_info);
const std::string& print_info, std::stringstream* out);
} // end namespace platform
} // end namespace paddle
3 changes: 2 additions & 1 deletion paddle/fluid/platform/lodtensor_printer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@

TEST(LodTensorPrinter, PrintVar) {
paddle::framework::Scope scope;
paddle::platform::PrintVar(&scope, "NotAVar", "We don't have var");
std::stringstream ss;
paddle::platform::PrintVar(&scope, "NotAVar", "We don't have var", &ss);
}
13 changes: 7 additions & 6 deletions python/paddle/distributed/fleet/base/fleet_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,12 +628,13 @@ def distributed_optimizer(self, optimizer, strategy=None):
self.user_defined_optimizer = optimizer

if strategy is not None:
warnings.warn(
"It is recommended to use DistributedStrategy "
"in fleet.init(). The strategy here is only for compatibility. "
"If the strategy in fleet.distributed_optimizer() is "
"not None, then it will overwrite the DistributedStrategy in fleet.init(), "
"which will take effect in distributed training.")
if self._is_collective:
warnings.warn(
"It is recommended to use DistributedStrategy "
"in fleet.init(). The strategy here is only for compatibility. "
"If the strategy in fleet.distributed_optimizer() is "
"not None, then it will overwrite the DistributedStrategy in fleet.init(), "
"which will take effect in distributed training.")
self._user_defined_strategy = copy.deepcopy(strategy)

self._context = {}
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/fleet/runtime/the_one_ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ def _init_server(self, dirname=None, var_names=None, **kwargs):
server = self._get_fleet_proto(is_server=True, is_sync=is_sync)
proto_txt = str(server)

debug = bool(os.getenv("PSERVER_DEBUG", "0"))
debug = bool(int(os.getenv("PSERVER_DEBUG", "0")))
if debug:
print("server: \n{}".format(proto_txt))

Expand Down
17 changes: 15 additions & 2 deletions python/paddle/fluid/tests/unittests/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from __future__ import print_function
import paddle
paddle.enable_static()

import paddle.fluid as fluid
import paddle.fluid.core as core
import numpy as np
Expand Down Expand Up @@ -52,6 +54,11 @@ def test_dataset_run_with_stat(self):
name=slot, shape=[1], dtype="int64", lod_level=1)
slots_vars.append(var)

embs = []
for x in slots_vars:
emb = fluid.layers.embedding(x, is_sparse=True, size=[100001, 4])
embs.append(emb)

dataset = paddle.distributed.InMemoryDataset()
dataset._set_batch_size(32)
dataset._set_thread(3)
Expand All @@ -74,11 +81,17 @@ def test_dataset_run_with_stat(self):
for i in range(self.epoch_num):
for data in data_loader():
exe.run(fluid.default_main_program(), feed=data)

else:
for i in range(self.epoch_num):
try:
exe.train_from_dataset(fluid.default_main_program(),
dataset)
exe.train_from_dataset(
fluid.default_main_program(),
dataset,
fetch_list=[embs[0], embs[1]],
fetch_info=["emb0", "emb1"],
print_period=1)

except Exception as e:
self.assertTrue(False)

Expand Down
4 changes: 1 addition & 3 deletions python/paddle/nn/functional/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ def embedding(x, weight, padding_idx=None, sparse=False, name=None):
sparse(bool): The flag indicating whether to use sparse update. This parameter only
affects the performance of the backwards gradient update. It is recommended to set
True because sparse update is faster. But some optimizers does not support sparse update,
such as :ref:`api_optimizer_AdadeltaOptimizer` , :ref:`api_optimizer_AdamaxOptimizer` ,
:ref:`api_optimizer_DecayedAdagradOptimizer` , :ref:`api_optimizer_FtrlOptimizer` ,
:ref:`api_optimizer_LambOptimizer` and :ref:`api_optimizer_LarsMomentumOptimizer` .
such as :ref:`api_paddle_optimizer_adadelta_Adadelta` , :ref:`api_paddle_optimizer_adamax_Adamax` , :ref:`api_paddle_optimizer_lamb_Lamb`.
In these cases, sparse must be False. Default: False.
padding_idx(int|long|None): padding_idx needs to be in the interval [-weight.shape[0], weight.shape[0]).
If :math:`padding\_idx < 0`, the :math:`padding\_idx` will automatically be converted
Expand Down
Loading

0 comments on commit 8140485

Please sign in to comment.