diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc index 8e19b9d320e..a950a30ddfb 100644 --- a/src/commands/cmd_stream.cc +++ b/src/commands/cmd_stream.cc @@ -249,6 +249,15 @@ class CommandXGroup : public Commander { return Status::OK(); } + if (subcommand_ == "createconsumer") { + if (args.size() != 5) { + return {Status::RedisParseErr, errWrongNumOfArguments}; + } + consumer_name_ = GET_OR_RET(parser.TakeStr()); + + return Status::OK(); + } + return {Status::RedisParseErr, "unknown subcommand"}; } @@ -278,6 +287,16 @@ class CommandXGroup : public Commander { } } + if (subcommand_ == "createconsumer") { + int created_number = 0; + auto s = stream_db.CreateConsumer(stream_name_, group_name_, consumer_name_, &created_number); + if (!s.ok()) { + return {Status::RedisExecErr, s.ToString()}; + } + + *output = redis::Integer(created_number); + } + return Status::OK(); } @@ -285,6 +304,7 @@ class CommandXGroup : public Commander { std::string subcommand_; std::string stream_name_; std::string group_name_; + std::string consumer_name_; StreamXGroupCreateOptions xgroup_create_options_; }; diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 7a7ffb1605a..daa5fd41e74 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -27,6 +27,7 @@ #include #include "db_util.h" +#include "time_util.h" namespace redis { @@ -211,6 +212,26 @@ StreamConsumerGroupMetadata Stream::decodeStreamConsumerGroupMetadataValue(const return consumer_group_metadata; } +std::string Stream::internalKeyFromConsumerName(const std::string &ns_key, const StreamMetadata &metadata, + const std::string &group_name, const std::string &consumer_name) const { + std::string sub_key; + PutFixed64(&sub_key, group_name.size()); + sub_key += group_name; + PutFixed64(&sub_key, consumer_name.size()); + sub_key += consumer_name; + sub_key += consumerGroupMetadataDelimiter; + std::string entry_key = InternalKey(ns_key, sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode(); + return entry_key; +} + +std::string Stream::encodeStreamConsumerMetadataValue(const StreamConsumerMetadata &consumer_metadata) { + std::string dst; + PutFixed64(&dst, consumer_metadata.pending_number); + PutFixed64(&dst, consumer_metadata.last_idle); + PutFixed64(&dst, consumer_metadata.last_active); + return dst; +} + rocksdb::Status Stream::CreateGroup(const Slice &stream_name, const StreamXGroupCreateOptions &options, const std::string &group_name) { if (std::isdigit(group_name[0])) { @@ -313,6 +334,59 @@ rocksdb::Status Stream::DestroyGroup(const Slice &stream_name, const std::string return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } +rocksdb::Status Stream::CreateConsumer(const Slice &stream_name, const std::string &group_name, + const std::string &consumer_name, int *created_number) { + if (std::isdigit(consumer_name[0])) { + return rocksdb::Status::InvalidArgument("consumer name cannot start with number"); + } + std::string ns_key = AppendNamespacePrefix(stream_name); + LockGuard guard(storage_->GetLockManager(), ns_key); + StreamMetadata metadata; + rocksdb::Status s = GetMetadata(ns_key, &metadata); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.IsNotFound()) { + return rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist); + } + + std::string entry_key = internalKeyFromGroupName(ns_key, metadata, group_name); + std::string get_entry_value; + s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &get_entry_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.IsNotFound()) { + return rocksdb::Status::InvalidArgument("NOGROUP No such consumer group " + group_name + " for key name " + + stream_name.ToString()); + } + + StreamConsumerMetadata consumer_metadata; + auto now = util::GetTimeStampMS(); + consumer_metadata.last_idle = now; + consumer_metadata.last_active = now; + std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); + std::string consumer_value = encodeStreamConsumerMetadataValue(consumer_metadata); + std::string get_consumer_value; + s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value); + if (!s.IsNotFound()) { + return s; + } + + auto batch = storage_->GetWriteBatchBase(); + WriteBatchLogData log_data(kRedisStream); + batch->PutLogData(log_data.Encode()); + + batch->Put(stream_cf_handle_, consumer_key, consumer_value); + StreamConsumerGroupMetadata consumer_group_metadata = decodeStreamConsumerGroupMetadataValue(get_entry_value); + consumer_group_metadata.consumer_number += 1; + std::string consumer_group_metadata_bytes = encodeStreamConsumerGroupMetadataValue(consumer_group_metadata); + batch->Put(stream_cf_handle_, entry_key, consumer_group_metadata_bytes); + s = storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); + if (s.ok()) *created_number = 1; + return s; +} + rocksdb::Status Stream::DeleteEntries(const Slice &stream_name, const std::vector &ids, uint64_t *deleted_cnt) { *deleted_cnt = 0; diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h index 81cdc11152f..75a51ecec27 100644 --- a/src/types/redis_stream.h +++ b/src/types/redis_stream.h @@ -42,6 +42,8 @@ class Stream : public SubKeyScanner { rocksdb::Status CreateGroup(const Slice &stream_name, const StreamXGroupCreateOptions &options, const std::string &group_name); rocksdb::Status DestroyGroup(const Slice &stream_name, const std::string &group_name, uint64_t *delete_cnt); + rocksdb::Status CreateConsumer(const Slice &stream_name, const std::string &group_name, + const std::string &consumer_name, int *created_number); rocksdb::Status DeleteEntries(const Slice &stream_name, const std::vector &ids, uint64_t *deleted_cnt); rocksdb::Status Len(const Slice &stream_name, const StreamLenOptions &options, uint64_t *size); rocksdb::Status GetStreamInfo(const Slice &stream_name, bool full, uint64_t count, StreamInfo *info); @@ -69,6 +71,9 @@ class Stream : public SubKeyScanner { std::string groupNameFromInternalKey(rocksdb::Slice key) const; static std::string encodeStreamConsumerGroupMetadataValue(const StreamConsumerGroupMetadata &consumer_group_metadata); static StreamConsumerGroupMetadata decodeStreamConsumerGroupMetadataValue(const std::string &value); + std::string internalKeyFromConsumerName(const std::string &ns_key, const StreamMetadata &metadata, + const std::string &group_name, const std::string &consumer_name) const; + static std::string encodeStreamConsumerMetadataValue(const StreamConsumerMetadata &consumer_metadata); }; } // namespace redis diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h index 7b0d7b61348..f5d4496deb8 100644 --- a/src/types/redis_stream_base.h +++ b/src/types/redis_stream_base.h @@ -169,6 +169,12 @@ struct StreamConsumerGroupMetadata { uint64_t lag = 0; }; +struct StreamConsumerMetadata { + uint64_t pending_number = 0; + uint64_t last_idle; + uint64_t last_active; +}; + struct StreamInfo { uint64_t size; uint64_t entries_added; diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index a91d791c55f..8b2b67d343e 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -886,6 +886,28 @@ func TestStreamOffset(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(0), result) }) + + t.Run("XGROUP CREATECONSUMER with different kinds of commands", func(t *testing.T) { + streamName := "test-stream" + groupName := "test-group" + consumerName := "test-consumer" + require.NoError(t, rdb.Del(ctx, streamName).Err()) + //No such stream + require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "1-0", + Values: []string{"data", "a"}, + }).Err()) + //no such group + require.Error(t, rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Err()) + require.NoError(t, rdb.XGroupCreate(ctx, streamName, groupName, "$").Err()) + + r := rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val() + require.Equal(t, int64(1), r) + r = rdb.XGroupCreateConsumer(ctx, streamName, groupName, consumerName).Val() + require.Equal(t, int64(0), r) + }) } func parseStreamEntryID(id string) (ts int64, seqNum int64) {