Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Feature/floyd rebase unstable #2413

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ elseif(${BUILD_TYPE} STREQUAL RELWITHDEBINFO)
else()
set(LIB_BUILD_TYPE RELEASE)
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -g -DNDEBUG")

endif()

if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
Expand Down Expand Up @@ -172,7 +171,6 @@ set(GTEST_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
set(GTEST_MAIN_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
set(GMOCK_INCLUDE_DIR ${INSTALL_INCLUDEDIR})


ExternalProject_Add(gflags
URL
https://github.com/gflags/gflags/archive/refs/tags/v2.2.2.tar.gz
Expand Down
4 changes: 4 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
# Port 10221 is used for Rsync, and port 11221 is used for Replication, while the listening port is 9221.
port : 9221

db-instance-num : 3
rocksdb-ttl-second : 86400 * 7;
rocksdb-periodic-second : 86400 * 3;

# Random value identifying the Pika server, its string length must be 40.
# If not set, Pika will generate a random string with a length of 40 random characters.
# run-id :
Expand Down
2 changes: 1 addition & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ class DiskRecoveryCmd : public Cmd {

private:
void DoInitial() override;
std::map<std::string, uint64_t> background_errors_;
std::map<int, uint64_t> background_errors_;
};

class ClearReplicationIDCmd : public Cmd {
Expand Down
2 changes: 0 additions & 2 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@
#include "pstd/include/pstd_mutex.h"
#include "pstd/include/pstd_status.h"
#include "pstd/include/noncopyable.h"

#include "include/pika_define.h"


std::string NewFileName(const std::string& name, uint32_t current);

class Version final : public pstd::noncopyable {
Expand Down
2 changes: 1 addition & 1 deletion include/pika_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,4 @@ class PikaCache : public pstd::noncopyable, public std::enable_shared_from_this<
std::vector<std::shared_ptr<pstd::Mutex>> cache_mutexs_;
};

#endif
#endif
2 changes: 1 addition & 1 deletion include/pika_cache_load_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class PikaCacheLoadThread : public net::Thread {
private:
std::atomic_bool should_exit_;
std::deque<std::tuple<const char, std::string, const std::shared_ptr<DB>>> loadkeys_queue_;

pstd::CondVar loadkeys_cond_;
pstd::Mutex loadkeys_mutex_;

Expand Down
27 changes: 24 additions & 3 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
#include <set>
#include <unordered_set>

#include "rocksdb/compression_type.h"

#include "pstd/include/base_conf.h"
#include "pstd/include/pstd_mutex.h"
#include "pstd/include/pstd_string.h"

#include "acl.h"
#include "include/pika_define.h"
#include "include/pika_meta.h"
#include "rocksdb/compression_type.h"

#define kBinlogReadWinDefaultSize 9000
Expand Down Expand Up @@ -76,6 +77,15 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return db_path_;
}
int db_instance_num() {
return db_instance_num_;
}
uint64_t rocksdb_ttl_second() {
return rocksdb_ttl_second_.load();
}
uint64_t rocksdb_periodic_compaction_second() {
return rocksdb_periodic_second_.load();
}
std::string db_sync_path() {
std::shared_lock l(rwlock_);
return db_sync_path_;
Expand Down Expand Up @@ -376,7 +386,6 @@ class PikaConf : public pstd::BaseConf {
bool daemonize() { return daemonize_; }
std::string pidfile() { return pidfile_; }
int binlog_file_size() { return binlog_file_size_; }
PikaMeta* local_meta() { return local_meta_.get(); }
std::vector<rocksdb::CompressionType> compression_per_level();
std::string compression_all_levels() const { return compression_per_level_; };
static rocksdb::CompressionType GetCompression(const std::string& value);
Expand Down Expand Up @@ -416,6 +425,15 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("slaveof", value);
slaveof_ = value;
}

void SetRocksdbTTLSecond(uint64_t ttl) {
rocksdb_ttl_second_.store(ttl);
}

void SetRocksdbPeriodicSecond(uint64_t value) {
rocksdb_periodic_second_.store(value);
}

void SetReplicationID(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("replication-id", value);
Expand Down Expand Up @@ -655,6 +673,7 @@ class PikaConf : public pstd::BaseConf {
int ConfigRewriteReplicationID();

private:
// TODO: replace mutex with atomic value
int port_ = 0;
int slave_priority_ = 0;
int thread_num_ = 0;
Expand All @@ -668,6 +687,7 @@ class PikaConf : public pstd::BaseConf {
std::string log_path_;
std::string log_level_;
std::string db_path_;
int db_instance_num_ = 0;
std::string db_sync_path_;
std::string compact_cron_;
std::string compact_interval_;
Expand Down Expand Up @@ -719,6 +739,8 @@ class PikaConf : public pstd::BaseConf {
int max_background_compactions_ = 0;
int max_background_jobs_ = 0;
int max_cache_files_ = 0;
std::atomic<uint64_t> rocksdb_ttl_second_ = 0;
std::atomic<uint64_t> rocksdb_periodic_second_ = 0;
int max_bytes_for_level_multiplier_ = 0;
int64_t block_size_ = 0;
int64_t block_cache_ = 0;
Expand Down Expand Up @@ -787,7 +809,6 @@ class PikaConf : public pstd::BaseConf {
int64_t blob_file_size_ = 256 * 1024 * 1024; // 256M
std::string blob_compression_type_ = "none";

std::unique_ptr<PikaMeta> local_meta_;
std::shared_mutex rwlock_;

// Rsync Rate limiting configuration
Expand Down
4 changes: 2 additions & 2 deletions include/pika_consensus.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

#include <utility>

#include "include/pika_define.h"
#include "pstd/include/env.h"
#include "include/pika_binlog_transverter.h"
#include "include/pika_client_conn.h"
#include "include/pika_define.h"
#include "include/pika_slave_node.h"
#include "include/pika_stable_log.h"
#include "pstd/include/env.h"

class Context : public pstd::noncopyable {
public:
Expand Down
8 changes: 4 additions & 4 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ const std::string kDefaultRsyncAuth = "default";
const int kMaxRsyncParallelNum = 4;

struct DBStruct {
DBStruct(std::string tn)
: db_name(std::move(tn)) {}
DBStruct(std::string tn, int32_t inst_num)
: db_name(std::move(tn)), db_instance_num(inst_num) {}

bool operator==(const DBStruct& db_struct) const {
return db_name == db_struct.db_name;
return db_name == db_struct.db_name && db_instance_num == db_struct.db_instance_num;
}
std::string db_name;
int32_t db_instance_num = 0;
};

// slave item
struct SlaveItem {
std::string ip_port;
std::string ip;
Expand Down
33 changes: 0 additions & 33 deletions include/pika_meta.h

This file was deleted.

47 changes: 47 additions & 0 deletions include/pika_monitor_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_MONITOR_THREAD_H_
#define PIKA_MONITOR_THREAD_H_

#include <atomic>
#include <deque>
#include <list>
#include <queue>

#include "net/include/net_thread.h"
#include "pstd/include/pstd_mutex.h"
#include "include/pika_define.h"
#include "include/pika_client_conn.h"

class PikaMonitorThread : public net::Thread {
public:
PikaMonitorThread();
~PikaMonitorThread() override;

void AddMonitorClient(const std::shared_ptr<PikaClientConn>& client_ptr);
void AddMonitorMessage(const std::string& monitor_message);
int32_t ThreadClientList(std::vector<ClientInfo>* client = nullptr);
bool ThreadClientKill(const std::string& ip_port = "all");
bool HasMonitorClients();

private:
void AddCronTask(const MonitorCronTask& task);
bool FindClient(const std::string& ip_port);
net::WriteStatus SendMessage(int32_t fd, std::string& message);
void RemoveMonitorClient(const std::string& ip_port);

std::atomic<bool> has_monitor_clients_;
pstd::Mutex monitor_mutex_protector_;
pstd::CondVar monitor_cond_;

std::list<ClientInfo> monitor_clients_;
std::deque<std::string> monitor_messages_;
std::queue<MonitorCronTask> cron_tasks_;

void* ThreadMain() override;
void RemoveMonitorClient(int32_t client_fd);
};
#endif
2 changes: 1 addition & 1 deletion include/pika_repl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
#include "net/include/net_conn.h"
#include "net/include/thread_pool.h"
#include "pstd/include/pstd_status.h"
#include "include/pika_define.h"

#include "include/pika_binlog_reader.h"
#include "include/pika_define.h"
#include "include/pika_repl_bgworker.h"
#include "include/pika_repl_client_thread.h"

Expand Down
2 changes: 1 addition & 1 deletion include/pika_rsync_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#ifndef PIKA_RSYNC_SERVICE_H_
#define PIKA_RSYNC_SERVICE_H_

#include "iostream"
#include <iostream>

class PikaRsyncService {
public:
Expand Down
17 changes: 4 additions & 13 deletions include/pika_slot_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,14 @@
const std::string SlotKeyPrefix = "_internal:slotkey:4migrate:";
const std::string SlotTagPrefix = "_internal:slottag:4migrate:";

extern uint32_t crc32tab[256];
const size_t MaxKeySendSize = 10 * 1024;

void CRC32TableInit(uint32_t poly);

extern void InitCRC32Table();

extern uint32_t CRC32Update(uint32_t crc, const char* buf, int len);
extern uint32_t CRC32CheckSum(const char* buf, int len);

int GetSlotID(const std::string &str);
int GetKeyType(const std::string& key, std::string& key_type, const std::shared_ptr<DB>& db);
int DeleteKey(const std::string& key, const char key_type, const std::shared_ptr<DB>& db);
int GetSlotsID(const std::string& str, uint32_t* pcrc, int* phastag);
int GetKeyType(const std::string& key, std::string &key_type, const std::shared_ptr<DB>& db);
void AddSlotKey(const std::string& type, const std::string& key, const std::shared_ptr<DB>& db);
void RemSlotKey(const std::string& key, const std::shared_ptr<DB>& db);
int DeleteKey(const std::string& key, const char key_type, const std::shared_ptr<DB>& db);
void RemSlotKeyByType(const std::string& type, const std::string& key, const std::shared_ptr<DB>& db);
std::string GetSlotKey(int slot);
std::string GetSlotKey(uint32_t slot);
std::string GetSlotsTagKey(uint32_t crc);

class PikaMigrate {
Expand Down
4 changes: 2 additions & 2 deletions src/cache/include/cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class RedisCache {
static void ResetHitAndMissNum(void);
Status Open(void);
int32_t ActiveExpireCycle(void);

// Normal Commands
bool Exists(std::string& key);
int64_t DbSize(void);
Expand Down Expand Up @@ -163,7 +163,7 @@ class RedisCache {
void FreeHitemList(hitem *items, uint32_t size);
void FreeZitemList(zitem *items, uint32_t size);
void ConvertObjectToString(robj *obj, std::string *value);

private:
RedisCache(const RedisCache&);
RedisCache& operator=(const RedisCache&);
Expand Down
2 changes: 1 addition & 1 deletion src/cache/src/hash.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,4 +309,4 @@ Status RedisCache::HStrlen(std::string& key, std::string &field, uint64_t *len)

} // namespace cache

/* EOF */
/* EOF */
5 changes: 3 additions & 2 deletions src/net/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ namespace net {
using TaskFunc = void (*)(void *);

struct Task {
TaskFunc func;
void* arg;
Task() = default;
TaskFunc func = nullptr;
void* arg = nullptr;
Task(TaskFunc _func, void* _arg) : func(_func), arg(_arg) {}
};

Expand Down
18 changes: 9 additions & 9 deletions src/pika.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,19 @@
#include <csignal>
#include <memory.h>

#include "include/build_version.h"
#include "net/include/net_stats.h"
#include "pstd/include/pika_codis_slot.h"
#include "include/pika_define.h"
#include "pstd/include/pstd_defer.h"
#include "include/pika_conf.h"
#include "pstd/include/env.h"
#include "include/pika_cmd_table_manager.h"
#include "include/pika_slot_command.h"
#include "include/build_version.h"
#include "include/pika_command.h"
#include "include/pika_conf.h"
#include "include/pika_define.h"
#include "include/pika_rm.h"
#include "include/pika_server.h"
#include "include/pika_slot_command.h"
#include "include/pika_version.h"
#include "net/include/net_stats.h"
#include "pstd/include/env.h"
#include "pstd/include/pstd_defer.h"
#include "include/pika_rm.h"

std::unique_ptr<PikaConf> g_pika_conf;
// todo : change to unique_ptr will coredump
Expand Down Expand Up @@ -202,7 +203,6 @@ int main(int argc, char* argv[]) {

PikaGlogInit();
PikaSignalSetup();
InitCRC32Table();

LOG(INFO) << "Server at: " << path;
g_pika_cmd_table_manager = std::make_unique<PikaCmdTableManager>();
Expand Down
Loading
Loading