Skip to content

Commit

Permalink
Merge branch 'unstable' into speedb
Browse files Browse the repository at this point in the history
  • Loading branch information
PragmaTwice authored Oct 12, 2023
2 parents 49c8424 + e38021c commit 2f9a7c2
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/commands/cmd_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,15 @@ class CommandXGroup : public Commander {
return Status::OK();
}

if (subcommand_ == "createconsumer") {
if (args.size() != 5) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
consumer_name_ = GET_OR_RET(parser.TakeStr());

return Status::OK();
}

return {Status::RedisParseErr, "unknown subcommand"};
}

Expand Down Expand Up @@ -278,13 +287,24 @@ class CommandXGroup : public Commander {
}
}

if (subcommand_ == "createconsumer") {
int created_number = 0;
auto s = stream_db.CreateConsumer(stream_name_, group_name_, consumer_name_, &created_number);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}

*output = redis::Integer(created_number);
}

return Status::OK();
}

private:
std::string subcommand_;
std::string stream_name_;
std::string group_name_;
std::string consumer_name_;
StreamXGroupCreateOptions xgroup_create_options_;
};

Expand Down
74 changes: 74 additions & 0 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <vector>

#include "db_util.h"
#include "time_util.h"

namespace redis {

Expand Down Expand Up @@ -211,6 +212,26 @@ StreamConsumerGroupMetadata Stream::decodeStreamConsumerGroupMetadataValue(const
return consumer_group_metadata;
}

std::string Stream::internalKeyFromConsumerName(const std::string &ns_key, const StreamMetadata &metadata,
const std::string &group_name, const std::string &consumer_name) const {
std::string sub_key;
PutFixed64(&sub_key, group_name.size());
sub_key += group_name;
PutFixed64(&sub_key, consumer_name.size());
sub_key += consumer_name;
sub_key += consumerGroupMetadataDelimiter;
std::string entry_key = InternalKey(ns_key, sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
return entry_key;
}

std::string Stream::encodeStreamConsumerMetadataValue(const StreamConsumerMetadata &consumer_metadata) {
std::string dst;
PutFixed64(&dst, consumer_metadata.pending_number);
PutFixed64(&dst, consumer_metadata.last_idle);
PutFixed64(&dst, consumer_metadata.last_active);
return dst;
}

rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const StreamXGroupCreateOptions &options,
const std::string &group_name) {
if (std::isdigit(group_name[0])) {
Expand Down Expand Up @@ -313,6 +334,59 @@ rocksdb::Status Stream::DestroyGroup(const Slice &stream_name, const std::string
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}

rocksdb::Status Stream::CreateConsumer(const Slice &stream_name, const std::string &group_name,
const std::string &consumer_name, int *created_number) {
if (std::isdigit(consumer_name[0])) {
return rocksdb::Status::InvalidArgument("consumer name cannot start with number");
}
std::string ns_key = AppendNamespacePrefix(stream_name);
LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata;
rocksdb::Status s = GetMetadata(ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
return rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
}

std::string entry_key = internalKeyFromGroupName(ns_key, metadata, group_name);
std::string get_entry_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &get_entry_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " + group_name + " for key name " +
stream_name.ToString());
}

StreamConsumerMetadata consumer_metadata;
auto now = util::GetTimeStampMS();
consumer_metadata.last_idle = now;
consumer_metadata.last_active = now;
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
std::string consumer_value = encodeStreamConsumerMetadataValue(consumer_metadata);
std::string get_consumer_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.IsNotFound()) {
return s;
}

auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
batch->PutLogData(log_data.Encode());

batch->Put(stream_cf_handle_, consumer_key, consumer_value);
StreamConsumerGroupMetadata consumer_group_metadata = decodeStreamConsumerGroupMetadataValue(get_entry_value);
consumer_group_metadata.consumer_number += 1;
std::string consumer_group_metadata_bytes = encodeStreamConsumerGroupMetadataValue(consumer_group_metadata);
batch->Put(stream_cf_handle_, entry_key, consumer_group_metadata_bytes);
s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
if (s.ok()) *created_number = 1;
return s;
}

rocksdb::Status Stream::DeleteEntries(const Slice &stream_name, const std::vector<StreamEntryID> &ids,
uint64_t *deleted_cnt) {
*deleted_cnt = 0;
Expand Down
5 changes: 5 additions & 0 deletions src/types/redis_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class Stream : public SubKeyScanner {
rocksdb::Status CreateGroup(const Slice &stream_name, const StreamXGroupCreateOptions &options,
const std::string &group_name);
rocksdb::Status DestroyGroup(const Slice &stream_name, const std::string &group_name, uint64_t *delete_cnt);
rocksdb::Status CreateConsumer(const Slice &stream_name, const std::string &group_name,
const std::string &consumer_name, int *created_number);
rocksdb::Status DeleteEntries(const Slice &stream_name, const std::vector<StreamEntryID> &ids, uint64_t *deleted_cnt);
rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions &options, uint64_t *size);
rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t count, StreamInfo *info);
Expand Down Expand Up @@ -69,6 +71,9 @@ class Stream : public SubKeyScanner {
std::string groupNameFromInternalKey(rocksdb::Slice key) const;
static std::string encodeStreamConsumerGroupMetadataValue(const StreamConsumerGroupMetadata &consumer_group_metadata);
static StreamConsumerGroupMetadata decodeStreamConsumerGroupMetadataValue(const std::string &value);
std::string internalKeyFromConsumerName(const std::string &ns_key, const StreamMetadata &metadata,
const std::string &group_name, const std::string &consumer_name) const;
static std::string encodeStreamConsumerMetadataValue(const StreamConsumerMetadata &consumer_metadata);
};

} // namespace redis
6 changes: 6 additions & 0 deletions src/types/redis_stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ struct StreamConsumerGroupMetadata {
uint64_t lag = 0;
};

struct StreamConsumerMetadata {
uint64_t pending_number = 0;
uint64_t last_idle;
uint64_t last_active;
};

struct StreamInfo {
uint64_t size;
uint64_t entries_added;
Expand Down
22 changes: 22 additions & 0 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,28 @@ func TestStreamOffset(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(0), result)
})

t.Run("XGROUP CREATECONSUMER with different kinds of commands", func(t *testing.T) {
streamName := "test-stream"
groupName := "test-group"
consumerName := "test-consumer"
require.NoError(t, rdb.Del(ctx, streamName).Err())
//No such stream
require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "1-0",
Values: []string{"data", "a"},
}).Err())
//no such group
require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err())

r := rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val()
require.Equal(t, int64(1), r)
r = rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val()
require.Equal(t, int64(0), r)
})
}

func parseStreamEntryID(id string) (ts int64, seqNum int64) {
Expand Down

0 comments on commit 2f9a7c2

Please sign in to comment.