diff --git a/dbms/src/Flash/Coprocessor/tests/cop_test.cpp b/dbms/src/Flash/Coprocessor/tests/cop_test.cpp index 47071244d20..b82525eb901 100644 --- a/dbms/src/Flash/Coprocessor/tests/cop_test.cpp +++ b/dbms/src/Flash/Coprocessor/tests/cop_test.cpp @@ -21,40 +21,78 @@ class FlashClient SubPtr sp; public: + static std::string decodeDatumToString(size_t & cursor, const std::string & raw_data) + { + switch (raw_data[cursor++]) + { + case TiDB::CodecFlagNil: + return "NULL"; + case TiDB::CodecFlagInt: + return std::to_string(DB::DecodeInt(cursor, raw_data)); + case TiDB::CodecFlagUInt: + return std::to_string(DB::DecodeInt(cursor, raw_data)); + case TiDB::CodecFlagBytes: + return DB::DecodeBytes(cursor, raw_data); + case TiDB::CodecFlagCompactBytes: + return DB::DecodeCompactBytes(cursor, raw_data); + case TiDB::CodecFlagFloat: + return std::to_string(DB::DecodeFloat64(cursor, raw_data)); + case TiDB::CodecFlagVarUInt: + return std::to_string(DB::DecodeVarUInt(cursor, raw_data)); + case TiDB::CodecFlagVarInt: + return std::to_string(DB::DecodeVarInt(cursor, raw_data)); + case TiDB::CodecFlagDuration: + throw DB::Exception("Not implented yet. DecodeDatum: CodecFlagDuration"); + case TiDB::CodecFlagDecimal: + return DB::DecodeDecimal(cursor, raw_data).toString(); + default: + throw DB::Exception("Unknown Type:" + std::to_string(raw_data[cursor - 1])); + } + } + FlashClient(ChannelPtr cp) : sp(tikvpb::Tikv::NewStub(cp)) {} - grpc::Status coprocessor(coprocessor::Request * rqst) + grpc::Status coprocessor(coprocessor::Request * rqst, size_t output_column_num) { grpc::ClientContext clientContext; clientContext.AddMetadata("user_name", ""); clientContext.AddMetadata("dag_planner", "optree"); coprocessor::Response response; grpc::Status status = sp->Coprocessor(&clientContext, *rqst, &response); - size_t column_num = 3; if (status.ok()) { // if status is ok, try to decode the result tipb::SelectResponse selectResponse; if (selectResponse.ParseFromString(response.data())) { - for (tipb::Chunk chunk : selectResponse.chunks()) + for (const tipb::Chunk & chunk : selectResponse.chunks()) { size_t cursor = 0; - std::vector row_result; const std::string & data = chunk.rows_data(); while (cursor < data.size()) { - row_result.push_back(DB::DecodeDatum(cursor, data)); - if (row_result.size() == column_num) + for (size_t i = 0; i < output_column_num; i++) { - //print the result - std::cout << row_result[0].get() << " " << row_result[1].get() << " " - << row_result[2].get() << std::endl; - row_result.clear(); + std::cout << decodeDatumToString(cursor, data) << " "; } + std::cout << std::endl; } } + std::cout << "Execute summary: " << std::endl; + for (int i = 0; i < selectResponse.execution_summaries_size(); i++) + { + auto & summary = selectResponse.execution_summaries(i); + std::cout << "Executor " << i; + std::cout << " time = " << summary.time_processed_ns() << " ns "; + std::cout << " rows = " << summary.num_produced_rows(); + std::cout << " iter nums = " << summary.num_iterations(); + std::cout << std::endl; + } } } + else + { + std::cout << "Coprocessor request failed, error code " << status.error_code() << " error msg " << status.error_message(); + } return status; } }; @@ -64,6 +102,7 @@ grpc::Status rpcTest() { ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials()); ClientPtr clientPtr = std::make_shared(cp); + size_t result_field_num = 0; // construct a dag request tipb::DAGRequest dagRequest; dagRequest.set_start_ts(18446744073709551615uL); @@ -75,14 +114,15 @@ grpc::Status rpcTest() tipb::ColumnInfo * ci = ts->add_columns(); ci->set_column_id(1); ci->set_tp(0xfe); - ci->set_flag(1); + ci->set_flag(0); ci = ts->add_columns(); ci->set_column_id(2); ci->set_tp(8); - ci->set_flag(1); + ci->set_flag(0); dagRequest.add_output_offsets(1); dagRequest.add_output_offsets(0); dagRequest.add_output_offsets(1); + result_field_num = 3; // selection: less(i, 123) executor = dagRequest.add_executors(); @@ -95,15 +135,23 @@ grpc::Status rpcTest() tipb::Expr * value = expr->add_children(); col->set_tp(tipb::ExprType::ColumnRef); std::stringstream ss; - DB::EncodeNumber(2, ss); + DB::EncodeNumber(1, ss); col->set_val(ss.str()); + auto * type = col->mutable_field_type(); + type->set_tp(8); + type->set_flag(0); value->set_tp(tipb::ExprType::Int64); ss.str(""); DB::EncodeNumber(10, ss); value->set_val(std::string(ss.str())); + type = value->mutable_field_type(); + type->set_tp(8); + type->set_flag(1); + type = expr->mutable_field_type(); + type->set_tp(1); + type->set_flag(1 << 5); // agg: count(s) group by i; - /* executor = dagRequest.add_executors(); executor->set_tp(tipb::ExecType::TypeAggregation); auto agg = executor->mutable_aggregation(); @@ -112,36 +160,44 @@ grpc::Status rpcTest() auto child = agg_func->add_children(); child->set_tp(tipb::ExprType::ColumnRef); ss.str(""); - DB::EncodeNumber(1, ss); + DB::EncodeNumber(0, ss); child->set_val(ss.str()); - auto type = agg_func->mutable_field_type(); - type->set_tp(3); - type->set_flag(33); + auto f_type = agg_func->mutable_field_type(); + f_type->set_tp(3); + f_type->set_flag(33); auto group_col = agg->add_group_by(); group_col->set_tp(tipb::ExprType::ColumnRef); ss.str(""); - DB::EncodeNumber(2,ss); + DB::EncodeNumber(1, ss); group_col->set_val(ss.str()); - */ + f_type = group_col->mutable_field_type(); + f_type->set_tp(8); + f_type->set_flag(1); + result_field_num = 2; // topn + /* executor = dagRequest.add_executors(); executor->set_tp(tipb::ExecType::TypeTopN); tipb::TopN * topN = executor->mutable_topn(); topN->set_limit(3); tipb::ByItem * byItem = topN->add_order_by(); - byItem->set_desc(true); + byItem->set_desc(false); tipb::Expr * expr1 = byItem->mutable_expr(); expr1->set_tp(tipb::ExprType::ColumnRef); ss.str(""); - DB::EncodeNumber(2, ss); + DB::EncodeNumber(1, ss); expr1->set_val(ss.str()); + type = expr1->mutable_field_type(); + type->set_tp(8); + type->set_tp(0); + */ // limit /* executor = dagRequest.add_executors(); executor->set_tp(tipb::ExecType::TypeLimit); tipb::Limit *limit = executor->mutable_limit(); - limit->set_limit(1); + limit->set_limit(5); */ @@ -156,7 +212,7 @@ grpc::Status rpcTest() request.set_tp(DAGREQUEST); request.set_data(dagRequest.SerializeAsString()); //request.add_ranges(); - return clientPtr->coprocessor(&request); + return clientPtr->coprocessor(&request, result_field_num); } void codecTest() diff --git a/dbms/src/Interpreters/DAGExpressionAnalyzer.cpp b/dbms/src/Interpreters/DAGExpressionAnalyzer.cpp index f8f230f5a79..8d36848c82c 100644 --- a/dbms/src/Interpreters/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Interpreters/DAGExpressionAnalyzer.cpp @@ -15,6 +15,7 @@ namespace DB namespace ErrorCodes { extern const int COP_BAD_DAG_REQUEST; +extern const int UNSUPPORTED_METHOD; } // namespace ErrorCodes static String genCastString(const String & org_name, const String & target_type_name) @@ -210,6 +211,7 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres DataTypePtr expected_type = getDataTypeByFieldType(expr.field_type()); DataTypePtr actual_type = actions->getSampleBlock().getByName(expr_name).type; //todo maybe use a more decent compare method + // todo ignore nullable info?? if (expected_type->getName() != actual_type->getName()) { // need to add cast function @@ -266,9 +268,9 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi else if (isColumnExpr(expr)) { ColumnID columnId = getColumnID(expr); - if (columnId < 1 || columnId > (ColumnID)getCurrentInputColumns().size()) + if (columnId < 0 || columnId >= (ColumnID)getCurrentInputColumns().size()) { - throw Exception("column id out of bound"); + throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST); } //todo check if the column type need to be cast to field type return expr_name; @@ -277,13 +279,13 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi { if (isAggFunctionExpr(expr)) { - throw Exception("agg function is not supported yet"); + throw Exception("agg function is not supported yet", ErrorCodes::UNSUPPORTED_METHOD); } const String & func_name = getFunctionName(expr); if (func_name == "in" || func_name == "notIn" || func_name == "globalIn" || func_name == "globalNotIn") { // todo support in - throw Exception(func_name + " is not supported yet"); + throw Exception(func_name + " is not supported yet", ErrorCodes::UNSUPPORTED_METHOD); } const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(func_name, context); @@ -292,15 +294,8 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi for (auto & child : expr.children()) { String name = getActions(child, actions); - if (actions->getSampleBlock().has(name)) - { - argument_names.push_back(name); - argument_types.push_back(actions->getSampleBlock().getByName(name).type); - } - else - { - throw Exception("Unknown expr: " + child.DebugString()); - } + argument_names.push_back(name); + argument_types.push_back(actions->getSampleBlock().getByName(name).type); } // re-construct expr_name, because expr_name generated previously is based on expr tree, @@ -319,7 +314,7 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi } else { - throw Exception("Unsupported expr type: " + getTypeName(expr)); + throw Exception("Unsupported expr type: " + getTypeName(expr), ErrorCodes::UNSUPPORTED_METHOD); } } } // namespace DB diff --git a/dbms/src/Interpreters/DAGUtils.cpp b/dbms/src/Interpreters/DAGUtils.cpp index 0cfa906cc02..ce1e88cd65b 100644 --- a/dbms/src/Interpreters/DAGUtils.cpp +++ b/dbms/src/Interpreters/DAGUtils.cpp @@ -92,11 +92,11 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col return DecodeBytes(cursor, expr.val()); case tipb::ExprType::ColumnRef: columnId = DecodeInt(cursor, expr.val()); - if (columnId < 1 || columnId > (ColumnID)input_col.size()) + if (columnId < 0 || columnId >= (ColumnID)input_col.size()) { throw Exception("out of bound"); } - return input_col.getNames()[columnId - 1]; + return input_col.getNames()[columnId]; case tipb::ExprType::Count: case tipb::ExprType::Sum: case tipb::ExprType::Avg: diff --git a/dbms/src/Interpreters/InterpreterDAG.cpp b/dbms/src/Interpreters/InterpreterDAG.cpp index d5d5c739508..5a12143d8bd 100644 --- a/dbms/src/Interpreters/InterpreterDAG.cpp +++ b/dbms/src/Interpreters/InterpreterDAG.cpp @@ -71,8 +71,10 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) // cid out of bound throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST); } - String name = storage->getTableInfo().columns[cid - 1].name; + String name = storage->getTableInfo().getColumnName(cid); required_columns.push_back(name); + NameAndTypePair nameAndTypePair = storage->getColumns().getPhysical(name); + source_columns.push_back(nameAndTypePair); } if (required_columns.empty()) { @@ -168,7 +170,6 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline) }); } ColumnsWithTypeAndName columnsWithTypeAndName = pipeline.firstStream()->getHeader().getColumnsWithTypeAndName(); - source_columns = storage->getColumns().getAllPhysical(); } InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions() diff --git a/dbms/src/Storages/Transaction/TiDB.cpp b/dbms/src/Storages/Transaction/TiDB.cpp index 83715fd5bb4..5483b5ee26b 100644 --- a/dbms/src/Storages/Transaction/TiDB.cpp +++ b/dbms/src/Storages/Transaction/TiDB.cpp @@ -15,7 +15,8 @@ ColumnInfo::ColumnInfo(Poco::JSON::Object::Ptr json) { deserialize(json); } Field ColumnInfo::defaultValueToField() const { auto & value = origin_default_value; - if (value.isEmpty()) { + if (value.isEmpty()) + { return Field(); } switch (tp) @@ -55,7 +56,8 @@ Field ColumnInfo::defaultValueToField() const return Field(); } -Poco::JSON::Object::Ptr ColumnInfo::getJSONObject() const try +Poco::JSON::Object::Ptr ColumnInfo::getJSONObject() const +try { Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); @@ -98,7 +100,8 @@ catch (const Poco::Exception & e) std::string(__PRETTY_FUNCTION__) + ": Serialize TiDB schema JSON failed (ColumnInfo): " + e.displayText(), DB::Exception(e)); } -void ColumnInfo::deserialize(Poco::JSON::Object::Ptr json) try +void ColumnInfo::deserialize(Poco::JSON::Object::Ptr json) +try { id = json->getValue("id"); name = json->getObject("name")->getValue("L"); @@ -132,7 +135,8 @@ catch (const Poco::Exception & e) PartitionDefinition::PartitionDefinition(Poco::JSON::Object::Ptr json) { deserialize(json); } -Poco::JSON::Object::Ptr PartitionDefinition::getJSONObject() const try +Poco::JSON::Object::Ptr PartitionDefinition::getJSONObject() const +try { Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); json->set("id", id); @@ -153,7 +157,8 @@ catch (const Poco::Exception & e) std::string(__PRETTY_FUNCTION__) + ": Serialize TiDB schema JSON failed (PartitionDef): " + e.displayText(), DB::Exception(e)); } -void PartitionDefinition::deserialize(Poco::JSON::Object::Ptr json) try +void PartitionDefinition::deserialize(Poco::JSON::Object::Ptr json) +try { id = json->getValue("id"); name = json->getObject("name")->getValue("L"); @@ -168,7 +173,8 @@ catch (const Poco::Exception & e) PartitionInfo::PartitionInfo(Poco::JSON::Object::Ptr json) { deserialize(json); } -Poco::JSON::Object::Ptr PartitionInfo::getJSONObject() const try +Poco::JSON::Object::Ptr PartitionInfo::getJSONObject() const +try { Poco::JSON::Object::Ptr json = new Poco::JSON::Object(); @@ -197,7 +203,8 @@ catch (const Poco::Exception & e) std::string(__PRETTY_FUNCTION__) + ": Serialize TiDB schema JSON failed (PartitionInfo): " + e.displayText(), DB::Exception(e)); } -void PartitionInfo::deserialize(Poco::JSON::Object::Ptr json) try +void PartitionInfo::deserialize(Poco::JSON::Object::Ptr json) +try { type = static_cast(json->getValue("type")); expr = json->getValue("expr"); @@ -221,7 +228,8 @@ catch (const Poco::Exception & e) TableInfo::TableInfo(const String & table_info_json) { deserialize(table_info_json); } -String TableInfo::serialize(bool escaped) const try +String TableInfo::serialize(bool escaped) const +try { std::stringstream buf; @@ -279,7 +287,8 @@ catch (const Poco::Exception & e) std::string(__PRETTY_FUNCTION__) + ": Serialize TiDB schema JSON failed (TableInfo): " + e.displayText(), DB::Exception(e)); } -void DBInfo::deserialize(const String & json_str) try +void DBInfo::deserialize(const String & json_str) +try { Poco::JSON::Parser parser; Poco::Dynamic::Var result = parser.parse(json_str); @@ -297,7 +306,8 @@ catch (const Poco::Exception & e) DB::Exception(e)); } -void TableInfo::deserialize(const String & json_str) try +void TableInfo::deserialize(const String & json_str) +try { if (json_str.empty()) { @@ -334,7 +344,8 @@ void TableInfo::deserialize(const String & json_str) try belonging_table_id = obj->getValue("belonging_table_id"); partition.deserialize(partition_obj); } - if (obj->has("schema_version")) { + if (obj->has("schema_version")) + { schema_version = obj->getValue("schema_version"); } } @@ -382,7 +393,7 @@ CodecFlag ColumnInfo::getCodecFlag() const ColumnID TableInfo::getColumnID(const String & name) const { - for (auto col : columns) + for (auto & col : columns) { if (name == col.name) { @@ -396,4 +407,19 @@ ColumnID TableInfo::getColumnID(const String & name) const throw DB::Exception(std::string(__PRETTY_FUNCTION__) + ": Unknown column name " + name, DB::ErrorCodes::LOGICAL_ERROR); } +String TableInfo::getColumnName(const ColumnID id) const +{ + for (auto & col : columns) + { + if (id == col.id) + { + return col.name; + } + } + + throw DB::Exception( + std::string(__PRETTY_FUNCTION__) + ": Invalidate column id " + std::to_string(id) + " for table " + db_name + "." + name, + DB::ErrorCodes::LOGICAL_ERROR); +} + } // namespace TiDB diff --git a/dbms/src/Storages/Transaction/TiDB.h b/dbms/src/Storages/Transaction/TiDB.h index dfd38f294cb..cd6fc0651d9 100644 --- a/dbms/src/Storages/Transaction/TiDB.h +++ b/dbms/src/Storages/Transaction/TiDB.h @@ -263,6 +263,7 @@ struct TableInfo Int64 schema_version = -1; ColumnID getColumnID(const String & name) const; + String getColumnName(const ColumnID id) const; TableInfo producePartitionTableInfo(TableID table_or_partition_id) const {