Skip to content

Commit

Permalink
Revert Lazily Init Store (#2011)
Browse files Browse the repository at this point in the history
* Revert "Init store in background task. (#1843,#1896) (#1874)"

This reverts commit 0882461.

Conflicts:
	dbms/src/Storages/StorageDeltaMerge.cpp

* format code

* Revert "Lazily initializing DeltaMergeStore  (#1423) (#1751) (#1868)"

This reverts commit bbce050.

Conflicts:
	dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp
	dbms/src/Storages/StorageDeltaMerge.cpp
	dbms/src/Storages/StorageDeltaMerge.h

Co-authored-by: JinheLin <linjinhe@pingcap.com>
Co-authored-by: Flowyi <flowbehappy@gmail.com>
  • Loading branch information
3 people authored May 27, 2021
1 parent 5e00e84 commit b09e4cc
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 500 deletions.
29 changes: 0 additions & 29 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,33 +347,6 @@ struct RaftStoreProxyRunner : boost::noncopyable
Logger * log;
};

// We only need this task run once.
void backgroundInitStores(Context & global_context, Logger * log)
{
auto initStores = [&global_context, log]() {
auto storages = global_context.getTMTContext().getStorages().getAllStorage();
int init_cnt = 0;
int err_cnt = 0;
for (auto & [table_id, storage] : storages)
{
try
{
init_cnt += storage->initStoreIfDataDirExist() ? 1 : 0;
LOG_INFO(log, "Storage inited done [table_id=" << table_id << "]");
}
catch (...)
{
err_cnt++;
tryLogCurrentException(log, "Storage inited fail, [table_id=" + DB::toString(table_id) + "]");
}
}
LOG_INFO(log,
"Storage inited finish. [total_count=" << storages.size() << "] [init_count=" << init_cnt << "] [error_count=" << err_cnt
<< "]");
};
std::thread(initStores).detach();
}

int Server::main(const std::vector<std::string> & /*args*/)
{
setThreadName("TiFlashMain");
Expand Down Expand Up @@ -780,8 +753,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
LOG_DEBUG(log, "Sync schemas done.");

backgroundInitStores(*global_context, log);

// After schema synced, set current database.
global_context->setCurrentDatabase(default_database);

Expand Down
173 changes: 0 additions & 173 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
#include <Parsers/ASTSelectQuery.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/DeltaTree.h>
#define private public
#include <Storages/StorageDeltaMerge.h>
#undef private
#include <Storages/StorageDeltaMergeHelpers.h>
#include <Storages/Transaction/RegionRangeKeys.h>
#include <Storages/Transaction/TiKVRange.h>
Expand Down Expand Up @@ -160,176 +157,6 @@ try
}
CATCH

TEST(StorageDeltaMerge_test, Rename)
try
{
Context ctx = DMTestEnv::getContext();
std::shared_ptr<StorageDeltaMerge> storage;
DataTypes data_types;
Names column_names;
const String path_name = DB::tests::TiFlashTestEnv::getTemporaryPath();
const String table_name = "tmp_table";
const String db_name = "default";
// create table
{
NamesAndTypesList names_and_types_list {
//{"col1", std::make_shared<DataTypeUInt64>()},
{"col1", std::make_shared<DataTypeInt64>()},
{"col2", std::make_shared<DataTypeString>()},
};
for (const auto & name_type : names_and_types_list)
{
data_types.push_back(name_type.type);
column_names.push_back(name_type.name);
}

Poco::File path(path_name);
if (path.exists())
{
path.remove(true);
}

// primary_expr_ast
ASTPtr astptr(new ASTIdentifier(table_name, ASTIdentifier::Kind::Table));
astptr->children.emplace_back(new ASTIdentifier("col1"));

storage = StorageDeltaMerge::create("TiFlash",
db_name,
table_name,
std::nullopt,
ColumnsDescription{names_and_types_list},
astptr,
0,
ctx);
storage->startup();
}

ASSERT_FALSE(storage->storeInited());
ASSERT_EQ(storage->getTableName(), table_name);
ASSERT_FALSE(storage->storeInited());
ASSERT_EQ(storage->getDatabaseName(), db_name);
ASSERT_FALSE(storage->storeInited());

// Rename database name before store object is created.
const String new_db_name = "new_" + storage->getDatabaseName();
storage->rename(path_name, new_db_name, table_name, table_name);
ASSERT_FALSE(storage->storeInited());
ASSERT_EQ(storage->getTableName(), table_name);
ASSERT_EQ(storage->getDatabaseName(), new_db_name);

// prepare block data
Block sample;
{
ColumnWithTypeAndName col1;
col1.name = "col1";
col1.type = std::make_shared<DataTypeInt64>();
{
IColumn::MutablePtr m_col = col1.type->createColumn();
// insert form large to small
for (int i = 0; i < 100; i++)
{
Field field = Int64(99 - i);
m_col->insert(field);
}
col1.column = std::move(m_col);
}
sample.insert(col1);

ColumnWithTypeAndName col2;
col2.name = "col2";
col2.type = std::make_shared<DataTypeString>();
{
IColumn::MutablePtr m_col2 = col2.type->createColumn();
for (int i = 0; i < 100; i++)
{
Field field("a", 1);
m_col2->insert(field);
}
col2.column = std::move(m_col2);
}
sample.insert(col2);
}
// Writing will create store object.
{
ASTPtr insertptr(new ASTInsertQuery());
BlockOutputStreamPtr output = storage->write(insertptr, ctx.getSettingsRef());
output->writePrefix();
output->write(sample);
output->writeSuffix();
ASSERT_TRUE(storage->storeInited());
}

// Rename table name
String new_table_name = "new_" + storage->getTableName();
storage->rename(path_name, new_db_name, new_table_name, new_table_name);
ASSERT_EQ(storage->getTableName(), new_table_name);
ASSERT_EQ(storage->getDatabaseName(), new_db_name);

}
CATCH

TEST(StorageDeltaMerge_test, HandleCol)
try
{
Context ctx = DMTestEnv::getContext();
std::shared_ptr<StorageDeltaMerge> storage;
DataTypes data_types;
Names column_names;
const String path_name = DB::tests::TiFlashTestEnv::getTemporaryPath();
const String table_name = "tmp_table";
const String db_name = "default";
// create table
{
NamesAndTypesList names_and_types_list {
//{"col1", std::make_shared<DataTypeUInt64>()},
{"col1", std::make_shared<DataTypeInt64>()},
{"col2", std::make_shared<DataTypeString>()},
};
for (const auto & name_type : names_and_types_list)
{
data_types.push_back(name_type.type);
column_names.push_back(name_type.name);
}

Poco::File path(path_name);
if (path.exists())
{
path.remove(true);
}

// primary_expr_ast
ASTPtr astptr(new ASTIdentifier(table_name, ASTIdentifier::Kind::Table));
astptr->children.emplace_back(new ASTIdentifier("col1"));

storage = StorageDeltaMerge::create("TiFlash",
db_name,
table_name,
std::nullopt,
ColumnsDescription{names_and_types_list},
astptr,
0,
ctx);
storage->startup();
}

ASSERT_FALSE(storage->storeInited());
auto pk_type = storage->getPKTypeImpl();
auto sort_desc = storage->getPrimarySortDescription();
ASSERT_FALSE(storage->storeInited());

auto& store = storage->getStore();
ASSERT_TRUE(storage->storeInited());
auto pk_type2 = store->getPKDataType();
auto sort_desc2 = store->getPrimarySortDescription();

ASSERT_EQ(pk_type->getTypeId(), pk_type2->getTypeId());
ASSERT_EQ(sort_desc.size(), 1u);
ASSERT_EQ(sort_desc2.size(), 1u);
ASSERT_EQ(sort_desc.front().column_name, sort_desc2.front().column_name);
ASSERT_EQ(sort_desc.front().direction, sort_desc2.front().direction);
ASSERT_EQ(sort_desc.front().nulls_direction, sort_desc2.front().nulls_direction);
}
CATCH

TEST(StorageDeltaMerge_internal_test, GetMergedQueryRanges)
{
Expand Down
3 changes: 0 additions & 3 deletions dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,6 @@ class IManageableStorage : public IStorage

// `limit` is the max number of segments to gc, return value is the number of segments gced
virtual UInt64 onSyncGc(Int64 /*limit*/) { throw Exception("Unsupported"); }

// Return true is data dir exist
virtual bool initStoreIfDataDirExist() { throw Exception("Unsupported"); }

virtual void mergeDelta(const Context &) { throw Exception("Unsupported"); }

Expand Down
Loading

0 comments on commit b09e4cc

Please sign in to comment.