Skip to content

Commit

Permalink
Optimize/fleet save (#32817)
Browse files Browse the repository at this point in the history
* fix cpp lint
* fix save/load with unexpected value
* fix save and user interface
  • Loading branch information
seiriosPlus authored May 12, 2021
1 parent e1a4c83 commit 890f626
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 64 deletions.
49 changes: 33 additions & 16 deletions paddle/fluid/distributed/table/common_sparse_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
// limitations under the License.

#include "paddle/fluid/distributed/table/common_sparse_table.h"

#include <sstream>

#include "boost/lexical_cast.hpp"
#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"

Expand All @@ -25,7 +25,8 @@ class ValueBlock;
} // namespace distributed
} // namespace paddle

#define PSERVER_SAVE_SUFFIX "_txt"
#define PSERVER_SAVE_SUFFIX ".shard"
using boost::lexical_cast;

namespace paddle {
namespace distributed {
Expand Down Expand Up @@ -100,7 +101,7 @@ struct Meta {
};

void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
std::vector<std::vector<float>>* values) {
const int64_t id, std::vector<std::vector<float>>* values) {
auto colunmn_size = columns.size();
auto load_values =
paddle::string::split_string<std::string>(columns[colunmn_size - 1], ",");
Expand All @@ -116,8 +117,18 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
"The data format in txt does not meet the field "
"requirements defined in meta"));

std::transform(start, end, std::back_inserter(val),
[](std::string va) { return std::stof(va); });
std::transform(start, end, std::back_inserter(val), [id](std::string va) {
float v = 0.0;

try {
v = lexical_cast<float>(va);
} catch (boost::bad_lexical_cast& e) {
VLOG(0) << "id: " << id << " get unexpected value: " << va
<< " and be reset to: 0.0";
}
return v;
});

values->push_back(val);
offset += meta.dims[x];
}
Expand All @@ -126,25 +137,29 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
int64_t SaveToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
const int mode) {
int64_t save_num = 0;

for (auto& table : block->values_) {
for (auto& value : table) {
if (mode == SaveMode::delta && !value.second->need_save_) {
continue;
}
save_num += 1;

auto* vs = value.second->data_.data();
++save_num;

std::stringstream ss;
auto* vs = value.second->data_.data();

auto id = value.first;

ss << id << "\t" << value.second->count_ << "\t"
<< value.second->unseen_days_ << "\t" << value.second->is_entry_
<< "\t";

for (int i = 0; i < block->value_length_; i++) {
ss << vs[i];
ss << ",";
for (int i = 0; i < block->value_length_ - 1; i++) {
ss << std::to_string(vs[i]) << ",";
}

ss << std::to_string(vs[block->value_length_ - 1]);
ss << "\n";

os->write(ss.str().c_str(), sizeof(char) * ss.str().size());
Expand All @@ -170,7 +185,7 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,

while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
auto id = std::stoull(values[0]);
auto id = lexical_cast<int64_t>(values[0]);

if (id % pserver_num != pserver_id) {
VLOG(3) << "will not load " << values[0] << " from " << valuepath
Expand All @@ -182,15 +197,17 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,
auto block = blocks->at(shard_id);

std::vector<std::vector<float>> kvalues;
ProcessALine(values, meta, &kvalues);
ProcessALine(values, meta, id, &kvalues);

block->Init(id, false);

VALUE* value_instant = block->GetValue(id);

if (values.size() == 5) {
value_instant->count_ = std::stoi(values[1]);
value_instant->unseen_days_ = std::stoi(values[2]);
value_instant->is_entry_ = static_cast<bool>(std::stoi(values[3]));
value_instant->count_ = lexical_cast<int>(values[1]);
value_instant->unseen_days_ = lexical_cast<int>(values[2]);
value_instant->is_entry_ =
static_cast<bool>(lexical_cast<int>(values[3]));
}

std::vector<float*> block_values = block->Get(id, meta.names, meta.dims);
Expand Down Expand Up @@ -475,7 +492,7 @@ int32_t CommonSparseTable::pull_sparse_ptr(char** pull_values,
auto* value = block->InitGet(id);
// std::copy_n(value + param_offset_, param_dim_,
// pull_values + param_dim_ * offset);
pull_values[offset] = (char*)value;
pull_values[offset] = reinterpret_cast<char*>(value);
}

return 0;
Expand Down
49 changes: 49 additions & 0 deletions python/paddle/distributed/fleet/base/fleet_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,49 @@ def stop_worker(self):
"""
self._runtime_handle._stop_worker()

def save(self, dirname, feed=[], fetch=[], **configs):
inference = True

if not feed and not fetch:
inference = False

place = paddle.CPUPlace()
executor = paddle.static.Executor(place)

if inference:
feeded_var_names = []
fetch_var_names = []

for var in feed:
if isinstance(var, str):
feeded_var_names.append(var)
elif isinstance(var, paddle.static.Variable):
feeded_var_names.append(var.name)
else:
raise ValueError("feed must be [str|Variable]")

for var in fetch:
if isinstance(var, str):
fetch_var_names.append(var)
elif isinstance(var, paddle.static.Variable):
fetch_var_names.append(var.name)
else:
raise ValueError("feed must be [str|Variable]")

fetch_vars = [
paddle.static.default_main_program().global_block().var(name)
for name in fetch_var_names
]

self._runtime_handle._save_inference_model(
executor, dirname, feeded_var_names, fetch_vars, None, True, 0)
else:
increment_mode = 0
if "mode" in configs:
increment_mode = int(configs["mode"])
self._runtime_handle._save_persistables(
executor, dirname, main_program=None, mode=increment_mode)

def save_inference_model(self,
executor,
dirname,
Expand Down Expand Up @@ -607,6 +650,9 @@ def save_inference_model(self,
fleet.init_server()
"""
# warnings.warn(
# "'save_inference_model' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
# )

self._runtime_handle._save_inference_model(
executor, dirname, feeded_var_names, target_vars, main_program,
Expand Down Expand Up @@ -653,6 +699,9 @@ def save_persistables(self, executor, dirname, main_program=None, mode=0):
fleet.save_persistables(exe, "dirname", paddle.static.default_main_program())
"""
# warnings.warn(
# "'save_persistables' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
# )

self._runtime_handle._save_persistables(executor, dirname, main_program,
mode)
Expand Down
70 changes: 34 additions & 36 deletions python/paddle/distributed/fleet/runtime/the_one_ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def conv_indent(indent):
return "".join([" "] * indent)


PSERVER_SAVE_SUFFIX = "_txt"
PSERVER_SAVE_SUFFIX = ".shard"


class Accessor:
Expand Down Expand Up @@ -916,7 +916,7 @@ def _save_sparse_params(self, executor, dirname, context, main_program,
self.compiled_strategy.origin_main_program, True)
values = []
for id, names in context.items():
if names not in distributed_varnames:
if names[0] not in distributed_varnames:
# only save sparse param to local
self._worker.recv_and_save_model(id, dirname)
# save sparse & distributed param on server
Expand Down Expand Up @@ -953,11 +953,11 @@ def _save_distributed_persistables(self,
TheOnePSRuntime.__exclude_vars(saved_varnames),
main_program.list_vars()))

fluid.io.save_vars(
executor,
main_program=main_program,
dirname=dirname,
vars=remaining_vars)
import paddle
for var in remaining_vars:
tensor = var.get_value()
paddle.save(
tensor, os.path.join(dirname, var.name), use_binary_format=True)

def _ps_inference_save_persistables(self,
executor,
Expand All @@ -978,20 +978,19 @@ def _ps_inference_save_persistables(self,

if isinstance(executor, ParallelExecutor):
raise TypeError(
"in fleet.save_persistables() function, executor must be as Executor type, ParallelExecutor is not allowed"
"in fleet.save() function, executor must be as Executor type, ParallelExecutor is not allowed"
)

if not isinstance(executor, Executor):
raise TypeError(
"in fleet.save_persistables() function, executor must be as Executor type"
)
"in fleet.save() function, executor must be as Executor type")

if main_program is None:
main_program = self.compiled_strategy.get_origin_ps_main_program()

if isinstance(main_program, CompiledProgram):
raise TypeError(
"in fleet.save_persistables() function, main_program must be as Program type, CompiledProgram is not allowed"
"in fleet.save() function, main_program must be as Program type, CompiledProgram is not allowed"
)

# Todo(MrChengmo): Save optimizer status
Expand All @@ -1013,37 +1012,36 @@ def _ps_inference_save_inference_model(self,

if isinstance(executor, ParallelExecutor):
raise TypeError(
"in fleet.save_inference_model() function, executor must be as Executor type, ParallelExecutor is not allowed"
"in fleet.save() function, executor must be as Executor type, ParallelExecutor is not allowed"
)

if not isinstance(executor, Executor):
raise TypeError(
"in fleet.save_inference_model() function, executor must be as Executor type"
"in fleet.save() function, executor must be as Executor type")

import paddle
program = self.origin_main_program if main_program is None else main_program

if isinstance(program, CompiledProgram):
raise TypeError(
"in fleet.save() function, main_program must be as Program type, CompiledProgram is not allowed"
)

if main_program is not None:
if isinstance(main_program, CompiledProgram):
raise TypeError(
"in fleet.save_inference_model() function, main_program must be as Program type, CompiledProgram is not allowed"
)
fluid.io.save_inference_model(dirname, feeded_var_names,
target_vars, executor, main_program,
None, None, export_for_deployment)
else:
fluid.io.save_inference_model(dirname, feeded_var_names,
target_vars, executor,
self.origin_main_program, None, None,
export_for_deployment, True)
model_basename = "__model__"
model_filename = os.path.join(dirname, model_basename)

with open(model_filename, "rb") as f:
program_desc_str = f.read()

program = Program.parse_from_string(program_desc_str)
program._copy_dist_param_info_from(fluid.default_main_program())
self._ps_inference_save_persistables(executor, dirname, program,
mode)
feed_vars = [
program.global_block().var(name) for name in feeded_var_names
]

infer_program = paddle.static.normalize_program(program, feed_vars,
target_vars)

infer_program._copy_dist_param_info_from(program)

model_basename = "__model__"
model_basename = os.path.join(dirname, model_basename)
paddle.save(infer_program, model_basename)

self._ps_inference_save_persistables(executor, dirname, infer_program,
mode)

def _save_inference_model(self, *args, **kwargs):
self._ps_inference_save_inference_model(*args, **kwargs)
Expand Down
26 changes: 14 additions & 12 deletions python/paddle/fluid/tests/unittests/test_fleet_base_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,25 @@

import unittest
import paddle
paddle.enable_static()

import os
import paddle.fluid as fluid


class TestFleetBase(unittest.TestCase):
def setUp(self):
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
"127.0.0.1:36001,127.0.0.2:36001"

def test_ps_minimize(self):
import paddle
import paddle.distributed.fleet as fleet

os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["PADDLE_TRAINER_ID"] = "1"

input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
Expand All @@ -47,24 +47,26 @@ def test_ps_minimize(self):

role = fleet.PaddleCloudRoleMaker(is_collective=False)
fleet.init(role)

strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = False
strategy.a_sync_configs = {"launch_barrier": False}

optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)

place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(paddle.static.default_startup_program())
pe = fluid.ParallelExecutor(use_cuda=False, loss_name=avg_cost.name)
compiled_prog = fluid.compiler.CompiledProgram(
fluid.default_main_program())
self.assertRaises(
Exception,
fleet.save_inference_model,
dirname='/tmp/',
feeded_var_names=['x', 'y'],
target_vars=[avg_cost],
executor=pe)

fleet.fleet.save(dirname="/tmp", feed=['x', 'y'], fetch=[avg_cost])
fleet.fleet.save(
dirname="/tmp", feed=[input_x, input_y], fetch=[avg_cost])
fleet.fleet.save(dirname="/tmp")

self.assertRaises(
Exception,
Expand Down

0 comments on commit 890f626

Please sign in to comment.