diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index a074d536b74..03625dc6275 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -382,6 +382,12 @@ Status SlotMigrator::sendSnapshotByCmd() { } } + if (auto s = iter->status(); !s.ok()) { + auto err_str = s.ToString(); + LOG(ERROR) << "[migrate] Failed to iterate keys of slot " << slot << ": " << err_str; + return {Status::NotOK, fmt::format("failed to iterate keys of slot {}: {}", slot, err_str)}; + } + // It's necessary to send commands that are still in the pipeline since the final pipeline may not be sent // while iterating keys because its size could be less than max_pipeline_size_ auto s = sendCmdsPipelineIfNeed(&restore_cmds, true); @@ -820,6 +826,11 @@ Status SlotMigrator::migrateComplexKey(const rocksdb::Slice &key, const Metadata } } + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, + fmt::format("failed to iterate values of the complex key {}: {}", key.ToString(), s.ToString())}; + } + // Have to check the item count of the last command list if (item_count % kMaxItemsInCommand != 0) { *restore_cmds += redis::ArrayOfBulkStrings(user_cmd); @@ -880,6 +891,11 @@ Status SlotMigrator::migrateStream(const Slice &key, const StreamMetadata &metad } } + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, + fmt::format("failed to iterate values of the stream key {}: {}", key.ToString(), s.ToString())}; + } + // commands like XTRIM and XDEL affect stream's metadata, but we use only XADD for a slot migration // XSETID is used to adjust stream's info on the destination node according to the current values on the source *restore_cmds += redis::ArrayOfBulkStrings({"XSETID", key.ToString(), metadata.last_generated_id.ToString(), diff --git a/src/search/index_manager.h b/src/search/index_manager.h index 1d7447047ee..0f90961f52c 100644 --- a/src/search/index_manager.h +++ b/src/search/index_manager.h @@ -129,6 +129,10 @@ struct IndexManager { index_map.Insert(std::move(info)); } + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, fmt::format("fail to load index metadata: {}", s.ToString())}; + } + return Status::OK(); } diff --git a/src/search/indexer.cc b/src/search/indexer.cc index 7ce0b3d013b..1212dd2f0d8 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -334,6 +334,10 @@ Status IndexUpdater::Build() const { if (s.Is()) continue; if (!s.OK()) return s; } + + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, s.ToString()}; + } } return Status::OK(); diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index 4f08490bd66..0fd3113778b 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -299,6 +299,10 @@ rocksdb::Status Database::Keys(const std::string &prefix, std::vectorstatus(); !s.ok()) { + return s; + } + if (!storage_->IsSlotIdEncoded()) break; if (prefix.empty()) break; if (++slot_id >= HASH_SLOTS_SIZE) break; @@ -368,6 +372,11 @@ rocksdb::Status Database::Scan(const std::string &cursor, uint64_t limit, const keys->emplace_back(user_key); cnt++; } + + if (auto s = iter->status(); !s.ok()) { + return s; + } + if (!storage_->IsSlotIdEncoded() || prefix.empty()) { if (!keys->empty() && cnt >= limit) { end_cursor->append(user_key); @@ -587,7 +596,7 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const Slice &user_key, const break; } } - return rocksdb::Status::OK(); + return iter->status(); } RedisType WriteBatchLogData::GetRedisType() const { return type_; } diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc index 5197eb5c1b1..987f9f0dfc0 100644 --- a/src/storage/scripting.cc +++ b/src/storage/scripting.cc @@ -452,6 +452,10 @@ Status FunctionList(Server *srv, const redis::Connection *conn, const std::strin result.emplace_back(lib.ToString(), iter->value().ToString()); } + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, s.ToString()}; + } + output->append(redis::MultiLen(result.size())); for (const auto &[lib, code] : result) { output->append(conn->HeaderOfMap(with_code ? 2 : 1)); @@ -488,6 +492,10 @@ Status FunctionListFunc(Server *srv, const redis::Connection *conn, const std::s result.emplace_back(func.ToString(), iter->value().ToString()); } + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, s.ToString()}; + } + output->append(redis::MultiLen(result.size())); for (const auto &[func, lib] : result) { output->append(conn->HeaderOfMap(2)); diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc index 6bf03d34071..97fcf948082 100644 --- a/src/types/redis_stream.cc +++ b/src/types/redis_stream.cc @@ -654,6 +654,10 @@ rocksdb::Status Stream::AutoClaim(const Slice &stream_name, const std::string &g } } + if (auto s = iter->status(); !s.ok()) { + return s; + } + if (has_next_entry) { std::string tmp_group_name; StreamEntryID entry_id = groupAndEntryIdFromPelInternalKey(iter->key(), tmp_group_name); @@ -763,6 +767,10 @@ rocksdb::Status Stream::DestroyGroup(const Slice &stream_name, const std::string *delete_cnt += 1; } + if (auto s = iter->status(); !s.ok()) { + return s; + } + if (*delete_cnt != 0) { metadata.group_number -= 1; std::string metadata_bytes; @@ -895,6 +903,11 @@ rocksdb::Status Stream::DestroyConsumer(const Slice &stream_name, const std::str } } } + + if (auto s = iter->status(); !s.ok()) { + return s; + } + batch->Delete(stream_cf_handle_, consumer_key); StreamConsumerGroupMetadata group_metadata = decodeStreamConsumerGroupMetadataValue(get_group_value); group_metadata.consumer_number -= 1; @@ -1101,6 +1114,10 @@ rocksdb::Status Stream::Len(const Slice &stream_name, const StreamLenOptions &op } } + if (auto s = iter->status(); !s.ok()) { + return s; + } + return rocksdb::Status::OK(); } @@ -1178,6 +1195,10 @@ rocksdb::Status Stream::range(const std::string &ns_key, const StreamMetadata &m } } + if (auto s = iter->status(); !s.ok()) { + return s; + } + return rocksdb::Status::OK(); } @@ -1341,7 +1362,7 @@ rocksdb::Status Stream::GetGroupInfo(const Slice &stream_name, group_metadata.push_back(tmp_item); } } - return rocksdb::Status::OK(); + return iter->status(); } rocksdb::Status Stream::GetConsumerInfo( @@ -1375,7 +1396,7 @@ rocksdb::Status Stream::GetConsumerInfo( consumer_metadata.push_back(tmp_item); } } - return rocksdb::Status::OK(); + return iter->status(); } rocksdb::Status Stream::Range(const Slice &stream_name, const StreamRangeOptions &options, @@ -1539,6 +1560,10 @@ rocksdb::Status Stream::RangeWithPending(const Slice &stream_name, StreamRangeOp if (count >= options.count) break; } } + + if (auto s = iter->status(); !s.ok()) { + return s; + } } batch->Put(stream_cf_handle_, group_key, encodeStreamConsumerGroupMetadataValue(consumergroup_metadata)); batch->Put(stream_cf_handle_, consumer_key, encodeStreamConsumerMetadataValue(consumer_metadata));