Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tianmu): storage support delta store #1358

Merged
merged 6 commits into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions mysql-test/suite/tianmu/r/unsigned_type.result
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ UPDATE j1 SET CUTINYINT=253;
UPDATE j1 SET CUSMALLINT=65533;
UPDATE j1 SET CUINTEGER=4294967293;
UPDATE j1 SET CUBIGINT=18446744073709551613;
ERROR 22003: Out of range[0, 9223372036854775807] for column 'CUBIGINT' value: 18446744073709551613
UPDATE j1 SET CTINYINT=-127 where j1_key=2;
UPDATE j1 SET CTINYINT=0 where j1_key=3;
UPDATE j1 SET CTINYINT=null where j1_key=4;
Expand Down Expand Up @@ -60,7 +61,7 @@ LN(CUINTEGER)
22.180709777219757
SELECT LN(CUBIGINT) FROM j1 WHERE j1_key=1;
LN(CUBIGINT)
44.3614195558365
NULL
SELECT LOG(CUTINYINT) FROM j1 WHERE j1_key=1;
LOG(CUTINYINT)
5.53338948872752
Expand All @@ -72,7 +73,7 @@ LOG(CUINTEGER)
22.180709777219757
SELECT LOG(CUBIGINT) FROM j1 WHERE j1_key=1;
LOG(CUBIGINT)
44.3614195558365
NULL
SELECT LOG2(CUTINYINT) FROM j1 WHERE j1_key=1;
LOG2(CUTINYINT)
7.98299357469431
Expand All @@ -84,7 +85,7 @@ LOG2(CUINTEGER)
31.99999999899229
SELECT LOG2(CUBIGINT) FROM j1 WHERE j1_key=1;
LOG2(CUBIGINT)
64
NULL
SELECT LN(CTINYINT) FROM j1 WHERE j1_key=2;
LN(CTINYINT)
NULL
Expand Down
1 change: 1 addition & 0 deletions mysql-test/suite/tianmu/t/unsigned_type.test
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ALTER TABLE j1 ADD COLUMN CUBIGINT BIGINT UNSIGNED;
UPDATE j1 SET CUTINYINT=253;
UPDATE j1 SET CUSMALLINT=65533;
UPDATE j1 SET CUINTEGER=4294967293;
--error 1264
UPDATE j1 SET CUBIGINT=18446744073709551613;
UPDATE j1 SET CTINYINT=-127 where j1_key=2;
UPDATE j1 SET CTINYINT=0 where j1_key=3;
Expand Down
2 changes: 1 addition & 1 deletion scripts/stonedb_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,4 @@ cmake ../../ \
# step 5. make & make install
make VERBOSE=1 -j`nproc` 2>&1 | tee -a ${build_log}
make install 2>&1 | tee -a ${build_log}
echo "current dir is `pwd`" 2>&1 | tee -a ${build_log}
echo "current dir is `pwd`" 2>&1 | tee -a ${build_log}
10 changes: 5 additions & 5 deletions sql/log_event_push_cond.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,20 @@ static bool checklist_have_number_or_time(Field **field, uint field_num)
}
default:
break;
}
}
}
return false;
}


bool Rows_log_event::column_information_to_conditions(std::string &sql_statemens,
bool Rows_log_event::column_information_to_conditions(std::string &sql_statemens,
std::string &prefix)
{
sql_statemens += prefix;
Field **field = m_table->field;
if(!field) return false;
/*
If there is a unique constraint,
If there is a unique constraint,
use the field of the unique constraint as the push down condition
*/
std::string key_field_name;
Expand All @@ -210,7 +210,7 @@ bool Rows_log_event::column_information_to_conditions(std::string &sql_statemens
int cond_num = 0;
bool unwanted_str = false;
/*
If there is a number or time type in the table,
If there is a number or time type in the table,
the string type value is not used as the push down condition.
*/
if(checklist_have_number_or_time(field, m_table->s->fields)) unwanted_str = true;
Expand All @@ -222,7 +222,7 @@ bool Rows_log_event::column_information_to_conditions(std::string &sql_statemens
std::string str_cond;

if(!key_field_name.empty()) {

if(key_field_name.compare(f->field_name) != 0) {
continue;
}
Expand Down
2 changes: 1 addition & 1 deletion sql/sql_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ int mysql_load(THD *thd,sql_exchange *ex,TABLE_LIST *table_list,
DBUG_RETURN(TRUE);
}

if (!Tianmu::handler::ha_my_tianmu_load(thd, ex, table_list,
if (!Tianmu::DBHandler::ha_my_tianmu_load(thd, ex, table_list,
(void *)&lf_info)) {
DBUG_RETURN(FALSE);
}
Expand Down
10 changes: 5 additions & 5 deletions sql/sql_parse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3282,8 +3282,8 @@ case SQLCOM_PREPARE:
// Tianmu hook added

int tianmu_res, free_join_from_tianmu, is_optimize_after_tianmu;
if (Tianmu::handler::QueryRouteTo::kToMySQL ==
Tianmu::handler::ha_my_tianmu_query(thd, lex, result, 0, tianmu_res, is_optimize_after_tianmu, free_join_from_tianmu, (int)true))
if (Tianmu::DBHandler::QueryRouteTo::kToMySQL ==
Tianmu::DBHandler::ha_my_tianmu_query(thd, lex, result, 0, tianmu_res, is_optimize_after_tianmu, free_join_from_tianmu, (int)true))
res = handle_query(thd, lex, result, SELECT_NO_UNLOCK, (ulong)0, is_optimize_after_tianmu, free_join_from_tianmu);
else
res = tianmu_res;
Expand Down Expand Up @@ -3748,7 +3748,7 @@ case SQLCOM_PREPARE:
if ((check_table_access(thd, SELECT_ACL, all_tables, FALSE, UINT_MAX, FALSE)
|| open_and_lock_tables(thd, all_tables, 0)))
goto error;
if (!Tianmu::handler::ha_my_tianmu_set_statement_allowed(thd, lex)) {
if (!Tianmu::DBHandler::ha_my_tianmu_set_statement_allowed(thd, lex)) {
goto error;
}
if (!(res= sql_set_variables(thd, lex_var_list)))
Expand Down Expand Up @@ -5204,8 +5204,8 @@ static bool execute_sqlcom_select(THD *thd, TABLE_LIST *all_tables)
//res= handle_query(thd, lex, result, 0, 0, 0, 0);

int tianmu_res, free_join_from_tianmu, is_optimize_after_tianmu;
if (Tianmu::handler::QueryRouteTo::kToMySQL ==
Tianmu::handler::ha_my_tianmu_query(thd, lex, result, (ulong)0, tianmu_res, is_optimize_after_tianmu, free_join_from_tianmu)) {
if (Tianmu::DBHandler::QueryRouteTo::kToMySQL ==
Tianmu::DBHandler::ha_my_tianmu_query(thd, lex, result, (ulong)0, tianmu_res, is_optimize_after_tianmu, free_join_from_tianmu)) {
res = handle_query(thd, lex, result, (ulonglong)0, (ulonglong)0, is_optimize_after_tianmu, free_join_from_tianmu);
}
else
Expand Down
2 changes: 1 addition & 1 deletion sql/sql_show.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5535,7 +5535,7 @@ static int get_schema_column_record(THD *thd, TABLE_LIST *tables,
table->field[IS_COLUMNS_GENERATION_EXPRESSION]->set_null();
//table->field[IS_COLUMNS_COLUMN_COMMENT]->store(field->comment.str,
// field->comment.length, cs);
Tianmu::handler::ha_my_tianmu_update_and_store_col_comment(table, IS_COLUMNS_COLUMN_COMMENT, field, count - 1, cs);//TIANMU UPGRADE
Tianmu::DBHandler::ha_my_tianmu_update_and_store_col_comment(table, IS_COLUMNS_COLUMN_COMMENT, field, count - 1, cs);//TIANMU UPGRADE
if (schema_table_store_record(thd, table))
DBUG_RETURN(1);
}
Expand Down
17 changes: 9 additions & 8 deletions storage/tianmu/core/blocked_mem_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,15 @@ void MemBlockManager::FreeBlock(void *b) {
current_size -= block_size;
} else
free_blocks.push_back(b);
} else
// often freed block are immediately required by other - keep some of them
// above the limit in the pool
if (current_size > size_limit + block_size * (no_threads + 1)) {
dealloc(b);
current_size -= block_size;
} else
free_blocks.push_back(b);
} else {
// often freed block are immediately required by other - keep some of them
// above the limit in the pool
if (current_size > size_limit + block_size * (no_threads + 1)) {
dealloc(b);
current_size -= block_size;
} else
free_blocks.push_back(b);
}
} // else not found (already erased)
}

Expand Down
3 changes: 3 additions & 0 deletions storage/tianmu/core/column_share.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ void ColumnShare::scan_dpn(common::TX_ID xid) {

ASSERT(hdr.numOfPacks <= capacity, "bad dpn index");

// get column saved auto inc
auto_inc_.store(hdr.auto_inc);

if (hdr.numOfPacks == 0) {
for (uint32_t i = 0; i < capacity; i++) {
start[i].reset();
Expand Down
5 changes: 4 additions & 1 deletion storage/tianmu/core/column_share.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct alignas(128) COL_VER_HDR_V3 {
uint64_t numOfPacks; // number of packs
uint64_t numOfDeleted; // number of deleted

uint64_t auto_inc_next;
uint64_t auto_inc; // only use in commit write
int64_t min;
int64_t max;
uint32_t dict_ver; // dict file version name. 0 means n/a
Expand Down Expand Up @@ -100,6 +100,8 @@ class ColumnShare final {
}
std::string GetFieldName() const { return field_name_; }

void Truncate() { auto_inc_.store(0); }

private:
void Init(common::TX_ID xid);
void map_dpn();
Expand All @@ -115,6 +117,7 @@ class ColumnShare final {
common::PackType pt;
uint32_t col_id;
std::string field_name_;
std::atomic<uint64_t> auto_inc_{0};
struct seg {
uint64_t offset;
uint64_t len;
Expand Down
1 change: 1 addition & 0 deletions storage/tianmu/core/column_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ struct ColumnType {
void OverrideInternalSize(uint size) { internal_size = size; };
int GetDisplaySize() const { return display_size; }
bool Lookup() const { return fmt == common::PackFmt::LOOKUP; }
bool IsLookup() const { return fmt == common::PackFmt::LOOKUP; }
ColumnType RemovedLookup() const;

bool IsNumeric() const {
Expand Down
95 changes: 95 additions & 0 deletions storage/tianmu/core/combined_iterator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/* Copyright (c) 2022 StoneAtom, Inc. All rights reserved.
Use is subject to license terms

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
*/

//
// Created by dfx on 22-12-19.
//

#include "combined_iterator.h"

namespace Tianmu::core {

CombinedIterator::CombinedIterator(TianmuTable *base_table, const std::vector<bool> &attrs, const Filter &filter)
: base_table_(base_table), attrs_(attrs) {
base_iter_ = std::make_unique<TianmuIterator>(base_table, attrs, filter);
delta_iter_ = std::make_unique<DeltaIterator>(base_table_->GetDelta().get(), attrs_);
is_base_ = !delta_iter_->Valid();
}

CombinedIterator::CombinedIterator(TianmuTable *base_table, const std::vector<bool> &attrs)
: base_table_(base_table), attrs_(attrs) {
base_iter_ = std::make_unique<TianmuIterator>(base_table, attrs);
delta_iter_ = std::make_unique<DeltaIterator>(base_table_->GetDelta().get(), attrs_);
is_base_ = !delta_iter_->Valid();
}

bool CombinedIterator::operator==(const CombinedIterator &o) {
return is_base_ == o.is_base_ && (is_base_ ? base_iter_ == o.base_iter_ : delta_iter_ == o.delta_iter_);
}

bool CombinedIterator::operator!=(const CombinedIterator &other) { return !(*this == other); }

void CombinedIterator::Next() {
if (!is_base_) {
delta_iter_->Next();
if (!delta_iter_->Valid()) {
is_base_ = true;
}
} else {
base_iter_->Next();
if (base_iter_->Valid()) {
TIANMU_LOG(LogCtl_Level::ERROR, "base pos: %d", base_iter_->Position());
}
}
}

std::shared_ptr<types::TianmuDataType> &CombinedIterator::GetBaseData(int col) { return base_iter_->GetData(col); }

std::string CombinedIterator::GetDeltaData() { return delta_iter_->GetData(); }

void CombinedIterator::SeekTo(int64_t row_id) {
int64_t base_max_row_id = base_table_->NumOfObj();
if (row_id <= base_max_row_id) {
base_iter_->SeekTo(row_id);
is_base_ = true;
} else {
delta_iter_->SeekTo(row_id);
is_base_ = false;
}
}

int64_t CombinedIterator::Position() const {
if (is_base_) {
return base_iter_->Position();
} else {
return delta_iter_->Position();
}
}

bool CombinedIterator::Valid() const { return Position() != -1; }

bool CombinedIterator::IsBase() const { return is_base_; }

bool CombinedIterator::BaseCurrentRowIsInvalid() const {
if (base_iter_->CurrentRowIsDeleted() || InDeltaDeletedRow.find(base_iter_->Position()) != InDeltaDeletedRow.end() ||
InDeltaUpdateRow.find(base_iter_->Position()) != InDeltaUpdateRow.end()) {
return true;
}
return false;
}

} // namespace Tianmu::core
71 changes: 71 additions & 0 deletions storage/tianmu/core/combined_iterator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/* Copyright (c) 2022 StoneAtom, Inc. All rights reserved.
Use is subject to license terms

This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
*/

//
// Created by dfx on 22-12-19.
//

#ifndef MYSQL_STORAGE_TIANMU_CORE_MERGE_ITERATOR_H_
#define MYSQL_STORAGE_TIANMU_CORE_MERGE_ITERATOR_H_

#include "delta_table.h"
#include "tianmu_attr_typeinfo.h"
#include "tianmu_table.h"

// new iterator not Begin() and End() function;
// only use Valid() to check iterator is valid.

namespace Tianmu::core {
class CombinedIterator {
public:
CombinedIterator() = default;
~CombinedIterator() = default;
CombinedIterator(TianmuTable *base_table, const std::vector<bool> &attrs, const Filter &filter);
CombinedIterator(TianmuTable *base_table, const std::vector<bool> &attrs);
// check two iterator is same
bool operator==(const CombinedIterator &other);
bool operator!=(const CombinedIterator &other);
// goto next row
void Next();
// get tianmu record col data
std::shared_ptr<types::TianmuDataType> &GetBaseData(int col = -1);
// get delta record row data
std::string GetDeltaData();
// move position to this row (:row_id)
void SeekTo(int64_t row_id);
// get the current row_id
int64_t Position() const;
// check the iterator is valid
bool Valid() const;
// check the iterator is delta || base
bool IsBase() const;
// Check whether the current line is invalid
bool BaseCurrentRowIsInvalid() const;

std::unordered_map<int64_t, bool> InDeltaUpdateRow;
std::unordered_map<int64_t, bool> InDeltaDeletedRow;
const TianmuTable *GetBaseTable() const { return base_table_; }

private:
TianmuTable *base_table_ = nullptr;
std::vector<bool> attrs_;
bool is_base_ = false; // make sure that is_base_=true base_iter_ is Valid
std::unique_ptr<DeltaIterator> delta_iter_;
std::unique_ptr<TianmuIterator> base_iter_;
};
} // namespace Tianmu::core
#endif // MYSQL_STORAGE_TIANMU_CORE_MERGE_ITERATOR_H_
Loading