Skip to content

Commit

Permalink
Abstract out the common thread functionality into Rdb_thread class
Browse files Browse the repository at this point in the history
Summary:
Introduce Rdb_thread base class and make background and
drop index class inherit from it.

Reviewed By: jkedgar

Differential Revision: https://reviews.facebook.net/D58725

fbshipit-source-id: b5cf7a1b12f
  • Loading branch information
Vasile Paraschiv authored and facebook-github-bot committed Dec 23, 2019
1 parent c9dff5c commit 73415f0
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 138 deletions.
1 change: 1 addition & 0 deletions storage/rocksdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ SET(ROCKSDB_SOURCES
rdb_perf_context.cc rdb_perf_context.h
rdb_mutex_wrapper.cc rdb_mutex_wrapper.h
rdb_utils.cc rdb_utils.h rdb_buff.h
rdb_threads.cc rdb_threads.h
${ROCKSDB_LIB_SOURCES}
)

Expand Down
199 changes: 61 additions & 138 deletions storage/rocksdb/ha_rocksdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "./rdb_datadic.h"
#include "./rdb_i_s.h"
#include "./rdb_mutex_wrapper.h"
#include "./rdb_threads.h"

#ifdef TARGET_OS_LINUX
extern my_bool cachedev_enabled;
Expand Down Expand Up @@ -140,48 +141,10 @@ Rdb_binlog_manager binlog_manager;

/**
MyRocks background thread control
N.B. This is on top of RocksDB's own background threads
N.B. This is besides RocksDB's own background threads
(@see rocksdb::CancelAllBackgroundWork())
*/
namespace // anonymous namespace = not visible outside this source file
{
struct Rdb_background_thread
{
struct st_control
{
bool m_stop= false;
bool m_save_stats= false;

void reset()
{
*this= st_control();
}
};

mysql_mutex_t m_mutex;
mysql_mutex_t m_stop_mutex;
mysql_cond_t m_stop_cond;
st_control m_control;
pthread_t m_handle;

void stop_signal()
{
mysql_mutex_lock(&m_stop_mutex);
m_control.m_stop= true;
mysql_cond_signal(&m_stop_cond);
mysql_mutex_unlock(&m_stop_mutex);
}

void request_save_stats()
{
mysql_mutex_lock(&m_stop_mutex);
m_control.m_save_stats= true;
mysql_mutex_unlock(&m_stop_mutex);
}

static void* thread_func(void* not_used_arg MY_ATTRIBUTE((__unused__)));
};
} // anonymous namespace
static Rdb_background_thread rdb_bg_thread;


Expand Down Expand Up @@ -1225,7 +1188,7 @@ PSI_stage_info stage_waiting_on_row_lock= { 0, "Waiting for row lock", 0};

#ifdef HAVE_PSI_INTERFACE
static PSI_thread_key rdb_background_psi_thread_key;
static PSI_thread_key rdb_drop_index_psi_thread_key;
static PSI_thread_key rdb_drop_idx_psi_thread_key;

static PSI_stage_info *all_rocksdb_stages[]=
{
Expand All @@ -1234,21 +1197,16 @@ static PSI_stage_info *all_rocksdb_stages[]=


static my_core::PSI_mutex_key rdb_psi_open_tbls_mutex_key,
rdb_background_psi_mutex_key, rdb_stop_bg_psi_mutex_key,
rdb_drop_index_psi_mutex_key, rdb_interrupt_drop_index_psi_mutex_key,
rdb_signal_bg_psi_mutex_key, rdb_signal_drop_idx_psi_mutex_key,
rdb_collation_data_mutex_key,
key_mutex_tx_list, rdb_sysvars_psi_mutex_key;

static PSI_mutex_info all_rocksdb_mutexes[]=
{
{ &rdb_psi_open_tbls_mutex_key, "rocksdb open tables", PSI_FLAG_GLOBAL},
{ &rdb_background_psi_mutex_key, "background", PSI_FLAG_GLOBAL},
{ &rdb_stop_bg_psi_mutex_key, "stop background", PSI_FLAG_GLOBAL},
{ &rdb_drop_index_psi_mutex_key, "drop index", PSI_FLAG_GLOBAL},
{ &rdb_interrupt_drop_index_psi_mutex_key, "drop index interrupt",
PSI_FLAG_GLOBAL},
{ &rdb_collation_data_mutex_key, "collation data init",
PSI_FLAG_GLOBAL},
{ &rdb_psi_open_tbls_mutex_key, "open tables", PSI_FLAG_GLOBAL},
{ &rdb_signal_bg_psi_mutex_key, "stop background", PSI_FLAG_GLOBAL},
{ &rdb_signal_drop_idx_psi_mutex_key, "signal drop index", PSI_FLAG_GLOBAL},
{ &rdb_collation_data_mutex_key, "collation data init", PSI_FLAG_GLOBAL},
{ &key_mutex_tx_list, "tx_list", PSI_FLAG_GLOBAL},
{ &rdb_sysvars_psi_mutex_key, "setting sysvar", PSI_FLAG_GLOBAL},
};
Expand All @@ -1261,19 +1219,19 @@ static PSI_rwlock_info all_rocksdb_rwlocks[]=
PSI_FLAG_GLOBAL},
};

PSI_cond_key rdb_stop_bg_psi_cond_key, rdb_drop_index_interrupt_psi_cond_key;
PSI_cond_key rdb_signal_bg_psi_cond_key, rdb_signal_drop_idx_psi_cond_key;

static PSI_cond_info all_rocksdb_conds[]=
{
{ &rdb_stop_bg_psi_cond_key, "cond_stop_background", PSI_FLAG_GLOBAL},
{ &rdb_drop_index_interrupt_psi_cond_key, "cond_stop_drop_index",
{ &rdb_signal_bg_psi_cond_key, "cond signal background", PSI_FLAG_GLOBAL},
{ &rdb_signal_drop_idx_psi_cond_key, "cond signal drop index",
PSI_FLAG_GLOBAL},
};

static PSI_thread_info all_rocksdb_threads[]=
{
{ &rdb_background_psi_thread_key, "background", PSI_FLAG_GLOBAL},
{ &rdb_drop_index_psi_thread_key, "drop index", PSI_FLAG_GLOBAL},
{ &rdb_drop_idx_psi_thread_key, "drop index", PSI_FLAG_GLOBAL},
};

static void init_rocksdb_psi_keys()
Expand Down Expand Up @@ -1304,24 +1262,9 @@ static void init_rocksdb_psi_keys()


/*
Drop index thread control
Drop index thread's control
*/

namespace // anonymous namespace = not visible outside this source file
{
struct Rdb_drop_index_thread
{
mysql_mutex_t m_mutex;
mysql_mutex_t m_interrupt_mutex;
mysql_cond_t m_interrupt_cond;
bool m_stop;
pthread_t m_handle;

void signal(bool stop_thread= false);

static void* thread_func(void* not_used_arg MY_ATTRIBUTE((__unused__)));
};
} // anonymous namespace
static Rdb_drop_index_thread rdb_drop_idx_thread;

static void rocksdb_drop_index_wakeup_thread(
Expand Down Expand Up @@ -2730,18 +2673,21 @@ static int rocksdb_init_func(void *p)
rocksdb_hton= (handlerton *)p;
mysql_mutex_init(rdb_psi_open_tbls_mutex_key, &rdb_open_tables.m_mutex,
MY_MUTEX_INIT_FAST);
mysql_mutex_init(rdb_background_psi_mutex_key, &rdb_bg_thread.m_mutex,
MY_MUTEX_INIT_FAST);
mysql_mutex_init(rdb_stop_bg_psi_mutex_key, &rdb_bg_thread.m_stop_mutex,
MY_MUTEX_INIT_FAST);
#ifdef HAVE_PSI_INTERFACE
rdb_bg_thread.init(rdb_signal_bg_psi_mutex_key,
rdb_signal_bg_psi_cond_key);
rdb_drop_idx_thread.init(rdb_signal_drop_idx_psi_mutex_key,
rdb_signal_drop_idx_psi_cond_key);
#else
rdb_bg_thread.init();
rdb_drop_idx_thread.init();
#endif
mysql_mutex_init(rdb_collation_data_mutex_key, &rdb_collation_data_mutex,
MY_MUTEX_INIT_FAST);
mysql_rwlock_init(key_rwlock_collation_exception_list,
&collation_exception_list_rwlock);
mysql_mutex_init(rdb_sysvars_psi_mutex_key, &rdb_sysvars_mutex,
MY_MUTEX_INIT_FAST);
mysql_cond_init(rdb_stop_bg_psi_cond_key, &rdb_bg_thread.m_stop_cond,
nullptr);
rdb_open_tables.init_hash();
Rdb_transaction::init_mutex();

Expand All @@ -2768,15 +2714,6 @@ static int rocksdb_init_func(void *p)
HTON_SUPPORTS_EXTENDED_KEYS |
HTON_CAN_RECREATE;

mysql_mutex_init(rdb_drop_index_psi_mutex_key, &rdb_drop_idx_thread.m_mutex,
MY_MUTEX_INIT_FAST);
mysql_mutex_init(rdb_interrupt_drop_index_psi_mutex_key,
&rdb_drop_idx_thread.m_interrupt_mutex,
MY_MUTEX_INIT_FAST);
mysql_cond_init(rdb_drop_index_interrupt_psi_cond_key,
&rdb_drop_idx_thread.m_interrupt_cond,
nullptr);

DBUG_ASSERT(!mysqld_embedded);

rocksdb_stats= rocksdb::CreateDBStatistics();
Expand Down Expand Up @@ -3031,10 +2968,10 @@ static int rocksdb_init_func(void *p)
DBUG_RETURN(1);
}

auto err= mysql_thread_create(
rdb_background_psi_thread_key, &rdb_bg_thread.m_handle,
nullptr,
Rdb_background_thread::thread_func, nullptr
auto err= rdb_bg_thread.create_thread(
#ifdef HAVE_PSI_INTERFACE
rdb_background_psi_thread_key
#endif
);
if (err != 0) {
sql_print_error("RocksDB: Couldn't start the background thread: (errno=%d)",
Expand All @@ -3043,11 +2980,10 @@ static int rocksdb_init_func(void *p)
DBUG_RETURN(1);
}

rdb_drop_idx_thread.m_stop= false;
err= mysql_thread_create(
rdb_drop_index_psi_thread_key, &rdb_drop_idx_thread.m_handle,
nullptr,
Rdb_drop_index_thread::thread_func, nullptr
err= rdb_drop_idx_thread.create_thread(
#ifdef HAVE_PSI_INTERFACE
rdb_drop_idx_psi_thread_key
#endif
);
if (err != 0) {
sql_print_error("RocksDB: Couldn't start the drop index thread: (errno=%d)",
Expand Down Expand Up @@ -3089,10 +3025,10 @@ static int rocksdb_done_func(void *p)
// memtable, but since the memtables were just flushed, it should not trigger
// a flush that can stall due to background threads being stopped. As long
// as these keys are stored in a WAL file, they can be retrieved on restart.
rdb_bg_thread.stop_signal();
rdb_bg_thread.signal(true);

// Wait for the background thread to finish.
auto err= pthread_join(rdb_bg_thread.m_handle, nullptr);
auto err= rdb_bg_thread.join();
if (err != 0) {
// We'll log the message and continue because we're shutting down and
// continuation is the optimal strategy.
Expand All @@ -3102,7 +3038,7 @@ static int rocksdb_done_func(void *p)
}

// Wait for the drop index thread to finish.
err= pthread_join(rdb_drop_idx_thread.m_handle, nullptr);
err= rdb_drop_idx_thread.join();
if (err != 0) {
// NO_LINT_DEBUG
sql_print_error("RocksDB: Couldn't stop the index thread: (errno=%d)",
Expand Down Expand Up @@ -7868,29 +7804,21 @@ rocksdb::Range ha_rocksdb::get_range(
}


void Rdb_drop_index_thread::signal(bool stop_thread)
{
mysql_mutex_lock(&m_interrupt_mutex);
if (stop_thread) {
m_stop= true;
}
mysql_cond_signal(&m_interrupt_cond);
mysql_mutex_unlock(&m_interrupt_mutex);
}
/*
Drop index thread's main logic
*/

void* Rdb_drop_index_thread::thread_func(
void* not_used_arg MY_ATTRIBUTE((__unused__)))
void Rdb_drop_index_thread::run()
{
mysql_mutex_lock(&rdb_drop_idx_thread.m_mutex);
mysql_mutex_lock(&rdb_drop_idx_thread.m_interrupt_mutex);
mysql_mutex_lock(&m_signal_mutex);

for (;;) {
// "stop_drop_index_thread = true" might be set by shutdown command
// after drop_index_thread releases drop_index_interrupt_mutex
// The stop flag might be set by shutdown command
// after drop_index_thread releases signal_mutex
// (i.e. while executing expensive Seek()). To prevent drop_index_thread
// from entering long cond_timedwait, checking if stop_drop_index_thread
// from entering long cond_timedwait, checking if stop flag
// is true or not is needed, with drop_index_interrupt_mutex held.
if (rdb_drop_idx_thread.m_stop) {
if (m_stop) {
break;
}

Expand All @@ -7901,14 +7829,13 @@ void* Rdb_drop_index_thread::thread_func(
: 60; // filtering

auto ret MY_ATTRIBUTE((__unused__)) = mysql_cond_timedwait(
&rdb_drop_idx_thread.m_interrupt_cond,
&rdb_drop_idx_thread.m_interrupt_mutex, &ts);
if (rdb_drop_idx_thread.m_stop) {
&m_signal_cond, &m_signal_mutex, &ts);
if (m_stop) {
break;
}
// make sure, no program error is returned
DBUG_ASSERT(ret == 0 || ret == ETIMEDOUT);
mysql_mutex_unlock(&rdb_drop_idx_thread.m_interrupt_mutex);
mysql_mutex_unlock(&m_signal_mutex);

std::vector<GL_INDEX_ID> indices;
dict_manager.get_drop_indexes_ongoing(&indices);
Expand Down Expand Up @@ -7997,15 +7924,13 @@ void* Rdb_drop_index_thread::thread_func(
dict_manager.done_drop_indexes(finished);
}
}
mysql_mutex_lock(&rdb_drop_idx_thread.m_interrupt_mutex);
mysql_mutex_lock(&m_signal_mutex);
}

mysql_mutex_unlock(&rdb_drop_idx_thread.m_interrupt_mutex);
mysql_mutex_unlock(&rdb_drop_idx_thread.m_mutex);

return nullptr;
mysql_mutex_unlock(&m_signal_mutex);
}


Rdb_tbl_def* ha_rocksdb::get_table_if_exists(const char* tablename)
{
DBUG_ASSERT(tablename != nullptr);
Expand Down Expand Up @@ -9089,33 +9014,35 @@ static SHOW_VAR rocksdb_status_vars[]= {
};


void* Rdb_background_thread::thread_func(
void* not_used_arg MY_ATTRIBUTE((__unused__)))
{
mysql_mutex_lock(&rdb_bg_thread.m_mutex);
/*
Background thread's main logic
*/

void Rdb_background_thread::run()
{
timespec ts_next_sync;
clock_gettime(CLOCK_REALTIME, &ts_next_sync);
ts_next_sync.tv_sec++;

for (;;)
{
// wait for 1 second or until we received a condition to stop the thread
mysql_mutex_lock(&rdb_bg_thread.m_stop_mutex);
mysql_mutex_lock(&m_signal_mutex);
auto ret MY_ATTRIBUTE((__unused__)) = mysql_cond_timedwait(
&rdb_bg_thread.m_stop_cond, &rdb_bg_thread.m_stop_mutex, &ts_next_sync);
&m_signal_cond, &m_signal_mutex, &ts_next_sync);
// make sure that no program error is returned
DBUG_ASSERT(ret == 0 || ret == ETIMEDOUT);
auto local_bg_control= rdb_bg_thread.m_control;
rdb_bg_thread.m_control.reset();
mysql_mutex_unlock(&rdb_bg_thread.m_stop_mutex);
bool local_stop= m_stop;
bool local_save_stats= m_save_stats;
reset();
mysql_mutex_unlock(&m_signal_mutex);

if (local_bg_control.m_stop)
if (local_stop)
{
break;
}

if (local_bg_control.m_save_stats)
if (local_save_stats)
{
ddl_manager.persist_stats();
}
Expand All @@ -9139,10 +9066,6 @@ void* Rdb_background_thread::thread_func(

// save remaining stats which might've left unsaved
ddl_manager.persist_stats();

mysql_mutex_unlock(&rdb_bg_thread.m_mutex);

return nullptr;
}


Expand Down
Loading

0 comments on commit 73415f0

Please sign in to comment.