Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): add support of XPENDING command #2387

Merged
merged 8 commits into from
Jul 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "error_constants.h"
#include "event_util.h"
#include "server/server.h"
#include "status.h"
#include "time_util.h"
#include "types/redis_stream.h"

Expand Down Expand Up @@ -824,6 +825,97 @@ class CommandXInfo : public Commander {
}
};

class CommandXPending : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 1);
stream_name_ = GET_OR_RET(parser.TakeStr());
group_name_ = GET_OR_RET(parser.TakeStr());
if (parser.EatEqICase("idle")) {
options_.idle_time = GET_OR_RET(parser.TakeInt<uint64_t>());
options_.with_time = true;
}

if (parser.Good()) {
std::string start_id, end_id;
start_id = GET_OR_RET(parser.TakeStr());
end_id = GET_OR_RET(parser.TakeStr());
if (start_id != "-") {
auto s = ParseStreamEntryID(start_id, &options_.start_id);
if (!s.IsOK()) {
return s;
}
}

if (end_id != "+") {
auto s = ParseStreamEntryID(start_id, &options_.end_id);
if (!s.IsOK()) {
return s;
}
}

options_.count = GET_OR_RET(parser.TakeInt<uint64_t>());
options_.with_count = true;
if (parser.Good()) {
options_.consumer = GET_OR_RET(parser.TakeStr());
options_.with_consumer = true;
}
}
return Status::OK();
}

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::Stream stream_db(srv->storage, conn->GetNamespace());
std::vector<std::pair<std::string, int>> pending_infos;
StreamGetPendingEntryResult results;
options_.stream_name = stream_name_;
options_.group_name = group_name_;
std::vector<StreamNACK> ext_results;
auto s = stream_db.GetPendingEntries(options_, results, ext_results);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
if (options_.with_count) {
return SendExtResults(conn, output, ext_results);
}
return SendResults(conn, output, results);
}

static Status SendResults(Connection *conn, std::string *output, StreamGetPendingEntryResult &results) {
output->append(redis::MultiLen(3 + results.consumer_infos.size()));
output->append(redis::Integer(results.pending_number));
output->append(redis::BulkString(results.first_entry_id.ToString()));
output->append(redis::BulkString(results.last_entry_id.ToString()));
output->append(redis::MultiLen(results.consumer_infos.size()));
for (const auto &entry : results.consumer_infos) {
output->append(redis::MultiLen(2));
output->append(redis::BulkString(entry.first));
output->append(redis::BulkString(std::to_string(entry.second)));
}

return Status::OK();
}

static Status SendExtResults(Connection *conn, std::string *output, std::vector<StreamNACK> &ext_results) {
output->append(redis::MultiLen(ext_results.size()));
for (const auto &entry : ext_results) {
output->append(redis::MultiLen(4));
output->append(redis::BulkString(entry.id.ToString()));
output->append(redis::BulkString(entry.pel_entry.consumer_name));
output->append(redis::Integer(entry.pel_entry.last_delivery_time_ms));
output->append(redis::Integer(entry.pel_entry.last_delivery_count));
}

return Status::OK();
}

private:
std::string group_name_;
std::string stream_name_;
std::string consumer_name_;
StreamPendingOptions options_;
};

class CommandXRange : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
Expand Down Expand Up @@ -1754,6 +1846,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandXAck>("xack", -4, "write no-dbsize-ch
MakeCmdAttr<CommandXGroup>("xgroup", -4, "write", 2, 2, 1),
MakeCmdAttr<CommandXLen>("xlen", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXInfo>("xinfo", -2, "read-only", 0, 0, 0),
MakeCmdAttr<CommandXPending>("xpending", -3, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRange>("xrange", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRevRange>("xrevrange", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandXRead>("xread", -4, "read-only", 0, 0, 0),
Expand Down
88 changes: 88 additions & 0 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1698,4 +1698,92 @@ rocksdb::Status Stream::SetId(const Slice &stream_name, const StreamEntryID &las
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status Stream::GetPendingEntries(StreamPendingOptions &options, StreamGetPendingEntryResult &pending_infos,
torwig marked this conversation as resolved.
Show resolved Hide resolved
std::vector<StreamNACK> &ext_results) {
const std::string &stream_name = options.stream_name;
const std::string &group_name = options.group_name;
std::string ns_key = AppendNamespacePrefix(stream_name);

StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(GetOptions{}, ns_key, &metadata);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}

std::string group_key = internalKeyFromGroupName(ns_key, metadata, group_name);
std::string get_group_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, group_key, &get_group_value);
if (!s.ok()) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}

std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.start_id);
std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, options.end_id);

rocksdb::ReadOptions read_options = storage_->DefaultScanOptions();
LatestSnapShot ss(storage_);
read_options.snapshot = ss.GetSnapShot();
rocksdb::Slice upper_bound(end_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;

auto iter = util::UniqueIterator(storage_, read_options, stream_cf_handle_);
std::unordered_set<std::string> consumer_names;
StreamEntryID first_entry_id{StreamEntryID::Maximum()};
StreamEntryID last_entry_id{StreamEntryID::Minimum()};
uint64_t ext_result_count = 0;
uint64_t summary_result_count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
if (options.with_count && options.count <= ext_result_count) {
break;
}
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);

if (first_entry_id > entry_id) {
first_entry_id = entry_id;
}
if (last_entry_id < entry_id) {
last_entry_id = entry_id;
}
StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
if (options.with_time && util::GetTimeStampMS() - pel_entry.last_delivery_time_ms < options.idle_time) {
continue;
}

const std::string &consumer_name = pel_entry.consumer_name;

if (options.with_consumer && options.consumer != consumer_name) {
continue;
}

if (options.with_count) {
torwig marked this conversation as resolved.
Show resolved Hide resolved
ext_results.push_back({entry_id, pel_entry.last_delivery_time_ms, pel_entry.last_delivery_count, consumer_name});
ext_result_count++;
continue;
}
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
std::string get_consumer_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
return rocksdb::Status::OK();
}

StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
if (consumer_names.find(consumer_name) == consumer_names.end()) {
consumer_names.insert(consumer_name);
pending_infos.consumer_infos.emplace_back(consumer_name, consumer_metadata.pending_number);
}
summary_result_count++;
}
pending_infos.last_entry_id = last_entry_id;
pending_infos.first_entry_id = first_entry_id;
pending_infos.pending_number = summary_result_count;
return rocksdb::Status::OK();
}

} // namespace redis
2 changes: 2 additions & 0 deletions src/types/redis_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class Stream : public SubKeyScanner {
std::vector<std::pair<std::string, StreamConsumerGroupMetadata>> &group_metadata);
rocksdb::Status GetConsumerInfo(const Slice &stream_name, const std::string &group_name,
std::vector<std::pair<std::string, StreamConsumerMetadata>> &consumer_metadata);
rocksdb::Status GetPendingEntries(StreamPendingOptions &options, StreamGetPendingEntryResult &pending_infos,
std::vector<StreamNACK> &ext_results);
rocksdb::Status Range(const Slice &stream_name, const StreamRangeOptions &options, std::vector<StreamEntry> *entries);
rocksdb::Status RangeWithPending(const Slice &stream_name, StreamRangeOptions &options,
std::vector<StreamEntry> *entries, std::string &group_name,
Expand Down
28 changes: 28 additions & 0 deletions src/types/redis_stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,34 @@ struct StreamAutoClaimResult {
std::vector<std::string> deleted_ids;
};

struct StreamPendingOptions {
uint64_t idle_time = 0;
bool with_time = false;

StreamEntryID start_id{StreamEntryID::Minimum()};
StreamEntryID end_id{StreamEntryID::Maximum()};

uint64_t count;
bool with_count = false;
bool with_consumer = false;

std::string consumer;
std::string stream_name;
std::string group_name;
};

struct StreamGetPendingEntryResult {
uint64_t pending_number;
StreamEntryID first_entry_id;
StreamEntryID last_entry_id;
std::vector<std::pair<std::string, int>> consumer_infos;
};

struct StreamNACK {
StreamEntryID id;
StreamPelEntry pel_entry;
};

Status IncrementStreamEntryID(StreamEntryID *id);
Status ParseStreamEntryID(const std::string &input, StreamEntryID *id);
StatusOr<std::unique_ptr<NextStreamEntryIDGenerationStrategy>> ParseNextStreamEntryIDStrategy(const std::string &input);
Expand Down
76 changes: 76 additions & 0 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1932,6 +1932,82 @@ func TestStreamOffset(t *testing.T) {
require.Error(t, cmd.Err())
require.Equal(t, "ERR COUNT must be > 0", cmd.Err().Error())
})

t.Run("XPending with different kinds of commands", func(t *testing.T) {
streamName := "mystream"
groupName := "mygroup"
require.NoError(t, rdb.Del(ctx, streamName).Err())
r, err := rdb.XAck(ctx, streamName, groupName, "0-0").Result()
require.NoError(t, err)
require.Equal(t, int64(0), r)
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "1-0",
Values: []string{"field1", "data1"},
}).Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "0").Err())
consumerName := "myconsumer"
err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 1,
NoAck: false,
}).Err()
require.NoError(t, err)

r1, err1 := rdb.XPending(ctx, streamName, groupName).Result()
require.NoError(t, err1)

require.Equal(t, &redis.XPending{
Count: 1,
Lower: "1-0",
Higher: "1-0",
Consumers: map[string]int64{"myconsumer": 1},
}, r1)

require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "2-0",
Values: []string{"field1", "data1"},
}).Err())

require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "2-2",
Values: []string{"field1", "data1"},
}).Err())

require.NoError(t, rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 2,
NoAck: false,
}).Err())

r1, err1 = rdb.XPending(ctx, streamName, groupName).Result()
require.NoError(t, err1)

require.Equal(t, &redis.XPending{
Count: 3,
Lower: "1-0",
Higher: "2-2",
Consumers: map[string]int64{"myconsumer": 3},
}, r1)

require.NoError(t, rdb.XAck(ctx, streamName, groupName, "2-0").Err())

r1, err1 = rdb.XPending(ctx, streamName, groupName).Result()
require.NoError(t, err1)

require.Equal(t, &redis.XPending{
Count: 2,
Lower: "1-0",
Higher: "2-2",
Consumers: map[string]int64{"myconsumer": 2},
}, r1)
})
}

func parseStreamEntryID(id string) (ts int64, seqNum int64) {
Expand Down
Loading