Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move learning rate and releated op to pserver #8209

Merged
merged 10 commits into from
Feb 11, 2018
17 changes: 9 additions & 8 deletions paddle/fluid/operators/listen_and_serv_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class ListenAndServOp : public framework::OperatorBase {
// the gradients arrives, just add suffix 0~n and merge the gradient.
rpc_service_->SetCond(0);
size_t recv_var_cnt = 0;
size_t update_param_cnt = 0;
int batch_barrier = 0;
while (batch_barrier != fan_in) {
const detail::MessageWithName &v = rpc_service_->Get();
Expand All @@ -126,13 +127,14 @@ class ListenAndServOp : public framework::OperatorBase {
std::string param_var_name;
if (it != grad_list.end()) {
param_var_name = param_list[it - grad_list.begin()];
update_param_cnt++;
VLOG(3) << "received grad: " << grad_var_name
<< " updating param: " << param_var_name;
} else {
LOG(ERROR) << "grad has no paired param:" << grad_var_name;
VLOG(3) << "received variable: " << grad_var_name
<< " no need to update param";
}
VLOG(3) << "received grad: " << grad_var_name
<< " updating param: " << param_var_name;

if (fan_in > 1) {
if (fan_in > 1 && !param_var_name.empty()) {
grad_var_name = this->GetGradVarNameForTrainer(grad_var_name);
}
auto *var = recv_scope.FindVar(grad_var_name);
Expand All @@ -144,19 +146,18 @@ class ListenAndServOp : public framework::OperatorBase {
}
}
VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier.";
// TODO(Yancey1989): merge SelectedRows variables here
if (exit_flag) {
rpc_service_->ShutDown();
}

VLOG(3) << "run optimize graph...";
try {
executor.Run(*program, &recv_scope, block->ID(), /*global_block*/
false /*create_local_scope*/, false /*create_vars*/);
} catch (std::exception &e) {
LOG(ERROR) << "run sub program error " << e.what();
}
rpc_service_->SetCond(1);
rpc_service_->WaitClientGet(recv_var_cnt);
rpc_service_->WaitClientGet(update_param_cnt);
grads_counter_.clear();
} // while(true)
}
Expand Down
246 changes: 172 additions & 74 deletions python/paddle/v2/fluid/distribute_transpiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,57 @@ def __str__(self):
return "%s:%d:%d" % (self.varname, self.offset, self.size)


class UnionFind(object):
""" Union-find data struct.

Union-find is a data struct that keeps track of a set of elements partitioned
into a number of disjoint (non-overlapping) subsets.

Reference:
https://en.wikipedia.org/wiki/Disjoint-set_data_structure

Args:
elements(list): The initialize element list.
"""

def __init__(self, elementes=None):
self._parents = [] # index -> parent index
self._index = {} # element -> index
self._curr_idx = 0
if not elementes:
elementes = []
for ele in elementes:
self._parents.append(self._curr_idx)
self._index.update({ele: self._curr_idx})
self._curr_idx += 1

def find(self, x):
# Find the root index of given element x,
# execute the path compress while findind the root index
if not x in self._index:
return -1
idx = self._index[x]
while idx != self._parents[idx]:
t = self._parents[idx]
self._parents[idx] = self._parents[t]
idx = t
return idx

def union(self, x, y):
# Union two given element
x_root = self.find(x)
y_root = self.find(y)

if x_root == y_root:
return
self._parents[x_root] = y_root

def is_connected(self, x, y):
# If two given elements have the same root index,
# then they are connected.
return self.find(x) == self.find(y)


def same_or_split_var(p_name, var_name):
return p_name == var_name or p_name.startswith(var_name + ".block")

Expand Down Expand Up @@ -178,6 +229,21 @@ def transpile(self,
outputs={"Out": [orig_param]},
attrs={"axis": 0})

self.lr_param_mapping = self._create_lr_param_mapping()

def _create_lr_param_mapping(self):
lr_mapping = dict()
for _, opt_op in enumerate(self.optimize_ops):
if not opt_op.inputs or not opt_op.inputs.has_key("LearningRate") \
or not opt_op.inputs.has_key("Param"):
continue
lr = opt_op.inputs["LearningRate"].name
param = opt_op.inputs["Param"].name
if not lr_mapping.has_key(lr):
lr_mapping.update({lr: list()})
lr_mapping[lr].append(param)
return lr_mapping

def _create_vars_from_blocklist(self, program, block_list):
# Create respective variables using the block_list
block_map = dict()
Expand Down Expand Up @@ -300,52 +366,15 @@ def _get_optimizer_input_shape(self, op_type, varkey, orig_shape,
pass
return orig_shape

def _op_input_var(self, op, varname):
pass

def _is_op_on_pserver(self, endpoint, all_ops, idx):
"""
Recursively check if the op need to run on current server.
Assume that ops are in the execution order.
"""
param_names = [
p.name for p in self.param_grad_ep_mapping[endpoint]["params"]
]
op = all_ops[idx]
input_names = set(op.input_names)
# TODO(typhoonzero): using Param and Grad input name to identify
# that the operator is an optimization operator, need a better way.
if "Param" in input_names:
if op.input("Param")[0] in param_names:
return True
else:
for n in param_names:
if same_or_split_var(n, op.input("Param")[0]) \
and n != op.input("Param")[0]:
return True
return False
else:
j = idx - 1
while j >= 0:
prev_op = all_ops[j]
# prev_output_names = [o.name for o in prev_op.outputs.values()]
# prev_input_names = [o.name for o in prev_op.inputs.values()]
# NOTE(typhoonzero): consider list input/output
prev_output_names = prev_op.desc.output_arg_names()
prev_input_names = prev_op.desc.input_arg_names()
found1 = False
found2 = False
for varname in op.desc.input_arg_names():
if varname in prev_output_names:
found1 = self._is_op_on_pserver(endpoint, all_ops, j)
# later ops may produce output for prev op's next batch use.
for varname in op.desc.output_arg_names():
if varname in prev_input_names:
found2 = self._is_op_on_pserver(endpoint, all_ops, j)
if found1 or found2:
return True
j -= 1
return False
def _fetch_var_names(self, param_dict):
res = []
if not param_dict:
return res
for _, values in param_dict.iteritems():
if not isinstance(values, list):
values = [values]
res += [v.name for v in values]
return res

def _append_pserver_ops(self, optimize_block, opt_op, endpoint):
program = optimize_block.program
Expand All @@ -363,11 +392,7 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint):
# do not append this op if current endpoint
# is not dealing with this grad block
return
merged_var = program.global_block().create_var(
name=grad_block.name,
persistable=grad_block.persistable,
dtype=grad_block.dtype,
shape=grad_block.shape)
merged_var = program.global_block().vars[grad_block.name]
# append merging ops if trainers > 1
if self.trainers > 1:
vars2merge = self._create_var_for_trainers(
Expand Down Expand Up @@ -398,13 +423,19 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint):
shape=param_block.shape)

new_inputs[key] = tmpvar
elif key == "LearningRate":
# leraning rate variable has already be created by non-optimize op,
# don't create it once again.
new_inputs[key] = program.global_block().vars[opt_op.input(key)[
0]]

for key in opt_op.input_names:
if key in ["Param", "Grad"]:
new_shape = None
if key in ["Param", "Grad", "LearningRate"]:
continue
var = program.global_block().vars[opt_op.input(key)[0]]
# update accumulator variable shape
param_shape = new_inputs["Param"].shape
var = program.global_block().vars[opt_op.input(key)[0]]
new_shape = self._get_optimizer_input_shape(opt_op.type, key,
var.shape, param_shape)
tmpvar = program.global_block().create_var(
Expand All @@ -415,24 +446,22 @@ def _append_pserver_ops(self, optimize_block, opt_op, endpoint):
new_inputs[key] = tmpvar

# change output's ParamOut variable
outputs = self._get_output_map_from_op(program.global_block(), opt_op)
outputs["ParamOut"] = new_inputs["Param"]
opt_op.outputs["ParamOut"] = new_inputs["Param"]
optimize_block.append_op(
type=opt_op.type,
inputs=new_inputs,
outputs=outputs,
outputs=opt_op.outputs,
attrs=opt_op.attrs)

def _append_pserver_non_opt_ops(self, optimize_block, opt_op):
program = optimize_block.program
# Append the ops for parameters that do not need to be optimized/updated
inputs = self._get_input_map_from_op(self.program.global_block().vars,
opt_op)
for var in inputs.itervalues():
if type(var) == list:
varlist = var
else:
varlist = [var]
for varlist in inputs.itervalues():
if not isinstance(varlist, list):
varlist = [varlist]

for var in varlist:
if not program.global_block().vars.has_key(var.name):
program.global_block().create_var(
Expand All @@ -444,12 +473,70 @@ def _append_pserver_non_opt_ops(self, optimize_block, opt_op):
outputs = self._get_output_map_from_op(self.program.global_block().vars,
opt_op)

for varlist in outputs.itervalues():
if not isinstance(varlist, list):
varlist = [varlist]

for var in varlist:
program.global_block().create_var(
name=var.name,
persistable=var.persistable,
dtype=var.dtype,
shape=var.shape)

optimize_block.append_op(
type=opt_op.type,
inputs=inputs,
outputs=outputs,
attrs=opt_op.attrs)

def _is_op_connected(self, op1, op2):
# If one op's input is another op's output or
# one op's output is another op's input, we say
# the two operator is connected.
op1_input_names = self._fetch_var_names(op1.inputs)
op1_output_names = self._fetch_var_names(op1.outputs)

op2_input_names = self._fetch_var_names(op2.inputs)
op2_output_names = self._fetch_var_names(op2.outputs)
if set(op1_output_names) & set(op2_input_names) or \
set(op1_input_names) & set(op2_output_names):
return True
return False

def _create_ufind(self, optimize_ops):
# Create a unit find data struct by optimize ops
ufind = UnionFind(optimize_ops)
for i in xrange(len(optimize_ops)):
for j in xrange(i, len(optimize_ops)):
op1 = optimize_ops[i]
op2 = optimize_ops[j]
if self._is_op_connected(op1, op2):
ufind.union(op1, op2)
return ufind

def _is_opt_op(self, op):
# NOTE: It's a HACK implement.
# optimize op: SGDOptimize, MomentumOptimizer, AdamOptimizer and etc...
if op.inputs and op.inputs.has_key("Param") \
and op.inputs.has_key("LearningRate"):
return True
return False

def _is_opt_op_on_pserver(self, endpoint, op):
param_names = [
p.name for p in self.param_grad_ep_mapping[endpoint]["params"]
]
if op.inputs["Param"].name in param_names:
return True
else:
for n in param_names:
param = op.inputs["Param"].name
if same_or_split_var(n, param) and n != op.inputs["Param"].name:
return True
return False
return False

def get_pserver_program(self, endpoint):
"""
Get pserver side program using the endpoint
Expand All @@ -469,26 +556,37 @@ def get_pserver_program(self, endpoint):
pserver_program.global_block().create_var(
name=v.name, persistable=True, dtype=v.dtype, shape=v.shape)
for trainer_id in xrange(self.trainers):
print("create variable for program: %s.trainer_%d" %
(v.name, trainer_id))
pserver_program.global_block().create_var(
name="%s.trainer_%d" % (v.name, trainer_id),
persistable=True,
dtype=v.dtype,
shape=v.shape)
# step6
optimize_block = pserver_program.create_block(0)
# Iterate through the ops and append ops as needed
for idx, opt_op in enumerate(self.optimize_ops):
is_op_on_pserver = self._is_op_on_pserver(endpoint,
self.optimize_ops, idx)
if not is_op_on_pserver:
continue
if "Grad" in opt_op.desc.input_arg_names():
self._append_pserver_ops(optimize_block, opt_op, endpoint)
else:
self._append_pserver_non_opt_ops(optimize_block, opt_op)

# step 6.1
# Create a union-find data struct by optimize ops,
# If two ops are connected, we could add these two ops
# into one set.
ufind = self._create_ufind(self.optimize_ops)
# step 6.2
# Iterate through the ops and append optimize op which
# located on current pserver
opt_op_on_pserver = []
for _, op in enumerate(self.optimize_ops):
if self._is_opt_op(op) and self._is_opt_op_on_pserver(endpoint, op):
opt_op_on_pserver.append(op)
# step 6.3
# Iterate through the ops, and if an op and the optimize ops
# which located on current pserver are in one set, then
# append it into the sub program.
for _, op in enumerate(self.optimize_ops):
for _, opt_op in enumerate(opt_op_on_pserver):
if ufind.is_connected(op, opt_op):
if self._is_opt_op(op):
self._append_pserver_ops(optimize_block, op, endpoint)
else:
self._append_pserver_non_opt_ops(optimize_block, op)
break
# Append the listen_and_serv op
pserver_program.global_block().append_op(
type="listen_and_serv",
Expand Down
1 change: 1 addition & 0 deletions python/paddle/v2/fluid/layers/math_op_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def __impl__(self, other_var):

tmp_name = unique_tmp_name()
out = self.block.create_var(name=tmp_name, dtype=lhs_dtype)

self.block.append_op(
type=op_type,
inputs={'X': [self],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
exe.run(fluid.default_startup_program())
for pass_id in range(PASS_NUM):
for data in train_reader():
avg_cost_np = exe.run(fluid.default_main_program(),
avg_cost_np = exe.run(t.get_trainer_program(),
feed=feeder.feed(data),
fetch_list=[avg_cost])
print("avg_cost_np", avg_cost_np)
Expand Down