Skip to content

Commit

Permalink
Performance Improvements (#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgqss authored May 19, 2021
1 parent 43998f9 commit 08c4430
Show file tree
Hide file tree
Showing 52 changed files with 1,017 additions and 697 deletions.
67 changes: 56 additions & 11 deletions include/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <base/time.h>
#include <base/third_party/murmurhash3/murmurhash3.h>
#include <base/containers/doubly_buffered_data.h>
#include <base/containers/flat_map.h>
#include <base/endpoint.h>
#include <base/base64.h>
#include <webfoot_naming.h>
Expand All @@ -41,6 +42,7 @@
#include <butil/time.h>
#include <butil/third_party/murmurhash3/murmurhash3.h>
#include <butil/containers/doubly_buffered_data.h>
#include <butil/containers/flat_map.h>
#include <butil/endpoint.h>
#include <butil/base64.h>
#include <butil/fast_rand.h>
Expand Down Expand Up @@ -503,7 +505,7 @@ class ThreadSafeMap {
}
}
}
void traverse_with_key_value(const std::function<void(const KEY key, VALUE& value)>& call) {
void traverse_with_key_value(const std::function<void(const KEY& key, VALUE& value)>& call) {
for (uint32_t i = 0; i < MAP_COUNT; i++) {
BAIDU_SCOPED_LOCK(_mutex[i]);
for (auto& pair : _map[i]) {
Expand Down Expand Up @@ -582,9 +584,14 @@ class ThreadSafeMap {
DISALLOW_COPY_AND_ASSIGN(ThreadSafeMap);
};

template <typename T>
// 通常使用butil::DoublyBufferedData
// 只在几个自己控制gc和写很少,需要很高性能时候使用这个
template <typename T, int64_t SLEEP = 1000>
class DoubleBuffer {
public:
DoubleBuffer() {
bthread::execution_queue_start(&_queue_id, nullptr, run_function, (void*)this);
}
T* read() {
return _data + _index;
}
Expand All @@ -594,9 +601,31 @@ class DoubleBuffer {
void swap() {
_index = ! _index;
}
void modify(const std::function<void(T&)>& fn) {
bthread::execution_queue_execute(_queue_id, fn);
}
private:
ExecutionQueue _queue;
T _data[2];
int _index = 0;
static int run_function(void* meta, bthread::TaskIterator<std::function<void(T&)>>& iter) {
if (iter.is_queue_stopped()) {
return 0;
}
DoubleBuffer* db = (DoubleBuffer*)meta;
std::vector<std::function<void(T&)>> vec;
for (; iter; ++iter) {
(*iter)(*db->read_background());
vec.emplace_back(*iter);
}
db->swap();
bthread_usleep(SLEEP);
for (auto& c : vec) {
c(*db->read_background());
}
return 0;
}
bthread::ExecutionQueueId<std::function<void(T&)>> _queue_id = {0};
};

DECLARE_int64(incremental_info_gc_time);
Expand Down Expand Up @@ -823,10 +852,22 @@ struct RocksdbVars {
return &_instance;
}

bvar::LatencyRecorder rocksdb_put_time_cost;
bvar::LatencyRecorder rocksdb_get_time_cost;
bvar::LatencyRecorder rocksdb_scan_time_cost;
bvar::LatencyRecorder rocksdb_seek_time_cost;
bvar::IntRecorder rocksdb_put_time;
bvar::Adder<int64_t> rocksdb_put_count;
bvar::Window<bvar::IntRecorder> rocksdb_put_time_cost_latency;
bvar::PerSecond<bvar::Adder<int64_t> > rocksdb_put_time_cost_qps;
bvar::IntRecorder rocksdb_get_time;
bvar::Adder<int64_t> rocksdb_get_count;
bvar::Window<bvar::IntRecorder> rocksdb_get_time_cost_latency;
bvar::PerSecond<bvar::Adder<int64_t> > rocksdb_get_time_cost_qps;
bvar::IntRecorder rocksdb_scan_time;
bvar::Adder<int64_t> rocksdb_scan_count;
bvar::Window<bvar::IntRecorder> rocksdb_scan_time_cost_latency;
bvar::PerSecond<bvar::Adder<int64_t> > rocksdb_scan_time_cost_qps;
bvar::IntRecorder rocksdb_seek_time;
bvar::Adder<int64_t> rocksdb_seek_count;
bvar::Window<bvar::IntRecorder> rocksdb_seek_time_cost_latency;
bvar::PerSecond<bvar::Adder<int64_t> > rocksdb_seek_time_cost_qps;
bvar::LatencyRecorder qos_fetch_tokens_wait_time_cost;
bvar::Adder<int64_t> qos_fetch_tokens_wait_count;
bvar::Adder<int64_t> qos_fetch_tokens_count;
Expand All @@ -835,13 +876,17 @@ struct RocksdbVars {
bvar::PerSecond<bvar::Adder<int64_t> > qos_token_waste_qps;
// 统计未提交的binlog最大时间
bvar::Maxer<int64_t> binlog_not_commit_max_cost;
bvar::Window<bvar::Maxer<int64_t> > binlog_not_commit_max_cost_minute;
bvar::Window<bvar::Maxer<int64_t>> binlog_not_commit_max_cost_minute;

private:
RocksdbVars(): rocksdb_put_time_cost("rocksdb_put_time_cost"),
rocksdb_get_time_cost("rocksdb_get_time_cost"),
rocksdb_scan_time_cost("rocksdb_scan_time_cost"),
rocksdb_seek_time_cost("rocksdb_seek_time_cost"),
RocksdbVars(): rocksdb_put_time_cost_latency("rocksdb_put_time_cost_latency", &rocksdb_put_time, -1),
rocksdb_put_time_cost_qps("rocksdb_put_time_cost_qps", &rocksdb_put_count),
rocksdb_get_time_cost_latency("rocksdb_get_time_cost_latency", &rocksdb_get_time, -1),
rocksdb_get_time_cost_qps("rocksdb_get_time_cost_qps", &rocksdb_get_count),
rocksdb_scan_time_cost_latency("rocksdb_scan_time_cost_latency", &rocksdb_scan_time, -1),
rocksdb_scan_time_cost_qps("rocksdb_scan_time_cost_qps", &rocksdb_scan_count),
rocksdb_seek_time_cost_latency("rocksdb_seek_time_cost_latency", &rocksdb_seek_time, -1),
rocksdb_seek_time_cost_qps("rocksdb_seek_time_cost_qps", &rocksdb_seek_count),
qos_fetch_tokens_wait_time_cost("qos_fetch_tokens_wait_time_cost"),
qos_fetch_tokens_wait_count("qos_fetch_tokens_wait_count"),
qos_fetch_tokens_qps("qos_fetch_tokens_qps", &qos_fetch_tokens_count),
Expand Down
35 changes: 22 additions & 13 deletions include/common/schema_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,14 +382,16 @@ struct SchemaMapping {
using DoubleBufferedTable = butil::DoublyBufferedData<SchemaMapping>;

struct InstanceDBStatus {
InstanceDBStatus() : InstanceDBStatus(pb::NORMAL, "") {}
InstanceDBStatus(pb::Status status, const std::string& logical_room) :
status(status), logical_room(logical_room) {}
// NORMAL 正常
// FAULTY 故障
// DEAD hang住假死,需要做rpc cancel
pb::Status status;
pb::Status status = pb::NORMAL;
// 只cancel一次,cancel操作后设置false
bool need_cancel = true;
std::string logical_room;
// 正常探测CHECK_COUNT次后才置NORMAL
int64_t normal_count = 0;
static const int64_t CHECK_COUNT = 5;
};
struct IdcMapping {
// store => logical_room
Expand Down Expand Up @@ -436,13 +438,10 @@ typedef ::google::protobuf::RepeatedPtrField<pb::Statistics> StatisticsVec;
void update_table(const pb::SchemaInfo& table);
//void update_table(DoubleBufferedTable& double_buffered_table, const pb::SchemaInfo& table);

static int update_tables_double_buffer(
void* meta, bthread::TaskIterator<pb::SchemaInfo>& iter);
void update_tables_double_buffer(bthread::TaskIterator<pb::SchemaInfo>& iter);

// _sync系统初始化的时候调用,防止meta信息获取延迟导致系统不可用
void update_tables_double_buffer_sync(const SchemaVec& tables);

void update_instance_canceled(const std::string& addr);
void update_instance(const std::string& addr, pb::Status s);
int update_instance_internal(IdcMapping& idc_mapping, const std::string& addr, pb::Status s);
void update_idc(const pb::IdcInfo& idc_info);
Expand Down Expand Up @@ -521,6 +520,14 @@ typedef ::google::protobuf::RepeatedPtrField<pb::Statistics> StatisticsVec;

IndexInfo get_index_info(int64_t indexid);
SmartIndex get_index_info_ptr(int64_t indexid);
// split使用的index_info,只加不删
IndexInfo* get_split_index_info(int64_t indexid) {
auto iter = _split_index_map.read()->seek(indexid);
if (iter != nullptr) {
return *iter;
}
return nullptr;
}

std::string get_index_name(int64_t index_id);

Expand All @@ -545,7 +552,6 @@ typedef ::google::protobuf::RepeatedPtrField<pb::Statistics> StatisticsVec;
int64_t& index_id);

// functions for region info access
int get_region_info(int64_t region_id, pb::RegionInfo& info);
int get_region_info(int64_t table_id, int64_t region_id, pb::RegionInfo& info);

int get_region_capacity(int64_t global_index_id, int64_t& region_capacity);
Expand Down Expand Up @@ -624,7 +630,7 @@ typedef ::google::protobuf::RepeatedPtrField<pb::Statistics> StatisticsVec;
} else {
DB_WARNING("read double_buffer_idc error.");
}
return {pb::NORMAL, ""};
return InstanceDBStatus();
}
int get_all_instance_status(std::unordered_map<std::string, InstanceDBStatus>* info_map) {
DoubleBufferedIdc::ScopedPtr idc_ptr;
Expand Down Expand Up @@ -849,6 +855,8 @@ typedef ::google::protobuf::RepeatedPtrField<pb::Statistics> StatisticsVec;
return _virtual_index_info.reset();
}
int is_unique_field_ids(int64_t table_id, const std::set<int32_t>& field_ids);

int fill_default_value(SmartRecord record, FieldInfo& field);
private:
SchemaFactory() {
_is_inited = false;
Expand All @@ -869,7 +877,7 @@ typedef ::google::protobuf::RepeatedPtrField<pb::Statistics> StatisticsVec;
pb::SchemaConf& mem_conf);
// 全量更新
void update_index(TableInfo& info, const pb::IndexInfo& index,
const pb::IndexInfo* pk_indexi, SchemaMapping& background);
const pb::IndexInfo* pk_index, SchemaMapping& background);
//delete table和index
void delete_table(const pb::SchemaInfo& table, SchemaMapping& background);

Expand All @@ -885,15 +893,16 @@ typedef ::google::protobuf::RepeatedPtrField<pb::Statistics> StatisticsVec;
std::unordered_map<std::string, std::shared_ptr<UserInfo>> _user_info_mapping;

DoubleBufferedTable _double_buffer_table;
bthread::ExecutionQueueId<pb::SchemaInfo> _table_queue_id = {0};
// index_id => IndexInfo*
// 提供给SplitCompactionFilter使用,使用普通双buf,split减少开销
DoubleBuffer<butil::FlatMap<int64_t, IndexInfo*>> _split_index_map;

DoubleBufferedIdc _double_buffer_idc;

DoubleBufferedTableRegionInfo _table_region_mapping;
bthread::ExecutionQueueId<RegionVec> _region_queue_id = {0};

DoubleBufferStringSet _double_buffer_big_sql;
bthread::ExecutionQueueId<std::string> _big_sql_queue_id = {0};

std::string _physical_room;
std::string _logical_room;
Expand Down
Loading

0 comments on commit 08c4430

Please sign in to comment.