Skip to content

Commit

Permalink
chore: pass RedisReplyBuilder explicitly from dragonfly connection (#…
Browse files Browse the repository at this point in the history
…4009)

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored Oct 29, 2024
1 parent 566f246 commit 6f6897c
Show file tree
Hide file tree
Showing 25 changed files with 156 additions and 230 deletions.
12 changes: 0 additions & 12 deletions src/facade/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,4 @@ size_t ConnectionContext::UsedMemory() const {
return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) + dfly::HeapSize(acl_commands);
}

void ConnectionContext::SendError(std::string_view str, std::string_view type) {
rbuilder_->SendError(str, type);
}

void ConnectionContext::SendError(ErrorReply error) {
rbuilder_->SendError(error);
}

void ConnectionContext::SendError(OpStatus status) {
rbuilder_->SendError(status);
}

} // namespace facade
34 changes: 0 additions & 34 deletions src/facade/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,40 +47,6 @@ class ConnectionContext {
return res;
}

virtual void SendError(std::string_view str, std::string_view type = std::string_view{});

virtual void SendError(ErrorReply error);

virtual void SendError(OpStatus status);

void SendStored() {
rbuilder_->SendStored();
}

void SendSetSkipped() {
rbuilder_->SendSetSkipped();
}

void SendMGetResponse(SinkReplyBuilder::MGetResponse resp) {
rbuilder_->SendMGetResponse(std::move(resp));
}

void SendLong(long val) {
rbuilder_->SendLong(val);
}

void SendSimpleString(std::string_view str) {
rbuilder_->SendSimpleString(str);
}

void SendOk() {
rbuilder_->SendOk();
}

void SendProtocolError(std::string_view str) {
rbuilder_->SendProtocolError(str);
}

virtual size_t UsedMemory() const;

// connection state / properties.
Expand Down
17 changes: 12 additions & 5 deletions src/facade/dragonfly_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,16 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg) {
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front());

self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()},
self->reply_builder_, self->cc_.get());

self->last_interaction_ = time(nullptr);
self->skip_next_squashing_ = false;
}

void Connection::DispatchOperations::operator()(const Connection::MCPipelineMessage& msg) {
self->service_->DispatchMC(msg.cmd, msg.value, self->cc_.get());
self->service_->DispatchMC(msg.cmd, msg.value, static_cast<MCReplyBuilder*>(self->reply_builder_),
self->cc_.get());
self->last_interaction_ = time(nullptr);
}

Expand Down Expand Up @@ -1087,7 +1089,7 @@ Connection::ParserStatus Connection::ParseRedis() {

auto dispatch_sync = [this, &parse_args, &cmd_vec] {
RespExpr::VecToArgList(parse_args, &cmd_vec);
service_->DispatchCommand(absl::MakeSpan(cmd_vec), cc_.get());
service_->DispatchCommand(absl::MakeSpan(cmd_vec), reply_builder_, cc_.get());
};
auto dispatch_async = [this, &parse_args, tlh = mi_heap_get_backing()]() -> MessageHandle {
return {FromArgs(std::move(parse_args), tlh)};
Expand Down Expand Up @@ -1131,7 +1133,10 @@ auto Connection::ParseMemcache() -> ParserStatus {
MemcacheParser::Command cmd;
string_view value;

auto dispatch_sync = [this, &cmd, &value] { service_->DispatchMC(cmd, value, cc_.get()); };
auto dispatch_sync = [this, &cmd, &value] {
service_->DispatchMC(cmd, value, static_cast<MCReplyBuilder*>(reply_builder_), cc_.get());
};

auto dispatch_async = [&cmd, &value]() -> MessageHandle {
return {make_unique<MCPipelineMessage>(std::move(cmd), value)};
};
Expand Down Expand Up @@ -1353,6 +1358,7 @@ bool Connection::ShouldEndDispatchFiber(const MessageHandle& msg) {

void Connection::SquashPipeline() {
DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_);
DCHECK_EQ(reply_builder_->type(), SinkReplyBuilder::REDIS); // Only Redis is supported.

vector<ArgSlice> squash_cmds;
squash_cmds.reserve(dispatch_q_.size());
Expand All @@ -1367,7 +1373,8 @@ void Connection::SquashPipeline() {

cc_->async_dispatch = true;

size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());
size_t dispatched =
service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_, cc_.get());

if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared
reply_builder_->FlushBatch();
Expand Down
13 changes: 7 additions & 6 deletions src/facade/ok_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ namespace {

class OkService : public ServiceInterface {
public:
void DispatchCommand(ArgSlice args, ConnectionContext* cntx) final {
cntx->SendOk();
void DispatchCommand(ArgSlice args, SinkReplyBuilder* builder, ConnectionContext* cntx) final {
builder->SendOk();
}

size_t DispatchManyCommands(absl::Span<ArgSlice> args_lists, ConnectionContext* cntx) final {
size_t DispatchManyCommands(absl::Span<ArgSlice> args_lists, SinkReplyBuilder* builder,
ConnectionContext* cntx) final {
for (auto args : args_lists)
DispatchCommand(args, cntx);
DispatchCommand(args, builder, cntx);
return args_lists.size();
}

void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
ConnectionContext* cntx) final {
cntx->SendError("");
MCReplyBuilder* builder, ConnectionContext* cntx) final {
builder->SendError("");
}

ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) final {
Expand Down
11 changes: 7 additions & 4 deletions src/facade/service_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,23 @@ namespace facade {

class ConnectionContext;
class Connection;
struct ConnectionStats;
class SinkReplyBuilder;
class MCReplyBuilder;

class ServiceInterface {
public:
virtual ~ServiceInterface() {
}

virtual void DispatchCommand(ArgSlice args, ConnectionContext* cntx) = 0;
virtual void DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
ConnectionContext* cntx) = 0;

// Returns number of processed commands
virtual size_t DispatchManyCommands(absl::Span<ArgSlice> args_list, ConnectionContext* cntx) = 0;
virtual size_t DispatchManyCommands(absl::Span<ArgSlice> args_list, SinkReplyBuilder* builder,
ConnectionContext* cntx) = 0;

virtual void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
ConnectionContext* cntx) = 0;
MCReplyBuilder* builder, ConnectionContext* cntx) = 0;

virtual ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) = 0;

Expand Down
6 changes: 3 additions & 3 deletions src/server/acl/acl_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,16 +629,16 @@ void AclFamily::DryRun(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
using MemberFunc2 = void (AclFamily::*)(CmdArgList args, Transaction* tx,
facade::SinkReplyBuilder* builder);

using MemberFunc3 = void (AclFamily::*)(CmdArgList args, Transaction* tx,
facade::SinkReplyBuilder* builder, ConnectionContext* cntx);
using MemberFunc = void (AclFamily::*)(CmdArgList args, Transaction* tx,
facade::SinkReplyBuilder* builder, ConnectionContext* cntx);

CommandId::Handler2 HandlerFunc(AclFamily* acl, MemberFunc2 f) {
return [=](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder) {
return (acl->*f)(args, tx, builder);
};
}

CommandId::Handler3 HandlerFunc(AclFamily* acl, MemberFunc3 f) {
CommandId::Handler HandlerFunc(AclFamily* acl, MemberFunc f) {
return [=](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
ConnectionContext* cntx) { return (acl->*f)(args, tx, builder, cntx); };
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* bu

using EngineFunc2 = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder);

inline CommandId::Handler3 HandlerFunc(ClusterFamily* se, EngineFunc f) {
inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) {
return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder, ConnectionContext* cntx) {
return (se->*f)(args, builder, cntx);
};
Expand Down
18 changes: 5 additions & 13 deletions src/server/command_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ bool CommandId::IsMultiTransactional() const {
return CO::IsTransKind(name()) || CO::IsEvalKind(name());
}

uint64_t CommandId::Invoke(CmdArgList args, ConnectionContext* cntx) const {
uint64_t CommandId::Invoke(CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
ConnectionContext* cntx) const {
int64_t before = absl::GetCurrentTimeNanos();
handler_(args, cntx);
handler_(args, tx, builder, cntx);
int64_t after = absl::GetCurrentTimeNanos();

ServerState* ss = ServerState::tlocal(); // Might have migrated thread, read after invocation
Expand Down Expand Up @@ -95,17 +96,8 @@ optional<facade::ErrorReply> CommandId::Validate(CmdArgList tail_args) const {
}

CommandId&& CommandId::SetHandler(Handler2 f) && {
handler_ = [f = std::move(f)](CmdArgList args, ConnectionContext* cntx) {
f(args, cntx->transaction, cntx->reply_builder());
};

return std::move(*this);
}

CommandId&& CommandId::SetHandler(Handler3 f) && {
handler_ = [f = std::move(f)](CmdArgList args, ConnectionContext* cntx) {
f(args, cntx->transaction, cntx->reply_builder(), cntx);
};
handler_ = [f = std::move(f)](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
facade::ConnectionContext*) { f(args, tx, builder); };

return std::move(*this);
}
Expand Down
16 changes: 7 additions & 9 deletions src/server/command_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,18 @@ class CommandId : public facade::CommandId {
using Handler =
fu2::function_base<true /*owns*/, true /*copyable*/, fu2::capacity_default,
false /* non-throwing*/, false /* strong exceptions guarantees*/,
void(CmdArgList, ConnectionContext*) const>;
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*,
ConnectionContext*) const>;
using Handler2 =
fu2::function_base<true, true, fu2::capacity_default, false, false,
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*) const>;

using ArgValidator = fu2::function_base<true, true, fu2::capacity_default, false, false,
std::optional<facade::ErrorReply>(CmdArgList) const>;

// Returns the invoke time in usec.
uint64_t Invoke(CmdArgList args, ConnectionContext* cntx) const;
uint64_t Invoke(CmdArgList args, Transaction*, facade::SinkReplyBuilder*,
ConnectionContext* cntx) const;

// Returns error if validation failed, otherwise nullopt
std::optional<facade::ErrorReply> Validate(CmdArgList tail_args) const;
Expand Down Expand Up @@ -122,14 +127,7 @@ class CommandId : public facade::CommandId {
return std::move(*this);
}

using Handler2 =
fu2::function_base<true, true, fu2::capacity_default, false, false,
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*) const>;
using Handler3 = fu2::function_base<true, true, fu2::capacity_default, false, false,
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*,
ConnectionContext*) const>;
CommandId&& SetHandler(Handler2 f) &&;
CommandId&& SetHandler(Handler3 f) &&;

CommandId&& SetValidator(ArgValidator f) && {
validator_ = std::move(f);
Expand Down
23 changes: 0 additions & 23 deletions src/server/conn_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,29 +269,6 @@ size_t ConnectionContext::UsedMemory() const {
return facade::ConnectionContext::UsedMemory() + dfly::HeapSize(conn_state);
}

void ConnectionContext::SendError(std::string_view str, std::string_view type) {
string_view name = cid ? cid->name() : string_view{};

VLOG(1) << "Sending error " << str << " " << type << " during " << name;
facade::ConnectionContext::SendError(str, type);
}

void ConnectionContext::SendError(facade::ErrorReply error) {
string_view name = cid ? cid->name() : string_view{};

VLOG(1) << "Sending error " << error.ToSv() << " during " << name;
facade::ConnectionContext::SendError(std::move(error));
}

void ConnectionContext::SendError(facade::OpStatus status) {
if (status != facade::OpStatus::OK) {
string_view name = cid ? cid->name() : string_view{};
VLOG(1) << "Sending error " << status << " during " << name;
}

facade::ConnectionContext::SendError(status);
}

void ConnectionState::ExecInfo::Clear() {
DCHECK(!preborrowed_interpreter); // Must have been released properly
state = EXEC_INACTIVE;
Expand Down
4 changes: 0 additions & 4 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,6 @@ class ConnectionContext : public facade::ConnectionContext {

size_t UsedMemory() const override;

void SendError(std::string_view str, std::string_view type = std::string_view{}) override;
void SendError(facade::ErrorReply error) override;
void SendError(facade::OpStatus status) override;

// Whether this connection is a connection from a replica to its master.
// This flag is true only on replica side, where we need to setup a special ConnectionContext
// instance that helps applying commands coming from master.
Expand Down
3 changes: 2 additions & 1 deletion src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool
local_tx->StartMultiNonAtomic();
boost::intrusive_ptr<Transaction> stub_tx =
new Transaction{local_tx.get(), EngineShard::tlocal()->shard_id(), nullopt};

absl::InlinedVector<string_view, 5> args_view;
facade::CapturingReplyBuilder crb;
ConnectionContext local_cntx{cntx, stub_tx.get(), &crb};
Expand All @@ -171,7 +172,7 @@ void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx->ns, local_cntx.conn_state.db_index, args_span);

sf->service().InvokeCmd(cid, args_span, &local_cntx);
sf->service().InvokeCmd(cid, args_span, &crb, &local_cntx);
}

local_cntx.Inject(nullptr);
Expand Down
2 changes: 1 addition & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1615,7 +1615,7 @@ void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* buil
};
shard_set->RunBriefInParallel(std::move(cb));

return cntx->SendOk();
return builder->SendOk();
}

void GenericFamily::Dump(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
Expand Down
5 changes: 2 additions & 3 deletions src/server/http_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,11 @@ void HttpAPI(const http::QueryArgs& args, HttpRequest&& req, Service* service,
DCHECK(context);

facade::CapturingReplyBuilder reply_builder;
auto* prev = context->Inject(&reply_builder);

// TODO: to finish this.
service->DispatchCommand(absl::MakeSpan(cmd_slices), context);
service->DispatchCommand(absl::MakeSpan(cmd_slices), &reply_builder, context);
facade::CapturingReplyBuilder::Payload payload = reply_builder.Take();

context->Inject(prev);
auto response = http::MakeStringResponse();
http::SetMime(http::kJsonMime, &response);

Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void JournalExecutor::FlushSlots(const cluster::SlotRange& slot_range) {

void JournalExecutor::Execute(journal::ParsedEntry::CmdData& cmd) {
auto span = CmdArgList{cmd.cmd_args.data(), cmd.cmd_args.size()};
service_->DispatchCommand(span, &conn_context_);
service_->DispatchCommand(span, &reply_builder_, &conn_context_);
}

void JournalExecutor::SelectDb(DbIndex dbid) {
Expand Down
Loading

0 comments on commit 6f6897c

Please sign in to comment.