From 9b63bccc4433f3bd3c4f7eab0ad96d7297406d32 Mon Sep 17 00:00:00 2001 From: jakevin Date: Sun, 10 Jul 2022 21:08:41 +0800 Subject: [PATCH 1/3] Return values instead of passing pointers (#722) --- src/replication.cc | 3 +- src/storage.cc | 120 +++++++++++++++--------------- src/storage.h | 4 +- src/util.cc | 29 ++++---- src/util.h | 2 +- tests/cppunit/string_util_test.cc | 3 +- 6 files changed, 80 insertions(+), 81 deletions(-) diff --git a/src/replication.cc b/src/replication.cc index c1a4a29e195..11e4640cbe8 100644 --- a/src/replication.cc +++ b/src/replication.cc @@ -952,8 +952,7 @@ rocksdb::Status ReplicationThread::ParseWriteBatch(const std::string &batch_stri break; case kBatchTypePropagate: if (write_batch_handler.Key() == Engine::kPropagateScriptCommand) { - std::vector tokens; - Util::TokenizeRedisProtocol(write_batch_handler.Value(), &tokens); + std::vector tokens = Util::TokenizeRedisProtocol(write_batch_handler.Value()); if (!tokens.empty()) { srv_->ExecPropagatedCommand(tokens); } diff --git a/src/storage.cc b/src/storage.cc index 2be8f4238c2..b263a0d5c24 100644 --- a/src/storage.cc +++ b/src/storage.cc @@ -93,17 +93,19 @@ void Storage::CloseDB() { db_ = nullptr; } -void Storage::InitTableOptions(rocksdb::BlockBasedTableOptions *table_options) { - table_options->format_version = 5; - table_options->index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; - table_options->filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false)); - table_options->partition_filters = true; - table_options->optimize_filters_for_memory = true; - table_options->metadata_block_size = 4096; - table_options->data_block_index_type = +rocksdb::BlockBasedTableOptions Storage::InitTableOptions() { + rocksdb::BlockBasedTableOptions table_options; + table_options.format_version = 5; + table_options.index_type = rocksdb::BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch; + table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(10, false)); + table_options.partition_filters = true; + table_options.optimize_filters_for_memory = true; + table_options.metadata_block_size = 4096; + table_options.data_block_index_type = rocksdb::BlockBasedTableOptions::DataBlockIndexType::kDataBlockBinaryAndHash; - table_options->data_block_hash_table_util_ratio = 0.75; - table_options->block_size = static_cast(config_->RocksDB.block_size); + table_options.data_block_hash_table_util_ratio = 0.75; + table_options.block_size = static_cast(config_->RocksDB.block_size); + return table_options; } void Storage::SetBlobDB(rocksdb::ColumnFamilyOptions *cf_options) { @@ -116,59 +118,62 @@ void Storage::SetBlobDB(rocksdb::ColumnFamilyOptions *cf_options) { cf_options->blob_garbage_collection_age_cutoff = config_->RocksDB.blob_garbage_collection_age_cutoff / 100.0; } -void Storage::InitOptions(rocksdb::Options *options) { - options->create_if_missing = true; - options->create_missing_column_families = true; +rocksdb::Options Storage::InitOptions() { + rocksdb::Options options; + options.create_if_missing = true; + options.create_missing_column_families = true; // options.IncreaseParallelism(2); // NOTE: the overhead of statistics is 5%-10%, so it should be configurable in prod env // See: https://github.com/facebook/rocksdb/wiki/Statistics - options->statistics = rocksdb::CreateDBStatistics(); - options->stats_dump_period_sec = config_->RocksDB.stats_dump_period_sec; - options->max_open_files = config_->RocksDB.max_open_files; - options->compaction_style = rocksdb::CompactionStyle::kCompactionStyleLevel; - options->max_subcompactions = static_cast(config_->RocksDB.max_sub_compactions); - options->max_background_flushes = config_->RocksDB.max_background_flushes; - options->max_background_compactions = config_->RocksDB.max_background_compactions; - options->max_write_buffer_number = config_->RocksDB.max_write_buffer_number; - options->min_write_buffer_number_to_merge = 2; - options->write_buffer_size = config_->RocksDB.write_buffer_size * MiB; - options->num_levels = 7; - options->compression_per_level.resize(options->num_levels); + options.statistics = rocksdb::CreateDBStatistics(); + options.stats_dump_period_sec = config_->RocksDB.stats_dump_period_sec; + options.max_open_files = config_->RocksDB.max_open_files; + options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleLevel; + options.max_subcompactions = static_cast(config_->RocksDB.max_sub_compactions); + options.max_background_flushes = config_->RocksDB.max_background_flushes; + options.max_background_compactions = config_->RocksDB.max_background_compactions; + options.max_write_buffer_number = config_->RocksDB.max_write_buffer_number; + options.min_write_buffer_number_to_merge = 2; + options.write_buffer_size = config_->RocksDB.write_buffer_size * MiB; + options.num_levels = 7; + options.compression_per_level.resize(options.num_levels); // only compress levels >= 2 - for (int i = 0; i < options->num_levels; ++i) { + for (int i = 0; i < options.num_levels; ++i) { if (i < 2) { - options->compression_per_level[i] = rocksdb::CompressionType::kNoCompression; + options.compression_per_level[i] = rocksdb::CompressionType::kNoCompression; } else { - options->compression_per_level[i] = static_cast(config_->RocksDB.compression); + options.compression_per_level[i] = static_cast(config_->RocksDB.compression); } } if (config_->RocksDB.row_cache_size) { - options->row_cache = rocksdb::NewLRUCache(config_->RocksDB.row_cache_size * MiB); - } - options->enable_pipelined_write = config_->RocksDB.enable_pipelined_write; - options->target_file_size_base = config_->RocksDB.target_file_size_base * MiB; - options->max_manifest_file_size = 64 * MiB; - options->max_log_file_size = 256 * MiB; - options->keep_log_file_num = 12; - options->WAL_ttl_seconds = static_cast(config_->RocksDB.WAL_ttl_seconds); - options->WAL_size_limit_MB = static_cast(config_->RocksDB.WAL_size_limit_MB); - options->max_total_wal_size = static_cast(config_->RocksDB.max_total_wal_size * MiB); - options->listeners.emplace_back(new EventListener(this)); - options->dump_malloc_stats = true; + options.row_cache = rocksdb::NewLRUCache(config_->RocksDB.row_cache_size * MiB); + } + options.enable_pipelined_write = config_->RocksDB.enable_pipelined_write; + options.target_file_size_base = config_->RocksDB.target_file_size_base * MiB; + options.max_manifest_file_size = 64 * MiB; + options.max_log_file_size = 256 * MiB; + options.keep_log_file_num = 12; + options.WAL_ttl_seconds = static_cast(config_->RocksDB.WAL_ttl_seconds); + options.WAL_size_limit_MB = static_cast(config_->RocksDB.WAL_size_limit_MB); + options.max_total_wal_size = static_cast(config_->RocksDB.max_total_wal_size * MiB); + options.listeners.emplace_back(new EventListener(this)); + options.dump_malloc_stats = true; sst_file_manager_ = std::shared_ptr(rocksdb::NewSstFileManager(rocksdb::Env::Default())); - options->sst_file_manager = sst_file_manager_; + options.sst_file_manager = sst_file_manager_; uint64_t max_io_mb = kIORateLimitMaxMb; if (config_->max_io_mb > 0) max_io_mb = static_cast(config_->max_io_mb); rate_limiter_ = std::shared_ptr(rocksdb::NewGenericRateLimiter(max_io_mb * MiB)); - options->rate_limiter = rate_limiter_; - options->delayed_write_rate = static_cast(config_->RocksDB.delayed_write_rate); - options->compaction_readahead_size = static_cast(config_->RocksDB.compaction_readahead_size); - options->level0_slowdown_writes_trigger = config_->RocksDB.level0_slowdown_writes_trigger; - options->level0_stop_writes_trigger = config_->RocksDB.level0_stop_writes_trigger; - options->level0_file_num_compaction_trigger = config_->RocksDB.level0_file_num_compaction_trigger; - options->max_bytes_for_level_base = config_->RocksDB.max_bytes_for_level_base; - options->max_bytes_for_level_multiplier = config_->RocksDB.max_bytes_for_level_multiplier; - options->level_compaction_dynamic_level_bytes = config_->RocksDB.level_compaction_dynamic_level_bytes; + options.rate_limiter = rate_limiter_; + options.delayed_write_rate = static_cast(config_->RocksDB.delayed_write_rate); + options.compaction_readahead_size = static_cast(config_->RocksDB.compaction_readahead_size); + options.level0_slowdown_writes_trigger = config_->RocksDB.level0_slowdown_writes_trigger; + options.level0_stop_writes_trigger = config_->RocksDB.level0_stop_writes_trigger; + options.level0_file_num_compaction_trigger = config_->RocksDB.level0_file_num_compaction_trigger; + options.max_bytes_for_level_base = config_->RocksDB.max_bytes_for_level_base; + options.max_bytes_for_level_multiplier = config_->RocksDB.max_bytes_for_level_multiplier; + options.level_compaction_dynamic_level_bytes = config_->RocksDB.level_compaction_dynamic_level_bytes; + + return options; } Status Storage::SetColumnFamilyOption(const std::string &key, const std::string &value) { @@ -234,8 +239,7 @@ Status Storage::Open(bool read_only) { size_t metadata_block_cache_size = config_->RocksDB.metadata_block_cache_size*MiB; size_t subkey_block_cache_size = config_->RocksDB.subkey_block_cache_size*MiB; - rocksdb::Options options; - InitOptions(&options); + rocksdb::Options options = InitOptions(); CreateColumnFamilies(options); std::shared_ptr shared_block_cache; @@ -244,8 +248,7 @@ Status Storage::Open(bool read_only) { shared_block_cache = rocksdb::NewLRUCache(shared_block_cache_size, -1, false, 0.75); } - rocksdb::BlockBasedTableOptions metadata_table_opts; - InitTableOptions(&metadata_table_opts); + rocksdb::BlockBasedTableOptions metadata_table_opts = InitTableOptions(); metadata_table_opts.block_cache = shared_block_cache ? shared_block_cache : rocksdb::NewLRUCache(metadata_block_cache_size, -1, false, 0.75); metadata_table_opts.pin_l0_filter_and_index_blocks_in_cache = true; @@ -263,8 +266,7 @@ Status Storage::Open(bool read_only) { NewCompactOnExpiredTableCollectorFactory(kMetadataColumnFamilyName, 0.3)); SetBlobDB(&metadata_opts); - rocksdb::BlockBasedTableOptions subkey_table_opts; - InitTableOptions(&subkey_table_opts); + rocksdb::BlockBasedTableOptions subkey_table_opts = InitTableOptions(); subkey_table_opts.block_cache = shared_block_cache ? shared_block_cache : rocksdb::NewLRUCache(subkey_block_cache_size, -1, false, 0.75); subkey_table_opts.pin_l0_filter_and_index_blocks_in_cache = true; @@ -278,16 +280,14 @@ Status Storage::Open(bool read_only) { NewCompactOnExpiredTableCollectorFactory(kSubkeyColumnFamilyName, 0.3)); SetBlobDB(&subkey_opts); - rocksdb::BlockBasedTableOptions pubsub_table_opts; - InitTableOptions(&pubsub_table_opts); + rocksdb::BlockBasedTableOptions pubsub_table_opts = InitTableOptions(); rocksdb::ColumnFamilyOptions pubsub_opts(options); pubsub_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(pubsub_table_opts)); pubsub_opts.compaction_filter_factory = std::make_shared(); pubsub_opts.disable_auto_compactions = config_->RocksDB.disable_auto_compactions; SetBlobDB(&pubsub_opts); - rocksdb::BlockBasedTableOptions propagate_table_opts; - InitTableOptions(&propagate_table_opts); + rocksdb::BlockBasedTableOptions propagate_table_opts = InitTableOptions(); rocksdb::ColumnFamilyOptions propagate_opts(options); propagate_opts.table_factory.reset(rocksdb::NewBlockBasedTableFactory(propagate_table_opts)); propagate_opts.compaction_filter_factory = std::make_shared(); diff --git a/src/storage.h b/src/storage.h index 83ce11a5fb2..7527e1f111a 100644 --- a/src/storage.h +++ b/src/storage.h @@ -68,9 +68,9 @@ class Storage { Status OpenForReadOnly(); void CloseDB(); void EmptyDB(); - void InitTableOptions(rocksdb::BlockBasedTableOptions *table_options); + rocksdb::BlockBasedTableOptions InitTableOptions(); void SetBlobDB(rocksdb::ColumnFamilyOptions *cf_options); - void InitOptions(rocksdb::Options *options); + rocksdb::Options InitOptions(); Status SetColumnFamilyOption(const std::string &key, const std::string &value); Status SetOption(const std::string &key, const std::string &value); Status SetDBOption(const std::string &key, const std::string &value); diff --git a/src/util.cc b/src/util.cc index 21d633fdb42..493101e2b05 100644 --- a/src/util.cc +++ b/src/util.cc @@ -570,11 +570,11 @@ void BytesToHuman(char *buf, size_t size, uint64_t n) { } } -void TokenizeRedisProtocol(const std::string &value, std::vector *tokens) { - tokens->clear(); +std::vector TokenizeRedisProtocol(const std::string &value) { + std::vector tokens; if (value.empty()) { - return; + return tokens; } enum ParserState { stateArrayLen, stateBulkLen, stateBulkData }; @@ -585,12 +585,12 @@ void TokenizeRedisProtocol(const std::string &value, std::vector *t switch (state) { case stateArrayLen: if (start[0] != '*') { - return; + return tokens; } p = strchr(start, '\r'); if (!p || (p == end) || p[1] != '\n') { - tokens->clear(); - return; + tokens.clear(); + return tokens; } array_len = std::stoull(std::string(start+1, p)); start = p + 2; @@ -599,12 +599,12 @@ void TokenizeRedisProtocol(const std::string &value, std::vector *t case stateBulkLen: if (start[0] != '$') { - return; + return tokens; } p = strchr(start, '\r'); if (!p || (p == end) || p[1] != '\n') { - tokens->clear(); - return; + tokens.clear(); + return tokens; } bulk_len = std::stoull(std::string(start+1, p)); start = p + 2; @@ -613,18 +613,19 @@ void TokenizeRedisProtocol(const std::string &value, std::vector *t case stateBulkData: if (bulk_len+2 > static_cast(end-start)) { - tokens->clear(); - return; + tokens.clear(); + return tokens; } - tokens->emplace_back(std::string(start, start+bulk_len)); + tokens.emplace_back(std::string(start, start+bulk_len)); start += bulk_len + 2; state = stateBulkLen; break; } } - if (array_len != tokens->size()) { - tokens->clear(); + if (array_len != tokens.size()) { + tokens.clear(); } + return tokens; } bool IsPortInUse(int port) { diff --git a/src/util.h b/src/util.h index f6735271564..625e16e50e2 100644 --- a/src/util.h +++ b/src/util.h @@ -61,7 +61,7 @@ bool HasPrefix(const std::string &str, const std::string &prefix); int StringMatch(const std::string &pattern, const std::string &in, int nocase); int StringMatchLen(const char *p, int plen, const char *s, int slen, int nocase); std::string StringToHex(const std::string &input); -void TokenizeRedisProtocol(const std::string &value, std::vector *tokens); +std::vector TokenizeRedisProtocol(const std::string &value); void ThreadSetName(const char *name); int aeWait(int fd, int mask, uint64_t milliseconds); diff --git a/tests/cppunit/string_util_test.cc b/tests/cppunit/string_util_test.cc index ea8315ec2f3..e75f3e56479 100644 --- a/tests/cppunit/string_util_test.cc +++ b/tests/cppunit/string_util_test.cc @@ -72,9 +72,8 @@ TEST(StringUtil, Split) { } TEST(StringUtil, TokenizeRedisProtocol) { - std::vector array; std::vector expected = {"this", "is", "a", "test"}; - Util::TokenizeRedisProtocol("*4\r\n$4\r\nthis\r\n$2\r\nis\r\n$1\r\na\r\n$4\r\ntest\r\n", &array); + auto array = Util::TokenizeRedisProtocol("*4\r\n$4\r\nthis\r\n$2\r\nis\r\n$1\r\na\r\n$4\r\ntest\r\n"); ASSERT_EQ(expected, array); } From 3f35369cb18af195e64ad08e314f2e80c8083c25 Mon Sep 17 00:00:00 2001 From: tison Date: Mon, 11 Jul 2022 14:20:43 +0800 Subject: [PATCH 2/3] ci: build docker image with travis ci (#724) Signed-off-by: tison --- .github/workflows/daily-ci.yaml | 51 --------------------------------- .travis.yml | 31 ++++++++++++++++++++ Dockerfile | 18 +++++++----- 3 files changed, 42 insertions(+), 58 deletions(-) delete mode 100644 .github/workflows/daily-ci.yaml create mode 100644 .travis.yml diff --git a/.github/workflows/daily-ci.yaml b/.github/workflows/daily-ci.yaml deleted file mode 100644 index 5d043c45fc1..00000000000 --- a/.github/workflows/daily-ci.yaml +++ /dev/null @@ -1,51 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: Daily CI - -on: - pull_request: - types: [opened, synchronize, reopened, labeled] - schedule: - - cron: '0 1 * * *' - workflow_dispatch: - -jobs: - build-docker-image: - name: Build Docker Image - if: | - (github.event_name != 'pull_request') || - (github.event.action == 'labeled' && github.event.label.name == 'needs-daily-ci') || - (github.event.action != 'labeled' && contains(github.event.pull_request.labels.*.name, 'needs-daily-ci')) - runs-on: ubuntu-18.04 - steps: - - name: Checkout Code Base - uses: actions/checkout@v3 - - - name: Set up QEMU - uses: docker/setup-qemu-action@v1 - - - name: Set up Docker Buildx - id: buildx - uses: docker/setup-buildx-action@v1 - - - name: Available platforms - run: echo ${{ steps.buildx.outputs.platforms }} - - - name: Build Docker Image - run: | - docker buildx build --platform linux/amd64,linux/arm64 . diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000000..33ccff8bb1d --- /dev/null +++ b/.travis.yml @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +language: cpp + +arch: + - amd64 + - arm64 + +os: linux +dist: focal + +services: + - docker + +script: + - docker build . diff --git a/Dockerfile b/Dockerfile index 1b16e699ced..f8a88495275 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,23 +15,27 @@ # specific language governing permissions and limitations # under the License. -FROM ubuntu:22.04 as build +FROM ubuntu:focal as build -RUN apt update && apt install -y cmake make git autoconf libtool g++ +# workaround tzdata install hanging +ENV TZ=Asia/Shanghai +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone + +RUN apt update +RUN apt install -y cmake make git autoconf libtool g++ WORKDIR /kvrocks COPY . . -RUN mkdir docker-build && ./build.sh docker-build - +RUN ./build.sh build -FROM ubuntu:22.04 +FROM ubuntu:focal WORKDIR /kvrocks -COPY --from=build /kvrocks/docker-build/kvrocks ./bin/ +COPY --from=build /kvrocks/build/kvrocks ./bin/ COPY ./kvrocks.conf ./conf/ -RUN sed -i -e 's|dir /tmp/kvrocks|dir /var/lib/kvrocks|g' ./conf/kvrocks.conf +RUN sed -i -e 's%dir /tmp/kvrocks%dir /var/lib/kvrocks%g' ./conf/kvrocks.conf VOLUME /var/lib/kvrocks EXPOSE 6666:6666 From b064b98bc6e3fb8e17c03af14523326e9f650383 Mon Sep 17 00:00:00 2001 From: jakevin Date: Mon, 11 Jul 2022 17:29:57 +0800 Subject: [PATCH 3/3] Use lock_guard to replace the manual mutex lock/unlock(#723) --- src/log_collector.cc | 15 +++++---------- src/server.cc | 22 +++++++--------------- src/task_runner.cc | 11 +++-------- src/worker.cc | 45 ++++++++++++++++++++++---------------------- 4 files changed, 38 insertions(+), 55 deletions(-) diff --git a/src/log_collector.cc b/src/log_collector.cc index fa43aeee16d..c9d7dff9224 100644 --- a/src/log_collector.cc +++ b/src/log_collector.cc @@ -53,36 +53,33 @@ LogCollector::~LogCollector() { template ssize_t LogCollector::Size() { size_t n; - mu_.lock(); + std::lock_guard guard(mu_); n = entries_.size(); - mu_.unlock(); return n; } template void LogCollector::Reset() { - mu_.lock(); + std::lock_guard guard(mu_); while (!entries_.empty()) { delete entries_.front(); entries_.pop_front(); } - mu_.unlock(); } template void LogCollector::SetMaxEntries(int64_t max_entries) { - mu_.lock(); + std::lock_guard guard(mu_); while (max_entries > 0 && static_cast(entries_.size()) > max_entries) { delete entries_.back(); entries_.pop_back(); } max_entries_ = max_entries; - mu_.unlock(); } template void LogCollector::PushEntry(T *entry) { - mu_.lock(); + std::lock_guard guard(mu_); entry->id = ++id_; entry->time = time(nullptr); if (max_entries_ > 0 && !entries_.empty() @@ -91,7 +88,6 @@ void LogCollector::PushEntry(T *entry) { entries_.pop_back(); } entries_.push_front(entry); - mu_.unlock(); } template @@ -99,7 +95,7 @@ std::string LogCollector::GetLatestEntries(int64_t cnt) { size_t n; std::string output; - mu_.lock(); + std::lock_guard guard(mu_); if (cnt > 0) { n = std::min(entries_.size(), static_cast(cnt)); } else { @@ -110,7 +106,6 @@ std::string LogCollector::GetLatestEntries(int64_t cnt) { output.append(entry->ToRedisString()); if (--n == 0) break; } - mu_.unlock(); return output; } diff --git a/src/server.cc b/src/server.cc index 604368e4379..a837c4a14b7 100644 --- a/src/server.cc +++ b/src/server.cc @@ -189,13 +189,12 @@ void Server::Join() { } Status Server::AddMaster(std::string host, uint32_t port, bool force_reconnect) { - slaveof_mu_.lock(); + std::lock_guard guard(slaveof_mu_); // Don't check host and port if 'force_reconnect' argument is set to true if (!force_reconnect && !master_host_.empty() && master_host_ == host && master_port_ == port) { - slaveof_mu_.unlock(); return Status::OK(); } @@ -226,12 +225,11 @@ Status Server::AddMaster(std::string host, uint32_t port, bool force_reconnect) } else { replication_thread_ = nullptr; } - slaveof_mu_.unlock(); return s; } Status Server::RemoveMaster() { - slaveof_mu_.lock(); + std::lock_guard guard(slaveof_mu_); if (!master_host_.empty()) { master_host_.clear(); master_port_ = 0; @@ -240,7 +238,6 @@ Status Server::RemoveMaster() { replication_thread_ = nullptr; storage_->ShiftReplId(); } - slaveof_mu_.unlock(); return Status::OK(); } @@ -258,7 +255,7 @@ Status Server::AddSlave(Redis::Connection *conn, rocksdb::SequenceNumber next_re } void Server::DisconnectSlaves() { - slave_threads_mu_.lock(); + std::lock_guard guard(slaveof_mu_); for (const auto &slave_thread : slave_threads_) { if (!slave_thread->IsStopped()) slave_thread->Stop(); } @@ -268,12 +265,11 @@ void Server::DisconnectSlaves() { slave_thread->Join(); delete slave_thread; } - slave_threads_mu_.unlock(); } void Server::cleanupExitedSlaves() { std::list exited_slave_threads; - slave_threads_mu_.lock(); + std::lock_guard guard(slaveof_mu_); for (const auto &slave_thread : slave_threads_) { if (slave_thread->IsStopped()) exited_slave_threads.emplace_back(slave_thread); @@ -285,7 +281,6 @@ void Server::cleanupExitedSlaves() { t->Join(); delete t; } - slave_threads_mu_.unlock(); } void Server::FeedMonitorConns(Redis::Connection *conn, const std::vector &tokens) { @@ -830,15 +825,13 @@ void Server::GetRoleInfo(std::string *info) { std::string Server::GetLastRandomKeyCursor() { std::string cursor; - last_random_key_cursor_mu_.lock(); + std::lock_guard guard(last_random_key_cursor_mu_); cursor = last_random_key_cursor_; - last_random_key_cursor_mu_.unlock(); return cursor; } void Server::SetLastRandomKeyCursor(const std::string &cursor) { - last_random_key_cursor_mu_.lock(); + std::lock_guard guard(last_random_key_cursor_mu_); last_random_key_cursor_ = cursor; - last_random_key_cursor_mu_.unlock(); } int Server::GetUnixTime() { @@ -1264,11 +1257,10 @@ std::string Server::GetClientsStr() { for (const auto &t : worker_threads_) { clients.append(t->GetWorker()->GetClientsStr()); } - slave_threads_mu_.lock(); + std::lock_guard guard(slave_threads_mu_); for (const auto &st : slave_threads_) { clients.append(st->GetConn()->ToString()); } - slave_threads_mu_.unlock(); return clients; } diff --git a/src/task_runner.cc b/src/task_runner.cc index 2747a4c8844..54dfb886e93 100644 --- a/src/task_runner.cc +++ b/src/task_runner.cc @@ -24,18 +24,15 @@ #include "util.h" Status TaskRunner::Publish(const Task &task) { - mu_.lock(); + std::lock_guard guard(mu_); if (stop_) { - mu_.unlock(); return Status(Status::NotOK, "the runner was stopped"); } if (task_queue_.size() >= max_queue_size_) { - mu_.unlock(); return Status(Status::NotOK, "the task queue was reached max length"); } task_queue_.emplace_back(task); cond_.notify_all(); - mu_.unlock(); return Status::OK(); } @@ -50,10 +47,9 @@ void TaskRunner::Start() { } void TaskRunner::Stop() { - mu_.lock(); + std::lock_guard guard(mu_); stop_ = true; cond_.notify_all(); - mu_.unlock(); } void TaskRunner::Join() { @@ -63,10 +59,9 @@ void TaskRunner::Join() { } void TaskRunner::Purge() { - mu_.lock(); + std::lock_guard guard(mu_); threads_.clear(); task_queue_.clear(); - mu_.unlock(); } void TaskRunner::run() { diff --git a/src/worker.cc b/src/worker.cc index 06faeb44eb9..d0bb4126219 100644 --- a/src/worker.cc +++ b/src/worker.cc @@ -345,10 +345,11 @@ Status Worker::Reply(int fd, const std::string &reply) { } void Worker::BecomeMonitorConn(Redis::Connection *conn) { - conns_mu_.lock(); - conns_.erase(conn->GetFD()); - monitor_conns_[conn->GetFD()] = conn; - conns_mu_.unlock(); + { + std::lock_guard guard(conns_mu_); + conns_.erase(conn->GetFD()); + monitor_conns_[conn->GetFD()] = conn; + } svr_->IncrMonitorClientNum(); conn->EnableFlag(Redis::Connection::kMonitor); } @@ -384,7 +385,7 @@ std::string Worker::GetClientsStr() { void Worker::KillClient(Redis::Connection *self, uint64_t id, std::string addr, uint64_t type, bool skipme, int64_t *killed) { - conns_mu_.lock(); + std::lock_guard guard(conns_mu_); for (const auto &iter : conns_) { Redis::Connection* conn = iter.second; if (skipme && self == conn) continue; @@ -400,29 +401,29 @@ void Worker::KillClient(Redis::Connection *self, uint64_t id, std::string addr, (*killed)++; } } - conns_mu_.unlock(); } void Worker::KickoutIdleClients(int timeout) { - conns_mu_.lock(); std::list> to_be_killed_conns; - if (conns_.empty()) { - conns_mu_.unlock(); - return; - } - int iterations = std::min(static_cast(conns_.size()), 50); - auto iter = conns_.upper_bound(last_iter_conn_fd); - while (iterations--) { - if (iter == conns_.end()) iter = conns_.begin(); - if (static_cast(iter->second->GetIdleTime()) >= timeout) { - to_be_killed_conns.emplace_back(std::make_pair(iter->first, iter->second->GetID())); + { + std::lock_guard guard(conns_mu_); + if (conns_.empty()) { + return; } - iter++; + int iterations = std::min(static_cast(conns_.size()), 50); + auto iter = conns_.upper_bound(last_iter_conn_fd); + while (iterations--) { + if (iter == conns_.end()) + iter = conns_.begin(); + if (static_cast(iter->second->GetIdleTime()) >= timeout) { + to_be_killed_conns.emplace_back( + std::make_pair(iter->first, iter->second->GetID())); + } + iter++; + } + iter--; + last_iter_conn_fd = iter->first; } - iter--; - last_iter_conn_fd = iter->first; - conns_mu_.unlock(); - for (const auto &conn : to_be_killed_conns) { FreeConnectionByID(conn.first, conn.second); }