Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade async distributed training in pscore #37515

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
760676b
test
zhaocaibei123 Aug 20, 2021
58712fa
Merge pull request #9 from PaddlePaddle/develop
zhaocaibei123 Oct 14, 2021
6650d87
test
zhaocaibei123 Aug 20, 2021
7c9a7a3
rm test
zhaocaibei123 Oct 26, 2021
d9bb3ad
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zhaocaibei123 Oct 26, 2021
bb5d1f2
Merge branch 'develop' of https://github.com/zhaocaibei123/Paddle int…
zhaocaibei123 Oct 26, 2021
b3d8a01
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zhaocaibei123 Oct 27, 2021
cd7af0f
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zhaocaibei123 Nov 13, 2021
a5a703d
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zhaocaibei123 Nov 15, 2021
c9e4289
Merge branch 'develop' of https://github.com/zhaocaibei123/Paddle int…
zhaocaibei123 Nov 15, 2021
fea03ec
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zhaocaibei123 Nov 16, 2021
cf96c33
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zhaocaibei123 Nov 16, 2021
0aa2c90
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zhaocaibei123 Nov 17, 2021
6edef85
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zhaocaibei123 Nov 17, 2021
3ad8e56
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zhaocaibei123 Nov 19, 2021
f0b2ec5
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zhaocaibei123 Nov 19, 2021
8e379b1
Merge branch 'develop' of https://github.com/zhaocaibei123/Paddle int…
zhaocaibei123 Nov 24, 2021
6148f18
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into…
zhaocaibei123 Nov 24, 2021
d82afca
update
zhaocaibei123 Nov 24, 2021
2effe9b
update
zhaocaibei123 Nov 24, 2021
0318dbf
update
zhaocaibei123 Nov 24, 2021
cd24736
add unittest
zhaocaibei123 Nov 24, 2021
692f911
update
zhaocaibei123 Nov 25, 2021
c2dcc4d
update save
zhaocaibei123 Nov 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions paddle/fluid/distributed/fleet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,6 @@ void FleetWrapper::LoadModel(const std::string& path, const int mode) {
ret.wait();
if (ret.get() != 0) {
LOG(ERROR) << "load model from path:" << path << " failed";
sleep(sleep_seconds_before_fail_exit_);
exit(-1);
}
}

Expand All @@ -596,8 +594,6 @@ void FleetWrapper::SaveModel(const std::string& path, const int mode) {
int32_t feasign_cnt = ret.get();
if (feasign_cnt == -1) {
LOG(ERROR) << "save model failed";
sleep(sleep_seconds_before_fail_exit_);
exit(-1);
}
}

Expand Down
132 changes: 132 additions & 0 deletions paddle/fluid/operators/pscore/distributed_push_sparse_op.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include <algorithm>

#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/operators/pscore/distributed_push_sparse_op.h"

namespace paddle {
namespace operators {

constexpr int64_t kNoPadding = -1;

class DistributedPushSparseOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;

void InferShape(framework::InferShapeContext *ctx) const override {
PADDLE_ENFORCE_EQ(ctx->HasInputs("Ids"), true,
platform::errors::InvalidArgument(
"Input(Ids) of PushSparseOp should not be null."));
PADDLE_ENFORCE_EQ(ctx->HasOutputs("Outputs"), true,
platform::errors::InvalidArgument(
"Output(Outs) of PushSparseOp should not be null."));

auto ids_dims = ctx->GetInputsDim("Ids");

for (auto &ids_dim : ids_dims) {
PADDLE_ENFORCE_EQ(ids_dim.size(), 2,
platform::errors::InvalidArgument(
"The dimension of the 'Ids' tensor must be 2."));
}

// for fluid.embedding
auto push_sparse_version =
ctx->Attrs().Get<std::string>("push_sparse_version");
}

protected:
framework::OpKernelType GetExpectedKernelType(
const framework::ExecutionContext &ctx) const override {
return framework::OpKernelType(
framework::proto::VarType::Type(ctx.Attr<int>("dtype")),
ctx.GetPlace());
}
};

class DistributedPushSparseOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("Ids",
"(LoDTensor) Ids's type should be LoDTensor"
"THe ids to be looked up in W.")
.AsDuplicable();

AddInput("Shows",
"(LoDTensor) Shows's type should be LoDTensor"
"THe shows default to be 1.")
.AsDuplicable();

AddInput("Clicks",
"(LoDTensor) Clicks's type should be LoDTensor"
"THe clicks usually equal to label.")
.AsDuplicable();

AddOutput("Outputs",
"(LoDTensor) The lookup results, which have the same type as W.")
.AsDuplicable();

AddAttr<int>("table_id", "sparse table id").SetDefault(0);
AddAttr<int>("size", "embedding size").SetDefault(8);

AddAttr<bool>("is_distributed",
"(boolean, default false) distributed lookup table.")
.SetDefault(false);

AddAttr<std::string>(
"push_sparse_version",
"(string, default push_sparse) "
"To distinguish between different versions of embedding OP")
.SetDefault(std::string("push_sparse"));

AddAttr<int64_t>("padding_idx",
"(int64, default -1) "
"If the value is -1, it makes no effect to lookup. "
"Otherwise the given value indicates padding the output "
"with zeros whenever lookup encounters it in Ids.")
.SetDefault(kNoPadding);
AddAttr<int>("dtype",
"(int, default 5 (FP32)) "
"Output data type")
.SetDefault(framework::proto::VarType::FP32);

AddAttr<bool>("is_test",
"(bool, default false) Set to true for inference only, false "
"for training.")
.SetDefault(false);

AddComment(R"DOC(
Lookup Tablel Prefetch Operator.
This operator is used to perform lookup on parameter W,
then concatenated into a sparse tensor.
The type of Ids(Input) is SelectedRows, the rows of Ids contains
the ids to be looked up in W;
if the Id is not in the sparse table, this operator will return a
random value and set the value into the table for the next looking up.
)DOC");
}
};
} // namespace operators
} // namespace paddle

namespace ops = paddle::operators;

REGISTER_OPERATOR(distributed_push_sparse, ops::DistributedPushSparseOp,
ops::DistributedPushSparseOpMaker);

REGISTER_OP_CPU_KERNEL(
distributed_push_sparse,
ops::DistributedPushSparseKernel<paddle::platform::CPUDeviceContext, float>,
ops::DistributedPushSparseKernel<paddle::platform::CPUDeviceContext,
double>);
23 changes: 23 additions & 0 deletions paddle/fluid/operators/pscore/distributed_push_sparse_op.cu.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/* Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/operators/pscore/distributed_push_sparse_op.h"

namespace ops = paddle::operators;
namespace plat = paddle::platform;

REGISTER_OP_CUDA_KERNEL(
distributed_push_sparse,
ops::DistributedPushSparseKernel<plat::CUDADeviceContext, float>,
ops::DistributedPushSparseKernel<plat::CUDADeviceContext, double>);
104 changes: 104 additions & 0 deletions paddle/fluid/operators/pscore/distributed_push_sparse_op.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once
#include <algorithm>
#include <string>
#include <vector>
#include "paddle/fluid/distributed/fleet.h"
#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/tensor_util.h"
#include "paddle/fluid/operators/math/math_function.h"

namespace paddle {
namespace operators {

template <typename DeviceContext, typename T>
class DistributedPushSparseKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &context) const override {
auto &scope = context.scope();

auto padding_idx = context.Attr<int64_t>("padding_idx");
auto table_id = context.Attr<int>("table_id");
auto emb_dim = context.Attr<int>("size");
VLOG(1) << "push_sparse.h::emb_dim: " << emb_dim;
bool is_test = context.Attr<bool>("is_test");

auto inputs = context.MultiInput<framework::LoDTensor>("Ids");
auto shows = context.Input<framework::LoDTensor>("Shows");
auto clks = context.Input<framework::LoDTensor>("Clicks");
auto outputs = context.MultiOutput<framework::LoDTensor>("Outputs");

auto fleet = distributed::FleetWrapper::GetInstance();

if (platform::is_cpu_place(context.GetPlace())) {
fleet->PushSparseFromTensorAsync(static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx),
context.GetPlace(), &inputs, shows, clks,
&outputs);
} else {
auto inputs_variable = context.MultiInputVar("Ids");
auto outputs_variable = context.MultiOutputVar("Outputs");
auto inputs_name = context.InputNames("Ids");
auto outputs_name = context.OutputNames("Outputs");

auto cpu_place = platform::CPUPlace();
framework::Scope *tmp_scope = scope.NewTmpScope().release();

std::vector<const framework::LoDTensor *> tmp_input_vec;
auto input_var_size = inputs_variable.size();
std::vector<framework::LoDTensor *> tmp_output_vec;
auto output_var_size = outputs_variable.size();

// create temp input
for (size_t idx = 0; idx < input_var_size; ++idx) {
framework::Variable *tmp_input_var = tmp_scope->Var(inputs_name[idx]);
framework::LoDTensor *tmp_input_tensor =
tmp_input_var->GetMutable<framework::LoDTensor>();
framework::TensorCopy(inputs_variable[idx]->Get<framework::LoDTensor>(),
cpu_place, context.device_context(),
tmp_input_tensor);
tmp_input_vec.push_back(tmp_input_tensor);
}

// create temp output
for (size_t idx = 0; idx < output_var_size; ++idx) {
framework::Variable *tmp_output_var = tmp_scope->Var(outputs_name[idx]);
framework::LoDTensor *tmp_output_tensor =
tmp_output_var->GetMutable<framework::LoDTensor>();
tmp_output_tensor->Resize(outputs[idx]->dims());
tmp_output_vec.push_back(tmp_output_tensor);
}

// use fleet->PullSparse
fleet->PullSparseToTensorSync(static_cast<uint64_t>(table_id), emb_dim,
static_cast<uint64_t>(padding_idx),
cpu_place, !is_test, &tmp_input_vec,
&tmp_output_vec);

// cp temp to origin
for (size_t idx = 0; idx < output_var_size; ++idx) {
framework::Variable *tmp_output_var = tmp_scope->Var(outputs_name[idx]);
framework::LoDTensor *tmp_output_tensor =
tmp_output_var->GetMutable<framework::LoDTensor>();
framework::TensorCopy(
*tmp_output_tensor, context.GetPlace(), context.device_context(),
outputs_variable[idx]->GetMutable<framework::LoDTensor>());
}
delete tmp_scope;
}
}
};

} // namespace operators
} // namespace paddle
12 changes: 7 additions & 5 deletions paddle/fluid/operators/pscore/send_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/distributed/fleet.h"
#include "paddle/fluid/distributed/service/communicator.h"
#include "paddle/fluid/framework/op_registry.h"

Expand Down Expand Up @@ -42,14 +43,15 @@ class SendOp : public framework::OperatorBase {
const platform::Place& place) const override {
auto ins = Inputs("X");
// auto is_sparse = Attr<int>("is_sparse");
// auto table_id = Attr<int>("table_id");
auto table_id = Attr<int>("table_id");

auto send_varnames = Attr<std::vector<std::string>>("send_varnames");

auto* communicator = paddle::distributed::Communicator::GetInstance();
if (communicator->Check(send_varnames)) {
communicator->Send(ins, scope);
}
auto fleet = paddle::distributed::FleetWrapper::GetInstance();
std::vector<::std::future<int32_t>> status;
// Note: only send push_dense now!
// communicator->Send(ins, scope) can be used to push_sparse or push_dense
fleet->PushDenseVarsAsync(scope, table_id, ins, &status, 0, -1);

// auto fleet = paddle::distributed::FleetWrapper::GetInstance();
// if (is_sparse == 0) {
Expand Down
11 changes: 8 additions & 3 deletions paddle/fluid/pybind/fleet_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ void BindDistFleetWrapper(py::module* m) {
.def("stop_server", &FleetWrapper::StopServer)
.def("stop_worker", &FleetWrapper::FinalizeWorker)
.def("barrier", &FleetWrapper::BarrierWithTable)
.def("shrink_sparse_table", &FleetWrapper::ShrinkSparseTable);
.def("shrink_sparse_table", &FleetWrapper::ShrinkSparseTable)
.def("create_client2client_connection",
&FleetWrapper::CreateClient2ClientConnection);
}

void BindPSHost(py::module* m) {
Expand Down Expand Up @@ -159,8 +161,11 @@ void BindDistCommunicator(py::module* m) {
.def("push_sparse_param", &Communicator::RpcSendSparseParam)
.def("is_running", &Communicator::IsRunning)
.def("init_params", &Communicator::InitParams)
.def("pull_dense", &Communicator::PullDense);
// .def("recv", &Communicator::RecvNoBarrier);
.def("pull_dense", &Communicator::PullDense)
.def("create_client_to_client_connection",
&Communicator::CreateC2CConnection)
.def("get_client_info", &Communicator::GetClientInfo)
.def("set_clients", &Communicator::SetClients);
}

void BindHeterClient(py::module* m) {
Expand Down
2 changes: 2 additions & 0 deletions python/paddle/distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

from .entry_attr import ProbabilityEntry # noqa: F401
from .entry_attr import CountFilterEntry # noqa: F401
from .entry_attr import ShowClickEntry # noqa: F401

from paddle.fluid.dygraph.parallel import ParallelEnv # noqa: F401

Expand All @@ -69,6 +70,7 @@
"QueueDataset",
"split",
"CountFilterEntry",
"ShowClickEntry",
"get_world_size",
"get_group",
"all_gather",
Expand Down
42 changes: 42 additions & 0 deletions python/paddle/distributed/entry_attr.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,45 @@ def __init__(self, count_filter):

def _to_attr(self):
return ":".join([self._name, str(self._count_filter)])


class ShowClickEntry(EntryAttr):
"""
Examples:
.. code-block:: python

import paddle
paddle.enable_static()

sparse_feature_dim = 1024
embedding_size = 64

shows = paddle.static.data(name='show', shape=[1], dtype='int64')
clicks = paddle.static.data(name='click', shape=[1], dtype='int64')
input = paddle.static.data(name='ins', shape=[1], dtype='int64')

entry = paddle.distributed.ShowClickEntry("show", "click")

emb = paddle.static.nn.sparse_embedding(
input=input,
size=[sparse_feature_dim, embedding_size],
is_test=False,
entry=entry,
param_attr=paddle.ParamAttr(name="SparseFeatFactors",
initializer=paddle.nn.initializer.Uniform()))


"""

def __init__(self, show_name, click_name):
super(ShowClickEntry, self).__init__()

if not isinstance(show_name, str) or not isinstance(click_name, str):
raise ValueError("show_name click_name must be a str")

self._name = "show_click_entry"
self._show_name = show_name
self._click_name = click_name

def _to_attr(self):
return ":".join([self._name, self._show_name, self._click_name])
Loading