diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 84882c4597d..dbc3c9986f3 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -1,10 +1,15 @@ +#include #include #include #include #include +#include #include +#include +#include #include #include +#include #include #include #include @@ -21,6 +26,7 @@ namespace DB namespace ErrorCodes { extern const int BAD_ARGUMENTS; +extern const int LOGICA_ERROR; } // namespace ErrorCodes using DAGField = std::pair; @@ -98,6 +104,110 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args) return outputDAGResponse(context, schema, dag_response); } +struct ExecutorCtx +{ + tipb::Executor * input; + DAGSchema output; + std::unordered_map col_ref_map; +}; + +void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::unordered_set & referred_columns, + std::unordered_map & col_ref_map) +{ + if (ASTIdentifier * id = typeid_cast(ast.get())) + { + auto ft = std::find_if(input.begin(), input.end(), [&](const auto & field) { return field.first == id->getColumnName(); }); + if (ft == input.end()) + throw DB::Exception("No such column " + id->getColumnName(), ErrorCodes::NO_SUCH_COLUMN_IN_TABLE); + expr->set_tp(tipb::ColumnRef); + *(expr->mutable_field_type()) = (*ft).second; + + referred_columns.emplace((*ft).first); + col_ref_map.emplace((*ft).first, expr); + } + else if (ASTFunction * func = typeid_cast(ast.get())) + { + // TODO: Support agg functions. + for (const auto & child_ast : func->arguments->children) + { + tipb::Expr * child = expr->add_children(); + compileExpr(input, child_ast, child, referred_columns, col_ref_map); + } + + String func_name_lowercase = Poco::toLower(func->name); + // TODO: Support more functions. + // TODO: Support type inference. + if (func_name_lowercase == "equals") + { + expr->set_sig(tipb::ScalarFuncSig::EQInt); + auto * ft = expr->mutable_field_type(); + // TODO: TiDB will infer Int64. + ft->set_tp(TiDB::TypeTiny); + ft->set_flag(TiDB::ColumnFlagUnsigned); + } + else if (func_name_lowercase == "and") + { + expr->set_sig(tipb::ScalarFuncSig::LogicalAnd); + auto * ft = expr->mutable_field_type(); + // TODO: TiDB will infer Int64. + ft->set_tp(TiDB::TypeTiny); + ft->set_flag(TiDB::ColumnFlagUnsigned); + } + else if (func_name_lowercase == "or") + { + expr->set_sig(tipb::ScalarFuncSig::LogicalOr); + auto * ft = expr->mutable_field_type(); + // TODO: TiDB will infer Int64. + ft->set_tp(TiDB::TypeTiny); + ft->set_flag(TiDB::ColumnFlagUnsigned); + } + else + { + throw DB::Exception("Unsupported function: " + func_name_lowercase, ErrorCodes::LOGICAL_ERROR); + } + expr->set_tp(tipb::ExprType::ScalarFunc); + } + else if (ASTLiteral * lit = typeid_cast(ast.get())) + { + std::stringstream ss; + switch (lit->value.getType()) + { + case Field::Types::Which::Null: + expr->set_tp(tipb::Null); + // Null literal expr doesn't need value. + break; + case Field::Types::Which::UInt64: + expr->set_tp(tipb::Uint64); + encodeDAGUInt64(lit->value.get(), ss); + break; + case Field::Types::Which::Int64: + expr->set_tp(tipb::Int64); + encodeDAGInt64(lit->value.get(), ss); + break; + case Field::Types::Which::Float64: + expr->set_tp(tipb::Float64); + encodeDAGFloat64(lit->value.get(), ss); + break; + case Field::Types::Which::Decimal: + expr->set_tp(tipb::MysqlDecimal); + encodeDAGDecimal(lit->value.get(), ss); + break; + case Field::Types::Which::String: + expr->set_tp(tipb::String); + // TODO: Align with TiDB. + encodeDAGBytes(lit->value.get(), ss); + break; + default: + throw DB::Exception(String("Unsupported literal type: ") + lit->value.getTypeName(), ErrorCodes::LOGICAL_ERROR); + } + expr->set_val(ss.str()); + } + else + { + throw DB::Exception("Unsupported expression " + ast->getColumnName(), ErrorCodes::LOGICAL_ERROR); + } +} + std::tuple compileQuery( Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts) { @@ -110,49 +220,244 @@ std::tuple compileQuery( ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "from DAG compiler", 0); ASTSelectQuery & ast_query = typeid_cast(*ast); - String database_name, table_name; - auto query_database = ast_query.database(); - auto query_table = ast_query.table(); - if (query_database) - database_name = typeid_cast(*query_database).name; - if (query_table) - table_name = typeid_cast(*query_table).name; - if (!query_table) + /// Get table metadata. + TiDB::TableInfo table_info; + { + String database_name, table_name; + auto query_database = ast_query.database(); + auto query_table = ast_query.table(); + if (query_database) + database_name = typeid_cast(*query_database).name; + if (query_table) + table_name = typeid_cast(*query_table).name; + if (!query_table) + { + database_name = "system"; + table_name = "one"; + } + else if (!query_database) + { + database_name = context.getCurrentDatabase(); + } + + table_info = schema_fetcher(database_name, table_name); + } + + std::unordered_map executor_ctx_map; + std::unordered_set referred_columns; + tipb::TableScan * ts = nullptr; + tipb::Executor * last_executor = nullptr; + + /// Table scan. + { + tipb::Executor * ts_exec = dag_request.add_executors(); + ts_exec->set_tp(tipb::ExecType::TypeTableScan); + ts = ts_exec->mutable_tbl_scan(); + ts->set_table_id(table_info.id); + DAGSchema ts_output; + for (const auto & column_info : table_info.columns) + { + tipb::FieldType field_type; + field_type.set_tp(column_info.tp); + field_type.set_flag(column_info.flag); + field_type.set_flen(column_info.flen); + field_type.set_decimal(column_info.decimal); + ts_output.emplace_back(std::make_pair(column_info.name, std::move(field_type))); + } + executor_ctx_map.emplace(ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map{}}); + last_executor = ts_exec; + } + + /// Filter. + if (ast_query.where_expression) + { + tipb::Executor * filter_exec = dag_request.add_executors(); + filter_exec->set_tp(tipb::ExecType::TypeSelection); + tipb::Selection * filter = filter_exec->mutable_selection(); + tipb::Expr * cond = filter->add_conditions(); + std::unordered_map col_ref_map; + compileExpr(executor_ctx_map[last_executor].output, ast_query.where_expression, cond, referred_columns, col_ref_map); + executor_ctx_map.emplace(filter_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::move(col_ref_map)}); + last_executor = filter_exec; + } + + /// TopN. + if (ast_query.order_expression_list && ast_query.limit_length) { - database_name = "system"; - table_name = "one"; + tipb::Executor * topn_exec = dag_request.add_executors(); + topn_exec->set_tp(tipb::ExecType::TypeTopN); + tipb::TopN * topn = topn_exec->mutable_topn(); + std::unordered_map col_ref_map; + for (const auto & child : ast_query.order_expression_list->children) + { + ASTOrderByElement * elem = typeid_cast(child.get()); + if (!elem) + throw DB::Exception("Invalid order by element", ErrorCodes::LOGICAL_ERROR); + tipb::ByItem * by = topn->add_order_by(); + by->set_desc(elem->direction < 0); + tipb::Expr * expr = by->mutable_expr(); + compileExpr(executor_ctx_map[last_executor].output, elem->children[0], expr, referred_columns, col_ref_map); + } + auto limit = safeGet(typeid_cast(*ast_query.limit_length).value); + topn->set_limit(limit); + executor_ctx_map.emplace(topn_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::move(col_ref_map)}); + last_executor = topn_exec; } - else if (!query_database) + else if (ast_query.limit_length) { - database_name = context.getCurrentDatabase(); + tipb::Executor * limit_exec = dag_request.add_executors(); + limit_exec->set_tp(tipb::ExecType::TypeLimit); + tipb::Limit * limit = limit_exec->mutable_limit(); + auto limit_length = safeGet(typeid_cast(*ast_query.limit_length).value); + limit->set_limit(limit_length); + executor_ctx_map.emplace( + limit_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::unordered_map{}}); + last_executor = limit_exec; } - auto table_info = schema_fetcher(database_name, table_name); - - tipb::Executor * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeTableScan); - tipb::TableScan * ts = executor->mutable_tbl_scan(); - ts->set_table_id(table_info.id); - size_t i = 0; - for (const auto & column_info : table_info.columns) + + /// Column pruner. + std::function column_pruner = [&](ExecutorCtx & executor_ctx) { + if (!executor_ctx.input) + { + executor_ctx.output.erase(std::remove_if(executor_ctx.output.begin(), executor_ctx.output.end(), + [&](const auto & field) { return referred_columns.count(field.first) == 0; }), + executor_ctx.output.end()); + + for (const auto & field : executor_ctx.output) + { + tipb::ColumnInfo * ci = ts->add_columns(); + ci->set_column_id(table_info.getColumnID(field.first)); + ci->set_tp(field.second.tp()); + ci->set_flag(field.second.flag()); + ci->set_columnlen(field.second.flen()); + ci->set_decimal(field.second.decimal()); + } + + return; + } + column_pruner(executor_ctx_map[executor_ctx.input]); + const auto & last_output = executor_ctx_map[executor_ctx.input].output; + for (const auto & pair : executor_ctx.col_ref_map) + { + auto iter = std::find_if(last_output.begin(), last_output.end(), [&](const auto & field) { return field.first == pair.first; }); + if (iter == last_output.end()) + throw DB::Exception("Column not found when pruning: " + pair.first, ErrorCodes::LOGICAL_ERROR); + std::stringstream ss; + encodeDAGInt64(iter - last_output.begin(), ss); + pair.second->set_val(ss.str()); + } + executor_ctx.output = last_output; + }; + + /// Aggregation finalize. { - tipb::ColumnInfo * ci = ts->add_columns(); - ci->set_column_id(column_info.id); - ci->set_tp(column_info.tp); - ci->set_flag(column_info.flag); + bool has_gby = ast_query.group_expression_list != nullptr; + bool has_agg_func = false; + for (const auto & child : ast_query.select_expression_list->children) + { + const ASTFunction * func = typeid_cast(child.get()); + if (func && AggregateFunctionFactory::instance().isAggregateFunctionName(func->name)) + { + has_agg_func = true; + break; + } + } + + if (has_gby || has_agg_func) + { + if (last_executor->has_limit() || last_executor->has_topn()) + throw DB::Exception("Limit/TopN and Agg cannot co-exist.", ErrorCodes::LOGICAL_ERROR); + + tipb::Executor * agg_exec = dag_request.add_executors(); + agg_exec->set_tp(tipb::ExecType::TypeAggregation); + tipb::Aggregation * agg = agg_exec->mutable_aggregation(); + std::unordered_map col_ref_map; + for (const auto & expr : ast_query.select_expression_list->children) + { + const ASTFunction * func = typeid_cast(expr.get()); + if (!func || !AggregateFunctionFactory::instance().isAggregateFunctionName(func->name)) + throw DB::Exception("Only agg function is allowed in select for a query with aggregation", ErrorCodes::LOGICAL_ERROR); + + tipb::Expr * agg_func = agg->add_agg_func(); - tipb::FieldType field_type; - field_type.set_tp(column_info.tp); - field_type.set_flag(column_info.flag); - field_type.set_flen(column_info.flen); - field_type.set_decimal(column_info.decimal); - schema.emplace_back(std::make_pair(column_info.name, std::move(field_type))); + for (const auto & arg : func->arguments->children) + { + tipb::Expr * arg_expr = agg_func->add_children(); + compileExpr(executor_ctx_map[last_executor].output, arg, arg_expr, referred_columns, col_ref_map); + } - dag_request.add_output_offsets(i); + if (func->name == "count") + { + agg_func->set_tp(tipb::Count); + auto ft = agg_func->mutable_field_type(); + ft->set_tp(TiDB::TypeLongLong); + ft->set_flag(TiDB::ColumnFlagUnsigned | TiDB::ColumnFlagNotNull); + } + // TODO: Other agg func. + else + { + throw DB::Exception("Unsupported agg function " + func->name, ErrorCodes::LOGICAL_ERROR); + } - i++; + schema.emplace_back(std::make_pair(func->getColumnName(), agg_func->field_type())); + } + + if (has_gby) + { + for (const auto & child : ast_query.group_expression_list->children) + { + tipb::Expr * gby = agg->add_group_by(); + compileExpr(executor_ctx_map[last_executor].output, child, gby, referred_columns, col_ref_map); + schema.emplace_back(std::make_pair(child->getColumnName(), gby->field_type())); + } + } + + executor_ctx_map.emplace(agg_exec, ExecutorCtx{last_executor, DAGSchema{}, std::move(col_ref_map)}); + last_executor = agg_exec; + + column_pruner(executor_ctx_map[last_executor]); + } } - // TODO: Other operator compile. + /// Non-aggregation finalize. + if (!last_executor->has_aggregation()) + { + std::vector final_output; + for (const auto & expr : ast_query.select_expression_list->children) + { + if (ASTIdentifier * id = typeid_cast(expr.get())) + { + referred_columns.emplace(id->getColumnName()); + final_output.emplace_back(id->getColumnName()); + } + else if (typeid_cast(expr.get())) + { + const auto & last_output = executor_ctx_map[last_executor].output; + for (const auto & field : last_output) + { + referred_columns.emplace(field.first); + final_output.push_back(field.first); + } + } + else + { + throw DB::Exception("Unsupported expression type in select", ErrorCodes::LOGICAL_ERROR); + } + } + + column_pruner(executor_ctx_map[last_executor]); + + const auto & last_output = executor_ctx_map[last_executor].output; + for (const auto & field : final_output) + { + auto iter + = std::find_if(last_output.begin(), last_output.end(), [&](const auto & last_field) { return last_field.first == field; }); + if (iter == last_output.end()) + throw DB::Exception("Column not found after pruning: " + field, ErrorCodes::LOGICAL_ERROR); + dag_request.add_output_offsets(iter - last_output.begin()); + schema.push_back(*iter); + } + } return std::make_tuple(table_info.id, std::move(schema), std::move(dag_request)); } @@ -160,14 +465,21 @@ std::tuple compileQuery( tipb::SelectResponse executeDAGRequest( Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version) { + static Logger * log = &Logger::get("MockDAG"); + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handling DAG request: " << dag_request.DebugString()); + context.setSetting("dag_planner", "optree"); tipb::SelectResponse dag_response; DAGDriver driver(context, dag_request, region_id, region_version, region_conf_version, dag_response, true); driver.execute(); + LOG_DEBUG(log, __PRETTY_FUNCTION__ << ": Handle DAG request done"); return dag_response; } BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse & dag_response) { + if (dag_response.has_error()) + throw DB::Exception(dag_response.error().msg(), dag_response.error().code()); + BlocksList blocks; for (const auto & chunk : dag_response.chunks()) { diff --git a/dbms/src/Flash/Coprocessor/DAGCodec.cpp b/dbms/src/Flash/Coprocessor/DAGCodec.cpp new file mode 100644 index 00000000000..9d809cc1258 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGCodec.cpp @@ -0,0 +1,65 @@ +#include + +#include +#include + +namespace DB +{ + +void encodeDAGInt64(Int64 i, std::stringstream & ss) +{ + auto u = RecordKVFormat::encodeInt64(i); + ss.write(reinterpret_cast(&u), sizeof(u)); +} + +void encodeDAGUInt64(UInt64 i, std::stringstream & ss) +{ + auto u = RecordKVFormat::encodeUInt64(i); + ss.write(reinterpret_cast(&u), sizeof(u)); +} + +void encodeDAGFloat32(Float32 f, std::stringstream & ss) { EncodeFloat64(f, ss); } + +void encodeDAGFloat64(Float64 f, std::stringstream & ss) { EncodeFloat64(f, ss); } + +void encodeDAGString(const String & s, std::stringstream & ss) { ss << s; } + +void encodeDAGBytes(const String & bytes, std::stringstream & ss) { ss << bytes; } + +void encodeDAGDecimal(const Decimal & d, std::stringstream & ss) { EncodeDecimal(d, ss); } + +Int64 decodeDAGInt64(const String & s) +{ + auto u = *(reinterpret_cast(s.data())); + return RecordKVFormat::decodeInt64(u); +} + +UInt64 decodeDAGUInt64(const String & s) +{ + auto u = *(reinterpret_cast(s.data())); + return RecordKVFormat::decodeUInt64(u); +} + +Float32 decodeDAGFloat32(const String & s) +{ + size_t cursor = 0; + return DecodeFloat64(cursor, s); +} + +Float64 decodeDAGFloat64(const String & s) +{ + size_t cursor = 0; + return DecodeFloat64(cursor, s); +} + +String decodeDAGString(const String & s) { return s; } + +String decodeDAGBytes(const String & s) { return s; } + +Decimal decodeDAGDecimal(const String & s) +{ + size_t cursor = 0; + return DecodeDecimal(cursor, s); +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGCodec.h b/dbms/src/Flash/Coprocessor/DAGCodec.h new file mode 100644 index 00000000000..faecf74df1f --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DAGCodec.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include + +namespace DB +{ + +void encodeDAGInt64(Int64, std::stringstream &); +void encodeDAGUInt64(UInt64, std::stringstream &); +void encodeDAGFloat32(Float32, std::stringstream &); +void encodeDAGFloat64(Float64, std::stringstream &); +void encodeDAGString(const String &, std::stringstream &); +void encodeDAGBytes(const String &, std::stringstream &); +void encodeDAGDecimal(const Decimal &, std::stringstream &); + +Int64 decodeDAGInt64(const String &); +UInt64 decodeDAGUInt64(const String &); +Float32 decodeDAGFloat32(const String &); +Float64 decodeDAGFloat64(const String &); +String decodeDAGString(const String &); +String decodeDAGBytes(const String &); +Decimal decodeDAGDecimal(const String &); + +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index ee125ce3f97..79720f0b37b 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -1,9 +1,8 @@ #include #include +#include #include -#include -#include #include @@ -51,7 +50,6 @@ const String & getFunctionName(const tipb::Expr & expr) String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col, bool for_parser) { std::stringstream ss; - size_t cursor = 0; Int64 column_id = 0; String func_name; Field f; @@ -60,19 +58,21 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col case tipb::ExprType::Null: return "NULL"; case tipb::ExprType::Int64: - return std::to_string(RecordKVFormat::decodeInt64(RecordKVFormat::read(expr.val().data()))); + return std::to_string(decodeDAGInt64(expr.val())); case tipb::ExprType::Uint64: - return std::to_string(DecodeInt(cursor, expr.val())); + return std::to_string(decodeDAGUInt64(expr.val())); case tipb::ExprType::Float32: + return std::to_string(decodeDAGFloat32(expr.val())); case tipb::ExprType::Float64: - return std::to_string(DecodeFloat64(cursor, expr.val())); + return std::to_string(decodeDAGFloat64(expr.val())); case tipb::ExprType::String: + return decodeDAGString(expr.val()); case tipb::ExprType::Bytes: - return expr.val(); + return decodeDAGBytes(expr.val()); case tipb::ExprType::MysqlDecimal: - return DecodeDecimal(cursor, expr.val()).toString(); + return decodeDAGDecimal(expr.val()).toString(); case tipb::ExprType::ColumnRef: - column_id = RecordKVFormat::decodeInt64(RecordKVFormat::read(expr.val().data())); + column_id = decodeDAGInt64(expr.val()); if (column_id < 0 || column_id >= (ColumnID)input_col.size()) { throw Exception("Column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST); @@ -191,23 +191,24 @@ bool isColumnExpr(const tipb::Expr & expr) { return expr.tp() == tipb::ExprType: Field decodeLiteral(const tipb::Expr & expr) { - size_t cursor = 0; switch (expr.tp()) { case tipb::ExprType::Null: return Field(); case tipb::ExprType::Int64: - return RecordKVFormat::decodeInt64(RecordKVFormat::read(expr.val().data())); + return decodeDAGInt64(expr.val()); case tipb::ExprType::Uint64: - return DecodeInt(cursor, expr.val()); + return decodeDAGUInt64(expr.val()); case tipb::ExprType::Float32: + return Float64(decodeDAGFloat32(expr.val())); case tipb::ExprType::Float64: - return DecodeFloat64(cursor, expr.val()); + return decodeDAGFloat64(expr.val()); case tipb::ExprType::String: + return decodeDAGString(expr.val()); case tipb::ExprType::Bytes: - return expr.val(); + return decodeDAGBytes(expr.val()); case tipb::ExprType::MysqlDecimal: - return DecodeDecimal(cursor, expr.val()); + return decodeDAGDecimal(expr.val()); case tipb::ExprType::MysqlBit: case tipb::ExprType::MysqlDuration: case tipb::ExprType::MysqlEnum: @@ -224,7 +225,7 @@ Field decodeLiteral(const tipb::Expr & expr) ColumnID getColumnID(const tipb::Expr & expr) { - auto column_id = RecordKVFormat::decodeInt64(RecordKVFormat::read(expr.val().data())); + auto column_id = decodeDAGInt64(expr.val()); return column_id; } diff --git a/dbms/src/Flash/Coprocessor/tests/CMakeLists.txt b/dbms/src/Flash/Coprocessor/tests/CMakeLists.txt index c236d367c5d..b8e4b57cbca 100644 --- a/dbms/src/Flash/Coprocessor/tests/CMakeLists.txt +++ b/dbms/src/Flash/Coprocessor/tests/CMakeLists.txt @@ -1,4 +1 @@ include_directories (${CMAKE_CURRENT_BINARY_DIR}) - -add_executable (cop_test cop_test.cpp) -target_link_libraries (cop_test dbms) diff --git a/dbms/src/Flash/Coprocessor/tests/cop_test.cpp b/dbms/src/Flash/Coprocessor/tests/cop_test.cpp deleted file mode 100644 index 4babeececd4..00000000000 --- a/dbms/src/Flash/Coprocessor/tests/cop_test.cpp +++ /dev/null @@ -1,332 +0,0 @@ -#include -#include - -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wunused-parameter" -#include -#include -#include -#include -#pragma GCC diagnostic pop - -#include - - -using ChannelPtr = std::shared_ptr; -using SubPtr = std::shared_ptr; -static const int DAGREQUEST = 103; -class FlashClient -{ -private: - SubPtr sp; - -public: - static std::string decodeDatumToString(size_t & cursor, const std::string & raw_data) - { - switch (raw_data[cursor++]) - { - case TiDB::CodecFlagNil: - return "NULL"; - case TiDB::CodecFlagInt: - return std::to_string(DB::DecodeInt(cursor, raw_data)); - case TiDB::CodecFlagUInt: - return std::to_string(DB::DecodeInt(cursor, raw_data)); - case TiDB::CodecFlagBytes: - return DB::DecodeBytes(cursor, raw_data); - case TiDB::CodecFlagCompactBytes: - return DB::DecodeCompactBytes(cursor, raw_data); - case TiDB::CodecFlagFloat: - return std::to_string(DB::DecodeFloat64(cursor, raw_data)); - case TiDB::CodecFlagVarUInt: - return std::to_string(DB::DecodeVarUInt(cursor, raw_data)); - case TiDB::CodecFlagVarInt: - return std::to_string(DB::DecodeVarInt(cursor, raw_data)); - case TiDB::CodecFlagDuration: - throw DB::Exception("Not implented yet. DecodeDatum: CodecFlagDuration"); - case TiDB::CodecFlagDecimal: - return DB::DecodeDecimal(cursor, raw_data).toString(); - default: - throw DB::Exception("Unknown Type:" + std::to_string(raw_data[cursor - 1])); - } - } - - FlashClient(ChannelPtr cp) : sp(tikvpb::Tikv::NewStub(cp)) {} - grpc::Status coprocessor(coprocessor::Request * rqst, size_t output_column_num) - { - grpc::ClientContext clientContext; - clientContext.AddMetadata("user_name", ""); - clientContext.AddMetadata("dag_planner", "optree"); - clientContext.AddMetadata("dag_expr_field_type_strict_check", "0"); - coprocessor::Response response; - grpc::Status status = sp->Coprocessor(&clientContext, *rqst, &response); - if (status.ok()) - { - // if status is ok, try to decode the result - tipb::SelectResponse selectResponse; - if (selectResponse.ParseFromString(response.data())) - { - if (selectResponse.has_error()) - { - std::cout << "Coprocessor request failed, error code " << selectResponse.error().code() << " error msg " - << selectResponse.error().msg(); - return status; - } - for (const tipb::Chunk & chunk : selectResponse.chunks()) - { - size_t cursor = 0; - const std::string & data = chunk.rows_data(); - while (cursor < data.size()) - { - for (size_t i = 0; i < output_column_num; i++) - { - std::cout << decodeDatumToString(cursor, data) << " "; - } - std::cout << std::endl; - } - } - std::cout << "Execute summary: " << std::endl; - for (int i = 0; i < selectResponse.execution_summaries_size(); i++) - { - auto & summary = selectResponse.execution_summaries(i); - std::cout << "Executor " << i; - std::cout << " time = " << summary.time_processed_ns() << " ns "; - std::cout << " rows = " << summary.num_produced_rows(); - std::cout << " iter nums = " << summary.num_iterations(); - std::cout << std::endl; - } - } - } - else - { - std::cout << "Coprocessor request failed, error code " << status.error_code() << " error msg " << status.error_message(); - } - return status; - } -}; - -using ClientPtr = std::shared_ptr; - -void appendTS(tipb::DAGRequest & dag_request, size_t & result_field_num) -{ - // table scan: s,i - tipb::Executor * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeTableScan); - tipb::TableScan * ts = executor->mutable_tbl_scan(); - ts->set_table_id(44); - tipb::ColumnInfo * ci = ts->add_columns(); - ci->set_column_id(1); - ci->set_tp(0xfe); - ci->set_flag(0); - ci = ts->add_columns(); - ci->set_column_id(2); - ci->set_tp(8); - ci->set_flag(0); - dag_request.add_output_offsets(1); - dag_request.add_output_offsets(0); - dag_request.add_output_offsets(1); - result_field_num = 3; -} - -void appendSelection(tipb::DAGRequest & dag_request) -{ - // selection: less(i, 123) - auto * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeSelection); - tipb::Selection * selection = executor->mutable_selection(); - tipb::Expr * expr = selection->add_conditions(); - expr->set_tp(tipb::ExprType::ScalarFunc); - expr->set_sig(tipb::ScalarFuncSig::LTInt); - tipb::Expr * col = expr->add_children(); - tipb::Expr * value = expr->add_children(); - col->set_tp(tipb::ExprType::ColumnRef); - std::stringstream ss; - DB::EncodeNumber(1, ss); - col->set_val(ss.str()); - auto * type = col->mutable_field_type(); - type->set_tp(8); - type->set_flag(0); - value->set_tp(tipb::ExprType::Int64); - ss.str(""); - DB::EncodeNumber(10, ss); - value->set_val(std::string(ss.str())); - type = value->mutable_field_type(); - type->set_tp(8); - type->set_flag(1); - type = expr->mutable_field_type(); - type->set_tp(1); - type->set_flag(1 << 5); - - // selection i in (5,10,11) - selection->clear_conditions(); - expr = selection->add_conditions(); - expr->set_tp(tipb::ExprType::ScalarFunc); - expr->set_sig(tipb::ScalarFuncSig::InInt); - col = expr->add_children(); - col->set_tp(tipb::ExprType::ColumnRef); - ss.str(""); - DB::EncodeNumber(1, ss); - col->set_val(ss.str()); - type = col->mutable_field_type(); - type->set_tp(8); - type->set_flag(0); - value = expr->add_children(); - value->set_tp(tipb::ExprType::Int64); - ss.str(""); - DB::EncodeNumber(10, ss); - value->set_val(std::string(ss.str())); - type = value->mutable_field_type(); - type->set_tp(8); - type->set_flag(1); - type = expr->mutable_field_type(); - type->set_tp(1); - type->set_flag(1 << 5); - value = expr->add_children(); - value->set_tp(tipb::ExprType::Int64); - ss.str(""); - DB::EncodeNumber(5, ss); - value->set_val(std::string(ss.str())); - type = value->mutable_field_type(); - type->set_tp(8); - type->set_flag(1); - type = expr->mutable_field_type(); - type->set_tp(1); - type->set_flag(1 << 5); - value = expr->add_children(); - value->set_tp(tipb::ExprType::Int64); - ss.str(""); - DB::EncodeNumber(11, ss); - value->set_val(std::string(ss.str())); - type = value->mutable_field_type(); - type->set_tp(8); - type->set_flag(1); - type = expr->mutable_field_type(); - type->set_tp(1); - type->set_flag(1 << 5); - - // selection i is null - /* - selection->clear_conditions(); - expr = selection->add_conditions(); - expr->set_tp(tipb::ExprType::ScalarFunc); - expr->set_sig(tipb::ScalarFuncSig::IntIsNull); - col = expr->add_children(); - col->set_tp(tipb::ExprType::ColumnRef); - ss.str(""); - DB::EncodeNumber(1, ss); - col->set_val(ss.str()); - */ -} - -void appendAgg(tipb::DAGRequest & dag_request, size_t & result_field_num) -{ - // agg: count(s) group by i; - auto * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeAggregation); - auto agg = executor->mutable_aggregation(); - auto agg_func = agg->add_agg_func(); - agg_func->set_tp(tipb::ExprType::Count); - auto child = agg_func->add_children(); - child->set_tp(tipb::ExprType::ColumnRef); - std::stringstream ss; - DB::EncodeNumber(0, ss); - child->set_val(ss.str()); - auto f_type = agg_func->mutable_field_type(); - f_type->set_tp(3); - f_type->set_flag(33); - auto group_col = agg->add_group_by(); - group_col->set_tp(tipb::ExprType::ColumnRef); - ss.str(""); - DB::EncodeNumber(1, ss); - group_col->set_val(ss.str()); - f_type = group_col->mutable_field_type(); - f_type->set_tp(8); - f_type->set_flag(1); - result_field_num = 2; -} - -void appendTopN(tipb::DAGRequest & dag_request) -{ - auto * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeTopN); - tipb::TopN * topN = executor->mutable_topn(); - topN->set_limit(3); - tipb::ByItem * byItem = topN->add_order_by(); - byItem->set_desc(false); - tipb::Expr * expr1 = byItem->mutable_expr(); - expr1->set_tp(tipb::ExprType::ColumnRef); - std::stringstream ss; - DB::EncodeNumber(1, ss); - expr1->set_val(ss.str()); - auto * type = expr1->mutable_field_type(); - type->set_tp(8); - type->set_tp(0); -} - -void appendLimit(tipb::DAGRequest & dag_request) -{ - auto * executor = dag_request.add_executors(); - executor->set_tp(tipb::ExecType::TypeLimit); - tipb::Limit * limit = executor->mutable_limit(); - limit->set_limit(5); -} - -grpc::Status rpcTest() -{ - ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials()); - ClientPtr clientPtr = std::make_shared(cp); - size_t result_field_num = 0; - bool has_selection = true; - bool has_agg = false; - bool has_topN = true; - bool has_limit = false; - // construct a dag request - tipb::DAGRequest dagRequest; - dagRequest.set_start_ts(18446744073709551615uL); - - appendTS(dagRequest, result_field_num); - if (has_selection) - appendSelection(dagRequest); - if (has_agg) - appendAgg(dagRequest, result_field_num); - if (has_topN) - appendTopN(dagRequest); - if (has_limit) - appendLimit(dagRequest); - - // construct a coprocessor request - coprocessor::Request request; - //todo add context info - kvrpcpb::Context * ctx = request.mutable_context(); - ctx->set_region_id(2); - auto region_epoch = ctx->mutable_region_epoch(); - region_epoch->set_version(21); - region_epoch->set_conf_ver(2); - request.set_tp(DAGREQUEST); - request.set_data(dagRequest.SerializeAsString()); - //request.add_ranges(); - return clientPtr->coprocessor(&request, result_field_num); -} - -void codecTest() -{ - Int64 i = 123; - std::stringstream ss; - DB::EncodeNumber(i, ss); - std::string val = ss.str(); - std::stringstream decode_ss; - size_t cursor = 0; - DB::Field f = DB::DecodeDatum(cursor, val); - Int64 r = f.get(); - r++; -} - -int main() -{ - // std::cout << "Before rpcTest"<< std::endl; - grpc::Status ret = rpcTest(); - // codecTest(); - // std::cout << "End rpcTest " << std::endl; - // std::cout << "The ret is " << ret.error_code() << " " << ret.error_details() - // << " " << ret.error_message() << std::endl; - return 0; -} diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index 614f3df2703..fb83777423d 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -97,6 +97,16 @@ enum TP M(PartKey, (1 << 14)) \ M(Num, (1 << 15)) +enum ColumnFlag +{ +#ifdef M +#error "Please undefine macro M first." +#endif +#define M(cf, v) ColumnFlag##cf = v, + COLUMN_FLAGS(M) +#undef M +}; + // Codec flags. // In format: TiDB codec flag, int value. #ifdef M diff --git a/tests/mutable-test/txn_dag/aggregation.test b/tests/mutable-test/txn_dag/aggregation.test new file mode 100644 index 00000000000..0f8ec4c30e3 --- /dev/null +++ b/tests/mutable-test/txn_dag/aggregation.test @@ -0,0 +1,32 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 52, 'test3', 777) + +# DAG read by not specifying region id, group by. +=> DBGInvoke dag('select count(col_1) from default.test group by col_2') +┌─count(col_1)─┬─col_2─┐ +│ 2 │ 666 │ +│ 1 │ 777 │ +└──────────────┴───────┘ + +# DAG read by explicitly specifying region id, where + group by. +=> DBGInvoke dag('select count(col_1) from default.test where col_2 = 666 group by col_2', 4) +┌─count(col_1)─┬─col_2─┐ +│ 2 │ 666 │ +└──────────────┴───────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_dag/filter.test b/tests/mutable-test/txn_dag/filter.test new file mode 100644 index 00000000000..9045b9da1b4 --- /dev/null +++ b/tests/mutable-test/txn_dag/filter.test @@ -0,0 +1,37 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 777) + +# DAG read by not specifying region id, where col_1 = 666. +=> DBGInvoke dag('select * from default.test where col_2 = 666') +┌─col_1─┬─col_2─┐ +│ test1 │ 666 │ +└───────┴───────┘ + +# DAG read by explicitly specifying region id, where col_2 = 'test2'. +=> DBGInvoke dag('select col_2 from default.test where col_1 = \'test2\'', 4) +┌─col_2─┐ +│ 777 │ +└───────┘ + +# Mock DAG read, where or. +=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_1 = \'test2\' or col_2 = 666', 4) +┌─col_2─┬─col_1─┬─col_2─┐ +│ 666 │ test1 │ 666 │ +│ 777 │ test2 │ 777 │ +└───────┴───────┴───────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_dag/limit.test b/tests/mutable-test/txn_dag/limit.test new file mode 100644 index 00000000000..ee8d97f75a7 --- /dev/null +++ b/tests/mutable-test/txn_dag/limit.test @@ -0,0 +1,31 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test1', 666) + +# DAG read by not specifying region id, order by col_2 limit 1. +=> DBGInvoke dag('select * from default.test') +┌─col_1─┬─col_2─┐ +│ test1 │ 666 │ +│ test1 │ 666 │ +└───────┴───────┘ + +# Mock DAG read, where + topn. +=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_2 = 666 limit 1', 4) +┌─col_2─┬─col_1─┬─col_2─┐ +│ 666 │ test1 │ 666 │ +└───────┴───────┴───────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_dag/project.test b/tests/mutable-test/txn_dag/project.test new file mode 100644 index 00000000000..8b29b4a7a08 --- /dev/null +++ b/tests/mutable-test/txn_dag/project.test @@ -0,0 +1,41 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666) + +# DAG read by not specifying region id, select *. +=> DBGInvoke dag('select * from default.test') " --dag_planner="optree +┌─col_1─┬─col_2─┐ +│ test1 │ 666 │ +└───────┴───────┘ + +# DAG read by not specifying region id, select col_1. +=> DBGInvoke dag('select col_1 from default.test') " --dag_planner="optree +┌─col_1─┐ +│ test1 │ +└───────┘ + +# DAG read by explicitly specifying region id, select col_2. +=> DBGInvoke dag('select col_2 from default.test', 4) " --dag_planner="optree +┌─col_2─┐ +│ 666 │ +└───────┘ + +# Mock DAG read, select col_2, col_1, col_2. +=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test', 4) " --dag_planner="optree +┌─col_2─┬─col_1─┬─col_2─┐ +│ 666 │ test1 │ 666 │ +└───────┴───────┴───────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_dag/table_scan.test b/tests/mutable-test/txn_dag/table_scan.test index 28d6599f6de..953af0cef9d 100644 --- a/tests/mutable-test/txn_dag/table_scan.test +++ b/tests/mutable-test/txn_dag/table_scan.test @@ -6,7 +6,7 @@ => DBGInvoke __set_flush_threshold(1000000, 1000000) -# Data +# Data. => DBGInvoke __mock_tidb_table(default, test, 'col_1 String') => DBGInvoke __refresh_schemas() => DBGInvoke __put_region(4, 0, 100, default, test) diff --git a/tests/mutable-test/txn_dag/topn.test b/tests/mutable-test/txn_dag/topn.test new file mode 100644 index 00000000000..1708402ca40 --- /dev/null +++ b/tests/mutable-test/txn_dag/topn.test @@ -0,0 +1,30 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 777) + +# DAG read by not specifying region id, order by col_2 limit 1. +=> DBGInvoke dag('select * from default.test order by col_2 limit 1') +┌─col_1─┬─col_2─┐ +│ test1 │ 666 │ +└───────┴───────┘ + +# Mock DAG read, where + topn. +=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_1 = \'test2\' or col_2 = 666 order by col_1 desc limit 1', 4) +┌─col_2─┬─col_1─┬─col_2─┐ +│ 777 │ test2 │ 777 │ +└───────┴───────┴───────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test