Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【Paddle.Fleet】【Cherry-Pick】fix grad_clip & gaussian_random & dataset & profiler #31945

Merged
merged 4 commits into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions paddle/fluid/distributed/service/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ void Communicator::SendGlobalStep(const CommContext &ctx, int batches,
if (batches == 0) {
return;
}
platform::RecordEvent record_event("Communicator->SendGlobalStep");
auto &table_id = ctx.table_id;
size_t request_call_num = _worker_ptr->get_server_nums();

Expand Down Expand Up @@ -788,6 +789,7 @@ void SyncCommunicator::BarrierRecv() {

void GeoCommunicator::Send(const std::vector<std::string> &var_names,
const framework::Scope &scope) {
platform::RecordEvent record_event("GeoCommunicator->Send");
waiting_ = false;
auto before_send = GetCurrentUS();
auto table_name = var_names[0];
Expand Down Expand Up @@ -1024,6 +1026,7 @@ void GeoCommunicator::InitSparse(const std::string &var_name, int table_id) {

std::vector<int64_t> GeoCommunicator::MergeSparseIds(
const std::string &send_varname) {
platform::RecordEvent record_event("GeoCommunicator->MergeSparseIds");
size_t merge_num = 0, wait_times = 0;
std::unordered_set<int64_t> sparse_ids;
while (merge_num < static_cast<size_t>(max_merge_var_num_)) {
Expand Down
2 changes: 2 additions & 0 deletions paddle/fluid/distributed/table/common_dense_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ void CommonDenseTable::create_initializer(const std::string& attr,
initializers_[name] = new FillConstantInitializer(slices);
} else if (slices[0] == "uniform_random") {
initializers_[name] = new UniformInitializer(slices);
} else if (slices[0] == "truncated_gaussian_random") {
initializers_[name] = new TruncatedGaussianInitializer(slices);
} else {
PADDLE_THROW(
platform::errors::InvalidArgument("%s can not be supported", name));
Expand Down
37 changes: 37 additions & 0 deletions paddle/fluid/distributed/table/depends/initializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
#include <gflags/gflags.h>
#include <functional>
#include <memory>
#include <random>
#include <string>
#include <utility>
#include <vector>

#include "paddle/fluid/framework/generator.h"

#include "paddle/fluid/operators/truncated_gaussian_random_op.h"

namespace paddle {
namespace distributed {

Expand Down Expand Up @@ -108,6 +111,40 @@ class GaussianInitializer : public Initializer {
std::normal_distribution<float> dist_;
};

class TruncatedGaussianInitializer : public Initializer {
public:
explicit TruncatedGaussianInitializer(const std::vector<std::string> &attrs) {
name_ = attrs[0];
seed_ = static_cast<unsigned int>(std::stoi(attrs[1]));
mean_ = std::stof(attrs[2]);
std_ = std::stof(attrs[3]);

std::uniform_real_distribution<float> dist_(
std::numeric_limits<float>::min(), 1.0);
random_engine_ = framework::GetCPURandomEngine(seed_);
}

float GetValue() override {
paddle::operators::TruncatedNormal<float> truncated_normal(mean_, std_);
float value = truncated_normal(dist_(*random_engine_));
return value;
}

void GetValue(float *value, int numel) {
paddle::operators::TruncatedNormal<float> truncated_normal(mean_, std_);
for (int x = 0; x < numel; ++x) {
value[x] = truncated_normal(dist_(*random_engine_));
}
}

private:
float std_;
float mean_;

std::shared_ptr<std::mt19937_64> random_engine_;
std::uniform_real_distribution<float> dist_;
};

class FillConstantInitializer : public Initializer {
public:
explicit FillConstantInitializer(const std::vector<std::string> &attrs) {
Expand Down
3 changes: 3 additions & 0 deletions paddle/fluid/distributed/table/depends/large_scale_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class ValueBlock {
} else if (slices[0] == "uniform_random") {
initializers_.emplace_back(
std::make_shared<UniformInitializer>(slices));
} else if (slices[0] == "truncated_gaussian_random") {
initializers_.emplace_back(
std::make_shared<TruncatedGaussianInitializer>(slices));
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"%s can not be supported", attr));
Expand Down
2 changes: 0 additions & 2 deletions paddle/fluid/framework/parallel_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1121,8 +1121,6 @@ void ParallelExecutor::BCastParamsToDevices(
FetchResultType ParallelExecutor::Run(
const std::vector<std::string> &fetch_tensors, bool return_merged) {
VLOG(3) << "enter ParallelExecutor Run";
platform::RecordEvent parallel_executor_event(
"ParallelExecutor::Run", paddle::platform::EventRole::kSpecial);
#ifdef WITH_GPERFTOOLS
if (gProfileStarted) {
ProfilerFlush();
Expand Down
26 changes: 16 additions & 10 deletions python/paddle/distributed/fleet/data_generator/data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ def set_batch(self, batch_size):
'''
Set batch size of current DataGenerator
This is necessary only if a user wants to define generator_batch

Example:

.. code-block:: python

import paddle.distributed.fleet.data_generator as dg
class MyData(dg.DataGenerator):

Expand All @@ -52,7 +52,7 @@ def local_iter():
yield ("words", s[1].extend([s[1][0]]))
mydata = MyData()
mydata.set_batch(128)

'''
self.batch_size_ = batch_size

Expand All @@ -63,7 +63,7 @@ def run_from_memory(self):

Example:
.. code-block:: python

import paddle.distributed.fleet.data_generator as dg
class MyData(dg.DataGenerator):

Expand Down Expand Up @@ -100,9 +100,9 @@ def run_from_stdin(self):
generated.

Example:

.. code-block:: python

import paddle.distributed.fleet.data_generator as dg
class MyData(dg.DataGenerator):

Expand Down Expand Up @@ -161,7 +161,7 @@ def generate_sample(self, line):
The data format is list or tuple:
[(name, [feasign, ...]), ...]
or ((name, [feasign, ...]), ...)

For example:
[("words", [1926, 08, 17]), ("label", [1])]
or (("words", [1926, 08, 17]), ("label", [1]))
Expand All @@ -174,7 +174,7 @@ def generate_sample(self, line):
Example:

.. code-block:: python

import paddle.distributed.fleet.data_generator as dg
class MyData(dg.DataGenerator):

Expand Down Expand Up @@ -206,7 +206,7 @@ def generate_batch(self, samples):
Example:

.. code-block:: python

import paddle.distributed.fleet.data_generator as dg
class MyData(dg.DataGenerator):

Expand Down Expand Up @@ -259,6 +259,9 @@ def _gen_str(self, line):
Returns:
Return a string data that can be read directly by the MultiSlotDataFeed.
'''
if sys.version > '3' and isinstance(line, zip):
line = list(line)

if not isinstance(line, list) and not isinstance(line, tuple):
raise ValueError(
"the output of process() must be in list or tuple type"
Expand Down Expand Up @@ -289,7 +292,7 @@ def _gen_str(self, line):
>>> [ids_num id1 id2 ...] ...
The proto_info will be in this format:
>>> [(name, type), ...]

For example, if the input is like this:
>>> [("words", [1926, 08, 17]), ("label", [1])]
>>> or (("words", [1926, 08, 17]), ("label", [1]))
Expand All @@ -304,6 +307,9 @@ def _gen_str(self, line):
Returns:
Return a string data that can be read directly by the MultiSlotDataFeed.
'''
if sys.version > '3' and isinstance(line, zip):
line = list(line)

if not isinstance(line, list) and not isinstance(line, tuple):
raise ValueError(
"the output of process() must be in list or tuple type"
Expand Down
3 changes: 2 additions & 1 deletion python/paddle/distributed/fleet/runtime/the_one_ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ def parse_by_optimizer(self, grad_name, is_sparse, total_dims,
oop = None

for op in optimizer_ops:
if op.input("Param")[0] == param_name:
if ("Param" in op.input_names) and (
op.input("Param")[0] == param_name):
oop = op
break

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from paddle.fluid.transpiler.details.program_utils import delete_ops

OP_NAME_SCOPE = "op_namescope"
CLIP_OP_NAME_SCOPE = "@CLIP"
CLIP_OP_NAME_SCOPE = "gradient_clip"
STEP_COUNTER = "@PS_STEP_COUNTER@"
LEARNING_RATE_DECAY_COUNTER = "@LR_DECAY_COUNTER@"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode

OP_NAME_SCOPE = "op_namescope"
CLIP_OP_NAME_SCOPE = "@CLIP"
CLIP_OP_NAME_SCOPE = "gradient_clip"
STEP_COUNTER = "@PS_STEP_COUNTER@"
OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName()
RPC_OP_ROLE_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleAttrName()
Expand Down
40 changes: 40 additions & 0 deletions python/paddle/fluid/tests/unittests/test_data_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,32 @@ def data_iter():
return data_iter


class MyMultiSlotStringDataGenerator_zip(fleet.MultiSlotStringDataGenerator):
def generate_sample(self, line):
def data_iter():
for i in range(40):
if i == 1:
yield None
feature_name = ["words", "label"]
data = [["1", "2", "3", "4"], ["0"]]
yield zip(feature_name, data)

return data_iter


class MyMultiSlotDataGenerator_zip(fleet.MultiSlotDataGenerator):
def generate_sample(self, line):
def data_iter():
for i in range(40):
if i == 1:
yield None
feature_name = ["words", "label"]
data = [[1, 2, 3, 4], [0]]
yield zip(feature_name, data)

return data_iter


class TestMultiSlotDataGenerator(unittest.TestCase):
def test_MultiSlotDataGenerator_basic(self):
my_ms_dg = MyMultiSlotDataGenerator()
Expand Down Expand Up @@ -149,5 +175,19 @@ def test_MultiSlotDataGenerator_error(self):
my_ms_dg.run_from_memory()


class TestMultiSlotStringDataGeneratorZip(unittest.TestCase):
def test_MultiSlotStringDataGenerator_zip(self):
my_ms_dg = MyMultiSlotStringDataGenerator_zip()
my_ms_dg.set_batch(1)
my_ms_dg.run_from_memory()


class TestMultiSlotDataGeneratorZip(unittest.TestCase):
def test_MultiSlotDataGenerator_zip(self):
my_ms_dg = MyMultiSlotDataGenerator_zip()
my_ms_dg.set_batch(1)
my_ms_dg.run_from_memory()


if __name__ == '__main__':
unittest.main()
15 changes: 7 additions & 8 deletions python/paddle/fluid/tests/unittests/test_dist_fleet_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.fluid as fluid
import paddle
"""
high level unit test for distribute fleet.
"""
Expand Down Expand Up @@ -112,23 +113,21 @@ def build_strategy(self, args):

def build_optimizer(self, avg_cost, strategy):
use_grad_clip = int(os.getenv('GRAD_CLIP', 0))
grad_clip = None
if use_grad_clip:
# 1: clip_by_value; 2: clip_by_norm; 3:clip_by_global_norm
if use_grad_clip == 1:
fluid.clip.set_gradient_clip(
clip=fluid.clip.GradientClipByValue(2.0))
grad_clip = paddle.nn.ClipGradByValue(min=-5.0, max=5.0)
elif use_grad_clip == 2:
fluid.clip.set_gradient_clip(
clip=fluid.clip.GradientClipByNorm(2.0))
grad_clip = paddle.nn.ClipGradByNorm(2.0)
elif use_grad_clip == 3:
fluid.clip.set_gradient_clip(
clip=fluid.clip.GradientClipByGlobalNorm(2.0))
grad_clip = paddle.nn.ClipGradByGlobalNorm(2.0)

use_decay = int(os.getenv("USE_DECAY", "0"))
if use_decay:
scheduler = paddle.optimizer.lr.ExponentialDecay(
learning_rate=LEARNING_RATE, gamma=0.999, verbose=True)
optimizer = fluid.optimizer.SGD(scheduler)
optimizer = fluid.optimizer.SGD(scheduler, grad_clip=grad_clip)
"""
# learning rate decay method before 2.0
optimizer = fluid.optimizer.SGD(
Expand All @@ -139,7 +138,7 @@ def build_optimizer(self, avg_cost, strategy):
staircase=True))
"""
else:
optimizer = fluid.optimizer.SGD(LEARNING_RATE)
optimizer = fluid.optimizer.SGD(LEARNING_RATE, grad_clip=grad_clip)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)

Expand Down
Loading