Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

safe map in heter server #42276

Merged
merged 135 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
135 commits
Select commit Hold shift + click to select a range
d9bb853
back fl
ziyoujiyi Mar 25, 2022
6073452
delete ssl cert
ziyoujiyi Mar 25, 2022
66fa8c8
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Mar 25, 2022
4bb3d3f
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Mar 25, 2022
7a02e84
.
ziyoujiyi Mar 25, 2022
883b55a
make warning
ziyoujiyi Mar 26, 2022
f917402
.
ziyoujiyi Mar 26, 2022
fa4ab2e
unittest paral degree
ziyoujiyi Mar 28, 2022
a129afc
solve unittest
ziyoujiyi Mar 28, 2022
a54e061
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Mar 29, 2022
ed7e38f
heter & multi cloud commm ready
ziyoujiyi Mar 29, 2022
3e86455
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Mar 29, 2022
b5a34fc
.
ziyoujiyi Mar 29, 2022
0e4b998
Merge branch 'develop' of https://github.com/ziyoujiyi/Paddle into de…
ziyoujiyi Mar 29, 2022
eeec283
.
ziyoujiyi Mar 29, 2022
d293d97
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Mar 29, 2022
c1759b5
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Mar 30, 2022
d9aa775
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Mar 31, 2022
b299a3c
arm_brpc compile
ziyoujiyi Apr 1, 2022
2863287
.
ziyoujiyi Apr 1, 2022
6848d18
.
ziyoujiyi Apr 1, 2022
7b6fc75
.
ziyoujiyi Apr 1, 2022
e65b6e5
.
ziyoujiyi Apr 1, 2022
e679cd0
.
ziyoujiyi Apr 1, 2022
a2a1c63
.
ziyoujiyi Apr 1, 2022
7a7634e
.
ziyoujiyi Apr 1, 2022
66fa157
.
ziyoujiyi Apr 1, 2022
f7cc89f
.
ziyoujiyi Apr 1, 2022
a96195e
.
ziyoujiyi Apr 1, 2022
7f1e278
.
ziyoujiyi Apr 1, 2022
fba8eee
.
ziyoujiyi Apr 1, 2022
360d494
.
ziyoujiyi Apr 1, 2022
4d4b6b6
.
ziyoujiyi Apr 1, 2022
4930772
only output is ok
ziyoujiyi Apr 1, 2022
185cf4e
base is ok
ziyoujiyi Apr 1, 2022
b3aa41e
.
ziyoujiyi Apr 1, 2022
57056d7
.
ziyoujiyi Apr 1, 2022
d32e424
.
ziyoujiyi Apr 1, 2022
092d2e3
.
ziyoujiyi Apr 1, 2022
978547e
.
ziyoujiyi Apr 1, 2022
e645632
.
ziyoujiyi Apr 1, 2022
1244efb
.
ziyoujiyi Apr 1, 2022
cc6a0fe
.
ziyoujiyi Apr 1, 2022
7105730
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Apr 2, 2022
06304ea
Merge branch 'develop' of https://github.com/ziyoujiyi/Paddle into he…
ziyoujiyi Apr 2, 2022
e00d1af
add switch server bin
ziyoujiyi Apr 2, 2022
867c823
.
ziyoujiyi Apr 2, 2022
e4863a8
.
ziyoujiyi Apr 2, 2022
3914e95
Merge branch 'heter_rpc' of https://github.com/ziyoujiyi/Paddle into …
ziyoujiyi Apr 2, 2022
1f0ddc2
.
ziyoujiyi Apr 2, 2022
cd060ed
.
ziyoujiyi Apr 2, 2022
84ce07d
.
ziyoujiyi Apr 2, 2022
f89383a
.
ziyoujiyi Apr 2, 2022
dc396d7
.
ziyoujiyi Apr 2, 2022
d66d9e4
.
ziyoujiyi Apr 2, 2022
5f0cc8c
.
ziyoujiyi Apr 2, 2022
e8c9d53
.
ziyoujiyi Apr 2, 2022
254e6a3
.
ziyoujiyi Apr 7, 2022
4c4aec7
.
ziyoujiyi Apr 7, 2022
e3303cb
.
ziyoujiyi Apr 7, 2022
b2f9d24
.
ziyoujiyi Apr 7, 2022
bc60040
.
ziyoujiyi Apr 7, 2022
0991930
.
ziyoujiyi Apr 7, 2022
5d0c6d1
.
ziyoujiyi Apr 7, 2022
ee756bd
.
ziyoujiyi Apr 7, 2022
57c2b6e
.
ziyoujiyi Apr 7, 2022
f815742
.
ziyoujiyi Apr 7, 2022
7e91374
.
ziyoujiyi Apr 8, 2022
7ece084
.
ziyoujiyi Apr 8, 2022
82dee5b
.
ziyoujiyi Apr 8, 2022
07b4f3b
.
ziyoujiyi Apr 8, 2022
beb93cc
.
ziyoujiyi Apr 8, 2022
0f164b6
.
ziyoujiyi Apr 8, 2022
2e816ca
.
ziyoujiyi Apr 8, 2022
d2c918e
.
ziyoujiyi Apr 8, 2022
be07755
.
ziyoujiyi Apr 8, 2022
95b6cb8
.
ziyoujiyi Apr 8, 2022
b3b089a
.
ziyoujiyi Apr 8, 2022
d1cc0b6
.
ziyoujiyi Apr 8, 2022
01d808f
.
ziyoujiyi Apr 8, 2022
5540d82
.
ziyoujiyi Apr 8, 2022
8f4f0fe
.
ziyoujiyi Apr 11, 2022
472c1bc
adapt brpc ssl
ziyoujiyi Apr 11, 2022
226d157
.
ziyoujiyi Apr 11, 2022
e2a988d
.
ziyoujiyi Apr 11, 2022
0840268
.
ziyoujiyi Apr 11, 2022
69ac0a2
.
ziyoujiyi Apr 11, 2022
f4ab3ec
.
ziyoujiyi Apr 11, 2022
b5af304
.
ziyoujiyi Apr 11, 2022
13d1a6f
.
ziyoujiyi Apr 11, 2022
678640a
.
ziyoujiyi Apr 11, 2022
74d1d04
.
ziyoujiyi Apr 11, 2022
73ea318
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Apr 11, 2022
44ec521
merge origin/develop
ziyoujiyi Apr 11, 2022
6dc9f6c
.
ziyoujiyi Apr 11, 2022
d9dd5ff
.
ziyoujiyi Apr 11, 2022
750e0de
.
ziyoujiyi Apr 11, 2022
3aa2fad
.
ziyoujiyi Apr 11, 2022
e09c2d5
.
ziyoujiyi Apr 11, 2022
be297f3
.
ziyoujiyi Apr 11, 2022
088a3bb
.
ziyoujiyi Apr 11, 2022
385ff3b
.
ziyoujiyi Apr 11, 2022
409cb71
.
ziyoujiyi Apr 12, 2022
77479b4
.
ziyoujiyi Apr 12, 2022
fa836e1
.
ziyoujiyi Apr 13, 2022
a0bba86
.
ziyoujiyi Apr 13, 2022
5bd81fd
.
ziyoujiyi Apr 14, 2022
f2fe0a1
.
ziyoujiyi Apr 14, 2022
6ea2a61
.
ziyoujiyi Apr 14, 2022
6a42307
.
ziyoujiyi Apr 14, 2022
534f22e
.
ziyoujiyi Apr 14, 2022
4221b3b
.
ziyoujiyi Apr 14, 2022
7f5f685
.
ziyoujiyi Apr 14, 2022
e81b10f
.
ziyoujiyi Apr 14, 2022
e9b7081
.
ziyoujiyi Apr 14, 2022
09d053e
.
ziyoujiyi Apr 14, 2022
ac133a6
.
ziyoujiyi Apr 14, 2022
4451a52
.
ziyoujiyi Apr 14, 2022
77a344b
.
ziyoujiyi Apr 14, 2022
413b6fc
.
ziyoujiyi Apr 15, 2022
7dc2091
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Apr 19, 2022
1462541
fix heter_server & heter_client
ziyoujiyi Apr 24, 2022
2019a5f
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Apr 24, 2022
8ecf9c0
merge dev
ziyoujiyi Apr 24, 2022
2944b41
.
ziyoujiyi Apr 24, 2022
e16c418
.
ziyoujiyi Apr 24, 2022
a288444
int->int64_t
ziyoujiyi Apr 25, 2022
721560b
.
ziyoujiyi Apr 25, 2022
52be1cb
safe map in multithread
ziyoujiyi Apr 26, 2022
f22bbcd
Merge branch 'PaddlePaddle:develop' into develop
ziyoujiyi Apr 26, 2022
67102d8
merge dev
ziyoujiyi Apr 26, 2022
07f6c87
fix heter unitest
ziyoujiyi Apr 27, 2022
ae088cf
.
ziyoujiyi Apr 27, 2022
e21b9a3
fix code_style
ziyoujiyi Apr 27, 2022
3593498
.
ziyoujiyi Apr 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions paddle/fluid/distributed/ps/service/heter_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ DEFINE_int32(switch_send_recv_timeout_s, 600, "switch_send_recv_timeout_s");
namespace paddle {
namespace distributed {
std::shared_ptr<HeterClient> HeterClient::s_instance_ = nullptr;
std::mutex HeterClient::mtx_;
std::shared_ptr<HeterClient> HeterClient::switch_s_instance_ = nullptr;

int GetMicroId(const platform::DeviceContext& ctx,
const framework::Scope* scope) {
Expand Down
24 changes: 16 additions & 8 deletions paddle/fluid/distributed/ps/service/heter_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,22 @@ class HeterClient {
}

// switch client singleton
static HeterClient& GetSwitchInstance(
static std::shared_ptr<HeterClient> GetSwitchInstance(
const std::vector<std::string>& peer_endpoints, int32_t peer_role) {
static HeterClient switch_s_instance_;
if (peer_endpoints.empty()) {
VLOG(4) << "init switch client failed, null peer_endpoints";
if (switch_s_instance_ == nullptr) {
std::unique_lock<std::mutex> lock(mtx_);
if (peer_endpoints.empty()) {
VLOG(4) << "init switch client failed, null peer_endpoints";
}
VLOG(4) << "peer role is: " << peer_role
<< ", addr is: " << peer_endpoints[0];
if (switch_s_instance_ == nullptr) {
switch_s_instance_.reset(new HeterClient());
switch_s_instance_->SetPeerSwitchList(peer_endpoints);
switch_s_instance_->InitClientChannels(false, peer_endpoints,
peer_role);
}
}
VLOG(4) << "peer role is: " << peer_role
<< ", addr is: " << peer_endpoints[0];
switch_s_instance_.SetPeerSwitchList(peer_endpoints);
switch_s_instance_.InitClientChannels(false, peer_endpoints, peer_role);
return switch_s_instance_;
}

Expand Down Expand Up @@ -230,6 +236,8 @@ class HeterClient {
HeterClient(const HeterClient&);

static std::shared_ptr<HeterClient> s_instance_;
static std::mutex mtx_;
static std::shared_ptr<HeterClient> switch_s_instance_;
std::vector<std::shared_ptr<brpc::Channel>> xpu_channels_;
std::vector<std::shared_ptr<brpc::Channel>> previous_xpu_channels_;

Expand Down
37 changes: 24 additions & 13 deletions paddle/fluid/distributed/ps/service/heter_server.h
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -144,31 +144,41 @@ class SendAndRecvVariableHandler final : public ServiceHandlerBase {
brpc::Controller* cntl);

void WaitForVarsConsumed(int32_t group_id, const std::string& var_name) {
timeline_.Start();
// timeline_.Start();
while (true) {
if (vars_ready_flag[group_id][var_name] == 0) {
break;
{
std::lock_guard<std::mutex> lock(scope_mutex_);
if (vars_ready_flag[group_id][var_name] == 0) {
break;
}
}
/*
timeline_.Pause();
if (timeline_.ElapsedSec() > FLAGS_switch_send_recv_timeout_s) {
VLOG(0) << "vars not consumed exceed 10 miniutes";
break;
}
*/
}
return;
}

void WaitForVarsProduced(int32_t group_id, const std::string& var_name) {
timeline_.Start();
// timeline_.Start();
while (true) {
if (vars_ready_flag[group_id][var_name] == 1) {
break;
{
std::lock_guard<std::mutex> lock(scope_mutex_);
if (vars_ready_flag[group_id][var_name] == 1) {
break;
}
}
/*
timeline_.Pause();
if (timeline_.ElapsedSec() > FLAGS_switch_send_recv_timeout_s) {
VLOG(0) << "vars not produced exceed 10 miniutes";
break;
}
*/
}
return;
}
Expand Down Expand Up @@ -379,12 +389,12 @@ class HeterService : public PsService {
::google::protobuf::Closure* done) {
VLOG(4) << "entering SendToSwitch";
brpc::ClosureGuard done_guard(done);
auto& switch_client_ptr_ =
std::shared_ptr<HeterClient> switch_client_ptr_ =
HeterClient::GetSwitchInstance(peer_endpoints_, PEER_ROLE_IS_SWITCH);
if (switch_client_ptr_.peer_switch_channels_.empty()) {
LOG(ERROR) << "switch_client_ptr_.peer_switch_channels_ null";
if (switch_client_ptr_->peer_switch_channels_.empty()) {
LOG(ERROR) << "switch_client_ptr_->peer_switch_channels_ null";
}
brpc::Channel* channel = switch_client_ptr_.peer_switch_channels_[0].get();
brpc::Channel* channel = switch_client_ptr_->peer_switch_channels_[0].get();
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
// proxy: 定义新的 OnHeterRpcDone 对象(或者在类 OnHeterRpcDone 中 reset)
OnHeterRpcDone* closure2 = new OnHeterRpcDone([](void* done) {
Expand Down Expand Up @@ -414,6 +424,7 @@ class HeterService : public PsService {
std_cntl.response_attachment().movable());
fut.wait();
VLOG(4) << "SendToSwitch done";
delete closure2;
}

void SendS2S(::google::protobuf::RpcController* controller,
Expand Down Expand Up @@ -446,11 +457,11 @@ class HeterService : public PsService {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
VLOG(4) << "SendToWorker(client addr) =" << cntl->remote_side();
auto& switch_client_ptr_ =
std::shared_ptr<distributed::HeterClient> switch_client_ptr_ =
HeterClient::GetSwitchInstance(peer_endpoints_, PEER_ROLE_IS_WORKER);
VLOG(4) << "in switch client, peer worker 0: "
<< switch_client_ptr_.peer_worker_list_[0];
brpc::Channel* channel = switch_client_ptr_.peer_worker_channels_[0].get();
<< switch_client_ptr_->peer_worker_list_[0];
brpc::Channel* channel = switch_client_ptr_->peer_worker_channels_[0].get();

auto* closure = reinterpret_cast<OnHeterRpcDone*>(done);
PsService_Stub stub(channel);
Expand Down
9 changes: 5 additions & 4 deletions paddle/fluid/operators/pscore/heter_cloud_comm_cpu_test.cc
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,21 @@ void TestShardSendRecv(
void PressTestSendRecv(
std::shared_ptr<distributed::HeterClient> heter_client_ptr_) {
// long l = 0, m = 0;
// https://paddlerec.bj.bcebos.com/online_infer/arm_brpc_ubuntu18/send_20_34
std::ifstream file("/send_20_34", std::ios::in | std::ios::binary);
// l = file.tellg();
// file.seekg(0, std::ios::end);
// m = file.tellg();
// file.close();
// VLOG(0) << "size of file " << "20_34" << " is " << (m - l) << " bytes.\n";
int64_t vars_len = 2359296 * sizeof(float);
int64_t data_size = vars_len * sizeof(float);
int64_t data_size = vars_len;
VLOG(0) << "float num: " << data_size;
float* data_ptr = new float[data_size];
file.read((char*)data_ptr, 9437184);
VLOG(0) << "send data is: " << data_ptr[0] << ", " << data_ptr[1];
std::vector<std::string> var_names{"34"};
int loopCnt = 600;
int loopCnt = 10000;
auto send_async = [&]() -> void {
int i = 0;
while (i++ < loopCnt) {
Expand Down Expand Up @@ -254,8 +255,8 @@ TEST(HETERSENDANDRECV, CPU) {
exe.Prepare(program, 0); // solve undefined symbol: tensor_table.cc

// TestScopeSendRecv(heter_client_ptr_);
TestShardSendRecv(heter_client_ptr_);
// PressTestSendRecv(heter_client_ptr_);
// TestShardSendRecv(heter_client_ptr_);
PressTestSendRecv(heter_client_ptr_);

switch_server_ptr_a->Stop();
LOG(INFO) << "switch server A stopped";
Expand Down