-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchannel.cc
76 lines (64 loc) · 2.74 KB
/
channel.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Copyright (c) 2017 Nuxi (https://nuxi.nl/) and contributors.
//
// SPDX-License-Identifier: BSD-2-Clause
#include <poll.h>
#include <cstring>
#include <memory>
#include <arpc++/arpc++.h>
#include "arpc_protocol.ad.h"
using namespace arpc;
Status Channel::BlockingUnaryCall(const RpcMethod& method,
ClientContext* context,
const Message& request, Message* response) {
// Send the request.
arpc_protocol::ClientMessage client_message;
arpc_protocol::UnaryRequest* unary_request =
client_message.mutable_unary_request();
arpc_protocol::RpcMethod* rpc_method = unary_request->mutable_rpc_method();
rpc_method->set_service(method.first);
rpc_method->set_rpc(method.second);
ArgdataBuilder argdata_builder;
unary_request->set_request(request.Build(&argdata_builder));
std::unique_ptr<argdata_writer_t> writer = argdata_writer_t::create();
writer->set(client_message.Build(&argdata_builder));
int error = writer->push(fd_->get());
if (error != 0)
return Status(StatusCode::INTERNAL, strerror(error));
// Process the response.
return FinishUnaryResponse(response);
}
Status Channel::FinishUnaryResponse(Message* response) {
// TODO(ed): Make message size configurable.
std::unique_ptr<argdata_reader_t> reader = argdata_reader_t::create(4096, 16);
int error = reader->pull(fd_->get());
if (error != 0)
return Status(StatusCode::INTERNAL, strerror(error));
const argdata_t* server_response = reader->get();
if (server_response == nullptr)
return Status(StatusCode::INTERNAL, "Channel closed by server");
ArgdataParser argdata_parser(reader.get());
arpc_protocol::ServerMessage server_message;
server_message.Parse(*server_response, &argdata_parser);
if (!server_message.has_unary_response())
return Status(StatusCode::INTERNAL, "Server sent invalid response");
const arpc_protocol::UnaryResponse& unary_response =
server_message.unary_response();
// TODO(ed): Only do the parsing upon success!
response->Clear();
response->Parse(*unary_response.response(), &argdata_parser);
const arpc_protocol::Status& status = unary_response.status();
return Status(StatusCode(status.code()), status.message());
}
arpc_connectivity_state Channel::GetState(bool try_to_connect) {
// Perform a non-blocking poll() call to check file descriptor state.
struct pollfd pfd = {.fd = fd_->get(), .events = POLLIN | POLLOUT};
if (poll(&pfd, 1, 0) == -1)
return ARPC_CHANNEL_SHUTDOWN;
return (pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) == 0
? ARPC_CHANNEL_READY
: ARPC_CHANNEL_SHUTDOWN;
}
std::shared_ptr<Channel> arpc::CreateChannel(
const std::shared_ptr<FileDescriptor>& fd) {
return std::make_shared<Channel>(fd);
}