Skip to content

Commit

Permalink
make consumer decrement pending number when message is acknowledged.
Browse files Browse the repository at this point in the history
  • Loading branch information
LindaSummer committed Jun 2, 2024
1 parent 309ea7b commit e04ee84
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
20 changes: 20 additions & 0 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,20 +339,40 @@ rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::st
WriteBatchLogData log_data(kRedisStream);
batch->PutLogData(log_data.Encode());

std::map<std::string, uint64_t> consumer_acknowledges;
for (const auto &id : entry_ids) {
std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id);
std::string value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &value);
if (s.ok()) {
*acknowledged += 1;
batch->Delete(stream_cf_handle_, entry_key);

// increment ack for each related consumer
auto pel_entry = decodeStreamPelEntryValue(value);
if (consumer_acknowledges.find(pel_entry.consumer_name) == consumer_acknowledges.cend()) {
consumer_acknowledges[pel_entry.consumer_name] = 1;
} else {
consumer_acknowledges[pel_entry.consumer_name] += 1;
}
}
}
if (*acknowledged > 0) {
StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value);
group_metadata.pending_number -= *acknowledged;
std::string group_value = encodeStreamConsumerGroupMetadataValue(group_metadata);
batch->Put(stream_cf_handle_, group_key, group_value);

for (const auto &consumer : consumer_acknowledges) {
auto consumer_meta_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer.first);
std::string consumer_meta_original;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_meta_key, &consumer_meta_original);
if (s.ok()) {
auto consumer_metadata = decodeStreamConsumerMetadataValue(consumer_meta_original);
consumer_metadata.pending_number -= consumer.second;
batch->Put(stream_cf_handle_, consumer_meta_key, encodeStreamConsumerMetadataValue(consumer_metadata));
}
}
}
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
Expand Down
35 changes: 35 additions & 0 deletions tests/gocase/unit/type/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,6 +1064,41 @@ func TestStreamOffset(t *testing.T) {
require.Equal(t, consumer3, r1[0].Name)
})

t.Run("XINFO after delete pending message and related consumer, for issue #2350", func(t *testing.T) {
streamName := "test-stream-2350"
groupName := "test-group-2350"
consumerName := "test-consumer-2350"
require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "$").Err())
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamName,
ID: "*",
Values: []string{"testing", "overflow"},
}).Err())
readRsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 1,
NoAck: false,
})
require.NoError(t, readRsp.Err())
require.Len(t, readRsp.Val(), 1)
streamRsp := readRsp.Val()[0]
require.Len(t, streamRsp.Messages, 1)
msgID := streamRsp.Messages[0]
require.NoError(t, rdb.XAck(ctx, streamName, groupName, msgID.ID).Err())
require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err())
infoRsp := rdb.XInfoGroups(ctx, streamName)
require.NoError(t, infoRsp.Err())
infoGroups := infoRsp.Val()
require.Len(t, infoGroups, 1)
infoGroup := infoGroups[0]
require.Equal(t, groupName, infoGroup.Name)
require.Equal(t, int64(0), infoGroup.Consumers)
require.Equal(t, int64(0), infoGroup.Pending)
require.Equal(t, msgID.ID, infoGroup.LastDeliveredID)
})

t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) {
streamName := "test-stream"
group := "group"
Expand Down

0 comments on commit e04ee84

Please sign in to comment.