From e04f9f3c633191bddb0b77594a56930218dba50a Mon Sep 17 00:00:00 2001 From: lihongjian Date: Tue, 7 Mar 2023 20:01:50 +0800 Subject: [PATCH] feat(tianmu):delta store,Modify the handler layer and modify the base class to support the delta layer --- scripts/stonedb_build.sh | 2 +- storage/tianmu/core/engine.cpp | 4 +- storage/tianmu/handler/ha_my_tianmu.cpp | 4 +- storage/tianmu/handler/ha_my_tianmu.h | 4 +- storage/tianmu/handler/ha_tianmu.cpp | 286 ++++++-------------- storage/tianmu/handler/ha_tianmu.h | 8 +- storage/tianmu/index/rdb_meta_manager.cpp | 62 +++-- storage/tianmu/index/rdb_meta_manager.h | 20 +- storage/tianmu/index/tianmu_table_index.cpp | 7 +- storage/tianmu/index/tianmu_table_index.h | 2 +- storage/tianmu/loader/value_cache.h | 20 ++ storage/tianmu/system/configuration.cpp | 3 + storage/tianmu/system/configuration.h | 7 + storage/tianmu/types/bstring.cpp | 3 + storage/tianmu/util/bitset.h | 19 +- 15 files changed, 194 insertions(+), 257 deletions(-) diff --git a/scripts/stonedb_build.sh b/scripts/stonedb_build.sh index 369047000..1ef4b6da3 100755 --- a/scripts/stonedb_build.sh +++ b/scripts/stonedb_build.sh @@ -86,4 +86,4 @@ cmake ../../ \ # step 5. make & make install make VERBOSE=1 -j`nproc` 2>&1 | tee -a ${build_log} make install 2>&1 | tee -a ${build_log} -echo "current dir is `pwd`" 2>&1 | tee -a ${build_log} +echo "current dir is `pwd`" 2>&1 | tee -a ${build_log} diff --git a/storage/tianmu/core/engine.cpp b/storage/tianmu/core/engine.cpp index bcaa52ec3..de94df245 100644 --- a/storage/tianmu/core/engine.cpp +++ b/storage/tianmu/core/engine.cpp @@ -919,10 +919,10 @@ AttributeTypeInfo Engine::GetAttrTypeInfo(const Field &field) { } void Engine::CommitTx(THD *thd, bool all) { - if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) { + if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT)) { GetTx(thd)->Commit(thd); - ClearTx(thd); } + ClearTx(thd); } void Engine::Rollback(THD *thd, bool all, bool force_error_message) { diff --git a/storage/tianmu/handler/ha_my_tianmu.cpp b/storage/tianmu/handler/ha_my_tianmu.cpp index 6e38a492d..bc59b5380 100644 --- a/storage/tianmu/handler/ha_my_tianmu.cpp +++ b/storage/tianmu/handler/ha_my_tianmu.cpp @@ -23,7 +23,7 @@ #include "vc/virtual_column.h" namespace Tianmu { -namespace handler { +namespace DBHandler { enum class TianmuEngineReturnValues { kLoadSuccessed = 0, @@ -156,5 +156,5 @@ bool ha_my_tianmu_load(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void return false; } -} // namespace handler +} // namespace DBHandler } // namespace Tianmu diff --git a/storage/tianmu/handler/ha_my_tianmu.h b/storage/tianmu/handler/ha_my_tianmu.h index 24c1b4612..3ba01ecf4 100644 --- a/storage/tianmu/handler/ha_my_tianmu.h +++ b/storage/tianmu/handler/ha_my_tianmu.h @@ -20,7 +20,7 @@ // mysql <--> tianmu interface functions namespace Tianmu { -namespace handler { +namespace DBHandler { enum class QueryRouteTo { kToMySQL = 0, @@ -42,6 +42,6 @@ bool ha_my_tianmu_set_statement_allowed(THD *thd, LEX *lex); // processing the load operation. bool ha_my_tianmu_load(THD *thd, sql_exchange *ex, TABLE_LIST *table_list, void *arg); -} // namespace handler +} // namespace DBHandler } // namespace Tianmu #endif // TIANMU_HANDLER_HA_RCENGINE_H_ diff --git a/storage/tianmu/handler/ha_tianmu.cpp b/storage/tianmu/handler/ha_tianmu.cpp index bb0a74820..9ec329950 100644 --- a/storage/tianmu/handler/ha_tianmu.cpp +++ b/storage/tianmu/handler/ha_tianmu.cpp @@ -28,6 +28,7 @@ #include "common/exception.h" #include "core/compilation_tools.h" #include "core/compiled_query.h" +#include "core/delta_record_head.h" #include "core/temp_table.h" #include "core/tools.h" #include "core/transaction.h" @@ -47,11 +48,11 @@ struct st_mysql_sys_var { }; handler *tianmu_create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root) { - return new (mem_root) Tianmu::handler::ha_tianmu(hton, table); + return new (mem_root) Tianmu::DBHandler::ha_tianmu(hton, table); } namespace Tianmu { -namespace handler { +namespace DBHandler { const Alter_inplace_info::HA_ALTER_FLAGS ha_tianmu::TIANMU_SUPPORTED_ALTER_ADD_DROP_ORDER = Alter_inplace_info::ADD_COLUMN | Alter_inplace_info::DROP_COLUMN | Alter_inplace_info::ALTER_STORED_COLUMN_ORDER; @@ -80,101 +81,6 @@ my_bool rcbase_query_caching_of_table_permitted(THD *thd, [[maybe_unused]] char return ((my_bool)FALSE); } -static core::Value GetValueFromField(Field *f) { - core::Value v; - - if (f->is_null()) - return v; - - switch (f->type()) { - case MYSQL_TYPE_TINY: - case MYSQL_TYPE_SHORT: - case MYSQL_TYPE_LONG: - case MYSQL_TYPE_INT24: - case MYSQL_TYPE_LONGLONG: - case MYSQL_TYPE_BIT: - v.SetInt(f->val_int()); - break; - case MYSQL_TYPE_DECIMAL: - case MYSQL_TYPE_FLOAT: - case MYSQL_TYPE_DOUBLE: - v.SetDouble(f->val_real()); - break; - case MYSQL_TYPE_NEWDECIMAL: { - auto dec_f = dynamic_cast(f); - v.SetInt(std::lround(dec_f->val_real() * types::PowOfTen(dec_f->dec))); - break; - } - case MYSQL_TYPE_TIMESTAMP: { - MYSQL_TIME my_time; - std::memset(&my_time, 0, sizeof(my_time)); - f->get_time(&my_time); - // convert to UTC - if (!common::IsTimeStampZero(my_time)) { - my_bool myb; - my_time_t secs_utc = current_txn_->Thd()->variables.time_zone->TIME_to_gmt_sec(&my_time, &myb); - common::GMTSec2GMTTime(&my_time, secs_utc); - } - types::DT dt = {}; - dt.year = my_time.year; - dt.month = my_time.month; - dt.day = my_time.day; - dt.hour = my_time.hour; - dt.minute = my_time.minute; - dt.second = my_time.second; - v.SetInt(dt.val); - break; - } - case MYSQL_TYPE_TIME: - case MYSQL_TYPE_TIME2: - case MYSQL_TYPE_DATE: - case MYSQL_TYPE_DATETIME: - case MYSQL_TYPE_NEWDATE: - case MYSQL_TYPE_TIMESTAMP2: - case MYSQL_TYPE_DATETIME2: { - MYSQL_TIME my_time; - std::memset(&my_time, 0, sizeof(my_time)); - f->get_time(&my_time); - types::DT dt = {}; - dt.year = my_time.year; - dt.month = my_time.month; - dt.day = my_time.day; - dt.hour = my_time.hour; - dt.minute = my_time.minute; - dt.second = my_time.second; - v.SetInt(dt.val); - break; - } - case MYSQL_TYPE_YEAR: // what the hell? - { - types::DT dt = {}; - dt.year = f->val_int(); - v.SetInt(dt.val); - break; - } - case MYSQL_TYPE_VARCHAR: - case MYSQL_TYPE_TINY_BLOB: - case MYSQL_TYPE_MEDIUM_BLOB: - case MYSQL_TYPE_LONG_BLOB: - case MYSQL_TYPE_BLOB: - case MYSQL_TYPE_VAR_STRING: - case MYSQL_TYPE_STRING: { - String str; - f->val_str(&str); - v.SetString(const_cast(str.ptr()), str.length()); - break; - } - case MYSQL_TYPE_SET: - case MYSQL_TYPE_ENUM: - case MYSQL_TYPE_GEOMETRY: - case MYSQL_TYPE_NULL: - default: - throw common::Exception("unsupported mysql type " + std::to_string(f->type())); - break; - } - return v; -} - ha_tianmu::ha_tianmu(handlerton *hton, TABLE_SHARE *table_arg) : handler(hton, table_arg) { ref_length = sizeof(uint64_t); } @@ -286,10 +192,6 @@ int ha_tianmu::external_lock(THD *thd, int lock_type) { if (thd->lex->sql_command == SQLCOM_LOCK_TABLES) DBUG_RETURN(HA_ERR_WRONG_COMMAND); - if (is_delay_insert(thd) && table_share->tmp_table == NO_TMP_TABLE && lock_type == F_WRLCK) { - DBUG_RETURN(0); - } - try { if (lock_type == F_UNLCK) { if (thd->lex->sql_command == SQLCOM_UNLOCK_TABLES) @@ -467,21 +369,25 @@ int ha_tianmu::write_row([[maybe_unused]] uchar *buf) { } catch (common::OutOfMemoryException &e) { DBUG_RETURN(ER_LOCK_WAIT_TIMEOUT); } catch (common::DatabaseException &e) { - TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::InsertRow: %s.", e.what()); + TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::AddInsertRecord: %s.", e.what()); my_message(static_cast(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0)); } catch (common::FormatException &e) { - TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::InsertRow: %s Row: %ld, field %u.", e.what(), - e.m_row_no, e.m_field_no); + TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::AddInsertRecord: %s Row: %ld, field %u.", + e.what(), e.m_row_no, e.m_field_no); my_message(static_cast(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0)); } catch (common::FileException &e) { - TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::InsertRow: %s.", e.what()); + TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::AddInsertRecord: %s.", e.what()); my_message(static_cast(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0)); + } catch (common::DupKeyException &e) { + ret = HA_ERR_FOUND_DUPP_KEY; + TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::AddInsertRecord: %s.", e.what()); + my_message(static_cast(common::ErrorCode::DUPP_KEY), e.what(), MYF(0)); } catch (common::Exception &e) { - TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::InsertRow: %s.", e.what()); + TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::AddInsertRecord: %s.", e.what()); my_message(static_cast(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0)); } catch (std::exception &e) { my_message(static_cast(common::ErrorCode::UNKNOWN_ERROR), e.what(), MYF(0)); - TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::InsertRow: %s.", e.what()); + TIANMU_LOG(LogCtl_Level::ERROR, "An exception is caught in Engine::AddInsertRecord: %s.", e.what()); } catch (...) { my_message(static_cast(common::ErrorCode::UNKNOWN_ERROR), "An unknown system exception error caught.", MYF(0)); TIANMU_LOG(LogCtl_Level::ERROR, "An unknown system exception error caught."); @@ -513,38 +419,7 @@ int ha_tianmu::update_row(const uchar *old_data, uchar *new_data) { [org_bitmap, this](...) { dbug_tmp_restore_column_map(table->write_set, org_bitmap); }); try { - auto tab = current_txn_->GetTableByPath(table_name_); - utils::result_set res; - for (uint i = 0; i < table->s->fields; i++) { - if (!bitmap_is_set(table->write_set, i)) { - continue; - } - auto field = table->field[i]; - if (field->real_maybe_null()) { - if (field->is_null_in_record(old_data) && field->is_null_in_record(new_data)) { - continue; - } - - if (field->is_null_in_record(new_data)) { - core::Value null; - res.insert(ha_tianmu_engine_->delete_or_update_thread_pool.add_task( - &core::TianmuTable::UpdateItem, tab.get(), current_position_, i, null, current_txn_)); - continue; - } - } - auto o_ptr = field->ptr - table->record[0] + old_data; - auto n_ptr = field->ptr - table->record[0] + new_data; - if (field->is_null_in_record(old_data) || std::memcmp(o_ptr, n_ptr, field->pack_length()) != 0) { - my_bitmap_map *org_bitmap2 = dbug_tmp_use_all_columns(table, table->read_set); - std::shared_ptr defer( - nullptr, [org_bitmap2, this](...) { dbug_tmp_restore_column_map(table->read_set, org_bitmap2); }); - core::Value v = GetValueFromField(field); - res.insert(ha_tianmu_engine_->delete_or_update_thread_pool.add_task(&core::TianmuTable::UpdateItem, tab.get(), - current_position_, i, v, current_txn_)); - } - } - - res.get_all_with_except(); + ha_tianmu_engine_->UpdateRow(table_name_, table, share_, current_position_, old_data, new_data); ha_tianmu_engine_->IncTianmuStatUpdate(); DBUG_RETURN(0); } catch (common::DatabaseException &e) { @@ -586,14 +461,7 @@ int ha_tianmu::delete_row([[maybe_unused]] const uchar *buf) { [org_bitmap, this](...) { dbug_tmp_restore_column_map(table->write_set, org_bitmap); }); try { - auto tab = current_txn_->GetTableByPath(table_name_); - utils::result_set res; - for (uint i = 0; i < table->s->fields; i++) { - res.insert(ha_tianmu_engine_->delete_or_update_thread_pool.add_task(&core::TianmuTable::DeleteItem, tab.get(), - current_position_, i, current_txn_)); - } - res.get_all_with_except(); - + ha_tianmu_engine_->DeleteRow(table_name_, table, share_, current_position_); DBUG_RETURN(0); } catch (common::DatabaseException &e) { TIANMU_LOG(LogCtl_Level::ERROR, "Delete exception: %s.", e.what()); @@ -794,7 +662,7 @@ int ha_tianmu::open(const char *name, [[maybe_unused]] int mode, [[maybe_unused] // have primary key, use table index if (table->s->primary_key != MAX_INDEXES) ha_tianmu_engine_->AddTableIndex(name, table, ha_thd()); - ha_tianmu_engine_->AddMemTable(table, share_); + ha_tianmu_engine_->AddTableDelta(table, share_); ret = 0; } catch (common::Exception &e) { my_message(static_cast(common::ErrorCode::UNKNOWN_ERROR), "Error from Tianmu engine", MYF(0)); @@ -1069,8 +937,6 @@ int ha_tianmu::rnd_init(bool scan) { filter_ptr_.reset(new core::Filter(*filter)); table_ptr_ = push_down_result->GetTableP(0); - table_new_iter_ = ((core::TianmuTable *)table_ptr_)->Begin(GetAttrsUseIndicator(table), *filter); - table_new_iter_end_ = ((core::TianmuTable *)table_ptr_)->End(); } catch (common::Exception const &e) { tianmu_control_ << system::lock << "Error in push-down execution, push-down execution aborted: " << e.what() << system::unlock; @@ -1080,14 +946,13 @@ int ha_tianmu::rnd_init(bool scan) { cq_.reset(); } else { if (scan && filter_ptr_.get()) { - table_new_iter_ = ((core::TianmuTable *)table_ptr_)->Begin(GetAttrsUseIndicator(table), *filter_ptr_); - table_new_iter_end_ = ((core::TianmuTable *)table_ptr_)->End(); + iterator_ = std::make_unique((core::TianmuTable *)table_ptr_, + GetAttrsUseIndicator(table), *filter_ptr_); } else { - std::shared_ptr rctp; - ha_tianmu_engine_->GetTableIterator(table_name_, table_new_iter_, table_new_iter_end_, rctp, - GetAttrsUseIndicator(table), table->in_use); + std::shared_ptr rctp = ha_tianmu_engine_->GetTx(table->in_use)->GetTableByPath(table_name_); table_ptr_ = rctp.get(); filter_ptr_.reset(); + iterator_ = std::make_unique(rctp.get(), GetAttrsUseIndicator(table)); } } ret = 0; @@ -1192,17 +1057,14 @@ int ha_tianmu::rnd_pos(uchar *buf, uchar *pos) { ret = index_read(buf, pos, ref_length, HA_READ_KEY_EXACT); } else { uint64_t position = my_get_ptr(pos, ref_length); - filter_ptr_ = std::make_unique(position + 1, share_->PackSizeShift()); filter_ptr_->Reset(); filter_ptr_->Set(position); auto tab_ptr = ha_tianmu_engine_->GetTx(table->in_use)->GetTableByPath(table_name_); - table_new_iter_ = tab_ptr->Begin(GetAttrsUseIndicator(table), *filter_ptr_); - table_new_iter_end_ = tab_ptr->End(); + iterator_ = std::make_unique(tab_ptr.get(), GetAttrsUseIndicator(table), *filter_ptr_); table_ptr_ = tab_ptr.get(); - - table_new_iter_.MoveToRow(position); + iterator_->SeekTo(position); table->status = 0; blob_buffers_.resize(table->s->fields); if (fill_row(buf) == HA_ERR_END_OF_FILE) { @@ -1378,44 +1240,68 @@ uint ha_tianmu::max_supported_key_part_length([[maybe_unused]] HA_CREATE_INFO *c } int ha_tianmu::fill_row(uchar *buf) { - if (table_new_iter_ == table_new_iter_end_) + if (!iterator_ || !iterator_->Valid()) return HA_ERR_END_OF_FILE; my_bitmap_map *org_bitmap = dbug_tmp_use_all_columns(table, table->write_set); + std::shared_ptr defer(nullptr, + [org_bitmap, this](...) { dbug_tmp_restore_column_map(table->write_set, org_bitmap); }); - std::shared_ptr buffer; - + std::unique_ptr buffer; // we should pack the row into `buf` but seems it just use record[0] blindly. // So this is a workaround to handle the case that `buf` is not record[0]. if (buf != table->record[0]) { buffer.reset(new char[table->s->reclength]); std::memcpy(buffer.get(), table->record[0], table->s->reclength); } - - for (uint col_id = 0; col_id < table->s->fields; col_id++) { - // when ConvertToField() return true, to judge whether this line has been deleted. + if (iterator_->IsBase()) { + // judge whether this line has been deleted. // if this line has been deleted, data will not be copied. - if (core::Engine::ConvertToField(table->field[col_id], *(table_new_iter_.GetData(col_id)), - &blob_buffers_[col_id]) && - (table_new_iter_.GetAttrs().size() > col_id) && - table_new_iter_.GetAttrs()[col_id]->IsDelete(table_new_iter_.GetCurrentRowId())) { - current_position_ = table_new_iter_.GetCurrentRowId(); - table_new_iter_++; + if (iterator_->BaseCurrentRowIsInvalid()) { + current_position_ = iterator_->Position(); + iterator_->Next(); dbug_tmp_restore_column_map(table->write_set, org_bitmap); return HA_ERR_RECORD_DELETED; } + for (uint col_id = 0; col_id < table->s->fields; col_id++) { + core::Engine::ConvertToField(table->field[col_id], *(iterator_->GetBaseData(col_id)), &blob_buffers_[col_id]); + } + } else { + std::string delta_record = iterator_->GetDeltaData(); + if (!delta_record.empty()) { + switch (core::DeltaRecordHead::GetRecordType(delta_record.data())) { + case core::RecordType::kInsert: + if (!core::Engine::DecodeInsertRecordToField(delta_record.data(), table->field)) { + current_position_ = iterator_->Position(); + iterator_->Next(); + dbug_tmp_restore_column_map(table->write_set, org_bitmap); + return HA_ERR_RECORD_DELETED; + } + break; + case core::RecordType::kUpdate: + current_txn_->GetTableByPath(table_name_)->FillRowByRowid(table, iterator_->Position()); + core::Engine::DecodeUpdateRecordToField(delta_record.data(), table->field); + iterator_->InDeltaUpdateRow.insert( + std::unordered_map::value_type(iterator_->Position(), true)); + break; + case core::RecordType::kDelete: + current_position_ = iterator_->Position(); + iterator_->InDeltaDeletedRow.insert(std::unordered_map::value_type(current_position_, true)); + iterator_->Next(); + dbug_tmp_restore_column_map(table->write_set, org_bitmap); + return HA_ERR_RECORD_DELETED; + default: + break; + } + } } if (buf != table->record[0]) { std::memcpy(buf, table->record[0], table->s->reclength); std::memcpy(table->record[0], buffer.get(), table->s->reclength); } - - current_position_ = table_new_iter_.GetCurrentRowId(); - table_new_iter_++; - - dbug_tmp_restore_column_map(table->write_set, org_bitmap); - + current_position_ = iterator_->Position(); + iterator_->Next(); return 0; } @@ -1498,8 +1384,8 @@ int ha_tianmu::set_cond_iter() { filter_ptr_.reset(new core::Filter(*filter)); table_ptr_ = push_down_result->GetTableP(0); - table_new_iter_ = ((core::TianmuTable *)table_ptr_)->Begin(GetAttrsUseIndicator(table), *filter_ptr_); - table_new_iter_end_ = ((core::TianmuTable *)table_ptr_)->End(); + iterator_ = std::make_unique((core::TianmuTable *)table_ptr_, GetAttrsUseIndicator(table), + *filter_ptr_); blob_buffers_.resize(0); if (table_ptr_ != nullptr) blob_buffers_.resize(table_ptr_->NumOfDisplaybleAttrs()); @@ -1521,9 +1407,7 @@ const Item *ha_tianmu::cond_push(const Item *a_cond) { try { if (!query_) { - std::shared_ptr rctp; - ha_tianmu_engine_->GetTableIterator(table_name_, table_new_iter_, table_new_iter_end_, rctp, - GetAttrsUseIndicator(table), table->in_use); + std::shared_ptr rctp = ha_tianmu_engine_->GetTx(table->in_use)->GetTableByPath(table_name_); table_ptr_ = rctp.get(); query_.reset(new core::Query(current_txn_)); cq_.reset(new core::CompiledQuery); @@ -1575,7 +1459,7 @@ const Item *ha_tianmu::cond_push(const Item *a_cond) { tmp_cq->AddConds(tmp_table_, cond_id, core::CondType::WHERE_COND); tmp_cq->ApplyConds(tmp_table_); cq_.reset(tmp_cq.release()); - // reset table_new_iter_ with push condition + // reset iterator with push condition if (!set_cond_iter()) ret = 0; } catch (std::exception &e) { @@ -1593,8 +1477,7 @@ int ha_tianmu::reset() { int ret = 1; try { - table_new_iter_ = core::TianmuTable::Iterator(); - table_new_iter_end_ = core::TianmuTable::Iterator(); + iterator_.reset(); table_ptr_ = nullptr; filter_ptr_.reset(); query_.reset(); @@ -1722,8 +1605,8 @@ bool ha_tianmu::inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha fw.OpenReadWrite(fname); fw.ReadExact(&hdr, sizeof(hdr)); uint64_t autoinc_ = ha_alter_info->create_info->auto_increment_value; - if (autoinc_ > hdr.auto_inc_next) { // alter table auto_increment must be > current max autoinc - hdr.auto_inc_next = --autoinc_; + if (autoinc_ > hdr.auto_inc) { // alter table auto_increment must be > current max autoinc + hdr.auto_inc = --autoinc_; fw.WriteExact(&hdr, sizeof(hdr)); } fw.Flush(); @@ -2223,7 +2106,7 @@ int get_DelayedBufferUsage_StatusVar([[maybe_unused]] MYSQL_THD thd, SHOW_VAR *v int get_RowStoreUsage_StatusVar([[maybe_unused]] MYSQL_THD thd, SHOW_VAR *var, char *buff) { var->type = SHOW_CHAR; var->value = buff; - std::string str = ha_tianmu_engine_->RowStoreStat(); + std::string str = ha_tianmu_engine_->DeltaStoreStat(); std::memcpy(buff, str.c_str(), str.length() + 1); return 0; } @@ -2519,6 +2402,12 @@ static MYSQL_SYSVAR_UINT(insert_buffer_size, tianmu_sysvar_insert_buffer_size, P nullptr, 512, 512, 10000, 0); static MYSQL_SYSVAR_UINT(delete_or_update_threads, tianmu_sysvar_delete_or_update_threads, PLUGIN_VAR_READONLY, "-", nullptr, nullptr, 0, 0, 100, 0); +static MYSQL_SYSVAR_UINT(merge_rocks_expected_count, tianmu_sysvar_merge_rocks_expected_count, PLUGIN_VAR_READONLY, "-", + nullptr, nullptr, 65536, 0, 6553600, 0); +static MYSQL_SYSVAR_UINT(insert_write_batch_size, tianmu_sysvar_insert_write_batch_size, PLUGIN_VAR_READONLY, "-", + nullptr, nullptr, 10000, 0, 1000000, 0); +static MYSQL_SYSVAR_UINT(log_loop_interval, tianmu_sysvar_log_loop_interval, PLUGIN_VAR_READONLY, "-", nullptr, nullptr, + 60, 0, 6000, 0); static MYSQL_THDVAR_INT(session_debug_level, PLUGIN_VAR_INT, "session debug level", nullptr, debug_update, 3, 0, 5, 0); static MYSQL_THDVAR_INT(control_trace, PLUGIN_VAR_OPCMDARG, "ini controltrace", nullptr, trace_update, 0, 0, 100, 0); @@ -2661,6 +2550,9 @@ static struct st_mysql_sys_var *tianmu_showvars[] = {MYSQL_SYSVAR(bg_load_thread MYSQL_SYSVAR(control_trace), MYSQL_SYSVAR(data_distribution_policy), MYSQL_SYSVAR(delete_or_update_threads), + MYSQL_SYSVAR(merge_rocks_expected_count), + MYSQL_SYSVAR(insert_write_batch_size), + MYSQL_SYSVAR(log_loop_interval), MYSQL_SYSVAR(disk_usage_threshold), MYSQL_SYSVAR(distinct_cache_size), MYSQL_SYSVAR(filterevaluation_speedup), @@ -2717,21 +2609,21 @@ static struct st_mysql_sys_var *tianmu_showvars[] = {MYSQL_SYSVAR(bg_load_thread MYSQL_SYSVAR(start_async), MYSQL_SYSVAR(result_sender_rows), nullptr}; -} // namespace handler +} // namespace DBHandler } // namespace Tianmu mysql_declare_plugin(tianmu){ MYSQL_STORAGE_ENGINE_PLUGIN, - &Tianmu::handler::tianmu_storage_engine, + &Tianmu::DBHandler::tianmu_storage_engine, "TIANMU", "StoneAtom Group Holding Limited", "Tianmu storage engine", PLUGIN_LICENSE_GPL, - Tianmu::handler::tianmu_init_func, /* Plugin Init */ - Tianmu::handler::tianmu_done_func, /* Plugin Deinit */ + Tianmu::DBHandler::tianmu_init_func, /* Plugin Init */ + Tianmu::DBHandler::tianmu_done_func, /* Plugin Deinit */ 0x0001 /* 0.1 */, - Tianmu::handler::statusvars, /* status variables */ - Tianmu::handler::tianmu_showvars, /* system variables */ - nullptr, /* config options */ - 0 /* flags for plugin */ + Tianmu::DBHandler::statusvars, /* status variables */ + Tianmu::DBHandler::tianmu_showvars, /* system variables */ + nullptr, /* config options */ + 0 /* flags for plugin */ } mysql_declare_plugin_end; diff --git a/storage/tianmu/handler/ha_tianmu.h b/storage/tianmu/handler/ha_tianmu.h index f84f54c90..ccb0cfde6 100644 --- a/storage/tianmu/handler/ha_tianmu.h +++ b/storage/tianmu/handler/ha_tianmu.h @@ -22,7 +22,7 @@ #include "core/engine.h" namespace Tianmu { -namespace handler { +namespace DBHandler { // Class definition for the storage engine class ha_tianmu final : public handler { @@ -175,9 +175,7 @@ class ha_tianmu final : public handler { core::JustATable *table_ptr_ = nullptr; std::unique_ptr filter_ptr_; uint64_t current_position_ = 0; - - core::TianmuTable::Iterator table_new_iter_; - core::TianmuTable::Iterator table_new_iter_end_; + std::unique_ptr iterator_; std::unique_ptr query_; core::TabID tmp_table_; @@ -186,7 +184,7 @@ class ha_tianmu final : public handler { std::vector> blob_buffers_; }; -} // namespace handler +} // namespace DBHandler } // namespace Tianmu #endif // HA_TIANMU_H_ diff --git a/storage/tianmu/index/rdb_meta_manager.cpp b/storage/tianmu/index/rdb_meta_manager.cpp index 0ca006460..df1183800 100644 --- a/storage/tianmu/index/rdb_meta_manager.cpp +++ b/storage/tianmu/index/rdb_meta_manager.cpp @@ -29,8 +29,9 @@ #include "m_ctype.h" #include "my_bit.h" +#include "core/delta_table.h" #include "core/engine.h" -#include "core/tianmu_mem_table.h" +#include "core/merge_operator.h" #include "index/kv_store.h" #include "index/rdb_utils.h" @@ -498,16 +499,16 @@ bool DDLManager::init(DICTManager *const dict, CFManager *const cf_manager_) { } const uint32_t cf_id = be_read_uint32(&ptr); - const uint32_t memtable_id = be_read_uint32(&ptr); + const uint32_t delta_table_id = be_read_uint32(&ptr); std::string table_name = std::string(key.data() + INDEX_NUMBER_SIZE, key.size() - INDEX_NUMBER_SIZE); - if (max_index_id < memtable_id) { + if (max_index_id < delta_table_id) { TIANMU_LOG(LogCtl_Level::ERROR, "RocksDB: Found MAX_MEM_ID %u, but also found larger memtable id %u.", - max_index_id, memtable_id); + max_index_id, delta_table_id); return false; } - std::shared_ptr tb_mem = - std::make_shared(table_name, memtable_id, cf_id); - mem_hash_[table_name] = tb_mem; + std::shared_ptr delta_table = + std::make_shared(table_name, delta_table_id, cf_id); + delta_hash_[table_name] = delta_table; } if (max_index_id < static_cast(MetaType::END_DICT_INDEX_ID)) { @@ -603,38 +604,38 @@ void DDLManager::cleanup() { } { std::scoped_lock mem_guard(mem_lock_); - mem_hash_.clear(); + delta_hash_.clear(); } } -std::shared_ptr DDLManager::find_mem(const std::string &table_name) { +std::shared_ptr DDLManager::find_delta(const std::string &table_name) { std::scoped_lock guard(mem_lock_); - auto iter = mem_hash_.find(table_name); - if (iter != mem_hash_.end()) + auto iter = delta_hash_.find(table_name); + if (iter != delta_hash_.end()) return iter->second; return nullptr; } -void DDLManager::put_mem(std::shared_ptr tb_mem, rocksdb::WriteBatch *const batch) { +void DDLManager::put_delta(std::shared_ptr delta, rocksdb::WriteBatch *const batch) { std::scoped_lock guard(mem_lock_); StringWriter key; - std::string table_name = tb_mem->FullName(); + std::string delta_name = delta->FullName(); key.write_uint32(static_cast(MetaType::DDL_MEMTABLE)); - key.write((const uchar *)table_name.c_str(), table_name.size()); + key.write((const uchar *)delta_name.c_str(), delta_name.size()); StringWriter value; value.write_uint16(static_cast(VersionType::DDL_VERSION)); - value.write_uint32(tb_mem->GetCFHandle()->GetID()); - value.write_uint32(tb_mem->GetMemID()); + value.write_uint32(delta->GetCFHandle()->GetID()); + value.write_uint32(delta->GetDeltaTableID()); dict_->put_key(batch, {(char *)key.ptr(), key.length()}, {(char *)value.ptr(), value.length()}); - mem_hash_[table_name] = tb_mem; + delta_hash_[delta_name] = delta; } -void DDLManager::remove_mem(std::shared_ptr tb_mem, rocksdb::WriteBatch *const batch) { +void DDLManager::remove_delta(std::shared_ptr tb_mem, rocksdb::WriteBatch *const batch) { std::scoped_lock guard(mem_lock_); StringWriter key; @@ -643,13 +644,13 @@ void DDLManager::remove_mem(std::shared_ptr tb_mem, rocksd key.write((const uchar *)table_name.c_str(), table_name.size()); dict_->delete_key(batch, {(const char *)key.ptr(), key.length()}); - auto iter = mem_hash_.find(table_name); - if (iter != mem_hash_.end()) { - mem_hash_.erase(iter); + auto iter = delta_hash_.find(table_name); + if (iter != delta_hash_.end()) { + delta_hash_.erase(iter); } } -bool DDLManager::rename_mem(std::string &from, std::string &to, rocksdb::WriteBatch *const batch) { +bool DDLManager::rename_delta(std::string &from, std::string &to, rocksdb::WriteBatch *const batch) { std::scoped_lock guard(mem_lock_); StringWriter skey; @@ -665,13 +666,13 @@ bool DDLManager::rename_mem(std::string &from, std::string &to, rocksdb::WriteBa dkey.write((const uchar *)to.c_str(), to.size()); dict_->put_key(batch, {(const char *)dkey.ptr(), dkey.length()}, origin_value); - auto iter = mem_hash_.find(from); - if (iter == mem_hash_.end()) + auto iter = delta_hash_.find(from); + if (iter == delta_hash_.end()) return false; auto tb_mem = iter->second; - mem_hash_.erase(iter); - mem_hash_[to] = tb_mem; + delta_hash_.erase(iter); + delta_hash_[to] = tb_mem; return true; } @@ -1008,8 +1009,13 @@ rocksdb::ColumnFamilyHandle *CFManager::get_or_create_cf(rocksdb::DB *const rdb_ cf_handle = it->second; } else { rocksdb::ColumnFamilyOptions opts; - if (!IsRowStoreCF(cf_name)) - opts.compaction_filter_factory.reset(new index::IndexCompactFilterFactory); + if (IsDeltaStoreCF(cf_name)) { + opts.write_buffer_size = 512 << 20; // test for speed insert/update + opts.merge_operator = std::make_shared(); + } else { + opts.disable_auto_compactions = true; + opts.compaction_filter_factory.reset(new IndexCompactFilterFactory); + } const rocksdb::Status s = rdb_->CreateColumnFamily(opts, cf_name, &cf_handle); if (s.ok()) { cf_name_map_[cf_handle->GetName()] = cf_handle; diff --git a/storage/tianmu/index/rdb_meta_manager.h b/storage/tianmu/index/rdb_meta_manager.h index 81f5c9ada..810e863cd 100644 --- a/storage/tianmu/index/rdb_meta_manager.h +++ b/storage/tianmu/index/rdb_meta_manager.h @@ -33,7 +33,7 @@ namespace Tianmu { namespace core { -class TianmuMemTable; +class DeltaTable; } namespace index { class DICTManager; @@ -44,7 +44,7 @@ class RdbTable; const std::string DEFAULT_CF_NAME("default"); const std::string DEFAULT_ROWSTORE_NAME("__rowstore__.default"); -const std::string DEFAULT_ROWSTORE_PREFIX("__rowstore__."); +const std::string DEFAULT_DELTA_STORE_PREFIX("__rowstore__."); const std::string DEFAULT_SYSTEM_CF_NAME("__system__"); const char QUALIFIER_VALUE_SEP = '='; @@ -244,9 +244,9 @@ class DDLManager { // find the handler by table name. std::shared_ptr find(const std::string &table_name); // find the mem handler by table name. - std::shared_ptr find_mem(const std::string &table_name); + std::shared_ptr find_delta(const std::string &table_name); // store a rc mem table into DDL mananger. - void put_mem(std::shared_ptr tb_mem, rocksdb::WriteBatch *const batch); + void put_delta(std::shared_ptr delta, rocksdb::WriteBatch *const batch); // write dictionary into tbl. void put_and_write(std::shared_ptr tbl, rocksdb::WriteBatch *const batch); @@ -256,8 +256,8 @@ class DDLManager { // remove the tbl from dictionary. void remove(std::shared_ptr tbl, rocksdb::WriteBatch *const batch); // remove tbl from mem hash table. - void remove_mem(std::shared_ptr tb_mem, rocksdb::WriteBatch *const batch); - bool rename_mem(std::string &from, std::string &to, rocksdb::WriteBatch *const batch); + void remove_delta(std::shared_ptr tb_mem, rocksdb::WriteBatch *const batch); + bool rename_delta(std::string &from, std::string &to, rocksdb::WriteBatch *const batch); // get the next seq num. uint get_and_update_next_number(DICTManager *const dict) { return seq_gen_.get_and_update_next_number(dict); } @@ -271,7 +271,7 @@ class DDLManager { // Contains RdbTable elements std::unordered_map> ddl_hash_; - std::unordered_map> mem_hash_; + std::unordered_map> delta_hash_; std::recursive_mutex lock_; std::recursive_mutex mem_lock_; SeqGenerator seq_gen_; @@ -395,11 +395,11 @@ class CFManager { std::recursive_mutex cf_mutex_; }; -inline bool IsRowStoreCF(std::string cf_name) { - if (cf_name.size() < DEFAULT_ROWSTORE_PREFIX.size()) +inline bool IsDeltaStoreCF(std::string cf_name) { + if (cf_name.size() < DEFAULT_DELTA_STORE_PREFIX.size()) return false; - return (strncmp(cf_name.data(), DEFAULT_ROWSTORE_PREFIX.data(), DEFAULT_ROWSTORE_PREFIX.size()) == 0); + return (strncmp(cf_name.data(), DEFAULT_DELTA_STORE_PREFIX.data(), DEFAULT_DELTA_STORE_PREFIX.size()) == 0); }; } // namespace index diff --git a/storage/tianmu/index/tianmu_table_index.cpp b/storage/tianmu/index/tianmu_table_index.cpp index dab479405..95dcf4e28 100644 --- a/storage/tianmu/index/tianmu_table_index.cpp +++ b/storage/tianmu/index/tianmu_table_index.cpp @@ -159,7 +159,7 @@ common::ErrorCode TianmuTableIndex::CheckUniqueness(core::Transaction *tx, const return common::ErrorCode::FAILED; } - if (!s.IsNotFound()) { + if (s.ok()) { return common::ErrorCode::DUPP_KEY; } @@ -206,12 +206,9 @@ common::ErrorCode TianmuTableIndex::UpdateIndex(core::Transaction *tx, std::stri return err_code; } -common::ErrorCode TianmuTableIndex::DeleteIndex(core::Transaction *tx, std::string ¤tRowKey, +common::ErrorCode TianmuTableIndex::DeleteIndex(core::Transaction *tx, std::vector &fields, uint64_t row [[maybe_unused]]) { StringWriter value, packkey; - std::vector fields; - - fields.emplace_back(currentRowKey); rocksdb_key_->pack_key(packkey, fields, value); common::ErrorCode rc = CheckUniqueness(tx, {(const char *)packkey.ptr(), packkey.length()}); diff --git a/storage/tianmu/index/tianmu_table_index.h b/storage/tianmu/index/tianmu_table_index.h index 7011eecac..725a9efac 100644 --- a/storage/tianmu/index/tianmu_table_index.h +++ b/storage/tianmu/index/tianmu_table_index.h @@ -56,7 +56,7 @@ class TianmuTableIndex final { common::ErrorCode RenameIndexTable(const std::string &from, const std::string &to); common::ErrorCode InsertIndex(core::Transaction *tx, std::vector &fields, uint64_t row); common::ErrorCode UpdateIndex(core::Transaction *tx, std::string &nkey, std::string &okey, uint64_t row); - common::ErrorCode DeleteIndex(core::Transaction *tx, std::string ¤tRowKey, uint64_t row); + common::ErrorCode DeleteIndex(core::Transaction *tx, std::vector &fields, uint64_t row); common::ErrorCode GetRowByKey(core::Transaction *tx, std::vector &fields, uint64_t &row); public: diff --git a/storage/tianmu/loader/value_cache.h b/storage/tianmu/loader/value_cache.h index 870d6f6aa..42b993fec 100644 --- a/storage/tianmu/loader/value_cache.h +++ b/storage/tianmu/loader/value_cache.h @@ -48,6 +48,10 @@ class ValueCache final { DEBUG_ASSERT(size_ <= capacity_); expected_size_ = 0; expected_null_ = false; + if (expected_delete_) { + deletes_.emplace(values_.size() - 1, true); + expected_delete_ = false; + } } void Rollback() { @@ -73,6 +77,11 @@ class ValueCache final { null_cnt_++; } + void SetDelete(size_t ono) { + DEBUG_ASSERT(ono < values_.size()); + deletes_.emplace(ono, true); + } + void ExpectedSize(size_t expectedSize) { DEBUG_ASSERT((size_ + expectedSize) <= capacity_); expected_size_ = expectedSize; @@ -80,6 +89,10 @@ class ValueCache final { size_t ExpectedSize() const { return expected_size_; } void ExpectedNull(bool null) { expected_null_ = null; } + void ExpectedDelete() { + expected_delete_ = true; + expected_null_ = true; + } bool ExpectedNull() const { return expected_null_; } void *PreparedBuffer() { return (static_cast(data_) + size_); } @@ -103,9 +116,13 @@ class ValueCache final { bool IsNull(size_t ono) const { return nulls_[ono]; } bool NotNull(size_t ono) const { return !nulls_[ono]; } + bool IsDelete(size_t ono) const { return (deletes_.find(ono) != deletes_.end()); } + size_t NumOfNulls() const { return null_cnt_; } size_t NumOfValues() const { return values_.size(); } + size_t NumOfDeletes() const { return deletes_.size(); } + void CalcIntStats(std::optional nv); void CalcRealStats(std::optional nv); void CalcStrStats(types::BString &min_s, types::BString &max_s, uint &maxlen, const DTCollation &col) const; @@ -124,9 +141,12 @@ class ValueCache final { size_t value_count_; size_t expected_size_ = 0; bool expected_null_ = false; + bool expected_delete_ = false; std::vector values_; std::vector nulls_; + // The delta layer is used to merge data of insert type and delete type + std::map deletes_; size_t null_cnt_ = 0; int64_t min_i_, max_i_, sum_i_; diff --git a/storage/tianmu/system/configuration.cpp b/storage/tianmu/system/configuration.cpp index 1127beb17..27b6da9af 100644 --- a/storage/tianmu/system/configuration.cpp +++ b/storage/tianmu/system/configuration.cpp @@ -54,6 +54,9 @@ unsigned int tianmu_sysvar_threadpoolsize; unsigned int tianmu_sysvar_join_parallel; unsigned int tianmu_sysvar_join_splitrows; unsigned int tianmu_sysvar_delete_or_update_threads; +unsigned int tianmu_sysvar_merge_rocks_expected_count; +unsigned int tianmu_sysvar_insert_write_batch_size; +unsigned int tianmu_sysvar_log_loop_interval; my_bool tianmu_sysvar_compensation_start; my_bool tianmu_sysvar_filterevaluation_speedup; my_bool tianmu_sysvar_groupby_speedup; diff --git a/storage/tianmu/system/configuration.h b/storage/tianmu/system/configuration.h index 5bc9e450b..fbeaf803c 100644 --- a/storage/tianmu/system/configuration.h +++ b/storage/tianmu/system/configuration.h @@ -88,6 +88,13 @@ extern char tianmu_sysvar_enable_histogram_cmap_bloom; extern unsigned int tianmu_sysvar_result_sender_rows; // Number of threads executing (delete_row) and (update_row) in parallel extern unsigned int tianmu_sysvar_delete_or_update_threads; + +extern unsigned int tianmu_sysvar_merge_rocks_expected_count; +// Threshold to submit in insert request +extern unsigned int tianmu_sysvar_insert_write_batch_size; + +extern unsigned int tianmu_sysvar_log_loop_interval; + // The parallelism degree of multithreaded aggregation // the number of threads executing group by multithreaded aggregation extern unsigned int tianmu_sysvar_groupby_parallel_degree; diff --git a/storage/tianmu/types/bstring.cpp b/storage/tianmu/types/bstring.cpp index ac6cfcc42..07b1dce7b 100644 --- a/storage/tianmu/types/bstring.cpp +++ b/storage/tianmu/types/bstring.cpp @@ -96,6 +96,9 @@ bool BString::Parse(BString &in, BString &out) { common::ColumnType BString::Type() const { return common::ColumnType::STRING; } void BString::PutString(char *&dest, ushort len, bool move_ptr) const { + if (this->len_ > len) { + ASSERT(this->len_ <= len, "should be 'this->len_ <= len'"); + } ASSERT(this->len_ <= len, "should be 'this->len_ <= len'"); if (this->len_ == 0) std::memset(dest, ' ', len); diff --git a/storage/tianmu/util/bitset.h b/storage/tianmu/util/bitset.h index 9a99f5e54..9a08e1d8b 100644 --- a/storage/tianmu/util/bitset.h +++ b/storage/tianmu/util/bitset.h @@ -23,13 +23,13 @@ namespace utils { class BitSet { public: - BitSet() = delete; - BitSet(size_t sz_, char *data = nullptr) : sz_(sz_) { + BitSet() = default; + BitSet(size_t sz, char *data = nullptr) : sz_(sz) { if (data) { ptr_ = data; allocated_ = false; } else { - ptr_ = new char[(sz_ + NO_OF_BITS - 1) / NO_OF_BITS](); + ptr_ = new char[(sz + NO_OF_BITS - 1) / NO_OF_BITS](); allocated_ = true; } } @@ -41,6 +41,17 @@ class BitSet { } } + void Init(size_t sz, char *data = nullptr) { + sz_ = sz; + if (data) { + ptr_ = data; + allocated_ = false; + } else { + ptr_ = new char[(sz + NO_OF_BITS - 1) / NO_OF_BITS](); + allocated_ = true; + } + } + bool operator[](size_t pos) const { return ptr_[pos / NO_OF_BITS] & (1U << (pos % NO_OF_BITS)); } void set(size_t pos) { ptr_[pos / NO_OF_BITS] |= (1U << (pos % NO_OF_BITS)); } void reset(size_t pos) { ptr_[pos / NO_OF_BITS] &= ~(1U << (pos % NO_OF_BITS)); } @@ -48,7 +59,7 @@ class BitSet { size_t data_size() const { return (sz_ + NO_OF_BITS - 1) / NO_OF_BITS; } private: - char *ptr_; + char *ptr_ = nullptr; size_t sz_; bool allocated_; static constexpr size_t NO_OF_BITS = 8;