Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize/fleet save #32817

Merged
merged 10 commits into from
May 12, 2021
49 changes: 33 additions & 16 deletions paddle/fluid/distributed/table/common_sparse_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
// limitations under the License.

#include "paddle/fluid/distributed/table/common_sparse_table.h"

#include <sstream>

#include "boost/lexical_cast.hpp"
#include "glog/logging.h"
#include "paddle/fluid/platform/enforce.h"

Expand All @@ -25,7 +25,8 @@ class ValueBlock;
} // namespace distributed
} // namespace paddle

#define PSERVER_SAVE_SUFFIX "_txt"
#define PSERVER_SAVE_SUFFIX ".shard"
using boost::lexical_cast;

namespace paddle {
namespace distributed {
Expand Down Expand Up @@ -100,7 +101,7 @@ struct Meta {
};

void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
std::vector<std::vector<float>>* values) {
const int64_t id, std::vector<std::vector<float>>* values) {
auto colunmn_size = columns.size();
auto load_values =
paddle::string::split_string<std::string>(columns[colunmn_size - 1], ",");
Expand All @@ -116,8 +117,18 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
"The data format in txt does not meet the field "
"requirements defined in meta"));

std::transform(start, end, std::back_inserter(val),
[](std::string va) { return std::stof(va); });
std::transform(start, end, std::back_inserter(val), [id](std::string va) {
float v = 0.0;

try {
v = lexical_cast<float>(va);
} catch (boost::bad_lexical_cast& e) {
VLOG(0) << "id: " << id << " get unexpected value: " << va
<< " and be reset to: 0.0";
}
return v;
});

values->push_back(val);
offset += meta.dims[x];
}
Expand All @@ -126,25 +137,29 @@ void ProcessALine(const std::vector<std::string>& columns, const Meta& meta,
int64_t SaveToText(std::ostream* os, std::shared_ptr<ValueBlock> block,
const int mode) {
int64_t save_num = 0;

for (auto& table : block->values_) {
for (auto& value : table) {
if (mode == SaveMode::delta && !value.second->need_save_) {
continue;
}
save_num += 1;

auto* vs = value.second->data_.data();
++save_num;

std::stringstream ss;
auto* vs = value.second->data_.data();

auto id = value.first;

ss << id << "\t" << value.second->count_ << "\t"
<< value.second->unseen_days_ << "\t" << value.second->is_entry_
<< "\t";

for (int i = 0; i < block->value_length_; i++) {
ss << vs[i];
ss << ",";
for (int i = 0; i < block->value_length_ - 1; i++) {
ss << std::to_string(vs[i]) << ",";
}

ss << std::to_string(vs[block->value_length_ - 1]);
ss << "\n";

os->write(ss.str().c_str(), sizeof(char) * ss.str().size());
Expand All @@ -170,7 +185,7 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,

while (std::getline(file, line)) {
auto values = paddle::string::split_string<std::string>(line, "\t");
auto id = std::stoull(values[0]);
auto id = lexical_cast<int64_t>(values[0]);

if (id % pserver_num != pserver_id) {
VLOG(3) << "will not load " << values[0] << " from " << valuepath
Expand All @@ -182,15 +197,17 @@ int64_t LoadFromText(const std::string& valuepath, const std::string& metapath,
auto block = blocks->at(shard_id);

std::vector<std::vector<float>> kvalues;
ProcessALine(values, meta, &kvalues);
ProcessALine(values, meta, id, &kvalues);

block->Init(id, false);

VALUE* value_instant = block->GetValue(id);

if (values.size() == 5) {
value_instant->count_ = std::stoi(values[1]);
value_instant->unseen_days_ = std::stoi(values[2]);
value_instant->is_entry_ = static_cast<bool>(std::stoi(values[3]));
value_instant->count_ = lexical_cast<int>(values[1]);
value_instant->unseen_days_ = lexical_cast<int>(values[2]);
value_instant->is_entry_ =
static_cast<bool>(lexical_cast<int>(values[3]));
}

std::vector<float*> block_values = block->Get(id, meta.names, meta.dims);
Expand Down Expand Up @@ -475,7 +492,7 @@ int32_t CommonSparseTable::pull_sparse_ptr(char** pull_values,
auto* value = block->InitGet(id);
// std::copy_n(value + param_offset_, param_dim_,
// pull_values + param_dim_ * offset);
pull_values[offset] = (char*)value;
pull_values[offset] = reinterpret_cast<char*>(value);
}

return 0;
Expand Down
49 changes: 49 additions & 0 deletions python/paddle/distributed/fleet/base/fleet_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,49 @@ def stop_worker(self):
"""
self._runtime_handle._stop_worker()

def save(self, dirname, feed=[], fetch=[], **configs):
inference = True

if not feed and not fetch:
inference = False

place = paddle.CPUPlace()
executor = paddle.static.Executor(place)

if inference:
feeded_var_names = []
fetch_var_names = []

for var in feed:
if isinstance(var, str):
feeded_var_names.append(var)
elif isinstance(var, paddle.static.Variable):
feeded_var_names.append(var.name)
else:
raise ValueError("feed must be [str|Variable]")

for var in fetch:
if isinstance(var, str):
fetch_var_names.append(var)
elif isinstance(var, paddle.static.Variable):
fetch_var_names.append(var.name)
else:
raise ValueError("feed must be [str|Variable]")

fetch_vars = [
paddle.static.default_main_program().global_block().var(name)
for name in fetch_var_names
]

self._runtime_handle._save_inference_model(
executor, dirname, feeded_var_names, fetch_vars, None, True, 0)
else:
increment_mode = 0
if "mode" in configs:
increment_mode = int(configs["mode"])
self._runtime_handle._save_persistables(
executor, dirname, main_program=None, mode=increment_mode)

def save_inference_model(self,
executor,
dirname,
Expand Down Expand Up @@ -605,6 +648,9 @@ def save_inference_model(self,
fleet.init_server()

"""
# warnings.warn(
# "'save_inference_model' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
# )

self._runtime_handle._save_inference_model(
executor, dirname, feeded_var_names, target_vars, main_program,
Expand Down Expand Up @@ -651,6 +697,9 @@ def save_persistables(self, executor, dirname, main_program=None, mode=0):
fleet.save_persistables(exe, "dirname", paddle.static.default_main_program())

"""
# warnings.warn(
# "'save_persistables' is a deprecated, will be deleted after v2.2.0, Please use fleet.save instead."
# )

self._runtime_handle._save_persistables(executor, dirname, main_program,
mode)
Expand Down
70 changes: 34 additions & 36 deletions python/paddle/distributed/fleet/runtime/the_one_ps.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def conv_indent(indent):
return "".join([" "] * indent)


PSERVER_SAVE_SUFFIX = "_txt"
PSERVER_SAVE_SUFFIX = ".shard"


class Accessor:
Expand Down Expand Up @@ -914,7 +914,7 @@ def _save_sparse_params(self, executor, dirname, context, main_program,
self.compiled_strategy.origin_main_program, True)
values = []
for id, names in context.items():
if names not in distributed_varnames:
if names[0] not in distributed_varnames:
# only save sparse param to local
self._worker.recv_and_save_model(id, dirname)
# save sparse & distributed param on server
Expand Down Expand Up @@ -951,11 +951,11 @@ def _save_distributed_persistables(self,
TheOnePSRuntime.__exclude_vars(saved_varnames),
main_program.list_vars()))

fluid.io.save_vars(
executor,
main_program=main_program,
dirname=dirname,
vars=remaining_vars)
import paddle
for var in remaining_vars:
tensor = var.get_value()
paddle.save(
tensor, os.path.join(dirname, var.name), use_binary_format=True)

def _ps_inference_save_persistables(self,
executor,
Expand All @@ -976,20 +976,19 @@ def _ps_inference_save_persistables(self,

if isinstance(executor, ParallelExecutor):
raise TypeError(
"in fleet.save_persistables() function, executor must be as Executor type, ParallelExecutor is not allowed"
"in fleet.save() function, executor must be as Executor type, ParallelExecutor is not allowed"
)

if not isinstance(executor, Executor):
raise TypeError(
"in fleet.save_persistables() function, executor must be as Executor type"
)
"in fleet.save() function, executor must be as Executor type")

if main_program is None:
main_program = self.compiled_strategy.get_origin_ps_main_program()

if isinstance(main_program, CompiledProgram):
raise TypeError(
"in fleet.save_persistables() function, main_program must be as Program type, CompiledProgram is not allowed"
"in fleet.save() function, main_program must be as Program type, CompiledProgram is not allowed"
)

# Todo(MrChengmo): Save optimizer status
Expand All @@ -1011,37 +1010,36 @@ def _ps_inference_save_inference_model(self,

if isinstance(executor, ParallelExecutor):
raise TypeError(
"in fleet.save_inference_model() function, executor must be as Executor type, ParallelExecutor is not allowed"
"in fleet.save() function, executor must be as Executor type, ParallelExecutor is not allowed"
)

if not isinstance(executor, Executor):
raise TypeError(
"in fleet.save_inference_model() function, executor must be as Executor type"
"in fleet.save() function, executor must be as Executor type")

import paddle
program = self.origin_main_program if main_program is None else main_program

if isinstance(program, CompiledProgram):
raise TypeError(
"in fleet.save() function, main_program must be as Program type, CompiledProgram is not allowed"
)

if main_program is not None:
if isinstance(main_program, CompiledProgram):
raise TypeError(
"in fleet.save_inference_model() function, main_program must be as Program type, CompiledProgram is not allowed"
)
fluid.io.save_inference_model(dirname, feeded_var_names,
target_vars, executor, main_program,
None, None, export_for_deployment)
else:
fluid.io.save_inference_model(dirname, feeded_var_names,
target_vars, executor,
self.origin_main_program, None, None,
export_for_deployment, True)
model_basename = "__model__"
model_filename = os.path.join(dirname, model_basename)

with open(model_filename, "rb") as f:
program_desc_str = f.read()

program = Program.parse_from_string(program_desc_str)
program._copy_dist_param_info_from(fluid.default_main_program())
self._ps_inference_save_persistables(executor, dirname, program,
mode)
feed_vars = [
program.global_block().var(name) for name in feeded_var_names
]

infer_program = paddle.static.normalize_program(program, feed_vars,
target_vars)

infer_program._copy_dist_param_info_from(program)

model_basename = "__model__"
model_basename = os.path.join(dirname, model_basename)
paddle.save(infer_program, model_basename)

self._ps_inference_save_persistables(executor, dirname, infer_program,
mode)

def _save_inference_model(self, *args, **kwargs):
self._ps_inference_save_inference_model(*args, **kwargs)
Expand Down
26 changes: 14 additions & 12 deletions python/paddle/fluid/tests/unittests/test_fleet_base_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,25 @@

import unittest
import paddle
paddle.enable_static()

import os
import paddle.fluid as fluid


class TestFleetBase(unittest.TestCase):
def setUp(self):
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
"127.0.0.1:36001,127.0.0.2:36001"

def test_ps_minimize(self):
import paddle
import paddle.distributed.fleet as fleet

os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
os.environ["TRAINING_ROLE"] = "TRAINER"
os.environ["PADDLE_TRAINER_ID"] = "1"

input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
Expand All @@ -47,24 +47,26 @@ def test_ps_minimize(self):

role = fleet.PaddleCloudRoleMaker(is_collective=False)
fleet.init(role)

strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = False
strategy.a_sync_configs = {"launch_barrier": False}

optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)

place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(paddle.static.default_startup_program())
pe = fluid.ParallelExecutor(use_cuda=False, loss_name=avg_cost.name)
compiled_prog = fluid.compiler.CompiledProgram(
fluid.default_main_program())
self.assertRaises(
Exception,
fleet.save_inference_model,
dirname='/tmp/',
feeded_var_names=['x', 'y'],
target_vars=[avg_cost],
executor=pe)

fleet.fleet.save(dirname="/tmp", feed=['x', 'y'], fetch=[avg_cost])
fleet.fleet.save(
dirname="/tmp", feed=[input_x, input_y], fetch=[avg_cost])
fleet.fleet.save(dirname="/tmp")

self.assertRaises(
Exception,
Expand Down