Skip to content

Commit

Permalink
remove iteration type check
Browse files Browse the repository at this point in the history
  • Loading branch information
Yangsx-1 committed Jul 17, 2024
1 parent 1deb510 commit bbf7cdf
Showing 1 changed file with 38 additions and 41 deletions.
79 changes: 38 additions & 41 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1733,54 +1733,51 @@ rocksdb::Status Stream::GetPendingEntries(StreamPendingOptions &options, StreamG
StreamEntryID first_entry_id{StreamEntryID::Maximum()};
StreamEntryID last_entry_id{StreamEntryID::Minimum()};
uint64_t count = 0;
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
if (identifySubkeyType(iter->key()) == StreamSubkeyType::StreamPelEntry) {
if (options.with_count && options.count <= count) {
break;
}
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
if (tmp_group_name != group_name) continue;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
if (options.with_count && options.count <= count) {
break;
}
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);

if (first_entry_id > entry_id) {
first_entry_id = entry_id;
}
if (last_entry_id < entry_id) {
last_entry_id = entry_id;
}
StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
if (options.with_time && util::GetTimeStampMS() - pel_entry.last_delivery_time_ms < options.idle_time) {
continue;
}
if (first_entry_id > entry_id) {
first_entry_id = entry_id;
}
if (last_entry_id < entry_id) {
last_entry_id = entry_id;
}
StreamPelEntry pel_entry = decodeStreamPelEntryValue(iter->value().ToString());
if (options.with_time && util::GetTimeStampMS() - pel_entry.last_delivery_time_ms < options.idle_time) {
continue;
}

const std::string &consumer_name = pel_entry.consumer_name;
const std::string &consumer_name = pel_entry.consumer_name;

if (options.with_consumer && options.consumer != consumer_name) {
continue;
}
if (options.with_consumer && options.consumer != consumer_name) {
continue;
}

ext_results.push_back({entry_id, pel_entry.last_delivery_time_ms, pel_entry.last_delivery_count, consumer_name});
ext_results.push_back({entry_id, pel_entry.last_delivery_time_ms, pel_entry.last_delivery_count, consumer_name});

if (options.with_count) {
continue;
}
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
std::string get_consumer_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
return rocksdb::Status::OK();
}
if (options.with_count) {
continue;
}
std::string consumer_key = internalKeyFromConsumerName(ns_key, metadata, group_name, consumer_name);
std::string get_consumer_value;
s = storage_->Get(rocksdb::ReadOptions(), stream_cf_handle_, consumer_key, &get_consumer_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
return rocksdb::Status::OK();
}

StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
if (consumer_names.find(consumer_name) == consumer_names.end()) {
consumer_names.insert(consumer_name);
pending_infos.consumer_infos.emplace_back(consumer_name, consumer_metadata.pending_number);
}
count++;
StreamConsumerMetadata consumer_metadata = decodeStreamConsumerMetadataValue(get_consumer_value);
if (consumer_names.find(consumer_name) == consumer_names.end()) {
consumer_names.insert(consumer_name);
pending_infos.consumer_infos.emplace_back(consumer_name, consumer_metadata.pending_number);
}
count++;
}
pending_infos.last_entry_id = last_entry_id;
pending_infos.first_entry_id = first_entry_id;
Expand Down

0 comments on commit bbf7cdf

Please sign in to comment.