Skip to content

Commit

Permalink
refactor: Improve consistency and isolation semantics by adding Conte…
Browse files Browse the repository at this point in the history
…xt parameter to DB API (#2332)

Co-authored-by: mwish <maplewish117@gmail.com>
  • Loading branch information
PokIsemaine and mapleFU authored Aug 24, 2024
1 parent 111d4e8 commit a1e0957
Show file tree
Hide file tree
Showing 110 changed files with 3,989 additions and 3,303 deletions.
6 changes: 4 additions & 2 deletions src/cluster/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const s
// 3. Update the map of slots to nodes.
// remember: The atomicity of the process is based on
// the transactionality of ClearKeysOfSlotRange().
engine::Context ctx(srv_->storage);
for (auto [s_start, s_end] : slot_ranges) {
for (int slot = s_start; slot <= s_end; slot++) {
std::shared_ptr<ClusterNode> old_node = slots_nodes_[slot];
Expand All @@ -129,7 +130,7 @@ Status Cluster::SetSlotRanges(const std::vector<SlotRange> &slot_ranges, const s
if (old_node == myself_ && old_node != to_assign_node) {
// If slot is migrated from this node
if (migrated_slots_.count(slot) > 0) {
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(kDefaultNamespace, SlotRange::GetPoint(slot));
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx, kDefaultNamespace, SlotRange::GetPoint(slot));
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slot: " << s.ToString();
}
Expand Down Expand Up @@ -212,9 +213,10 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b

// Clear data of migrated slots
if (!migrated_slots_.empty()) {
engine::Context ctx(srv_->storage);
for (const auto &[slot, _] : migrated_slots_) {
if (slots_nodes_[slot] != myself_) {
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(kDefaultNamespace, SlotRange::GetPoint(slot));
auto s = srv_->slot_migrator->ClearKeysOfSlotRange(ctx, kDefaultNamespace, SlotRange::GetPoint(slot));
if (!s.ok()) {
LOG(ERROR) << "failed to clear data of migrated slots: " << s.ToString();
}
Expand Down
9 changes: 6 additions & 3 deletions src/cluster/slot_import.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ Status SlotImport::Start(const SlotRange &slot_range) {
}

// Clean slot data first
auto s = ClearKeysOfSlotRange(namespace_, slot_range);
engine::Context ctx(srv_->storage);
auto s = ClearKeysOfSlotRange(ctx, namespace_, slot_range);
if (!s.ok()) {
return {Status::NotOK, fmt::format("clear keys of slot(s) error: {}", s.ToString())};
}
Expand Down Expand Up @@ -74,7 +75,8 @@ Status SlotImport::Fail(const SlotRange &slot_range) {
}

// Clean imported slot data
auto s = ClearKeysOfSlotRange(namespace_, slot_range);
engine::Context ctx(srv_->storage);
auto s = ClearKeysOfSlotRange(ctx, namespace_, slot_range);
if (!s.ok()) {
return {Status::NotOK, fmt::format("clear keys of slot(s) error: {}", s.ToString())};
}
Expand All @@ -99,7 +101,8 @@ Status SlotImport::StopForLinkError() {
// from new master.
if (!srv_->IsSlave()) {
// Clean imported slot data
auto s = ClearKeysOfSlotRange(namespace_, import_slot_range_);
engine::Context ctx(srv_->storage);
auto s = ClearKeysOfSlotRange(ctx, namespace_, import_slot_range_);
if (!s.ok()) {
return {Status::NotOK, fmt::format("clear keys of slot error: {}", s.ToString())};
}
Expand Down
3 changes: 2 additions & 1 deletion src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1254,7 +1254,8 @@ Status SlotMigrator::sendSnapshotByRawKV() {
read_options.snapshot = slot_snapshot_;
rocksdb::Slice prefix_slice(prefix);
read_options.iterate_lower_bound = &prefix_slice;
engine::DBIterator iter(storage_, read_options);
auto no_txn_ctx = engine::Context::NoTransactionContext(storage_);
engine::DBIterator iter(no_txn_ctx, read_options);

BatchSender batch_sender(*dst_fd_, migrate_batch_size_bytes_, migrate_batch_bytes_per_sec_);

Expand Down
20 changes: 13 additions & 7 deletions src/commands/cmd_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class CommandGetBit : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
bool bit = false;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.GetBit(args_[1], offset_, &bit);
engine::Context ctx(srv->storage);
auto s = bitmap_db.GetBit(ctx, args_[1], offset_, &bit);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(bit ? 1 : 0);
Expand Down Expand Up @@ -80,7 +81,8 @@ class CommandSetBit : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
bool old_bit = false;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.SetBit(args_[1], offset_, bit_, &old_bit);
engine::Context ctx(srv->storage);
auto s = bitmap_db.SetBit(ctx, args_[1], offset_, bit_, &old_bit);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(old_bit ? 1 : 0);
Expand Down Expand Up @@ -135,7 +137,8 @@ class CommandBitCount : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
uint32_t cnt = 0;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitCount(args_[1], start_, stop_, is_bit_index_, &cnt);
engine::Context ctx(srv->storage);
auto s = bitmap_db.BitCount(ctx, args_[1], start_, stop_, is_bit_index_, &cnt);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(cnt);
Expand Down Expand Up @@ -194,7 +197,8 @@ class CommandBitPos : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
int64_t pos = 0;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, stop_given_, &pos, is_bit_index_);
engine::Context ctx(srv->storage);
auto s = bitmap_db.BitPos(ctx, args_[1], bit_, start_, stop_, stop_given_, &pos, is_bit_index_);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(pos);
Expand Down Expand Up @@ -239,7 +243,8 @@ class CommandBitOp : public Commander {

int64_t dest_key_len = 0;
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
auto s = bitmap_db.BitOp(op_flag_, args_[1], args_[2], op_keys, &dest_key_len);
engine::Context ctx(srv->storage);
auto s = bitmap_db.BitOp(ctx, op_flag_, args_[1], args_[2], op_keys, &dest_key_len);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(dest_key_len);
Expand Down Expand Up @@ -336,10 +341,11 @@ class CommandBitfield : public Commander {
redis::Bitmap bitmap_db(srv->storage, conn->GetNamespace());
std::vector<std::optional<BitfieldValue>> rets;
rocksdb::Status s;
engine::Context ctx(srv->storage);
if (read_only_) {
s = bitmap_db.BitfieldReadOnly(args_[1], cmds_, &rets);
s = bitmap_db.BitfieldReadOnly(ctx, args_[1], cmds_, &rets);
} else {
s = bitmap_db.Bitfield(args_[1], cmds_, &rets);
s = bitmap_db.Bitfield(ctx, args_[1], cmds_, &rets);
}
std::vector<std::string> str_rets(rets.size());
for (size_t i = 0; i != rets.size(); ++i) {
Expand Down
24 changes: 16 additions & 8 deletions src/commands/cmd_bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ class CommandBFReserve : public Commander {

Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloomfilter_db(srv->storage, conn->GetNamespace());
auto s = bloomfilter_db.Reserve(args_[1], capacity_, error_rate_, expansion_);
engine::Context ctx(srv->storage);
auto s = bloomfilter_db.Reserve(ctx, args_[1], capacity_, error_rate_, expansion_);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::SimpleString("OK");
Expand All @@ -108,7 +109,8 @@ class CommandBFAdd : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
BloomFilterAddResult ret = BloomFilterAddResult::kOk;
auto s = bloom_db.Add(args_[1], args_[2], &ret);
engine::Context ctx(srv->storage);
auto s = bloom_db.Add(ctx, args_[1], args_[2], &ret);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

switch (ret) {
Expand Down Expand Up @@ -139,7 +141,8 @@ class CommandBFMAdd : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
std::vector<BloomFilterAddResult> rets(items_.size(), BloomFilterAddResult::kOk);
auto s = bloom_db.MAdd(args_[1], items_, &rets);
engine::Context ctx(srv->storage);
auto s = bloom_db.MAdd(ctx, args_[1], items_, &rets);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(items_.size());
Expand Down Expand Up @@ -234,7 +237,8 @@ class CommandBFInsert : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
std::vector<BloomFilterAddResult> rets(items_.size(), BloomFilterAddResult::kOk);
auto s = bloom_db.InsertCommon(args_[1], items_, insert_options_, &rets);
engine::Context ctx(srv->storage);
auto s = bloom_db.InsertCommon(ctx, args_[1], items_, insert_options_, &rets);
if (s.IsNotFound()) return {Status::RedisExecErr, "key is not found"};
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

Expand Down Expand Up @@ -265,7 +269,8 @@ class CommandBFExists : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
bool exist = false;
auto s = bloom_db.Exists(args_[1], args_[2], &exist);
engine::Context ctx(srv->storage);
auto s = bloom_db.Exists(ctx, args_[1], args_[2], &exist);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::Integer(exist ? 1 : 0);
Expand All @@ -286,7 +291,8 @@ class CommandBFMExists : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
std::vector<bool> exists(items_.size(), false);
auto s = bloom_db.MExists(args_[1], items_, &exists);
engine::Context ctx(srv->storage);
auto s = bloom_db.MExists(ctx, args_[1], items_, &exists);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

*output = redis::MultiLen(items_.size());
Expand Down Expand Up @@ -329,7 +335,8 @@ class CommandBFInfo : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
BloomFilterInfo info;
auto s = bloom_db.Info(args_[1], &info);
engine::Context ctx(srv->storage);
auto s = bloom_db.Info(ctx, args_[1], &info);
if (s.IsNotFound()) return {Status::RedisExecErr, "key is not found"};
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};

Expand Down Expand Up @@ -376,7 +383,8 @@ class CommandBFCard : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(srv->storage, conn->GetNamespace());
BloomFilterInfo info;
auto s = bloom_db.Info(args_[1], &info);
engine::Context ctx(srv->storage);
auto s = bloom_db.Info(ctx, args_[1], &info);
if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr, s.ToString()};
if (s.IsNotFound()) {
*output = redis::Integer(0);
Expand Down
5 changes: 3 additions & 2 deletions src/commands/cmd_function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "commands/command_parser.h"
#include "parse_util.h"
#include "server/redis_reply.h"
#include "server/server.h"
#include "storage/scripting.h"
#include "string_util.h"

Expand All @@ -30,6 +31,7 @@ namespace redis {
struct CommandFunction : Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
CommandParser parser(args_, 1);
engine::Context ctx(srv->storage);
if (parser.EatEqICase("load")) {
bool replace = false;
if (parser.EatEqICase("replace")) {
Expand Down Expand Up @@ -70,8 +72,7 @@ struct CommandFunction : Commander {
if (!lua::FunctionIsLibExist(conn, libname)) {
return {Status::NotOK, "no such library"};
}

auto s = lua::FunctionDelete(srv, libname);
auto s = lua::FunctionDelete(ctx, srv, libname);
if (!s) return s;

*output = SimpleString("OK");
Expand Down
26 changes: 17 additions & 9 deletions src/commands/cmd_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class CommandGeoAdd : public CommandGeoBase {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
uint64_t ret = 0;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.Add(args_[1], &geo_points_, &ret);
engine::Context ctx(srv->storage);
auto s = geo_db.Add(ctx, args_[1], &geo_points_, &ret);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down Expand Up @@ -142,7 +143,8 @@ class CommandGeoDist : public CommandGeoBase {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
double distance = 0;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.Dist(args_[1], args_[2], args_[3], &distance);
engine::Context ctx(srv->storage);
auto s = geo_db.Dist(ctx, args_[1], args_[2], args_[3], &distance);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand All @@ -168,7 +170,8 @@ class CommandGeoHash : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<std::string> hashes;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.Hash(args_[1], members_, &hashes);
engine::Context ctx(srv->storage);
auto s = geo_db.Hash(ctx, args_[1], members_, &hashes);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down Expand Up @@ -197,7 +200,8 @@ class CommandGeoPos : public Commander {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::map<std::string, GeoPoint> geo_points;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.Pos(args_[1], members_, &geo_points);
engine::Context ctx(srv->storage);
auto s = geo_db.Pos(ctx, args_[1], members_, &geo_points);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
Expand Down Expand Up @@ -304,7 +308,8 @@ class CommandGeoRadius : public CommandGeoBase {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<GeoPoint> geo_points;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.Radius(args_[1], longitude_, latitude_, GetRadiusMeters(radius_), count_, sort_, store_key_,
engine::Context ctx(srv->storage);
auto s = geo_db.Radius(ctx, args_[1], longitude_, latitude_, GetRadiusMeters(radius_), count_, sort_, store_key_,
store_distance_, GetUnitConversion(), &geo_points);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
Expand Down Expand Up @@ -452,8 +457,9 @@ class CommandGeoSearch : public CommandGeoBase {
std::vector<GeoPoint> geo_points;
redis::Geo geo_db(srv->storage, conn->GetNamespace());

auto s = geo_db.Search(args_[1], geo_shape_, origin_point_type_, member_, count_, sort_, false, GetUnitConversion(),
&geo_points);
engine::Context ctx(srv->storage);
auto s = geo_db.Search(ctx, args_[1], geo_shape_, origin_point_type_, member_, count_, sort_, false,
GetUnitConversion(), &geo_points);

if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
Expand Down Expand Up @@ -614,7 +620,8 @@ class CommandGeoSearchStore : public CommandGeoSearch {
std::vector<GeoPoint> geo_points;
redis::Geo geo_db(srv->storage, conn->GetNamespace());

auto s = geo_db.SearchStore(args_[2], geo_shape_, origin_point_type_, member_, count_, sort_, store_key_,
engine::Context ctx(srv->storage);
auto s = geo_db.SearchStore(ctx, args_[2], geo_shape_, origin_point_type_, member_, count_, sort_, store_key_,
store_distance_, GetUnitConversion(), &geo_points);

if (!s.ok()) {
Expand Down Expand Up @@ -654,7 +661,8 @@ class CommandGeoRadiusByMember : public CommandGeoRadius {
Status Execute(Server *srv, Connection *conn, std::string *output) override {
std::vector<GeoPoint> geo_points;
redis::Geo geo_db(srv->storage, conn->GetNamespace());
auto s = geo_db.RadiusByMember(args_[1], args_[2], GetRadiusMeters(radius_), count_, sort_, store_key_,
engine::Context ctx(srv->storage);
auto s = geo_db.RadiusByMember(ctx, args_[1], args_[2], GetRadiusMeters(radius_), count_, sort_, store_key_,
store_distance_, GetUnitConversion(), &geo_points);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
Expand Down
Loading

0 comments on commit a1e0957

Please sign in to comment.