-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add rpc
interface.
#10805
Add rpc
interface.
#10805
Conversation
const framework::Scope& scope, | ||
const std::string& var_name, | ||
int64_t time_out = 600 * 1000) { | ||
PADDLE_ENFORCE(false, "RPCServer WaitServerReady is not implemented!"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can just make this a pure virtual
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
const platform::DeviceContext& ctx, | ||
const framework::Scope& scope, | ||
const std::string& var_name, | ||
int64_t time_out = 600 * 1000) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a constant to represent 600 * 1000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
#include <time.h> | ||
|
||
#include <chrono> // NOLINT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pls remove all not used includes for this interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
class RPCServer { | ||
public: | ||
RPCServer(const std::string &address, bool sync_mode) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
address => bind address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
// functions to sync server barrier status. | ||
void WaitCond(int cond); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking a way to register rpc calls with conditions like :
// register rpc call name to a condition id with will be waiting on
virtual void RegisterBarrier(const std::string& rpc_name, int cond_id) = 0;
// wait the RPC call barrier, which means wait all the clients have
// performed the call.
virtual void WaitCond(const std::string& rpc_name) = 0;
|
||
protected: | ||
std::string address_; | ||
const bool sync_mode_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sync_mode_
is actually what the operator should take care of. I think if we are going to give a "RPC Interface", we must clean up the code and make sure ops and rpc_servers and rpc_clients do what they are.
Also for scope_ and dev_ctx_
public: | ||
virtual bool AsyncSendVariable(const std::string& ep, | ||
const platform::DeviceContext& ctx, | ||
const framework::Scope& scope, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can omit the argument ctx
because we can get "place" from the tensor/selectedrows, and the type can be got from the variable. So we may only need a Variable
, it's name and endpoint here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -138,3 +137,8 @@ TEST(PREFETCH, CPU) { | |||
EXPECT_EQ(ptr[0 + i * value.dims()[1]], static_cast<float>(i * 2)); | |||
} | |||
} | |||
|
|||
TEST(PREFETCH, CPU) { | |||
std::shared_ptr<detail::RPCClient> client(new detail::GRPCClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why shared_ptr? would unique_ptr be enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} else if (var->IsType<framework::SelectedRows>()) { | ||
place = var->Get<framework::LoDTensor>().place(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be Get<framework::SelectedRows>?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
server_thread.join(); | ||
auto* ptr = rpc_service.release(); | ||
delete ptr; | ||
} | ||
|
||
TEST(SendNcclId, Normal) { | ||
std::shared_ptr<detail::RPCClient> client(new detail::GRPCClient()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why shared_ptr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
const std::string& var_name, | ||
int64_t time_out = RPCClient::rpc_time_out); | ||
|
||
virtual bool AsyncGetVariable(const std::string& ep, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should replace virtual with overwrite in subclass
https://stackoverflow.com/questions/39932391/virtual-override-or-both-c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I will fix it.
Fix part of #10804