From e04ee8437874e9b7d065d3546d389874cb2ff51f Mon Sep 17 00:00:00 2001 From: edward Date: Sun, 2 Jun 2024 17:42:11 +0800 Subject: [PATCH 1/4] make consumer decrement pending number when message is acknowledged. --- src/types/redis_stream.cc | 20 +++++++++++ tests/gocase/unit/type/stream/stream_test.go | 35 ++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 0ec71270e65..63a7ebc952c 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -339,6 +339,7 @@ rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::st WriteBatchLogData log_data(kRedisStream); batch->PutLogData(log_data.Encode()); + std::map consumer_acknowledges; for (const auto &id : entry_ids) { std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); std::string value; @@ -346,6 +347,14 @@ rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::st if (s.ok()) { *acknowledged += 1; batch->Delete(stream_cf_handle_, entry_key); + + // increment ack for each related consumer + auto pel_entry = decodeStreamPelEntryValue(value); + if (consumer_acknowledges.find(pel_entry.consumer_name) == consumer_acknowledges.cend()) { + consumer_acknowledges[pel_entry.consumer_name] = 1; + } else { + consumer_acknowledges[pel_entry.consumer_name] += 1; + } } } if (*acknowledged > 0) { @@ -353,6 +362,17 @@ rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::st group_metadata.pending_number -= *acknowledged; std::string group_value = encodeStreamConsumerGroupMetadataValue(group_metadata); batch->Put(stream_cf_handle_, group_key, group_value); + + for (const auto &consumer : consumer_acknowledges) { + auto consumer_meta_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer.first); + std::string consumer_meta_original; + s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_meta_key, &consumer_meta_original); + if (s.ok()) { + auto consumer_metadata = decodeStreamConsumerMetadataValue(consumer_meta_original); + consumer_metadata.pending_number -= consumer.second; + batch->Put(stream_cf_handle_, consumer_meta_key, encodeStreamConsumerMetadataValue(consumer_metadata)); + } + } } return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index d6a09a9e3bf..7297bb66036 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -1064,6 +1064,41 @@ func TestStreamOffset(t *testing.T) { require.Equal(t, consumer3, r1[0].Name) }) + t.Run("XINFO after delete pending message and related consumer, for issue #2350", func(t *testing.T) { + streamName := "test-stream-2350" + groupName := "test-group-2350" + consumerName := "test-consumer-2350" + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "$").Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"testing", "overflow"}, + }).Err()) + readRsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: 1, + NoAck: false, + }) + require.NoError(t, readRsp.Err()) + require.Len(t, readRsp.Val(), 1) + streamRsp := readRsp.Val()[0] + require.Len(t, streamRsp.Messages, 1) + msgID := streamRsp.Messages[0] + require.NoError(t, rdb.XAck(ctx, streamName, groupName, msgID.ID).Err()) + require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err()) + infoRsp := rdb.XInfoGroups(ctx, streamName) + require.NoError(t, infoRsp.Err()) + infoGroups := infoRsp.Val() + require.Len(t, infoGroups, 1) + infoGroup := infoGroups[0] + require.Equal(t, groupName, infoGroup.Name) + require.Equal(t, int64(0), infoGroup.Consumers) + require.Equal(t, int64(0), infoGroup.Pending) + require.Equal(t, msgID.ID, infoGroup.LastDeliveredID) + }) + t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) { streamName := "test-stream" group := "group" From 1e03f6536db4919dbe75182861aa1fdf943cecde Mon Sep 17 00:00:00 2001 From: edward Date: Sun, 2 Jun 2024 20:25:29 +0800 Subject: [PATCH 2/4] update map int entity construction to auto construction, it will be `uint64()` which is 0 --- src/types/redis_stream.cc | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 63a7ebc952c..27450b1c097 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -350,11 +350,7 @@ rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::st // increment ack for each related consumer auto pel_entry = decodeStreamPelEntryValue(value); - if (consumer_acknowledges.find(pel_entry.consumer_name) == consumer_acknowledges.cend()) { - consumer_acknowledges[pel_entry.consumer_name] = 1; - } else { - consumer_acknowledges[pel_entry.consumer_name] += 1; - } + consumer_acknowledges[pel_entry.consumer_name]++; } } if (*acknowledged > 0) { From 777f5043e895af1fee055dec8f1bee564aad9cfc Mon Sep 17 00:00:00 2001 From: edward Date: Sun, 2 Jun 2024 23:44:20 +0800 Subject: [PATCH 3/4] add error handling and fix sonar issue --- src/types/redis_stream.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 27450b1c097..86d067442a8 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -344,6 +344,9 @@ rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::st std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); std::string value; s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } if (s.ok()) { *acknowledged += 1; batch->Delete(stream_cf_handle_, entry_key); @@ -359,13 +362,13 @@ rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::st std::string group_value = encodeStreamConsumerGroupMetadataValue(group_metadata); batch->Put(stream_cf_handle_, group_key, group_value); - for (const auto &consumer : consumer_acknowledges) { - auto consumer_meta_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer.first); + for (const auto &[consumer_name, ack_count] : consumer_acknowledges) { + auto consumer_meta_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); std::string consumer_meta_original; s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_meta_key, &consumer_meta_original); if (s.ok()) { auto consumer_metadata = decodeStreamConsumerMetadataValue(consumer_meta_original); - consumer_metadata.pending_number -= consumer.second; + consumer_metadata.pending_number -= ack_count; batch->Put(stream_cf_handle_, consumer_meta_key, encodeStreamConsumerMetadataValue(consumer_metadata)); } } From f020a4c522d7f94b293e274c01fbe429da57cf41 Mon Sep 17 00:00:00 2001 From: edward Date: Mon, 3 Jun 2024 08:08:22 +0800 Subject: [PATCH 4/4] add error handling for `rocksdb::Get` --- src/types/redis_stream.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 86d067442a8..8b624a2ea8c 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -366,6 +366,9 @@ rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::st auto consumer_meta_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); std::string consumer_meta_original; s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_meta_key, &consumer_meta_original); + if (!s.ok() && !s.IsNotFound()) { + return s; + } if (s.ok()) { auto consumer_metadata = decodeStreamConsumerMetadataValue(consumer_meta_original); consumer_metadata.pending_number -= ack_count;