Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry-Pick] fix entry (#31079) #31182

Merged
merged 1 commit into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions paddle/fluid/distributed/ps.proto
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ message CommonAccessorParameter {
repeated string params = 4;
repeated uint32 dims = 5;
repeated string initializers = 6;
optional int32 trainer_num = 7;
optional bool sync = 8;
optional string entry = 7;
optional int32 trainer_num = 8;
optional bool sync = 9;
}

message TableAccessorSaveParameter {
Expand Down
7 changes: 4 additions & 3 deletions paddle/fluid/distributed/table/common_sparse_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,13 @@ int32_t CommonSparseTable::initialize() {
int32_t CommonSparseTable::initialize_recorder() { return 0; }

int32_t CommonSparseTable::initialize_value() {
auto common = _config.common();
shard_values_.reserve(task_pool_size_);

for (int x = 0; x < task_pool_size_; ++x) {
auto shard =
std::make_shared<ValueBlock>(value_names_, value_dims_, value_offsets_,
value_idx_, initializer_attrs_, "none");
auto shard = std::make_shared<ValueBlock>(
value_names_, value_dims_, value_offsets_, value_idx_,
initializer_attrs_, common.entry());

shard_values_.emplace_back(shard);
}
Expand Down
16 changes: 9 additions & 7 deletions paddle/fluid/distributed/table/depends/large_scale_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ inline bool count_entry(std::shared_ptr<VALUE> value, int threshold) {
}

inline bool probility_entry(std::shared_ptr<VALUE> value, float threshold) {
UniformInitializer uniform = UniformInitializer({"0", "0", "1"});
UniformInitializer uniform = UniformInitializer({"uniform", "0", "0", "1"});
return uniform.GetValue() >= threshold;
}

Expand All @@ -93,20 +93,20 @@ class ValueBlock {

// for Entry
{
auto slices = string::split_string<std::string>(entry_attr, "&");
auto slices = string::split_string<std::string>(entry_attr, ":");
if (slices[0] == "none") {
entry_func_ = std::bind(&count_entry, std::placeholders::_1, 0);
} else if (slices[0] == "count_filter") {
} else if (slices[0] == "count_filter_entry") {
int threshold = std::stoi(slices[1]);
entry_func_ = std::bind(&count_entry, std::placeholders::_1, threshold);
} else if (slices[0] == "probability") {
} else if (slices[0] == "probability_entry") {
float threshold = std::stof(slices[1]);
entry_func_ =
std::bind(&probility_entry, std::placeholders::_1, threshold);
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"Not supported Entry Type : %s, Only support [count_filter, "
"probability]",
"Not supported Entry Type : %s, Only support [CountFilterEntry, "
"ProbabilityEntry]",
slices[0]));
}
}
Expand Down Expand Up @@ -179,10 +179,12 @@ class ValueBlock {
initializers_[x]->GetValue(value->data_.data() + value_offsets_[x],
value_dims_[x]);
}
value->need_save_ = true;
}
} else {
value->need_save_ = true;
}

value->need_save_ = true;
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ void GetDownpourSparseTableProto(
common_proto->set_table_name("MergedDense");
common_proto->set_trainer_num(1);
common_proto->set_sync(false);
common_proto->set_entry("none");
common_proto->add_params("Param");
common_proto->add_dims(10);
common_proto->add_initializers("uniform_random&0&-1.0&1.0");
Expand Down
15 changes: 15 additions & 0 deletions python/paddle/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
from . import collective
from .collective import *

from .entry_attr import ProbabilityEntry
from .entry_attr import CountFilterEntry

# start multiprocess apis
__all__ = ["spawn"]

Expand All @@ -38,5 +41,17 @@
"QueueDataset",
]

# dataset reader
__all__ += [
"InMemoryDataset",
"QueueDataset",
]

# entry for embedding
__all__ += [
"ProbabilityEntry",
"CountFilterEntry",
]

# collective apis
__all__ += collective.__all__
139 changes: 139 additions & 0 deletions python/paddle/distributed/entry_attr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function

__all__ = ['ProbabilityEntry', 'CountFilterEntry']


class EntryAttr(object):
"""
Entry Config for paddle.static.nn.sparse_embedding with Parameter Server.

Examples:
.. code-block:: python

import paddle

sparse_feature_dim = 1024
embedding_size = 64

entry = paddle.distributed.ProbabilityEntry(0.1)

input = paddle.static.data(name='ins', shape=[1], dtype='int64')

emb = paddle.static.nn.sparse_embedding((
input=input,
size=[sparse_feature_dim, embedding_size],
is_test=False,
entry=entry,
param_attr=paddle.ParamAttr(name="SparseFeatFactors",
initializer=paddle.nn.initializer.Uniform()))

"""

def __init__(self):
self._name = None

def _to_attr(self):
"""
Returns the attributes of this parameter.

Returns:
Parameter attributes(map): The attributes of this parameter.
"""
raise NotImplementedError("EntryAttr is base class")


class ProbabilityEntry(EntryAttr):
"""
Examples:
.. code-block:: python

import paddle

sparse_feature_dim = 1024
embedding_size = 64

entry = paddle.distributed.ProbabilityEntry(0.1)

input = paddle.static.data(name='ins', shape=[1], dtype='int64')

emb = paddle.static.nn.sparse_embedding((
input=input,
size=[sparse_feature_dim, embedding_size],
is_test=False,
entry=entry,
param_attr=paddle.ParamAttr(name="SparseFeatFactors",
initializer=paddle.nn.initializer.Uniform()))


"""

def __init__(self, probability):
super(EntryAttr, self).__init__()

if not isinstance(probability, float):
raise ValueError("probability must be a float in (0,1)")

if probability <= 0 or probability >= 1:
raise ValueError("probability must be a float in (0,1)")

self._name = "probability_entry"
self._probability = probability

def _to_attr(self):
return ":".join([self._name, str(self._probability)])


class CountFilterEntry(EntryAttr):
"""
Examples:
.. code-block:: python

import paddle

sparse_feature_dim = 1024
embedding_size = 64

entry = paddle.distributed.CountFilterEntry(10)

input = paddle.static.data(name='ins', shape=[1], dtype='int64')

emb = paddle.static.nn.sparse_embedding((
input=input,
size=[sparse_feature_dim, embedding_size],
is_test=False,
entry=entry,
param_attr=paddle.ParamAttr(name="SparseFeatFactors",
initializer=paddle.nn.initializer.Uniform()))

"""

def __init__(self, count_filter):
super(EntryAttr, self).__init__()

if not isinstance(count_filter, int):
raise ValueError(
"count_filter must be a valid integer greater than 0")

if count_filter < 0:
raise ValueError(
"count_filter must be a valid integer greater or equal than 0")

self._name = "count_filter_entry"
self._count_filter = count_filter

def _to_attr(self):
return ":".join([self._name, str(self._count_filter)])
48 changes: 34 additions & 14 deletions python/paddle/distributed/fleet/runtime/the_one_ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class CommonAccessor:
def __init__(self):
self.accessor_class = ""
self.table_name = None
self.entry = None
self.attrs = []
self.params = []
self.dims = []
Expand Down Expand Up @@ -93,6 +94,24 @@ def define_optimize_map(self):
self.opt_input_map = opt_input_map
self.opt_init_map = opt_init_map

def parse_entry(self, varname, o_main_program):
from paddle.fluid.incubate.fleet.parameter_server.ir.public import is_distributed_sparse_op
from paddle.fluid.incubate.fleet.parameter_server.ir.public import is_sparse_op

for op in o_main_program.global_block().ops:
if not is_distributed_sparse_op(op) and not is_sparse_op(op):
continue

param_name = op.input("W")[0]

if param_name == varname and op.type == "lookup_table":
self.entry = op.attr('entry')
break

if param_name == varname and op.type == "lookup_table_v2":
self.entry = "none"
break

def get_shard(self, total_dim, shard_num, pserver_id):
# remainder = total_dim % shard_num
blocksize = int(total_dim / shard_num + 1)
Expand Down Expand Up @@ -188,6 +207,8 @@ def to_string(self, indent):
if self.table_name:
attrs += "table_name: \"{}\" ".format(self.table_name)

if self.entry:
attrs += "entry: \"{}\" ".format(self.entry)
attrs += "trainer_num: {} ".format(self.trainer_num)
attrs += "sync: {} ".format(self.sync)

Expand Down Expand Up @@ -655,36 +676,31 @@ def _get_tables():
use_origin_program=True,
split_dense_table=self.role_maker.
_is_heter_parameter_server_mode)

tables = []
for idx, (name, ctx) in enumerate(send_ctx.items()):
if ctx.is_tensor_table() or len(ctx.origin_varnames()) < 1:
continue

table = Table()
table.id = ctx.table_id()

if ctx.is_tensor_table():
continue
common = CommonAccessor()

if ctx.is_sparse():
if len(ctx.origin_varnames()) < 1:
continue
table.type = "PS_SPARSE_TABLE"
table.shard_num = 256

if self.compiled_strategy.is_geo_mode():
table.table_class = "SparseGeoTable"
else:
table.table_class = "CommonSparseTable"
table.shard_num = 256
else:
if len(ctx.origin_varnames()) < 1:
continue
table.type = "PS_DENSE_TABLE"
table.table_class = "CommonDenseTable"
table.shard_num = 256

common = CommonAccessor()
if ctx.is_sparse():
common.table_name = self.compiled_strategy.grad_name_to_param_name[
ctx.origin_varnames()[0]]
else:
table.type = "PS_DENSE_TABLE"
table.table_class = "CommonDenseTable"
table.shard_num = 256
common.table_name = "MergedDense"

common.parse_by_optimizer(ctx.origin_varnames()[0],
Expand All @@ -693,6 +709,10 @@ def _get_tables():
else ctx.sections()[0],
self.compiled_strategy)

if ctx.is_sparse():
common.parse_entry(common.table_name,
self.origin_main_program)

if is_sync:
common.sync = "true"
else:
Expand Down
11 changes: 6 additions & 5 deletions python/paddle/fluid/contrib/layers/nn.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@

from paddle.fluid import core
from paddle.fluid.param_attr import ParamAttr
from paddle.fluid.entry_attr import ProbabilityEntry, CountFilterEntry

from paddle.fluid.framework import Variable, convert_np_dtype_to_dtype_
from paddle.fluid.layers import slice, reshape
Expand Down Expand Up @@ -993,11 +992,13 @@ def sparse_embedding(input,
entry_str = "none"

if entry is not None:
if not isinstance(entry, ProbabilityEntry) and not isinstance(
entry, CountFilterEntry):
if entry.__class__.__name__ not in [
"ProbabilityEntry", "CountFilterEntry"
]:
raise ValueError(
"entry must be instance in [ProbabilityEntry, CountFilterEntry]")
entry_str = entry.to_attr()
"entry must be instance in [paddle.distributed.ProbabilityEntry, paddle.distributed.CountFilterEntry]"
)
entry_str = entry._to_attr()

helper.append_op(
type='lookup_table',
Expand Down
Loading