Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Introduce small buffer in redis parser #4076

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 59 additions & 15 deletions src/facade/redis_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
#include "facade/redis_parser.h"

#include <absl/strings/escaping.h>
#include <absl/strings/numbers.h>

#include "base/logging.h"
Expand All @@ -18,6 +19,9 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
*consumed = 0;
res->clear();

DVLOG(2) << "Parsing: "
<< absl::CHexEscape(string_view{reinterpret_cast<const char*>(str.data()), str.size()});

if (state_ == CMD_COMPLETE_S) {
if (InitStart(str[0], res)) {
// We recognized a non-INLINE state, starting with a special char.
Expand Down Expand Up @@ -62,6 +66,18 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
case BULK_STR_S:
resultc = ConsumeBulk(str);
break;
case SLASH_N_S:
if (str[0] != '\n') {
resultc.first = BAD_STRING;
} else {
resultc = {OK, 1};
if (arg_c_ == '_') {
cached_expr_->emplace_back(RespExpr::NIL);
cached_expr_->back().u = Buffer{};
}
HandleFinishArg();
}
break;
default:
LOG(FATAL) << "Unexpected state " << int(state_);
}
Expand All @@ -76,13 +92,16 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
}

if (resultc.first == INPUT_PENDING) {
DCHECK(str.empty());
StashState(res);
}
return resultc.first;
}

if (resultc.first == OK) {
DCHECK(cached_expr_);
DCHECK_EQ(0, small_len_);

if (res != cached_expr_) {
DCHECK(!stash_.empty());

Expand Down Expand Up @@ -233,15 +252,27 @@ auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed {
const char* s = reinterpret_cast<const char*>(str.data());
const char* pos = reinterpret_cast<const char*>(memchr(s, '\n', str.size()));
if (!pos) {
Result r = INPUT_PENDING;
if (str.size() >= 32) {
LOG(WARNING) << "Unexpected format " << string_view{s, str.size()};
r = BAD_ARRAYLEN;
if (str.size() + small_len_ < small_buf_.size()) {
memcpy(&small_buf_[small_len_], str.data(), str.size());
small_len_ += str.size();
return {INPUT_PENDING, str.size()};
}
return {r, 0};
LOG(WARNING) << "Unexpected format " << string_view{s, str.size()};
return ResultConsumed{BAD_ARRAYLEN, 0};
}

unsigned consumed = pos - s + 1;
if (small_len_ > 0) {
if (small_len_ + consumed >= small_buf_.size()) {
return ResultConsumed{BAD_ARRAYLEN, consumed};
}
memcpy(&small_buf_[small_len_], str.data(), consumed);
small_len_ += consumed;
s = small_buf_.data();
pos = s + small_len_ - 1;
small_len_ = 0;
}

if (pos[-1] != '\r') {
return {BAD_ARRAYLEN, consumed};
}
Expand Down Expand Up @@ -320,12 +351,6 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
DCHECK(!str.empty());

unsigned min_len = 2 + int(arg_c_ != '_');

if (str.size() < min_len) {
return {INPUT_PENDING, 0};
}

if (arg_c_ == '$') {
int64_t len;

Expand All @@ -352,12 +377,21 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
}

DCHECK(!server_mode_);

if (arg_c_ == '_') { // Resp3 NIL
// '\r','\n'
if (str[0] != '\r' || str[1] != '\n') {
// "_\r\n", with '_' consumed into arg_c_.
DCHECK_LT(small_len_, 2u); // must be because we never fill here with more than 2 bytes.
DCHECK_GE(str.size(), 1u);

if (str[0] != '\r' || (str.size() > 1 && str[1] != '\n')) {
return {BAD_STRING, 0};
}

if (str.size() == 1) {
state_ = SLASH_N_S;
return {INPUT_PENDING, 1};
}

cached_expr_->emplace_back(RespExpr::NIL);
cached_expr_->back().u = Buffer{};
HandleFinishArg();
Expand All @@ -371,6 +405,9 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
char* s = reinterpret_cast<char*>(str.data());
char* eol = reinterpret_cast<char*>(memchr(s, '\n', str.size()));

// TODO: in client mode we still may not consume everything (see INPUT_PENDING below).
// It's not a problem, because we need consume all the input only in server mode.

if (arg_c_ == '+' || arg_c_ == '-') { // Simple string or error.
DCHECK(!server_mode_);
if (!eol) {
Expand Down Expand Up @@ -421,9 +458,9 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
}

auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
auto& bulk_str = get<Buffer>(cached_expr_->back().u);

DCHECK_EQ(small_len_, 0);
uint32_t consumed = 0;
auto& bulk_str = get<Buffer>(cached_expr_->back().u);

if (str.size() >= bulk_len_) {
consumed = bulk_len_;
Expand All @@ -446,6 +483,12 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
}
HandleFinishArg();
return {OK, consumed + 2};
} else if (str.size() == 1) {
if (str[0] != '\r') {
return {BAD_STRING, consumed};
}
state_ = SLASH_N_S;
consumed++;
}
return {INPUT_PENDING, consumed};
}
Expand Down Expand Up @@ -490,6 +533,7 @@ void RedisParser::HandleFinishArg() {
}
cached_expr_ = parse_stack_.back().second;
}
small_len_ = 0;
}

void RedisParser::ExtendLastString(Buffer str) {
Expand Down
6 changes: 4 additions & 2 deletions src/facade/redis_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ class RedisParser {
* part of str because parser caches the intermediate state internally according to 'consumed'
* result.
*
* Note: A parser does not always guarantee progress, i.e. if a small buffer was passed it may
* returns INPUT_PENDING with consumed == 0.
*
*/

Expand Down Expand Up @@ -93,13 +91,16 @@ class RedisParser {
PARSE_ARG_TYPE, // Parse [$:+-]
PARSE_ARG_S, // Parse string\r\n
BULK_STR_S,
SLASH_N_S,
CMD_COMPLETE_S,
};

State state_ = CMD_COMPLETE_S;
bool is_broken_token_ = false; // true, if a token (inline or bulk) is broken during the parsing.
bool server_mode_ = true;
uint8_t small_len_ = 0;
char arg_c_ = 0;

uint32_t bulk_len_ = 0;
uint32_t last_stashed_level_ = 0, last_stashed_index_ = 0;
uint32_t max_arr_len_;
Expand All @@ -114,6 +115,7 @@ class RedisParser {

using Blob = std::vector<uint8_t>;
std::vector<Blob> buf_stash_;
std::array<char, 32> small_buf_;
};

} // namespace facade
34 changes: 24 additions & 10 deletions src/facade/redis_parser_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,19 @@ TEST_F(RedisParserTest, ClientMode) {

ASSERT_EQ(RedisParser::OK, Parse("-ERR foo bar\r\n"));
EXPECT_THAT(args_, ElementsAre(ErrArg("ERR foo")));

ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("_"));
EXPECT_EQ(1, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\r"));
EXPECT_EQ(1, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\n"));
EXPECT_EQ(1, consumed_);
EXPECT_THAT(args_, ElementsAre(ArgType(RespExpr::NIL)));
ASSERT_EQ(RedisParser::OK, Parse("*2\r\n_\r\n_\r\n"));
ASSERT_EQ(10, consumed_);

ASSERT_EQ(RedisParser::OK, Parse("*3\r\n+OK\r\n$1\r\n1\r\n*2\r\n$1\r\n1\r\n$-1\r\n"));
ASSERT_THAT(args_, ElementsAre("OK", "1", ArrLen(2)));
}

TEST_F(RedisParserTest, Hierarchy) {
Expand Down Expand Up @@ -183,9 +196,9 @@ TEST_F(RedisParserTest, LargeBulk) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half));
ASSERT_EQ(512, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\r"));
ASSERT_EQ(0, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
ASSERT_EQ(2, consumed_);
ASSERT_EQ(1, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\n"));
EXPECT_EQ(1, consumed_);

string part1 = absl::StrCat(prefix, half);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(part1));
Expand All @@ -208,7 +221,8 @@ TEST_F(RedisParserTest, LargeBulk) {
TEST_F(RedisParserTest, NILs) {
ASSERT_EQ(RedisParser::BAD_ARRAYLEN, Parse("_\r\n"));
parser_.SetClientMode();
ASSERT_EQ(RedisParser::OK, Parse("_\r\n"));
ASSERT_EQ(RedisParser::OK, Parse("_\r\nfooobar"));
EXPECT_EQ(3, consumed_);
}

TEST_F(RedisParserTest, NestedArray) {
Expand Down Expand Up @@ -245,15 +259,15 @@ TEST_F(RedisParserTest, UsedMemory) {

TEST_F(RedisParserTest, Eol) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r"));
EXPECT_EQ(1, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("1\r\n$5\r\n"));
EXPECT_EQ(7, consumed_);
EXPECT_EQ(3, consumed_);
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\n$5\r\n"));
EXPECT_EQ(5, consumed_);
}

TEST_F(RedisParserTest, BulkSplit) {
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r\n$4\r\nSADD"));
ASSERT_EQ(12, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r\n$4\r\nSADD\r"));
ASSERT_EQ(13, consumed_);
ASSERT_EQ(RedisParser::OK, Parse("\n"));
}

TEST_F(RedisParserTest, InlineSplit) {
Expand Down
Loading