Skip to content

Commit

Permalink
Merge pull request #4 from monographdb/resume_q_zkl
Browse files Browse the repository at this point in the history
Add resume_rq for remote task and improve wait_task.
  • Loading branch information
MrGuin authored Aug 25, 2023
2 parents 7d1df9f + 86048e8 commit d77640f
Show file tree
Hide file tree
Showing 9 changed files with 5,426 additions and 25 deletions.
10 changes: 10 additions & 0 deletions src/brpc/policy/redis_protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "brpc/redis.h"
#include "brpc/redis_command.h"
#include "brpc/policy/redis_protocol.h"
#include "bvar/latency_recorder.h"

namespace brpc {

Expand Down Expand Up @@ -144,6 +145,9 @@ void RedisConnContext::Destroy() {

// ========== impl of RedisConnContext ==========

inline bvar::LatencyRecorder socket_write_latency("socket", "write");
inline bvar::LatencyRecorder consume_cmd_latency("socket", "consume_cmd");

ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
bool read_eof, const void* arg) {
if (read_eof || source->empty()) {
Expand Down Expand Up @@ -174,22 +178,28 @@ ParseResult ParseRedisMessage(butil::IOBuf* source, Socket* socket,
if (err != PARSE_OK) {
break;
}
int64_t start_time_us = butil::cpuwide_time_us();
if (ConsumeCommand(ctx, current_args, false, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
consume_cmd_latency << (butil::cpuwide_time_us() - start_time_us);
current_args.swap(next_args);
}
int64_t start_time_us = butil::cpuwide_time_us();
if (ConsumeCommand(ctx, current_args,
true /*must be the last message*/, &appender) != 0) {
return MakeParseError(PARSE_ERROR_ABSOLUTELY_WRONG);
}
consume_cmd_latency << (butil::cpuwide_time_us() - start_time_us);
butil::IOBuf sendbuf;
appender.move_to(sendbuf);
CHECK(!sendbuf.empty());
Socket::WriteOptions wopt;
wopt.ignore_eovercrowded = true;
start_time_us = butil::cpuwide_time_us();
LOG_IF(WARNING, socket->Write(&sendbuf, &wopt) != 0)
<< "Fail to send redis reply";
socket_write_latency << (butil::cpuwide_time_us() - start_time_us);
if(ctx->parser.ParsedArgsSize() == 0) {
ctx->arena.clear();
}
Expand Down
Loading

0 comments on commit d77640f

Please sign in to comment.