Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine schema sync on read logic and add test #120

Merged
merged 29 commits into from
Jul 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
26b6dd7
Add sync schema on read
zanmato1984 Jul 16, 2019
d1ac4f4
Simplify schema syncer interface and adjust mock stuff
zanmato1984 Jul 16, 2019
d3e2298
Rename default schema version setting
zanmato1984 Jul 18, 2019
e690046
Compensate last commit
zanmato1984 Jul 18, 2019
2c57e8c
Merge dll branch
zanmato1984 Jul 18, 2019
49b3fdf
Remove curl library
zanmato1984 Jul 18, 2019
b5c2a85
Remove curl from builder image
zanmato1984 Jul 18, 2019
e515c30
Remove useless codes, init schema syncer based on pd config
zanmato1984 Jul 18, 2019
e084771
Minor fix to schema debug
zanmato1984 Jul 18, 2019
e89c697
Fix alter tmt and pass tests
zanmato1984 Jul 18, 2019
293c880
Merge ddl
zanmato1984 Jul 18, 2019
aa8072a
Merge branch 'ddl' into ddl-ruoxi
zanmato1984 Jul 18, 2019
14bc79b
Merge ddl
zanmato1984 Jul 18, 2019
074f521
Fix build fail
zanmato1984 Jul 18, 2019
a1e9b57
Merge remote
zanmato1984 Jul 18, 2019
3140836
Add lock for mock schema syncer
zanmato1984 Jul 19, 2019
23ff96c
Fix schema sync service init context
zanmato1984 Jul 19, 2019
fb638a7
Adjust schema tests
zanmato1984 Jul 19, 2019
fc10c2e
Not sync if no schema change detected
zanmato1984 Jul 19, 2019
d3b0af9
Adjust txn mock tests
zanmato1984 Jul 19, 2019
9590357
Merge ddl
zanmato1984 Jul 19, 2019
f6c7275
Merge branch 'ddl' into ddl-ruoxi
zanmato1984 Jul 19, 2019
517793b
Fix default value bug
zanmato1984 Jul 19, 2019
0e510f3
Rename some tests
zanmato1984 Jul 19, 2019
32e9f3d
Remove sync schema test
zanmato1984 Jul 19, 2019
bbe3743
Merge branch 'ddl' into ddl-ruoxi
zanmato1984 Jul 19, 2019
c6d4f86
Remove a lot useless code
zanmato1984 Jul 19, 2019
9851e66
Refine schema sync on read, and add drop on read test
zanmato1984 Jul 20, 2019
b001bcc
Merge branch 'ddl' into ddl-ruoxi
zanmato1984 Jul 20, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions dbms/src/Debug/MockSchemaSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,24 +328,22 @@ void MockSchemaSyncer::syncTable(Context & context, MockTiDB::TablePtr table)
/// Table existing, detect schema changes and apply.
const TableInfo & orig_table_info = storage->getTableInfo();
AlterCommands alter_commands = detectSchemaChanges(table_info, orig_table_info);

std::stringstream ss;
ss << "Detected schema changes: ";
for (const auto & command : alter_commands)
{
// TODO: Other command types.
if (command.type == AlterCommand::ADD_COLUMN)
ss << "ADD COLUMN " << command.column_name << " " << command.data_type->getName() << ", ";
else if (command.type == AlterCommand::DROP_COLUMN)
ss << "DROP COLUMN " << command.column_name << ", ";
else if (command.type == AlterCommand::MODIFY_COLUMN)
ss << "MODIFY COLUMN " << command.column_name << " " << command.data_type->getName() << ", ";
}

LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": " << ss.str());

if (!alter_commands.empty())
{
std::stringstream ss;
ss << "Detected schema changes: ";
for (const auto & command : alter_commands)
{
// TODO: Other command types.
if (command.type == AlterCommand::ADD_COLUMN)
ss << "ADD COLUMN " << command.column_name << " " << command.data_type->getName() << ", ";
else if (command.type == AlterCommand::DROP_COLUMN)
ss << "DROP COLUMN " << command.column_name << ", ";
else if (command.type == AlterCommand::MODIFY_COLUMN)
ss << "MODIFY COLUMN " << command.column_name << " " << command.data_type->getName() << ", ";
}
LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": " << ss.str());

// Call storage alter to apply schema changes.
storage->alterForTMT(alter_commands, table_info, table->table_info.db_name, context);

Expand Down
75 changes: 49 additions & 26 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names)
{
/// Read from table function.
storage = context.getQueryContext().executeTableFunction(table_expression);
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
}
else
{
Expand All @@ -159,12 +160,17 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names)

getDatabaseAndTableNames(database_name, table_name);

storage = context.getTable(database_name, table_name);
if (settings.schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION)
{
storage = context.getTable(database_name, table_name);
table_lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
}
else
{
getAndLockStorageWithSchemaVersion(database_name, table_name, settings.schema_version);
}
}

if (storage)
table_lock = alignStorageSchemaAndLock(settings.schema_version);

query_analyzer = std::make_unique<ExpressionAnalyzer>(
query_ptr, context, storage, source_columns, required_result_column_names, subquery_depth, !only_analyze);

Expand All @@ -187,47 +193,64 @@ void InterpreterSelectQuery::init(const Names & required_result_column_names)
}


TableStructureReadLockPtr InterpreterSelectQuery::alignStorageSchemaAndLock(Int64 schema_version)
void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 schema_version)
{
/// Regular read lock for non-TMT or schema version unspecified.
const auto merge_tree = dynamic_cast<const StorageMergeTree *>(storage.get());
if (schema_version == DEFAULT_UNSPECIFIED_SCHEMA_VERSION || !merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
return storage->lockStructure(false, __PRETTY_FUNCTION__);
String qualified_name = database_name + "." + table_name;

/// Lambda for get storage, then align schema version under the read lock.
auto get_and_lock_storage = [&](bool schema_synced) -> std::tuple<StoragePtr, TableStructureReadLockPtr, Int64, bool> {
/// Get storage in case it's dropped then re-created.
// If schema synced, call getTable without try, leading to exception on table not existing.
auto storage_ = schema_synced ? context.getTable(database_name, table_name) : context.tryGetTable(database_name, table_name);
if (!storage_)
return std::make_tuple(nullptr, nullptr, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, false);

const auto merge_tree = dynamic_cast<const StorageMergeTree *>(storage_.get());
zanmato1984 marked this conversation as resolved.
Show resolved Hide resolved
if (!merge_tree || merge_tree->getData().merging_params.mode != MergeTreeData::MergingParams::Txn)
throw Exception("Specifying schema_version for non-TMT storage: " + storage->getName() + ", table: " + qualified_name + " is not allowed", ErrorCodes::LOGICAL_ERROR);

/// Lambda for schema version check under the read lock.
auto checkSchemaVersionAndLock = [&](bool schema_synced) -> std::tuple<TableStructureReadLockPtr, Int64> {
auto lock = storage->lockStructure(false, __PRETTY_FUNCTION__);
/// Lock storage.
auto lock = storage_->lockStructure(false, __PRETTY_FUNCTION__);

/// Check schema version.
auto storage_schema_version = merge_tree->getTableInfo().schema_version;
if (storage_schema_version > schema_version)
throw Exception("Storage schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);
throw Exception("Table " + qualified_name + " schema version " + std::to_string(storage_schema_version) + " newer than query schema version " + std::to_string(schema_version), ErrorCodes::SCHEMA_VERSION_ERROR);

if ((schema_synced && storage_schema_version <= schema_version) || (!schema_synced && storage_schema_version == schema_version))
return std::make_tuple(lock, storage_schema_version);
return std::make_tuple(storage_, lock, storage_schema_version, true);

return std::make_tuple(nullptr, storage_schema_version);
return std::make_tuple(nullptr, nullptr, storage_schema_version, false);
};

/// Try check and lock once.
/// Try get storage and lock once.
StoragePtr storage_;
TableStructureReadLockPtr lock;
Int64 storage_schema_version;
bool ok;
{
auto [lock, storage_schema_version] = checkSchemaVersionAndLock(false);
if (lock)
std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(false);
if (ok)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check OK, no syncing required.");
return lock;
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK, no syncing required.");
storage = storage_;
table_lock = lock;
return;
}
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check not OK.");
}

/// If first try failed, sync schema and check again.
/// If first try failed, sync schema and try again.
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", not OK, syncing schemas.");
context.getTMTContext().getSchemaSyncer()->syncSchemas(context);

auto [lock, storage_schema_version] = checkSchemaVersionAndLock(true);
if (lock)
std::tie(storage_, lock, storage_schema_version, ok) = get_and_lock_storage(true);
if (ok)
{
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " storage schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", schema check OK after syncing.");
return lock;
LOG_DEBUG(log, __PRETTY_FUNCTION__ << " Table " << qualified_name << " schema version: " << storage_schema_version << ", query schema version: " << schema_version << ", OK after syncing.");
storage = storage_;
table_lock = lock;
return;
}

throw Exception("Shouldn't reach here", ErrorCodes::UNKNOWN_EXCEPTION);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/IBlockInputStream.h>
#include <Storages/Transaction/Types.h>


namespace Poco { class Logger; }
Expand Down Expand Up @@ -111,7 +112,7 @@ class InterpreterSelectQuery : public IInterpreter

void init(const Names & required_result_column_names);

TableStructureReadLockPtr alignStorageSchemaAndLock(Int64 schema_version);
void getAndLockStorageWithSchemaVersion(const String & database_name, const String & table_name, Int64 schema_version);

void executeImpl(Pipeline & pipeline, const BlockInputStreamPtr & input, bool dry_run);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct Settings

#define APPLY_FOR_SETTINGS(M) \
M(SettingString, regions, "", "the region need to be read.") \
M(SettingBool, resolve_locks, false, "tmt read tso.") \
M(SettingBool, resolve_locks, false, "tmt resolve locks.") \
M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \
M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
Expand Down
26 changes: 26 additions & 0 deletions tests/mutable-test/txn_schema/drop_on_read.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
=> DBGInvoke __enable_schema_sync_service('false')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)
=> DBGInvoke __mock_schema_syncer('true')

=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __drop_tidb_table(default, test, 'false')
=> select * from default.test
=> select * from default.test " --schema_version "100
Received exception from server (version {#WORD}):
Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist..

=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Nullable(Int8)')
=> select * from default.test
Received exception from server (version {#WORD}):
Code: 60. DB::Exception: Received from {#WORD} DB::Exception: Table default.test doesn't exist..
=> select * from default.test " --schema_version "100

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
=> DBGInvoke __enable_schema_sync_service('true')