diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 0ec71270e65..8b624a2ea8c 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -339,13 +339,21 @@ rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::st WriteBatchLogData log_data(kRedisStream); batch->PutLogData(log_data.Encode()); + std::map consumer_acknowledges; for (const auto &id : entry_ids) { std::string entry_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, group_name, id); std::string value; s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, entry_key, &value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } if (s.ok()) { *acknowledged += 1; batch->Delete(stream_cf_handle_, entry_key); + + // increment ack for each related consumer + auto pel_entry = decodeStreamPelEntryValue(value); + consumer_acknowledges[pel_entry.consumer_name]++; } } if (*acknowledged > 0) { @@ -353,6 +361,20 @@ rocksdb::Status Stream::DeletePelEntries(const Slice &stream_name, const std::st group_metadata.pending_number -= *acknowledged; std::string group_value = encodeStreamConsumerGroupMetadataValue(group_metadata); batch->Put(stream_cf_handle_, group_key, group_value); + + for (const auto &[consumer_name, ack_count] : consumer_acknowledges) { + auto consumer_meta_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); + std::string consumer_meta_original; + s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_meta_key, &consumer_meta_original); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.ok()) { + auto consumer_metadata = decodeStreamConsumerMetadataValue(consumer_meta_original); + consumer_metadata.pending_number -= ack_count; + batch->Put(stream_cf_handle_, consumer_meta_key, encodeStreamConsumerMetadataValue(consumer_metadata)); + } + } } return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch()); } diff --git a/tests/gocase/unit/type/stream/stream_test.go b/tests/gocase/unit/type/stream/stream_test.go index d6a09a9e3bf..7297bb66036 100644 --- a/tests/gocase/unit/type/stream/stream_test.go +++ b/tests/gocase/unit/type/stream/stream_test.go @@ -1064,6 +1064,41 @@ func TestStreamOffset(t *testing.T) { require.Equal(t, consumer3, r1[0].Name) }) + t.Run("XINFO after delete pending message and related consumer, for issue #2350", func(t *testing.T) { + streamName := "test-stream-2350" + groupName := "test-group-2350" + consumerName := "test-consumer-2350" + require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamName, groupName, "$").Err()) + require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + ID: "*", + Values: []string{"testing", "overflow"}, + }).Err()) + readRsp := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: consumerName, + Streams: []string{streamName, ">"}, + Count: 1, + NoAck: false, + }) + require.NoError(t, readRsp.Err()) + require.Len(t, readRsp.Val(), 1) + streamRsp := readRsp.Val()[0] + require.Len(t, streamRsp.Messages, 1) + msgID := streamRsp.Messages[0] + require.NoError(t, rdb.XAck(ctx, streamName, groupName, msgID.ID).Err()) + require.NoError(t, rdb.XGroupDelConsumer(ctx, streamName, groupName, consumerName).Err()) + infoRsp := rdb.XInfoGroups(ctx, streamName) + require.NoError(t, infoRsp.Err()) + infoGroups := infoRsp.Val() + require.Len(t, infoGroups, 1) + infoGroup := infoGroups[0] + require.Equal(t, groupName, infoGroup.Name) + require.Equal(t, int64(0), infoGroup.Consumers) + require.Equal(t, int64(0), infoGroup.Pending) + require.Equal(t, msgID.ID, infoGroup.LastDeliveredID) + }) + t.Run("XREAD After XGroupCreate and XGroupCreateConsumer, for issue #2109", func(t *testing.T) { streamName := "test-stream" group := "group"