Skip to content

Commit

Permalink
Add the ability to use the '*-123' pattern to specify stream entry ID…
Browse files Browse the repository at this point in the history
… via XADD (#1405)

It allows Kvrocks server to set current timestamp as the first part of the
entry ID and provided by the client sequence number as the second part of it.
  • Loading branch information
torwig authored Apr 29, 2023
1 parent 6889b59 commit 8cd5291
Show file tree
Hide file tree
Showing 8 changed files with 599 additions and 409 deletions.
32 changes: 13 additions & 19 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*
*/

#include <memory>
#include <stdexcept>

#include "commander.h"
#include "error_constants.h"
#include "server/server.h"
Expand Down Expand Up @@ -102,26 +105,21 @@ class CommandXAdd : public Commander {
return {Status::RedisParseErr, errLimitOptionNotAllowed};
}

if (val == "*" && !entry_id_found) {
entry_id_found = true;
++i;
continue;
} else if (!entry_id_found) {
auto s = ParseNewStreamEntryID(val, &entry_id_);
if (!s.IsOK()) {
return {Status::RedisParseErr, s.Msg()};
if (!entry_id_found) {
auto result = ParseNextStreamEntryIDStrategy(val);
if (!result.IsOK()) {
return {Status::RedisParseErr, result.Msg()};
}

next_id_strategy_ = std::move(*result);

entry_id_found = true;
with_entry_id_ = true;
++i;
continue;
}

if (entry_id_found) {
name_value_pairs_.push_back(val);
++i;
}
name_value_pairs_.push_back(val);
++i;
}

if (name_value_pairs_.empty() || name_value_pairs_.size() % 2 != 0) {
Expand All @@ -142,10 +140,7 @@ class CommandXAdd : public Commander {
options.trim_options.strategy = StreamTrimStrategy::MinID;
options.trim_options.min_id = min_id_;
}
if (with_entry_id_) {
options.with_entry_id = true;
options.entry_id = entry_id_;
}
options.next_id_strategy = std::move(next_id_strategy_);

redis::Stream stream_db(svr->storage, conn->GetNamespace());
StreamEntryID entry_id;
Expand All @@ -170,12 +165,11 @@ class CommandXAdd : public Commander {
std::string stream_name_;
uint64_t max_len_ = 0;
redis::StreamEntryID min_id_;
redis::NewStreamEntryID entry_id_;
std::unique_ptr<redis::NextStreamEntryIDGenerationStrategy> next_id_strategy_;
std::vector<std::string> name_value_pairs_;
bool nomkstream_ = false;
bool with_max_len_ = false;
bool with_min_id_ = false;
bool with_entry_id_ = false;
};

class CommandXDel : public Commander {
Expand Down
60 changes: 2 additions & 58 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ namespace redis {

const char *errSetEntryIdSmallerThanLastGenerated =
"The ID specified in XSETID is smaller than the target stream top item";
const char *errEntryIdOutOfRange = "The ID specified in XADD must be greater than 0-0";
const char *errStreamExhaustedEntryId = "The stream has exhausted the last possible ID, unable to add more items";
const char *errAddEntryIdSmallerThanLastGenerated =
"The ID specified in XADD is equal or smaller than the target stream top item";
const char *errEntriesAddedSmallerThanStreamSize =
"The entries_added specified in XSETID is smaller than the target stream length";
const char *errMaxDeletedIdGreaterThanLastGenerated =
Expand Down Expand Up @@ -108,11 +104,9 @@ rocksdb::Status Stream::Add(const Slice &stream_name, const StreamAddOptions &op
return s;
}

bool first_entry = s.IsNotFound();

StreamEntryID next_entry_id;
s = getNextEntryID(metadata, options, first_entry, &next_entry_id);
if (!s.ok()) return s;
auto status = options.next_id_strategy->GenerateID(metadata.last_generated_id, &next_entry_id);
if (!status.IsOK()) return rocksdb::Status::InvalidArgument(status.Msg());

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
Expand Down Expand Up @@ -169,56 +163,6 @@ rocksdb::Status Stream::Add(const Slice &stream_name, const StreamAddOptions &op
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status Stream::getNextEntryID(const StreamMetadata &metadata, const StreamAddOptions &options,
bool first_entry, StreamEntryID *next_entry_id) {
if (options.with_entry_id) {
if (options.entry_id.ms == 0 && !options.entry_id.any_seq_number && options.entry_id.seq == 0) {
return rocksdb::Status::InvalidArgument(errEntryIdOutOfRange);
}

if (metadata.last_generated_id.ms == UINT64_MAX && metadata.last_generated_id.seq == UINT64_MAX) {
return rocksdb::Status::InvalidArgument(errStreamExhaustedEntryId);
}

if (!first_entry) {
if (metadata.last_generated_id.ms > options.entry_id.ms) {
return rocksdb::Status::InvalidArgument(errAddEntryIdSmallerThanLastGenerated);
}

if (metadata.last_generated_id.ms == options.entry_id.ms) {
if (!options.entry_id.any_seq_number && metadata.last_generated_id.seq >= options.entry_id.seq) {
return rocksdb::Status::InvalidArgument(errAddEntryIdSmallerThanLastGenerated);
}

if (options.entry_id.any_seq_number && metadata.last_generated_id.seq == UINT64_MAX) {
return rocksdb::Status::InvalidArgument(
"Elements are too large to be stored"); // Redis responds with exactly this message
}
}

if (options.entry_id.any_seq_number) {
if (options.entry_id.ms == metadata.last_generated_id.ms) {
next_entry_id->seq = metadata.last_generated_id.seq + 1;
} else {
next_entry_id->seq = 0;
}
} else {
next_entry_id->seq = options.entry_id.seq;
}
} else {
if (options.entry_id.any_seq_number) {
next_entry_id->seq = options.entry_id.ms != 0 ? 0 : 1;
} else {
next_entry_id->seq = options.entry_id.seq;
}
}
next_entry_id->ms = options.entry_id.ms;
return rocksdb::Status::OK();
} else {
return GetNextStreamEntryID(metadata.last_generated_id, next_entry_id);
}
}

rocksdb::Status Stream::DeleteEntries(const rocksdb::Slice &stream_name, const std::vector<StreamEntryID> &ids,
uint64_t *ret) {
*ret = 0;
Expand Down
2 changes: 0 additions & 2 deletions src/types/redis_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ class Stream : public SubKeyScanner {
StreamEntryID entryIDFromInternalKey(const rocksdb::Slice &key) const;
std::string internalKeyFromEntryID(const std::string &ns_key, const StreamMetadata &metadata,
const StreamEntryID &id) const;
static rocksdb::Status getNextEntryID(const StreamMetadata &metadata, const StreamAddOptions &options,
bool first_entry, StreamEntryID *next_entry_id);
uint64_t trim(const std::string &ns_key, const StreamTrimOptions &options, StreamMetadata *metadata,
rocksdb::WriteBatch *batch);
};
Expand Down
137 changes: 102 additions & 35 deletions src/types/redis_stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,20 @@ namespace redis {
const char *kErrLastEntryIdReached = "last possible entry id reached";
const char *kErrInvalidEntryIdSpecified = "Invalid stream ID specified as stream command argument";
const char *kErrDecodingStreamEntryValueFailure = "failed to decode stream entry value";

rocksdb::Status IncrementStreamEntryID(StreamEntryID *id) {
const char *errAddEntryIdSmallerThanLastGenerated =
"The ID specified in XADD is equal or smaller than the target stream top item";
const char *errSequenceNumberOverflow =
"Elements are too large to be stored"; // Redis responds with exactly this message
const char *errEntryIdOutOfRange = "The ID specified in XADD must be greater than 0-0";
const char *errStreamExhaustedEntryID = "The stream has exhausted the last possible ID, unable to add more items";

Status IncrementStreamEntryID(StreamEntryID *id) {
if (id->seq == UINT64_MAX) {
if (id->ms == UINT64_MAX) {
// special case where 'id' is the last possible entry ID
id->ms = 0;
id->seq = 0;
return rocksdb::Status::InvalidArgument(kErrLastEntryIdReached);
return {Status::RedisExecErr, kErrLastEntryIdReached};
} else {
id->ms++;
id->seq = 0;
Expand All @@ -45,19 +51,7 @@ rocksdb::Status IncrementStreamEntryID(StreamEntryID *id) {
id->seq++;
}

return rocksdb::Status::OK();
}

rocksdb::Status GetNextStreamEntryID(const StreamEntryID &last_id, StreamEntryID *new_id) {
uint64_t ms = util::GetTimeStampMS();
if (ms > last_id.ms) {
new_id->ms = ms;
new_id->seq = 0;
return rocksdb::Status::OK();
} else {
*new_id = last_id;
return IncrementStreamEntryID(new_id);
}
return Status::OK();
}

Status ParseStreamEntryID(const std::string &input, StreamEntryID *id) {
Expand Down Expand Up @@ -85,39 +79,49 @@ Status ParseStreamEntryID(const std::string &input, StreamEntryID *id) {
return Status::OK();
}

Status ParseNewStreamEntryID(const std::string &input, NewStreamEntryID *id) {
StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>> ParseNextStreamEntryIDStrategy(
const std::string &input) {
if (input == "*") {
return std::make_unique<AutoGeneratedEntryID>();
}

auto pos = input.find('-');
if (pos != std::string::npos) {
auto ms_str = input.substr(0, pos);
auto seq_str = input.substr(pos + 1);
auto parse_ms = ParseInt<uint64_t>(ms_str, 10);
if (!parse_ms) {
return {Status::RedisParseErr, kErrInvalidEntryIdSpecified};
}

id->ms = *parse_ms;

if (seq_str == "*") {
id->any_seq_number = true;
} else {
if (ms_str == "*") {
auto parse_seq = ParseInt<uint64_t>(seq_str, 10);
if (!parse_seq) {
return {Status::RedisParseErr, kErrInvalidEntryIdSpecified};
return {Status::NotOK, kErrInvalidEntryIdSpecified};
}

id->seq = *parse_seq;
return std::make_unique<CurrentTimestampWithSpecificSequenceNumber>(*parse_seq);
}
} else {
auto parse_input = ParseInt<uint64_t>(input, 10);
if (!parse_input) {
return {Status::RedisParseErr, kErrInvalidEntryIdSpecified};

auto parse_ms = ParseInt<uint64_t>(ms_str, 10);
if (!parse_ms) {
return {Status::NotOK, kErrInvalidEntryIdSpecified};
}

id->ms = *parse_input;
id->seq = 0;
if (seq_str == "*") {
return std::make_unique<SpecificTimestampWithAnySequenceNumber>(*parse_ms);
}

auto parse_seq = ParseInt<uint64_t>(seq_str, 10);
if (!parse_seq) {
return {Status::NotOK, kErrInvalidEntryIdSpecified};
}

return std::make_unique<FullySpecifiedEntryID>(StreamEntryID{*parse_ms, *parse_seq});
}

return Status::OK();
auto parse_ms = ParseInt<uint64_t>(input, 10);
if (!parse_ms) {
return {Status::NotOK, kErrInvalidEntryIdSpecified};
}

return std::make_unique<FullySpecifiedEntryID>(StreamEntryID{*parse_ms, 0});
}

Status ParseRangeStart(const std::string &input, StreamEntryID *id) { return ParseStreamEntryID(input, id); }
Expand Down Expand Up @@ -174,4 +178,67 @@ Status DecodeRawStreamEntryValue(const std::string &value, std::vector<std::stri
return Status::OK();
}

Status FullySpecifiedEntryID::GenerateID(const StreamEntryID &last_id, StreamEntryID *next_id) {
if (last_id.ms == UINT64_MAX && last_id.seq == UINT64_MAX) {
return {Status::RedisExecErr, errStreamExhaustedEntryID};
}

if (id_.ms == 0 && id_.seq == 0) {
return {Status::RedisExecErr, errEntryIdOutOfRange};
}

if (id_ <= last_id) {
return {Status::RedisExecErr, errAddEntryIdSmallerThanLastGenerated};
}

next_id->ms = id_.ms;
next_id->seq = id_.seq;

return Status::OK();
}

Status AutoGeneratedEntryID::GenerateID(const StreamEntryID &last_id, StreamEntryID *next_id) {
uint64_t ms = util::GetTimeStampMS();
if (ms > last_id.ms) {
next_id->ms = ms;
next_id->seq = 0;
return Status::OK();
}

*next_id = last_id;

return IncrementStreamEntryID(next_id);
}

Status SpecificTimestampWithAnySequenceNumber::GenerateID(const StreamEntryID &last_id, StreamEntryID *next_id) {
if (ms_ < last_id.ms) {
return {Status::RedisExecErr, errAddEntryIdSmallerThanLastGenerated};
}

if (ms_ == last_id.ms) {
if (last_id.seq == UINT64_MAX) {
return {Status::RedisExecErr, errSequenceNumberOverflow};
}

next_id->ms = last_id.ms;
next_id->seq = last_id.seq + 1;
} else {
next_id->ms = ms_;
next_id->seq = 0;
}

return Status::OK();
}

Status CurrentTimestampWithSpecificSequenceNumber::GenerateID(const StreamEntryID &last_id, StreamEntryID *next_id) {
next_id->ms = util::GetTimeStampMS();
next_id->seq = seq_;

if (*next_id <= last_id) {
return {Status::RedisExecErr, errAddEntryIdSmallerThanLastGenerated};
}

return Status::OK();
}

} // namespace redis
Loading

0 comments on commit 8cd5291

Please sign in to comment.