Skip to content

Commit

Permalink
feat(connection): Support pipelining with Memcached (#2648)
Browse files Browse the repository at this point in the history
* feat(connection): Support pipelining with Memcached

Adds support for pipelining to Memcached, enhances Memcached pytests

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
  • Loading branch information
dranikpg authored Feb 23, 2024
1 parent 47171c4 commit 5ee61db
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 142 deletions.
179 changes: 113 additions & 66 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,28 +277,6 @@ string_view Connection::PubMessage::Message() const {
return {buf.get() + channel_len, message_len};
}

struct Connection::DispatchOperations {
DispatchOperations(SinkReplyBuilder* b, Connection* me)
: stats{&tl_facade_stats->conn_stats}, builder{b}, self(me) {
}

void operator()(const PubMessage& msg);
void operator()(Connection::PipelineMessage& msg);
void operator()(const MonitorMessage& msg);
void operator()(const AclUpdateMessage& msg);
void operator()(const MigrationRequestMessage& msg);
void operator()(CheckpointMessage msg);
void operator()(const InvalidationMessage& msg);

template <typename T, typename D> void operator()(unique_ptr<T, D>& ptr) {
operator()(*ptr.get());
}

ConnectionStats* stats = nullptr;
SinkReplyBuilder* builder = nullptr;
Connection* self = nullptr;
};

void Connection::PipelineMessage::SetArgs(const RespVec& args) {
auto* next = storage.data();
for (size_t i = 0; i < args.size(); ++i) {
Expand All @@ -312,6 +290,41 @@ void Connection::PipelineMessage::SetArgs(const RespVec& args) {
}
}

Connection::MCPipelineMessage::MCPipelineMessage(MemcacheParser::Command cmd_in,
std::string_view value_in)
: cmd{std::move(cmd_in)}, value{value_in}, backing_size{0} {
// Note: The process of laundering string_views should be placed in an utility function,
// but there are no other uses like this so far.

// Compute total size and create backing
backing_size = cmd.key.size() + value.size();
for (const auto& ext_key : cmd.keys_ext)
backing_size += ext_key.size();

backing = make_unique<char[]>(backing_size);

// Copy everything into backing
if (!cmd.key.empty())
memcpy(backing.get(), cmd.key.data(), cmd.key.size());
if (!value.empty())
memcpy(backing.get() + cmd.key.size(), value.data(), value.size());
size_t offset = cmd.key.size() + value.size();
for (const auto& ext_key : cmd.keys_ext) {
if (!ext_key.empty())
memcpy(backing.get() + offset, ext_key.data(), ext_key.size());
offset += ext_key.size();
}

// Update string_views
cmd.key = string_view{backing.get(), cmd.key.size()};
value = string_view{backing.get() + cmd.key.size(), value.size()};
offset = cmd.key.size() + value.size();
for (auto& key : cmd.keys_ext) {
key = {backing.get() + offset, key.size()};
offset += key.size();
}
}

void Connection::MessageDeleter::operator()(PipelineMessage* msg) const {
msg->~PipelineMessage();
mi_free(msg);
Expand Down Expand Up @@ -360,6 +373,10 @@ size_t Connection::MessageHandle::UsedMemory() const {
size_t operator()(const InvalidationMessage& msg) {
return 0;
}
size_t operator()(const MCPipelineMessagePtr& msg) {
return sizeof(MCPipelineMessage) + msg->backing_size +
msg->cmd.keys_ext.size() * sizeof(string_view);
}
};

return sizeof(MessageHandle) + visit(MessageSize{}, this->handle);
Expand All @@ -379,9 +396,34 @@ bool Connection::MessageHandle::IsPubMsg() const {
}

bool Connection::MessageHandle::IsReplying() const {
return IsPipelineMsg() || IsPubMsg() || holds_alternative<MonitorMessage>(handle);
return IsPipelineMsg() || IsPubMsg() || holds_alternative<MonitorMessage>(handle) ||
(holds_alternative<MCPipelineMessagePtr>(handle) &&
!get<MCPipelineMessagePtr>(handle)->cmd.no_reply);
}

struct Connection::DispatchOperations {
DispatchOperations(SinkReplyBuilder* b, Connection* me)
: stats{&tl_facade_stats->conn_stats}, builder{b}, self(me) {
}

void operator()(const PubMessage& msg);
void operator()(Connection::PipelineMessage& msg);
void operator()(const Connection::MCPipelineMessage& msg);
void operator()(const MonitorMessage& msg);
void operator()(const AclUpdateMessage& msg);
void operator()(const MigrationRequestMessage& msg);
void operator()(CheckpointMessage msg);
void operator()(const InvalidationMessage& msg);

template <typename T, typename D> void operator()(unique_ptr<T, D>& ptr) {
operator()(*ptr.get());
}

ConnectionStats* stats = nullptr;
SinkReplyBuilder* builder = nullptr;
Connection* self = nullptr;
};

void Connection::DispatchOperations::operator()(const MonitorMessage& msg) {
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
rbuilder->SendSimpleString(msg);
Expand Down Expand Up @@ -422,6 +464,11 @@ void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg
self->skip_next_squashing_ = false;
}

void Connection::DispatchOperations::operator()(const Connection::MCPipelineMessage& msg) {
self->service_->DispatchMC(msg.cmd, msg.value, self->cc_.get());
self->last_interaction_ = time(nullptr);
}

void Connection::DispatchOperations::operator()(const MigrationRequestMessage& msg) {
// no-op
}
Expand Down Expand Up @@ -846,64 +893,66 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
}
}

void Connection::DispatchCommand(uint32_t consumed, mi_heap_t* heap) {
bool can_dispatch_sync = (consumed >= io_buf_.InputLen());

if (tl_traffic_logger.log_file) {
// Log command as soon as we receive it
LogTraffic(id_, !can_dispatch_sync, absl::MakeSpan(tmp_parse_args_));
}

// Avoid sync dispatch if an async dispatch is already in progress, or else they'll interleave.
if (cc_->async_dispatch)
can_dispatch_sync = false;
void Connection::DispatchCommand(bool has_more, absl::FunctionRef<void()> dispatch_sync,
absl::FunctionRef<MessageHandle()> dispatch_async) {
// Avoid sync dispatch if we can interleave with an ongoing async dispatch
bool can_dispatch_sync = !cc_->async_dispatch;

// Avoid sync dispatch if we already have pending async messages or
// can potentially receive some (subscriptions > 0). Otherwise the dispatch
// fiber might be constantly blocked by sync_dispatch.
// can potentially receive some (subscriptions > 0)
if (dispatch_q_.size() > 0 || cc_->subscriptions > 0)
can_dispatch_sync = false;

if (can_dispatch_sync) {
ShrinkPipelinePool(); // Gradually release pipeline request pool.

RespExpr::VecToArgList(tmp_parse_args_, &tmp_cmd_vec_);
// Dispatch async if we're handling a pipeline or if we can't dispatch sync.
if (has_more || !can_dispatch_sync) {
SendAsync(dispatch_async());

if (dispatch_q_.size() > 10)
ThisFiber::Yield();
} else {
ShrinkPipelinePool(); // Gradually release pipeline request pool.
{
cc_->sync_dispatch = true;
service_->DispatchCommand(absl::MakeSpan(tmp_cmd_vec_), cc_.get());
dispatch_sync();
cc_->sync_dispatch = false;
}

last_interaction_ = time(nullptr);

// We might have blocked the dispatch queue from processing, wake it up.
if (dispatch_q_.size() > 0)
evc_.notify();

} else {
SendAsync(MessageHandle{FromArgs(std::move(tmp_parse_args_), heap)});
if (dispatch_q_.size() > 10)
ThisFiber::Yield();
}
}

Connection::ParserStatus Connection::ParseRedis(SinkReplyBuilder* orig_builder) {
uint32_t consumed = 0;

RedisParser::Result result = RedisParser::OK;
mi_heap_t* tlh = mi_heap_get_backing();

// Re-use connection local resources to reduce allocations
RespVec& parse_args = tmp_parse_args_;
CmdArgVec& cmd_vec = tmp_cmd_vec_;

auto dispatch_sync = [this, &parse_args, &cmd_vec] {
RespExpr::VecToArgList(parse_args, &cmd_vec);
service_->DispatchCommand(absl::MakeSpan(cmd_vec), cc_.get());
};
auto dispatch_async = [this, &parse_args, tlh = mi_heap_get_backing()]() -> MessageHandle {
return {FromArgs(std::move(parse_args), tlh)};
};

do {
result = redis_parser_->Parse(io_buf_.InputBuffer(), &consumed, &tmp_parse_args_);
result = redis_parser_->Parse(io_buf_.InputBuffer(), &consumed, &parse_args);

if (result == RedisParser::OK && !tmp_parse_args_.empty()) {
RespExpr& first = tmp_parse_args_.front();
if (first.type == RespExpr::STRING) {
if (result == RedisParser::OK && !parse_args.empty()) {
if (RespExpr& first = parse_args.front(); first.type == RespExpr::STRING)
DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf());
}

DispatchCommand(consumed, tlh);
bool has_more = consumed < io_buf_.InputLen();

if (tl_traffic_logger.log_file) // Log command as soon as we receive it
LogTraffic(id_, has_more, absl::MakeSpan(parse_args));

DispatchCommand(has_more, dispatch_sync, dispatch_async);
}
io_buf_.ConsumeInput(consumed);
} while (RedisParser::OK == result && !orig_builder->GetError());
Expand All @@ -919,10 +968,17 @@ Connection::ParserStatus Connection::ParseRedis(SinkReplyBuilder* orig_builder)
}

auto Connection::ParseMemcache() -> ParserStatus {
MemcacheParser::Result result = MemcacheParser::OK;
uint32_t consumed = 0;
MemcacheParser::Result result = MemcacheParser::OK;

MemcacheParser::Command cmd;
string_view value;

auto dispatch_sync = [this, &cmd, &value] { service_->DispatchMC(cmd, value, cc_.get()); };
auto dispatch_async = [&cmd, &value]() -> MessageHandle {
return {make_unique<MCPipelineMessage>(std::move(cmd), value)};
};

MCReplyBuilder* builder = static_cast<MCReplyBuilder*>(cc_->reply_builder());

do {
Expand All @@ -948,20 +1004,11 @@ auto Connection::ParseMemcache() -> ParserStatus {
}

value = parsed_value.substr(0, cmd.bytes_len);
// TODO: dispatch.
} else {
return NEED_MORE;
}
}

// An optimization to skip dispatch_q_ if no pipelining is identified.
// We use ASYNC_DISPATCH as a lock to avoid out-of-order replies when the
// dispatch fiber pulls the last record but is still processing the command and then this
// fiber enters the condition below and executes out of order.
bool is_sync_dispatch = !cc_->async_dispatch;
if (dispatch_q_.empty() && is_sync_dispatch) {
service_->DispatchMC(cmd, value, cc_.get());
}
DispatchCommand(total_len < io_buf_.InputLen(), dispatch_sync, dispatch_async);
io_buf_.ConsumeInput(total_len);
} while (!builder->GetError());

Expand Down
29 changes: 23 additions & 6 deletions src/facade/dragonfly_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "core/fibers.h"
#include "facade/acl_commands_def.h"
#include "facade/facade_types.h"
#include "facade/memcache_parser.h"
#include "facade/resp_expr.h"
#include "util/connection.h"
#include "util/http/http_handler.h"
Expand Down Expand Up @@ -45,7 +46,6 @@ namespace facade {
class ConnectionContext;
class RedisParser;
class ServiceInterface;
class MemcacheParser;
class SinkReplyBuilder;

// Connection represents an active connection for a client.
Expand Down Expand Up @@ -78,7 +78,7 @@ class Connection : public util::Connection {
size_t message_len);
};

// Pipeline message, accumulated command to be executed.
// Pipeline message, accumulated Redis command to be executed.
struct PipelineMessage {
PipelineMessage(size_t nargs, size_t capacity) : args(nargs), storage(capacity) {
}
Expand All @@ -97,6 +97,16 @@ class Connection : public util::Connection {
StorageType storage;
};

// Pipeline message, accumulated Memcached command to be executed.
struct MCPipelineMessage {
MCPipelineMessage(MemcacheParser::Command cmd, std::string_view value);

MemcacheParser::Command cmd;
std::string_view value;
size_t backing_size;
std::unique_ptr<char[]> backing; // backing for cmd and value
};

// Monitor message, carries a simple payload with the registered event to be sent.
struct MonitorMessage : public std::string {};

Expand Down Expand Up @@ -129,6 +139,8 @@ class Connection : public util::Connection {
// Requests are allocated on the mimalloc heap and thus require a custom deleter.
using PipelineMessagePtr = std::unique_ptr<PipelineMessage, MessageDeleter>;
using PubMessagePtr = std::unique_ptr<PubMessage, MessageDeleter>;

using MCPipelineMessagePtr = std::unique_ptr<MCPipelineMessage>;
using AclUpdateMessagePtr = std::unique_ptr<AclUpdateMessage>;

// Variant wrapper around different message types
Expand All @@ -143,8 +155,9 @@ class Connection : public util::Connection {
bool IsPubMsg() const;
bool IsReplying() const; // control messges don't reply, messages carrying data do

std::variant<MonitorMessage, PubMessagePtr, PipelineMessagePtr, AclUpdateMessagePtr,
MigrationRequestMessage, CheckpointMessage, InvalidationMessage>
std::variant<MonitorMessage, PubMessagePtr, PipelineMessagePtr, MCPipelineMessagePtr,
AclUpdateMessagePtr, MigrationRequestMessage, CheckpointMessage,
InvalidationMessage>
handle;

// time when the message was dispatched to the dispatch queue as reported by
Expand Down Expand Up @@ -329,8 +342,11 @@ class Connection : public util::Connection {
// Returns true if HTTP header is detected.
io::Result<bool> CheckForHttpProto(util::FiberSocketBase* peer);

// Dispatch last command parsed by ParseRedis
void DispatchCommand(uint32_t consumed, mi_heap_t* heap);
// Dispatch Redis or MC command. `has_more` should indicate whether the buffer has more commands
// (pipelining in progress). Performs async dispatch if forced (already in async mode) or if
// has_more is true, otherwise uses synchronous dispatch.
void DispatchCommand(bool has_more, absl::FunctionRef<void()> sync_dispatch,
absl::FunctionRef<MessageHandle()> async_dispatch);

// Handles events from dispatch queue.
void DispatchFiber(util::FiberSocketBase* peer);
Expand Down Expand Up @@ -405,6 +421,7 @@ class Connection : public util::Connection {
BreakerCb breaker_cb_;
std::unique_ptr<Shutdown> shutdown_cb_;

// Used by redis parser to avoid allocations
RespVec tmp_parse_args_;
CmdArgVec tmp_cmd_vec_;

Expand Down
13 changes: 11 additions & 2 deletions src/facade/memcache_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,19 @@ MP::Result ParseValueless(TokensView tokens, MP::Command* res) {
}
++key_pos;
}

// We support only `flushall` or `flushall 0`
if (key_pos < num_tokens && res->type == MP::FLUSHALL) {
int delay = 0;
if (key_pos + 1 == num_tokens && absl::SimpleAtoi(tokens[key_pos], &delay) && delay == 0)
return MP::OK;
return MP::PARSE_ERROR;
}

res->key = tokens[key_pos++];

if (key_pos < num_tokens && base::_in(res->type, {MP::STATS, MP::FLUSHALL}))
return MP::PARSE_ERROR; // we do not support additional arguments for now.
if (key_pos < num_tokens && res->type == MP::STATS)
return MP::PARSE_ERROR; // we don't support additional arguments to stats for now

if (res->type == MP::INCR || res->type == MP::DECR) {
if (key_pos == num_tokens)
Expand Down
Loading

0 comments on commit 5ee61db

Please sign in to comment.