Skip to content

Commit

Permalink
rpc: reduce context switches and receive calls
Browse files Browse the repository at this point in the history
* When queueing a task to execute on the reactor, avoid writing to the
  eventfd to wake it up if such a write has already been done. This
  should reduce the number of read/write syscalls to the eventfd and
  avoid "spurious" wakeups of the reactor.

* When reading inbound data, read an extra 4 bytes, and if it's
  available, loop around to read another call without putting the
  reactor back to sleep.

The effect on context switches is clearly visible using
  rpc-bench --gtest_filter=\*Async

Before:
I0305 12:50:56.463312  7468 rpc-bench.cc:128] Ctx Sw. per req:  0.640409
I0305 12:50:58.015260  7542 rpc-bench.cc:128] Ctx Sw. per req:  0.613172
I0305 12:50:59.563201  7587 rpc-bench.cc:128] Ctx Sw. per req:  0.589479
I0305 12:51:01.014848  7662 rpc-bench.cc:128] Ctx Sw. per req:  0.562744
I0305 12:51:02.666339  7736 rpc-bench.cc:128] Ctx Sw. per req:  0.569126

After:
I0305 12:52:03.567790  9005 rpc-bench.cc:128] Ctx Sw. per req:  0.383251
I0305 12:52:05.050909  9079 rpc-bench.cc:128] Ctx Sw. per req:  0.454404
I0305 12:52:06.626401  9138 rpc-bench.cc:128] Ctx Sw. per req:  0.3308
I0305 12:52:08.123154  9198 rpc-bench.cc:128] Ctx Sw. per req:  0.317752
I0305 12:52:09.666586  9272 rpc-bench.cc:128] Ctx Sw. per req:  0.391739

And on system CPU:

Before:
I0305 12:50:56.463310  7468 rpc-bench.cc:127] Sys CPU per req:  16.5524us
I0305 12:50:58.015259  7542 rpc-bench.cc:127] Sys CPU per req:  16.1158us
I0305 12:50:59.563199  7587 rpc-bench.cc:127] Sys CPU per req:  17.3184us
I0305 12:51:01.014847  7662 rpc-bench.cc:127] Sys CPU per req:  16.7911us
I0305 12:51:02.666337  7736 rpc-bench.cc:127] Sys CPU per req:  15.7659us

After:
I0305 12:52:03.567787  9005 rpc-bench.cc:127] Sys CPU per req:  13.0533us
I0305 12:52:05.050906  9079 rpc-bench.cc:127] Sys CPU per req:  13.7925us
I0305 12:52:06.626399  9138 rpc-bench.cc:127] Sys CPU per req:  11.6987us
I0305 12:52:08.123152  9198 rpc-bench.cc:127] Sys CPU per req:  11.9214us
I0305 12:52:09.666584  9272 rpc-bench.cc:127] Sys CPU per req:  13.4031us

And on syscalls:

todd@turbo:~/kudu$ grep recvfr /tmp/before /tmp/after
/tmp/before:           1458969      syscalls:sys_enter_recvfrom                                     ( +-  1.99% )
/tmp/before:           1458969      syscalls:sys_exit_recvfrom                                     ( +-  1.99% )
/tmp/after:           1252328      syscalls:sys_enter_recvfrom                                     ( +-  1.82% )
/tmp/after:           1252328      syscalls:sys_exit_recvfrom                                     ( +-  1.82% )

todd@turbo:~/kudu$ grep epoll_ctl /tmp/before /tmp/after
/tmp/before:            915862      syscalls:sys_enter_epoll_ctl                                     ( +-  1.47% )
/tmp/before:            915862      syscalls:sys_exit_epoll_ctl                                     ( +-  1.47% )
/tmp/after:            475978      syscalls:sys_enter_epoll_ctl                                     ( +-  3.61% )
/tmp/after:            475978      syscalls:sys_exit_epoll_ctl                                     ( +-  3.61% )

On a more macro-benchmark (TSBS single-groupby-1-1-1 16 workers on an
8-core machine) this also reduces syscalls a bit, though the end-to-end
improvement is minimal.

Before:

 Performance counter stats for 'system wide' (10 runs):

           340,444      cs                                                            ( +-  0.30% )
           144,024      syscalls:sys_enter_recvfrom                                     ( +-  0.00% )
            94,379      syscalls:sys_enter_epoll_ctl                                     ( +-  0.06% )
           129,376      syscalls:sys_enter_epoll_wait                                     ( +-  0.10% )

       2.025755946 seconds time elapsed                                          ( +-  0.43% )

After:
 Performance counter stats for 'system wide' (10 runs):

           333,865      cs                                                            ( +-  0.27% )
           119,216      syscalls:sys_enter_recvfrom                                     ( +-  0.04% )
            88,731      syscalls:sys_enter_epoll_ctl                                     ( +-  0.08% )
           104,149      syscalls:sys_enter_epoll_wait                                     ( +-  0.08% )

       2.005614271 seconds time elapsed                                          ( +-  0.19% )

Change-Id: I32c5e4d146c25be8e90665a0cb8385fcd017b15c
Reviewed-on: http://gerrit.cloudera.org:8080/15440
Reviewed-by: Andrew Wong <awong@cloudera.com>
Tested-by: Andrew Wong <awong@cloudera.com>
Reviewed-by: Bankim Bhavsar <bankim@cloudera.com>
  • Loading branch information
toddlipcon committed Mar 20, 2020
1 parent 8026237 commit 1a21e92
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 35 deletions.
17 changes: 8 additions & 9 deletions src/kudu/rpc/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/transfer.h"
#include "kudu/util/faststring.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/slice.h"
Expand Down Expand Up @@ -645,11 +646,12 @@ void Connection::ReadHandler(ev::io &watcher, int revents) {
}
last_activity_time_ = reactor_thread_->cur_time();

faststring extra_buf;
while (true) {
if (!inbound_) {
inbound_.reset(new InboundTransfer());
}
Status status = inbound_->ReceiveBuffer(*socket_);
Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf);
if (PREDICT_FALSE(!status.ok())) {
if (status.posix_code() == ESHUTDOWN) {
VLOG(1) << ToString() << " shut down by remote end.";
Expand All @@ -673,14 +675,11 @@ void Connection::ReadHandler(ev::io &watcher, int revents) {
LOG(FATAL) << "Invalid direction: " << direction_;
}

// TODO: it would seem that it would be good to loop around and see if
// there is more data on the socket by trying another recv(), but it turns
// out that it really hurts throughput to do so. A better approach
// might be for each InboundTransfer to actually try to read an extra byte,
// and if it succeeds, then we'd copy that byte into a new InboundTransfer
// and loop around, since it's likely the next call also arrived at the
// same time.
break;
if (extra_buf.size() > 0) {
inbound_.reset(new InboundTransfer(std::move(extra_buf)));
} else {
break;
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/kudu/rpc/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,7 @@ void Reactor::QueueCancellation(const shared_ptr<OutboundCall>& call) {
}

void Reactor::ScheduleReactorTask(ReactorTask *task) {
bool was_empty;
{
std::unique_lock<LockType> l(lock_);
if (closing_) {
Expand All @@ -936,9 +937,12 @@ void Reactor::ScheduleReactorTask(ReactorTask *task) {
task->Abort(ShutdownError(false));
return;
}
was_empty = pending_tasks_.empty();
pending_tasks_.push_back(*task);
}
thread_.WakeThread();
if (was_empty) {
thread_.WakeThread();
}
}

bool Reactor::DrainTaskQueue(boost::intrusive::list<ReactorTask> *tasks) { // NOLINT(*)
Expand Down
70 changes: 47 additions & 23 deletions src/kudu/rpc/transfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <sys/uio.h>

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <limits>
Expand Down Expand Up @@ -88,32 +89,43 @@ TransferCallbacks::~TransferCallbacks()
{}

InboundTransfer::InboundTransfer()
: total_length_(kMsgLengthPrefixLength),
: total_length_(0),
cur_offset_(0) {
buf_.resize(kMsgLengthPrefixLength);
}

Status InboundTransfer::ReceiveBuffer(Socket &socket) {
if (cur_offset_ < kMsgLengthPrefixLength) {
// receive uint32 length prefix
int32_t rem = kMsgLengthPrefixLength - cur_offset_;
int32_t nread;
Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
if (nread == 0) {
return Status::OK();
}
DCHECK_GE(nread, 0);
cur_offset_ += nread;
InboundTransfer::InboundTransfer(faststring initial_buf)
: buf_(std::move(initial_buf)),
total_length_(0),
cur_offset_(buf_.size()) {
buf_.resize(std::max<size_t>(kMsgLengthPrefixLength, buf_.size()));
}

Status InboundTransfer::ReceiveBuffer(Socket* socket, faststring* extra_4) {
static constexpr int kExtraReadLength = kMsgLengthPrefixLength;
if (total_length_ == 0) {
// We haven't yet parsed the message length. It's possible that the
// length is already available in the buffer passed in the constructor.
if (cur_offset_ < kMsgLengthPrefixLength) {
// If we still don't have the full length prefix, we can't continue
// reading yet.
return Status::OK();
// receive uint32 length prefix
int32_t rem = kMsgLengthPrefixLength - cur_offset_;
int32_t nread;
Status status = socket->Recv(&buf_[cur_offset_], rem, &nread);
RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
if (nread == 0) {
return Status::OK();
}
DCHECK_GE(nread, 0);
cur_offset_ += nread;
if (cur_offset_ < kMsgLengthPrefixLength) {
// If we still don't have the full length prefix, we can't continue
// reading yet.
return Status::OK();
}
}
// Since we only read 'rem' bytes above, we should now have exactly
// the length prefix in our buffer and no more.
DCHECK_EQ(cur_offset_, kMsgLengthPrefixLength);

// Parse the message length out of the prefix.
DCHECK_GE(cur_offset_, kMsgLengthPrefixLength);
// The length prefix doesn't include its own 4 bytes, so we have to
// add that back in.
total_length_ = NetworkByteOrder::Load32(&buf_[0]) + kMsgLengthPrefixLength;
Expand All @@ -126,7 +138,7 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) {
return Status::NetworkError(Substitute("RPC frame had invalid length of $0",
total_length_));
}
buf_.resize(total_length_);
buf_.resize(total_length_ + kExtraReadLength);

// Fall through to receive the message body, which is likely to be already
// available on the socket.
Expand All @@ -139,12 +151,24 @@ Status InboundTransfer::ReceiveBuffer(Socket &socket) {
// INT_MAX. The message will be split across multiple Recv() calls.
// Note that this is only needed when rpc_max_message_size > INT_MAX, which is
// currently only used for unit tests.
int32_t rem = std::min(total_length_ - cur_offset_,
int32_t rem = std::min(total_length_ - cur_offset_ + kExtraReadLength,
static_cast<uint32_t>(std::numeric_limits<int32_t>::max()));
Status status = socket.Recv(&buf_[cur_offset_], rem, &nread);
Status status = socket->Recv(&buf_[cur_offset_], rem, &nread);
RETURN_ON_ERROR_OR_SOCKET_NOT_READY(status);
cur_offset_ += nread;

// We may have read some extra bytes, in which case we need to trim them off
// and write them into the provided buffer.
if (cur_offset_ >= total_length_) {
int64_t extra_read = cur_offset_ - total_length_;
DCHECK_LE(extra_read, kExtraReadLength);
DCHECK_GE(extra_read, 0);
extra_4->clear();
extra_4->append(&buf_[total_length_], extra_read);
cur_offset_ = total_length_;
buf_.resize(total_length_);
}

return Status::OK();
}

Expand All @@ -153,7 +177,7 @@ bool InboundTransfer::TransferStarted() const {
}

bool InboundTransfer::TransferFinished() const {
return cur_offset_ == total_length_;
return total_length_ > 0 && cur_offset_ == total_length_;
}

string InboundTransfer::StatusAsString() const {
Expand Down
11 changes: 9 additions & 2 deletions src/kudu/rpc/transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,15 @@ class InboundTransfer {
public:

InboundTransfer();
explicit InboundTransfer(faststring initial_buf);

// read from the socket into our buffer
Status ReceiveBuffer(Socket &socket);
// Read from the socket into our buffer.
//
// If this is the last read of the transfer (i.e. if TransferFinished() is true
// after this call returns OK), up to 4 extra bytes may have been read
// from the socket and stored in 'extra_4'. In that case, any previous content of
// 'extra_4' is replaced by this extra bytes.
Status ReceiveBuffer(Socket *socket, faststring* extra_4);

// Return true if any bytes have yet been sent.
bool TransferStarted() const;
Expand All @@ -91,6 +97,7 @@ class InboundTransfer {

faststring buf_;

// 0 indicates not yet set
uint32_t total_length_;
uint32_t cur_offset_;

Expand Down

0 comments on commit 1a21e92

Please sign in to comment.