Skip to content

Commit

Permalink
Check if the type is an entry while iterating the stream subkeys (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
jihuayu authored Feb 23, 2024
1 parent 5fb6079 commit 8a19bd5
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 3 deletions.
9 changes: 7 additions & 2 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ StreamConsumerMetadata Stream::decodeStreamConsumerMetadataValue(const std::stri
return consumer_metadata;
}

StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) {
StreamSubkeyType Stream::identifySubkeyType(const rocksdb::Slice &key) const {
InternalKey ikey(key, storage_->IsSlotIdEncoded());
Slice subkey = ikey.GetSubKey();
const size_t entry_id_size = sizeof(StreamEntryID);
Expand Down Expand Up @@ -618,7 +618,9 @@ rocksdb::Status Stream::Len(const Slice &stream_name, const StreamLenOptions &op
}

for (; iter->Valid(); options.to_first ? iter->Prev() : iter->Next()) {
*size += 1;
if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamEntry) {
*size += 1;
}
}

return rocksdb::Status::OK();
Expand Down Expand Up @@ -674,6 +676,9 @@ rocksdb::Status Stream::range(const std::string &ns_key, const StreamMetadata &m

for (; iter->Valid() && (options.reverse ? iter->key().ToString() >= end_key : iter->key().ToString() <= end_key);
options.reverse ? iter->Prev() : iter->Next()) {
if (identifySubkeyType(iter->key()) != StreamSubkeyType::StreamEntry) {
continue;
}
if (options.exclude_start && iter->key().ToString() == start_key) {
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class Stream : public SubKeyScanner {
std::string consumerNameFromInternalKey(rocksdb::Slice key) const;
static std::string encodeStreamConsumerMetadataValue(const StreamConsumerMetadata &consumer_metadata);
static StreamConsumerMetadata decodeStreamConsumerMetadataValue(const std::string &value);
StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key);
StreamSubkeyType identifySubkeyType(const rocksdb::Slice &key) const;
};

} // namespace redis
15 changes: 15 additions & 0 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,21 @@ func TestStreamOffset(t *testing.T) {
r1 = rdb.XInfoConsumers(ctx, streamName, group2).Val()
require.Equal(t, consumer3, r1[0].Name)
})

t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) {
streamName := "test-stream"
group := "group"
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "*",
Values: []string{"data1", "b"},
}).Err())
require.NoError(t, rdb.XGroupCreate(ctx, streamName, group, "0").Err())
require.NoError(t, rdb.XGroupCreateConsumer(ctx, streamName, group, "consumer").Err())
require.NoError(t, rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{streamName, "0"},
}).Err())
})
}

func parseStreamEntryID(id string) (ts int64, seqNum int64) {
Expand Down

0 comments on commit 8a19bd5

Please sign in to comment.