Skip to content

Commit

Permalink
feat: add iter->status() check for loop iterators (#2395)
Browse files Browse the repository at this point in the history
  • Loading branch information
LindaSummer authored Jul 7, 2024
1 parent 7dd0248 commit 51569ad
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 3 deletions.
16 changes: 16 additions & 0 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@ Status SlotMigrator::sendSnapshotByCmd() {
}
}

if (auto s = iter->status(); !s.ok()) {
auto err_str = s.ToString();
LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << slot << ": " << err_str;
return {Status::NotOK, fmt::format("failed to iterate keys of slot {}: {}", slot, err_str)};
}

// It's necessary to send commands that are still in the pipeline since the final pipeline may not be sent
// while iterating keys because its size could be less than max_pipeline_size_
auto s = sendCmdsPipelineIfNeed(&restore_cmds, true);
Expand Down Expand Up @@ -820,6 +826,11 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata
}
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK,
fmt::format("failed to iterate values of the complex key {}: {}", key.ToString(), s.ToString())};
}

// Have to check the item count of the last command list
if (item_count % kMaxItemsInCommand != 0) {
*restore_cmds += redis::ArrayOfBulkStrings(user_cmd);
Expand Down Expand Up @@ -880,6 +891,11 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad
}
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK,
fmt::format("failed to iterate values of the stream key {}: {}", key.ToString(), s.ToString())};
}

// commands like XTRIM and XDEL affect stream's metadata, but we use only XADD for a slot migration
// XSETID is used to adjust stream's info on the destination node according to the current values on the source
*restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(), metadata.last_generated_id.ToString(),
Expand Down
4 changes: 4 additions & 0 deletions src/search/index_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ struct IndexManager {
index_map.Insert(std::move(info));
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK, fmt::format("fail to load index metadata: {}", s.ToString())};
}

return Status::OK();
}

Expand Down
4 changes: 4 additions & 0 deletions src/search/indexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@ Status IndexUpdater::Build() const {
if (s.Is<Status::TypeMismatched>()) continue;
if (!s.OK()) return s;
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK, s.ToString()};
}
}

return Status::OK();
Expand Down
11 changes: 10 additions & 1 deletion src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ rocksdb::Status Database::Keys(const std::string &prefix, std::vector<std::strin
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

if (!storage_->IsSlotIdEncoded()) break;
if (prefix.empty()) break;
if (++slot_id >= HASH_SLOTS_SIZE) break;
Expand Down Expand Up @@ -368,6 +372,11 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const
keys->emplace_back(user_key);
cnt++;
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

if (!storage_->IsSlotIdEncoded() || prefix.empty()) {
if (!keys->empty() && cnt >= limit) {
end_cursor->append(user_key);
Expand Down Expand Up @@ -587,7 +596,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const
break;
}
}
return rocksdb::Status::OK();
return iter->status();
}

RedisType WriteBatchLogData::GetRedisType() const { return type_; }
Expand Down
8 changes: 8 additions & 0 deletions src/storage/scripting.cc
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,10 @@ Status FunctionList(Server *srv, const redis::Connection *conn, const std::strin
result.emplace_back(lib.ToString(), iter->value().ToString());
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK, s.ToString()};
}

output->append(redis::MultiLen(result.size()));
for (const auto &[lib, code] : result) {
output->append(conn->HeaderOfMap(with_code ? 2 : 1));
Expand Down Expand Up @@ -488,6 +492,10 @@ Status FunctionListFunc(Server *srv, const redis::Connection *conn, const std::s
result.emplace_back(func.ToString(), iter->value().ToString());
}

if (auto s = iter->status(); !s.ok()) {
return {Status::NotOK, s.ToString()};
}

output->append(redis::MultiLen(result.size()));
for (const auto &[func, lib] : result) {
output->append(conn->HeaderOfMap(2));
Expand Down
29 changes: 27 additions & 2 deletions src/types/redis_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ rocksdb::Status Stream::AutoClaim(const Slice &stream_name, const std::string &g
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

if (has_next_entry) {
std::string tmp_group_name;
StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name);
Expand Down Expand Up @@ -763,6 +767,10 @@ rocksdb::Status Stream::DestroyGroup(const Slice &stream_name, const std::string
*delete_cnt += 1;
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

if (*delete_cnt != 0) {
metadata.group_number -= 1;
std::string metadata_bytes;
Expand Down Expand Up @@ -895,6 +903,11 @@ rocksdb::Status Stream::DestroyConsumer(const Slice &stream_name, const std::str
}
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

batch->Delete(stream_cf_handle_, consumer_key);
StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value);
group_metadata.consumer_number -= 1;
Expand Down Expand Up @@ -1101,6 +1114,10 @@ rocksdb::Status Stream::Len(const Slice &stream_name, const StreamLenOptions &op
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

return rocksdb::Status::OK();
}

Expand Down Expand Up @@ -1178,6 +1195,10 @@ rocksdb::Status Stream::range(const std::string &ns_key, const StreamMetadata &m
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}

return rocksdb::Status::OK();
}

Expand Down Expand Up @@ -1341,7 +1362,7 @@ rocksdb::Status Stream::GetGroupInfo(const Slice &stream_name,
group_metadata.push_back(tmp_item);
}
}
return rocksdb::Status::OK();
return iter->status();
}

rocksdb::Status Stream::GetConsumerInfo(
Expand Down Expand Up @@ -1375,7 +1396,7 @@ rocksdb::Status Stream::GetConsumerInfo(
consumer_metadata.push_back(tmp_item);
}
}
return rocksdb::Status::OK();
return iter->status();
}

rocksdb::Status Stream::Range(const Slice &stream_name, const StreamRangeOptions &options,
Expand Down Expand Up @@ -1539,6 +1560,10 @@ rocksdb::Status Stream::RangeWithPending(const Slice &stream_name, StreamRangeOp
if (count >= options.count) break;
}
}

if (auto s = iter->status(); !s.ok()) {
return s;
}
}
batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(consumergroup_metadata));
batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_metadata));
Expand Down

0 comments on commit 51569ad

Please sign in to comment.