Skip to content

Commit

Permalink
fix entry (#31079) (#31182)
Browse files Browse the repository at this point in the history
* fix entry

* fix distributed lookup table fuse case

* fix entry bug at first time

* move entry from paddle.fluid -> paddle.distributed

* fix ut with paddle.enable_static()

Co-authored-by: malin10 <malin10@baidu.com>

Co-authored-by: malin10 <malin10@baidu.com>
  • Loading branch information
seiriosPlus and 123malin authored Feb 25, 2021
1 parent fe00d32 commit 8177ece
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 43 deletions.
5 changes: 3 additions & 2 deletions paddle/fluid/distributed/ps.proto
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ message CommonAccessorParameter {
repeated string params = 4;
repeated uint32 dims = 5;
repeated string initializers = 6;
optional int32 trainer_num = 7;
optional bool sync = 8;
optional string entry = 7;
optional int32 trainer_num = 8;
optional bool sync = 9;
}

message TableAccessorSaveParameter {
Expand Down
7 changes: 4 additions & 3 deletions paddle/fluid/distributed/table/common_sparse_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,13 @@ int32_t CommonSparseTable::initialize() {
int32_t CommonSparseTable::initialize_recorder() { return 0; }

int32_t CommonSparseTable::initialize_value() {
auto common = _config.common();
shard_values_.reserve(task_pool_size_);

for (int x = 0; x < task_pool_size_; ++x) {
auto shard =
std::make_shared<ValueBlock>(value_names_, value_dims_, value_offsets_,
value_idx_, initializer_attrs_, "none");
auto shard = std::make_shared<ValueBlock>(
value_names_, value_dims_, value_offsets_, value_idx_,
initializer_attrs_, common.entry());

shard_values_.emplace_back(shard);
}
Expand Down
16 changes: 9 additions & 7 deletions paddle/fluid/distributed/table/depends/large_scale_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ inline bool count_entry(std::shared_ptr<VALUE> value, int threshold) {
}

inline bool probility_entry(std::shared_ptr<VALUE> value, float threshold) {
UniformInitializer uniform = UniformInitializer({"0", "0", "1"});
UniformInitializer uniform = UniformInitializer({"uniform", "0", "0", "1"});
return uniform.GetValue() >= threshold;
}

Expand All @@ -93,20 +93,20 @@ class ValueBlock {

// for Entry
{
auto slices = string::split_string<std::string>(entry_attr, "&");
auto slices = string::split_string<std::string>(entry_attr, ":");
if (slices[0] == "none") {
entry_func_ = std::bind(&count_entry, std::placeholders::_1, 0);
} else if (slices[0] == "count_filter") {
} else if (slices[0] == "count_filter_entry") {
int threshold = std::stoi(slices[1]);
entry_func_ = std::bind(&count_entry, std::placeholders::_1, threshold);
} else if (slices[0] == "probability") {
} else if (slices[0] == "probability_entry") {
float threshold = std::stof(slices[1]);
entry_func_ =
std::bind(&probility_entry, std::placeholders::_1, threshold);
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"Not supported Entry Type : %s, Only support [count_filter, "
"probability]",
"Not supported Entry Type : %s, Only support [CountFilterEntry, "
"ProbabilityEntry]",
slices[0]));
}
}
Expand Down Expand Up @@ -179,10 +179,12 @@ class ValueBlock {
initializers_[x]->GetValue(value->data_.data() + value_offsets_[x],
value_dims_[x]);
}
value->need_save_ = true;
}
} else {
value->need_save_ = true;
}

value->need_save_ = true;
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ void GetDownpourSparseTableProto(
common_proto->set_table_name("MergedDense");
common_proto->set_trainer_num(1);
common_proto->set_sync(false);
common_proto->set_entry("none");
common_proto->add_params("Param");
common_proto->add_dims(10);
common_proto->add_initializers("uniform_random&0&-1.0&1.0");
Expand Down
15 changes: 15 additions & 0 deletions python/paddle/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
from . import collective
from .collective import *

from .entry_attr import ProbabilityEntry
from .entry_attr import CountFilterEntry

# start multiprocess apis
__all__ = ["spawn"]

Expand All @@ -38,5 +41,17 @@
"QueueDataset",
]

# dataset reader
__all__ += [
"InMemoryDataset",
"QueueDataset",
]

# entry for embedding
__all__ += [
"ProbabilityEntry",
"CountFilterEntry",
]

# collective apis
__all__ += collective.__all__
139 changes: 139 additions & 0 deletions python/paddle/distributed/entry_attr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

__all__ = ['ProbabilityEntry', 'CountFilterEntry']


class EntryAttr(object):
"""
Entry Config for paddle.static.nn.sparse_embedding with Parameter Server.
Examples:
.. code-block:: python
import paddle
sparse_feature_dim = 1024
embedding_size = 64
entry = paddle.distributed.ProbabilityEntry(0.1)
input = paddle.static.data(name='ins', shape=[1], dtype='int64')
emb = paddle.static.nn.sparse_embedding((
input=input,
size=[sparse_feature_dim, embedding_size],
is_test=False,
entry=entry,
param_attr=paddle.ParamAttr(name="SparseFeatFactors",
initializer=paddle.nn.initializer.Uniform()))
"""

def __init__(self):
self._name = None

def _to_attr(self):
"""
Returns the attributes of this parameter.
Returns:
Parameter attributes(map): The attributes of this parameter.
"""
raise NotImplementedError("EntryAttr is base class")


class ProbabilityEntry(EntryAttr):
"""
Examples:
.. code-block:: python
import paddle
sparse_feature_dim = 1024
embedding_size = 64
entry = paddle.distributed.ProbabilityEntry(0.1)
input = paddle.static.data(name='ins', shape=[1], dtype='int64')
emb = paddle.static.nn.sparse_embedding((
input=input,
size=[sparse_feature_dim, embedding_size],
is_test=False,
entry=entry,
param_attr=paddle.ParamAttr(name="SparseFeatFactors",
initializer=paddle.nn.initializer.Uniform()))
"""

def __init__(self, probability):
super(EntryAttr, self).__init__()

if not isinstance(probability, float):
raise ValueError("probability must be a float in (0,1)")

if probability <= 0 or probability >= 1:
raise ValueError("probability must be a float in (0,1)")

self._name = "probability_entry"
self._probability = probability

def _to_attr(self):
return ":".join([self._name, str(self._probability)])


class CountFilterEntry(EntryAttr):
"""
Examples:
.. code-block:: python
import paddle
sparse_feature_dim = 1024
embedding_size = 64
entry = paddle.distributed.CountFilterEntry(10)
input = paddle.static.data(name='ins', shape=[1], dtype='int64')
emb = paddle.static.nn.sparse_embedding((
input=input,
size=[sparse_feature_dim, embedding_size],
is_test=False,
entry=entry,
param_attr=paddle.ParamAttr(name="SparseFeatFactors",
initializer=paddle.nn.initializer.Uniform()))
"""

def __init__(self, count_filter):
super(EntryAttr, self).__init__()

if not isinstance(count_filter, int):
raise ValueError(
"count_filter must be a valid integer greater than 0")

if count_filter < 0:
raise ValueError(
"count_filter must be a valid integer greater or equal than 0")

self._name = "count_filter_entry"
self._count_filter = count_filter

def _to_attr(self):
return ":".join([self._name, str(self._count_filter)])
48 changes: 34 additions & 14 deletions python/paddle/distributed/fleet/runtime/the_one_ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class CommonAccessor:
def __init__(self):
self.accessor_class = ""
self.table_name = None
self.entry = None
self.attrs = []
self.params = []
self.dims = []
Expand Down Expand Up @@ -93,6 +94,24 @@ def define_optimize_map(self):
self.opt_input_map = opt_input_map
self.opt_init_map = opt_init_map

def parse_entry(self, varname, o_main_program):
from paddle.fluid.incubate.fleet.parameter_server.ir.public import is_distributed_sparse_op
from paddle.fluid.incubate.fleet.parameter_server.ir.public import is_sparse_op

for op in o_main_program.global_block().ops:
if not is_distributed_sparse_op(op) and not is_sparse_op(op):
continue

param_name = op.input("W")[0]

if param_name == varname and op.type == "lookup_table":
self.entry = op.attr('entry')
break

if param_name == varname and op.type == "lookup_table_v2":
self.entry = "none"
break

def get_shard(self, total_dim, shard_num, pserver_id):
# remainder = total_dim % shard_num
blocksize = int(total_dim / shard_num + 1)
Expand Down Expand Up @@ -188,6 +207,8 @@ def to_string(self, indent):
if self.table_name:
attrs += "table_name: \"{}\" ".format(self.table_name)

if self.entry:
attrs += "entry: \"{}\" ".format(self.entry)
attrs += "trainer_num: {} ".format(self.trainer_num)
attrs += "sync: {} ".format(self.sync)

Expand Down Expand Up @@ -655,36 +676,31 @@ def _get_tables():
use_origin_program=True,
split_dense_table=self.role_maker.
_is_heter_parameter_server_mode)

tables = []
for idx, (name, ctx) in enumerate(send_ctx.items()):
if ctx.is_tensor_table() or len(ctx.origin_varnames()) < 1:
continue

table = Table()
table.id = ctx.table_id()

if ctx.is_tensor_table():
continue
common = CommonAccessor()

if ctx.is_sparse():
if len(ctx.origin_varnames()) < 1:
continue
table.type = "PS_SPARSE_TABLE"
table.shard_num = 256

if self.compiled_strategy.is_geo_mode():
table.table_class = "SparseGeoTable"
else:
table.table_class = "CommonSparseTable"
table.shard_num = 256
else:
if len(ctx.origin_varnames()) < 1:
continue
table.type = "PS_DENSE_TABLE"
table.table_class = "CommonDenseTable"
table.shard_num = 256

common = CommonAccessor()
if ctx.is_sparse():
common.table_name = self.compiled_strategy.grad_name_to_param_name[
ctx.origin_varnames()[0]]
else:
table.type = "PS_DENSE_TABLE"
table.table_class = "CommonDenseTable"
table.shard_num = 256
common.table_name = "MergedDense"

common.parse_by_optimizer(ctx.origin_varnames()[0],
Expand All @@ -693,6 +709,10 @@ def _get_tables():
else ctx.sections()[0],
self.compiled_strategy)

if ctx.is_sparse():
common.parse_entry(common.table_name,
self.origin_main_program)

if is_sync:
common.sync = "true"
else:
Expand Down
11 changes: 6 additions & 5 deletions python/paddle/fluid/contrib/layers/nn.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@

from paddle.fluid import core
from paddle.fluid.param_attr import ParamAttr
from paddle.fluid.entry_attr import ProbabilityEntry, CountFilterEntry

from paddle.fluid.framework import Variable, convert_np_dtype_to_dtype_
from paddle.fluid.layers import slice, reshape
Expand Down Expand Up @@ -993,11 +992,13 @@ def sparse_embedding(input,
entry_str = "none"

if entry is not None:
if not isinstance(entry, ProbabilityEntry) and not isinstance(
entry, CountFilterEntry):
if entry.__class__.__name__ not in [
"ProbabilityEntry", "CountFilterEntry"
]:
raise ValueError(
"entry must be instance in [ProbabilityEntry, CountFilterEntry]")
entry_str = entry.to_attr()
"entry must be instance in [paddle.distributed.ProbabilityEntry, paddle.distributed.CountFilterEntry]"
)
entry_str = entry._to_attr()

helper.append_op(
type='lookup_table',
Expand Down
Loading

0 comments on commit 8177ece

Please sign in to comment.