diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index f4c784e4780..d42347294e7 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -1733,54 +1733,51 @@ rocksdb::Status Stream::GetPendingEntries(StreamPendingOptions &options, StreamG StreamEntryID first_entry_id{StreamEntryID::Maximum()}; StreamEntryID last_entry_id{StreamEntryID::Minimum()}; uint64_t count = 0; - for (iter->SeekToLast(); iter->Valid(); iter->Prev()) { - if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) { - if (options.with_count && options.count <= count) { - break; - } - std::string tmp_group_name; - StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name); - if (tmp_group_name != group_name) continue; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + if (options.with_count && options.count <= count) { + break; + } + std::string tmp_group_name; + StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name); - if (first_entry_id > entry_id) { - first_entry_id = entry_id; - } - if (last_entry_id < entry_id) { - last_entry_id = entry_id; - } - StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString()); - if (options.with_time && util::GetTimeStampMS() - pel_entry.last_delivery_time_ms < options.idle_time) { - continue; - } + if (first_entry_id > entry_id) { + first_entry_id = entry_id; + } + if (last_entry_id < entry_id) { + last_entry_id = entry_id; + } + StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString()); + if (options.with_time && util::GetTimeStampMS() - pel_entry.last_delivery_time_ms < options.idle_time) { + continue; + } - const std::string &consumer_name = pel_entry.consumer_name; + const std::string &consumer_name = pel_entry.consumer_name; - if (options.with_consumer && options.consumer != consumer_name) { - continue; - } + if (options.with_consumer && options.consumer != consumer_name) { + continue; + } - ext_results.push_back({entry_id, pel_entry.last_delivery_time_ms, pel_entry.last_delivery_count, consumer_name}); + ext_results.push_back({entry_id, pel_entry.last_delivery_time_ms, pel_entry.last_delivery_count, consumer_name}); - if (options.with_count) { - continue; - } - std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); - std::string get_consumer_value; - s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value); - if (!s.ok() && !s.IsNotFound()) { - return s; - } - if (s.IsNotFound()) { - return rocksdb::Status::OK(); - } + if (options.with_count) { + continue; + } + std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name); + std::string get_consumer_value; + s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value); + if (!s.ok() && !s.IsNotFound()) { + return s; + } + if (s.IsNotFound()) { + return rocksdb::Status::OK(); + } - StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value); - if (consumer_names.find(consumer_name) == consumer_names.end()) { - consumer_names.insert(consumer_name); - pending_infos.consumer_infos.emplace_back(consumer_name, consumer_metadata.pending_number); - } - count++; + StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value); + if (consumer_names.find(consumer_name) == consumer_names.end()) { + consumer_names.insert(consumer_name); + pending_infos.consumer_infos.emplace_back(consumer_name, consumer_metadata.pending_number); } + count++; } pending_infos.last_entry_id = last_entry_id; pending_infos.first_entry_id = first_entry_id;