Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix lock #2

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions codis/pkg/utils/version.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package utils

const (
Version = "2018-11-04 16:22:35 +0800 @de1ad026e329561c22e2a3035fbfe89dc7fef764 @3.2.2-12-gde1ad026"
Compile = "2023-02-23 11:25:09 +0800 by go version go1.19.6 linux/amd64"
Version = "2024-03-14 16:55:09 +0800 @de1ad026e329561c22e2a3035fbfe89dc7fef764 @Pika 3.5.3"
Compile = "2024-03-14 16:55:09 +0800 by go version go1.19.6 linux/amd64"
)
2 changes: 0 additions & 2 deletions include/pika_repl_server_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ class PikaReplServerConn : public net::PbConn {
const InnerMessage::InnerRequest::TrySync& try_sync_request,
const std::shared_ptr<net::PbConn>& conn,
InnerMessage::InnerResponse::TrySync* try_sync_response);
static void BuildConsensusMeta(const bool& reject, const std::vector<LogOffset>& hints, const uint32_t& term,
InnerMessage::InnerResponse* response);

static void HandleDBSyncRequest(void* arg);
static void HandleBinlogSyncRequest(void* arg);
Expand Down
17 changes: 0 additions & 17 deletions src/pika_repl_server_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,6 @@ bool PikaReplServerConn::TrySyncOffsetCheck(const std::shared_ptr<SyncMasterDB>&
return true;
}

void PikaReplServerConn::BuildConsensusMeta(const bool& reject, const std::vector<LogOffset>& hints,
const uint32_t& term, InnerMessage::InnerResponse* response) {
InnerMessage::ConsensusMeta* consensus_meta = response->mutable_consensus_meta();
consensus_meta->set_term(term);
consensus_meta->set_reject(reject);
if (!reject) {
return;
}
for (const auto& hint : hints) {
InnerMessage::BinlogOffset* offset = consensus_meta->add_hint();
offset->set_filenum(hint.b_offset.filenum);
offset->set_offset(hint.b_offset.offset);
offset->set_term(hint.l_offset.term);
offset->set_index(hint.l_offset.index);
}
}

void PikaReplServerConn::HandleDBSyncRequest(void* arg) {
std::unique_ptr<ReplServerTaskArg> task_arg(static_cast<ReplServerTaskArg*>(arg));
const std::shared_ptr<InnerMessage::InnerRequest> req = task_arg->req;
Expand Down
1 change: 1 addition & 0 deletions src/storage/src/redis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Redis::Redis(Storage* const s, const DataType& type)
scan_cursors_store_->SetCapacity(5000);
default_compact_range_options_.exclusive_manual_compaction = false;
default_compact_range_options_.change_level = true;
default_write_options_.sync = true;
handles_.clear();
}

Expand Down
10 changes: 0 additions & 10 deletions src/storage/src/redis_hashes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ Status RedisHashes::HDel(const Slice& key, const std::vector<std::string>& field
std::string meta_value;
int32_t del_cnt = 0;
int32_t version = 0;
ScopeRecordLock l(lock_mgr_, key);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;
Status s = db_->Get(read_options, handles_[0], key, &meta_value);
Expand Down Expand Up @@ -357,7 +356,6 @@ Status RedisHashes::HGetallWithTTL(const Slice& key, std::vector<FieldValue>* fv
Status RedisHashes::HIncrby(const Slice& key, const Slice& field, int64_t value, int64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

int32_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -428,7 +426,6 @@ Status RedisHashes::HIncrby(const Slice& key, const Slice& field, int64_t value,
Status RedisHashes::HIncrbyfloat(const Slice& key, const Slice& field, const Slice& by, std::string* new_value) {
new_value->clear();
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

int32_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -606,7 +603,6 @@ Status RedisHashes::HMSet(const Slice& key, const std::vector<FieldValue>& fvs)
}

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

int32_t version = 0;
std::string meta_value;
Expand Down Expand Up @@ -665,7 +661,6 @@ Status RedisHashes::HMSet(const Slice& key, const std::vector<FieldValue>& fvs)

Status RedisHashes::HSet(const Slice& key, const Slice& field, const Slice& value, int32_t* res) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

int32_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -724,7 +719,6 @@ Status RedisHashes::HSet(const Slice& key, const Slice& field, const Slice& valu

Status RedisHashes::HSetnx(const Slice& key, const Slice& field, const Slice& value, int32_t* ret) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

int32_t version = 0;
std::string meta_value;
Expand Down Expand Up @@ -1160,7 +1154,6 @@ Status RedisHashes::PKRScanRange(const Slice& key_start, const Slice& key_end, c

Status RedisHashes::Expire(const Slice& key, int32_t ttl) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
Expand All @@ -1183,7 +1176,6 @@ Status RedisHashes::Expire(const Slice& key, int32_t ttl) {

Status RedisHashes::Del(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
Expand Down Expand Up @@ -1278,7 +1270,6 @@ bool RedisHashes::PKExpireScan(const std::string& start_key, int32_t min_timesta

Status RedisHashes::Expireat(const Slice& key, int32_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
Expand All @@ -1300,7 +1291,6 @@ Status RedisHashes::Expireat(const Slice& key, int32_t timestamp) {

Status RedisHashes::Persist(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedHashesMetaValue parsed_hashes_meta_value(&meta_value);
Expand Down
14 changes: 0 additions & 14 deletions src/storage/src/redis_lists.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,6 @@ Status RedisLists::LInsert(const Slice& key, const BeforeOrAfter& before_or_afte
const std::string& value, int64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
Expand Down Expand Up @@ -349,7 +348,6 @@ Status RedisLists::LPop(const Slice& key, int64_t count, std::vector<std::string
elements->clear();

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

std::string meta_value;
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
Expand Down Expand Up @@ -392,7 +390,6 @@ Status RedisLists::LPop(const Slice& key, int64_t count, std::vector<std::string
Status RedisLists::LPush(const Slice& key, const std::vector<std::string>& values, uint64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t index = 0;
int32_t version = 0;
Expand Down Expand Up @@ -436,7 +433,6 @@ Status RedisLists::LPush(const Slice& key, const std::vector<std::string>& value
Status RedisLists::LPushx(const Slice& key, const std::vector<std::string>& values, uint64_t* len) {
*len = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

std::string meta_value;
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
Expand Down Expand Up @@ -579,7 +575,6 @@ Status RedisLists::LRangeWithTTL(const Slice& key, int64_t start, int64_t stop,
Status RedisLists::LRem(const Slice& key, int64_t count, const Slice& value, uint64_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
Expand Down Expand Up @@ -695,7 +690,6 @@ Status RedisLists::LRem(const Slice& key, int64_t count, const Slice& value, uin

Status RedisLists::LSet(const Slice& key, int64_t index, const Slice& value) {
uint32_t statistic = 0;
ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
Expand Down Expand Up @@ -724,7 +718,6 @@ Status RedisLists::LSet(const Slice& key, int64_t index, const Slice& value) {

Status RedisLists::LTrim(const Slice& key, int64_t start, int64_t stop) {
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint32_t statistic = 0;
std::string meta_value;
Expand Down Expand Up @@ -786,7 +779,6 @@ Status RedisLists::RPop(const Slice& key, int64_t count, std::vector<std::string
elements->clear();

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

std::string meta_value;
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
Expand Down Expand Up @@ -831,7 +823,6 @@ Status RedisLists::RPoplpush(const Slice& source, const Slice& destination, std:
uint32_t statistic = 0;
Status s;
rocksdb::WriteBatch batch;
MultiScopeRecordLock l(lock_mgr_, {source.ToString(), destination.ToString()});
if (source.compare(destination) == 0) {
std::string meta_value;
s = db_->Get(default_read_options_, handles_[0], source, &meta_value);
Expand Down Expand Up @@ -986,7 +977,6 @@ Status RedisLists::RPushx(const Slice& key, const std::vector<std::string>& valu
*len = 0;
rocksdb::WriteBatch batch;

ScopeRecordLock l(lock_mgr_, key);
std::string meta_value;
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
Expand Down Expand Up @@ -1120,7 +1110,6 @@ Status RedisLists::PKRScanRange(const Slice& key_start, const Slice& key_end, co

Status RedisLists::Expire(const Slice& key, int32_t ttl) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
Expand All @@ -1143,7 +1132,6 @@ Status RedisLists::Expire(const Slice& key, int32_t ttl) {

Status RedisLists::Del(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
Expand Down Expand Up @@ -1237,7 +1225,6 @@ bool RedisLists::PKExpireScan(const std::string& start_key, int32_t min_timestam

Status RedisLists::Expireat(const Slice& key, int32_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
Expand All @@ -1259,7 +1246,6 @@ Status RedisLists::Expireat(const Slice& key, int32_t timestamp) {

Status RedisLists::Persist(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
Expand Down
11 changes: 0 additions & 11 deletions src/storage/src/redis_sets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ rocksdb::Status RedisSets::SAdd(const Slice& key, const std::vector<std::string>
}

rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
int32_t version = 0;
std::string meta_value;
rocksdb::Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
Expand Down Expand Up @@ -365,7 +364,6 @@ rocksdb::Status RedisSets::SDiffstore(const Slice& destination, const std::vecto

std::string meta_value;
int32_t version = 0;
ScopeRecordLock l(lock_mgr_, destination);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;
std::vector<KeyVersion> vaild_sets;
Expand Down Expand Up @@ -540,7 +538,6 @@ rocksdb::Status RedisSets::SInterstore(const Slice& destination, const std::vect
std::string meta_value;
int32_t version = 0;
bool have_invalid_sets = false;
ScopeRecordLock l(lock_mgr_, destination);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;
std::vector<KeyVersion> vaild_sets;
Expand Down Expand Up @@ -841,7 +838,6 @@ rocksdb::Status RedisSets::SPop(const Slice& key, std::vector<std::string>* memb

std::string meta_value;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

uint64_t start_us = pstd::NowMicros();
Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
Expand Down Expand Up @@ -935,7 +931,6 @@ rocksdb::Status RedisSets::SRandmember(const Slice& key, int32_t count, std::vec

std::string meta_value;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);
std::vector<int32_t> targets;
std::unordered_set<int32_t> unique;

Expand Down Expand Up @@ -996,7 +991,6 @@ rocksdb::Status RedisSets::SRandmember(const Slice& key, int32_t count, std::vec
rocksdb::Status RedisSets::SRem(const Slice& key, const std::vector<std::string>& members, int32_t* ret) {
*ret = 0;
rocksdb::WriteBatch batch;
ScopeRecordLock l(lock_mgr_, key);

int32_t version = 0;
uint32_t statistic = 0;
Expand Down Expand Up @@ -1099,7 +1093,6 @@ rocksdb::Status RedisSets::SUnionstore(const Slice& destination, const std::vect

std::string meta_value;
int32_t version = 0;
ScopeRecordLock l(lock_mgr_, destination);
ScopeSnapshot ss(db_, &snapshot);
read_options.snapshot = snapshot;
std::vector<KeyVersion> vaild_sets;
Expand Down Expand Up @@ -1345,7 +1338,6 @@ rocksdb::Status RedisSets::PKRScanRange(const Slice& key_start, const Slice& key

rocksdb::Status RedisSets::Expire(const Slice& key, int32_t ttl) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
Expand All @@ -1368,7 +1360,6 @@ rocksdb::Status RedisSets::Expire(const Slice& key, int32_t ttl) {

rocksdb::Status RedisSets::Del(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
Expand Down Expand Up @@ -1462,7 +1453,6 @@ bool RedisSets::PKExpireScan(const std::string& start_key, int32_t min_timestamp

rocksdb::Status RedisSets::Expireat(const Slice& key, int32_t timestamp) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
Expand All @@ -1484,7 +1474,6 @@ rocksdb::Status RedisSets::Expireat(const Slice& key, int32_t timestamp) {

rocksdb::Status RedisSets::Persist(const Slice& key) {
std::string meta_value;
ScopeRecordLock l(lock_mgr_, key);
rocksdb::Status s = db_->Get(default_read_options_, handles_[0], key, &meta_value);
if (s.ok()) {
ParsedSetsMetaValue parsed_sets_meta_value(&meta_value);
Expand Down
Loading
Loading