-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.cc
128 lines (117 loc) · 5.04 KB
/
server.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
// Copyright (c) 2017 Nuxi (https://nuxi.nl/) and contributors.
//
// SPDX-License-Identifier: BSD-2-Clause
#include <thread>
#include <arpc++/arpc++.h>
#include <argdata.hpp>
#include "arpc_protocol.ad.h"
using namespace arpc;
int Server::HandleRequest() {
// Read the next message from the socket. Return end-of-file as -1.
// TODO(ed): Make buffer size configurable!
std::unique_ptr<argdata_reader_t> reader = argdata_reader_t::create(4096, 16);
{
int error = reader->pull(fd_->get());
if (error != 0)
return error;
}
const argdata_t* input = reader->get();
if (input == nullptr)
return -1;
// Parse the received message.
ArgdataParser argdata_parser(reader.get());
arpc_protocol::ClientMessage client_message;
client_message.Parse(*input, &argdata_parser);
if (client_message.has_unary_request()) {
const arpc_protocol::UnaryRequest& unary_request =
client_message.unary_request();
const arpc_protocol::RpcMethod& rpc_method = unary_request.rpc_method();
ArgdataBuilder argdata_builder;
arpc_protocol::ServerMessage server_message;
if (unary_request.server_streaming()) {
// Server-streaming call.
arpc_protocol::StreamingResponseFinish* streaming_response_finish =
server_message.mutable_streaming_response_finish();
auto service = services_.find(rpc_method.service());
if (service == services_.end()) {
// Service not found.
arpc_protocol::Status* status =
streaming_response_finish->mutable_status();
status->set_code(arpc_protocol::StatusCode::UNIMPLEMENTED);
status->set_message("Service not registered");
} else {
// Service found. Invoke call.
ServerContext context;
ServerWriterImpl writer(fd_);
Status rpc_status = service->second->BlockingServerStreamingCall(
rpc_method.rpc(), &context, *unary_request.request(),
&argdata_parser, &writer);
arpc_protocol::Status* status =
streaming_response_finish->mutable_status();
status->set_code(arpc_protocol::StatusCode(rpc_status.error_code()));
status->set_message(rpc_status.error_message());
}
} else {
// Simple unary call.
arpc_protocol::UnaryResponse* unary_response =
server_message.mutable_unary_response();
auto service = services_.find(rpc_method.service());
if (service == services_.end()) {
// Service not found.
arpc_protocol::Status* status = unary_response->mutable_status();
status->set_code(arpc_protocol::StatusCode::UNIMPLEMENTED);
status->set_message("Service not registered");
} else {
// Service found. Invoke call.
ServerContext context;
const argdata_t* response = argdata_t::null();
Status rpc_status = service->second->BlockingUnaryCall(
rpc_method.rpc(), &context, *unary_request.request(),
&argdata_parser, &response, &argdata_builder);
arpc_protocol::Status* status = unary_response->mutable_status();
status->set_code(arpc_protocol::StatusCode(rpc_status.error_code()));
status->set_message(rpc_status.error_message());
unary_response->set_response(response);
}
}
std::unique_ptr<argdata_writer_t> writer = argdata_writer_t::create();
writer->set(server_message.Build(&argdata_builder));
return writer->push(fd_->get());
} else if (client_message.has_streaming_request_start()) {
// Client-streaming call.
// TODO(ed): Implement bidirectional streaming calls?
const arpc_protocol::StreamingRequestStart& streaming_request_start =
client_message.streaming_request_start();
const arpc_protocol::RpcMethod& rpc_method =
streaming_request_start.rpc_method();
arpc_protocol::ServerMessage server_message;
arpc_protocol::UnaryResponse* unary_response =
server_message.mutable_unary_response();
// Find corresponding service.
ArgdataBuilder argdata_builder;
auto service = services_.find(rpc_method.service());
if (service == services_.end()) {
// Service not found.
arpc_protocol::Status* status = unary_response->mutable_status();
status->set_code(arpc_protocol::StatusCode::UNIMPLEMENTED);
status->set_message("Service not registered");
} else {
// Service found. Invoke call.
ServerContext context;
ServerReaderImpl reader(fd_);
const argdata_t* response = argdata_t::null();
Status rpc_status = service->second->BlockingClientStreamingCall(
rpc_method.rpc(), &context, &reader, &response, &argdata_builder);
arpc_protocol::Status* status = unary_response->mutable_status();
status->set_code(arpc_protocol::StatusCode(rpc_status.error_code()));
status->set_message(rpc_status.error_message());
unary_response->set_response(response);
}
std::unique_ptr<argdata_writer_t> writer = argdata_writer_t::create();
writer->set(server_message.Build(&argdata_builder));
return writer->push(fd_->get());
} else {
// Invalid operation.
return EOPNOTSUPP;
}
}