From 2d093a82d4c1e8c2f57cd59880a0afc32eca27ab Mon Sep 17 00:00:00 2001 From: xufei Date: Mon, 12 Aug 2019 16:04:19 +0800 Subject: [PATCH] code refine && several minor bug fix (#174) * fix cop test regression * address comments * format code * fix npe for dag execute * format code * address comment * add some comments * throw exception when meet error duing cop request handling * address comments * add error code * throw exception when meet error duing cop request handling * address comments * add DAGContext so InterpreterDAG can exchange information with DAGDriver * fix bug * 1. refine code, 2. address comments * update comments * columnref index is based on executor output schema * handle error in coprocessor request * refine code * use Clear to clear a protobuf message completely * refine code * code refine && several minor bug fix * address comments * address comments --- .../Coprocessor/DAGBlockOutputStream.cpp | 2 + dbms/src/Flash/Coprocessor/DAGDriver.cpp | 2 +- .../Coprocessor/DAGExpressionAnalyzer.cpp | 13 +++- .../Flash/Coprocessor/DAGExpressionAnalyzer.h | 1 + dbms/src/Flash/Coprocessor/DAGQuerySource.cpp | 4 +- dbms/src/Flash/Coprocessor/InterpreterDAG.cpp | 11 +-- dbms/src/Flash/Coprocessor/tests/cop_test.cpp | 75 ++++++++++++------- dbms/src/Flash/FlashService.cpp | 4 +- dbms/src/Interpreters/Settings.h | 1 + dbms/src/Storages/Transaction/Codec.h | 9 +++ 10 files changed, 82 insertions(+), 40 deletions(-) diff --git a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp index 74cf847118f..0ef25b08700 100644 --- a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp +++ b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp @@ -42,6 +42,7 @@ void DAGBlockOutputStream::writeSuffix() if (current_chunk != nullptr && records_per_chunk > 0) { current_chunk->set_rows_data(current_ss.str()); + dag_response.add_output_counts(records_per_chunk); } } @@ -62,6 +63,7 @@ void DAGBlockOutputStream::write(const Block & block) { // set the current ss to current chunk current_chunk->set_rows_data(current_ss.str()); + dag_response.add_output_counts(current_records_num); } current_chunk = dag_response.add_chunks(); current_ss.str(""); diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index 6e25308f5ba..62e6f861db0 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -78,7 +78,7 @@ try { if (auto * p_stream = dynamic_cast(streamPtr.get())) { - time_processed_ns += p_stream->getProfileInfo().total_stopwatch.elapsed(); + time_processed_ns = std::max(time_processed_ns, p_stream->getProfileInfo().total_stopwatch.elapsed()); num_produced_rows += p_stream->getProfileInfo().rows; num_iterations += p_stream->getProfileInfo().blocks; } diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp index e407d5c0d6b..5b8b5fa9165 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp @@ -143,6 +143,15 @@ void DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return after_agg ? aggregated_columns : source_columns; } +void DAGExpressionAnalyzer::appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project) +{ + initChain(chain, getCurrentInputColumns()); + for (auto name : final_project) + { + chain.steps.back().required_output.push_back(name.first); + } +} + void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation) { initChain(chain, getCurrentInputColumns()); @@ -199,11 +208,11 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name) { - if (!expr.has_field_type()) + if (!expr.has_field_type() && context.getSettingsRef().dag_expr_field_type_strict_check) { throw Exception("Expression without field type", ErrorCodes::COP_BAD_DAG_REQUEST); } - if (isFunctionExpr(expr)) + if (expr.has_field_type() && isFunctionExpr(expr)) { DataTypePtr expected_type = getDataTypeByFieldType(expr.field_type()); DataTypePtr actual_type = actions->getSampleBlock().getByName(expr_name).type; diff --git a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h index b05b974d37a..cdc3acbac5b 100644 --- a/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h +++ b/dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h @@ -44,6 +44,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable chain.steps.emplace_back(std::make_shared(columns, settings)); } } + void appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project); String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions); const NamesAndTypesList & getCurrentInputColumns(); }; diff --git a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp index d59e93401ea..3b2d3f4b8c3 100644 --- a/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQuerySource.cpp @@ -90,7 +90,7 @@ bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector DAGQuerySource::getResultFieldTypes() const throw Exception("Do not found result field type for current dag request", ErrorCodes::COP_BAD_DAG_REQUEST); } // tispark assumes that if there is a agg, the output offset is - // ignored and the request out put is the same as the agg's output. + // ignored and the request output is the same as the agg's output. // todo should always use output offset to re-construct the output field types if (hasAggregation()) { diff --git a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp index beb8c0a3bd0..be2f8700a04 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterDAG.cpp @@ -208,15 +208,8 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions() analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names); } // Append final project results if needed. - // TODO: Refine this logic by an `analyzer.appendFinalProject()`-like call. - if (dag.hasSelection() || dag.hasAggregation() || dag.hasTopN()) - { - for (auto & name : final_project) - { - chain.steps.back().required_output.push_back(name.first); - } - res.before_order_and_select = chain.getLastActions(); - } + analyzer.appendFinalProject(chain, final_project); + res.before_order_and_select = chain.getLastActions(); chain.finalize(); chain.clear(); //todo need call prependProjectInput?? diff --git a/dbms/src/Flash/Coprocessor/tests/cop_test.cpp b/dbms/src/Flash/Coprocessor/tests/cop_test.cpp index b82525eb901..e18c3c4dd74 100644 --- a/dbms/src/Flash/Coprocessor/tests/cop_test.cpp +++ b/dbms/src/Flash/Coprocessor/tests/cop_test.cpp @@ -98,16 +98,11 @@ class FlashClient }; using ClientPtr = std::shared_ptr; -grpc::Status rpcTest() + +void appendTS(tipb::DAGRequest & dag_request, size_t & result_field_num) { - ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials()); - ClientPtr clientPtr = std::make_shared(cp); - size_t result_field_num = 0; - // construct a dag request - tipb::DAGRequest dagRequest; - dagRequest.set_start_ts(18446744073709551615uL); // table scan: s,i - tipb::Executor * executor = dagRequest.add_executors(); + tipb::Executor * executor = dag_request.add_executors(); executor->set_tp(tipb::ExecType::TypeTableScan); tipb::TableScan * ts = executor->mutable_tbl_scan(); ts->set_table_id(44); @@ -119,13 +114,16 @@ grpc::Status rpcTest() ci->set_column_id(2); ci->set_tp(8); ci->set_flag(0); - dagRequest.add_output_offsets(1); - dagRequest.add_output_offsets(0); - dagRequest.add_output_offsets(1); + dag_request.add_output_offsets(1); + dag_request.add_output_offsets(0); + dag_request.add_output_offsets(1); result_field_num = 3; +} +void appendSelection(tipb::DAGRequest & dag_request) +{ // selection: less(i, 123) - executor = dagRequest.add_executors(); + auto * executor = dag_request.add_executors(); executor->set_tp(tipb::ExecType::TypeSelection); tipb::Selection * selection = executor->mutable_selection(); tipb::Expr * expr = selection->add_conditions(); @@ -150,16 +148,19 @@ grpc::Status rpcTest() type = expr->mutable_field_type(); type->set_tp(1); type->set_flag(1 << 5); +} +void appendAgg(tipb::DAGRequest & dag_request, size_t & result_field_num) +{ // agg: count(s) group by i; - executor = dagRequest.add_executors(); + auto * executor = dag_request.add_executors(); executor->set_tp(tipb::ExecType::TypeAggregation); auto agg = executor->mutable_aggregation(); auto agg_func = agg->add_agg_func(); agg_func->set_tp(tipb::ExprType::Count); auto child = agg_func->add_children(); child->set_tp(tipb::ExprType::ColumnRef); - ss.str(""); + std::stringstream ss; DB::EncodeNumber(0, ss); child->set_val(ss.str()); auto f_type = agg_func->mutable_field_type(); @@ -174,10 +175,11 @@ grpc::Status rpcTest() f_type->set_tp(8); f_type->set_flag(1); result_field_num = 2; +} - // topn - /* - executor = dagRequest.add_executors(); +void appendTopN(tipb::DAGRequest & dag_request) +{ + auto * executor = dag_request.add_executors(); executor->set_tp(tipb::ExecType::TypeTopN); tipb::TopN * topN = executor->mutable_topn(); topN->set_limit(3); @@ -185,21 +187,44 @@ grpc::Status rpcTest() byItem->set_desc(false); tipb::Expr * expr1 = byItem->mutable_expr(); expr1->set_tp(tipb::ExprType::ColumnRef); - ss.str(""); + std::stringstream ss; DB::EncodeNumber(1, ss); expr1->set_val(ss.str()); - type = expr1->mutable_field_type(); + auto * type = expr1->mutable_field_type(); type->set_tp(8); type->set_tp(0); - */ - // limit - /* - executor = dagRequest.add_executors(); +} + +void appendLimit(tipb::DAGRequest & dag_request) +{ + auto * executor = dag_request.add_executors(); executor->set_tp(tipb::ExecType::TypeLimit); - tipb::Limit *limit = executor->mutable_limit(); + tipb::Limit * limit = executor->mutable_limit(); limit->set_limit(5); - */ +} + +grpc::Status rpcTest() +{ + ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials()); + ClientPtr clientPtr = std::make_shared(cp); + size_t result_field_num = 0; + bool has_selection = false; + bool has_agg = true; + bool has_topN = false; + bool has_limit = false; + // construct a dag request + tipb::DAGRequest dagRequest; + dagRequest.set_start_ts(18446744073709551615uL); + appendTS(dagRequest, result_field_num); + if (has_selection) + appendSelection(dagRequest); + if (has_agg) + appendAgg(dagRequest, result_field_num); + if (has_topN) + appendTopN(dagRequest); + if (has_limit) + appendLimit(dagRequest); // construct a coprocessor request coprocessor::Request request; diff --git a/dbms/src/Flash/FlashService.cpp b/dbms/src/Flash/FlashService.cpp index 2b8941f0472..0489b6b8777 100644 --- a/dbms/src/Flash/FlashService.cpp +++ b/dbms/src/Flash/FlashService.cpp @@ -96,8 +96,10 @@ std::tuple FlashService::createDBContext(grpc::ServerCo { context.setSetting("dag_records_per_chunk", dag_records_per_chunk_str); } - std::string planner = getClientMetaVarWithDefault(grpc_context, "dag_planner", "sql"); + std::string planner = getClientMetaVarWithDefault(grpc_context, "dag_planner", "optree"); context.setSetting("dag_planner", planner); + std::string expr_field_type_check = getClientMetaVarWithDefault(grpc_context, "dag_expr_field_type_strict_check", "1"); + context.setSetting("dag_expr_field_type_strict_check", expr_field_type_check); return std::make_tuple(context, ::grpc::Status::OK); } diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h index 1efd1dfcad3..86b42cf9ce2 100644 --- a/dbms/src/Interpreters/Settings.h +++ b/dbms/src/Interpreters/Settings.h @@ -31,6 +31,7 @@ struct Settings M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \ M(SettingInt64, dag_records_per_chunk, DEFAULT_DAG_RECORDS_PER_CHUNK, "default chunk size of a DAG response.") \ M(SettingString, dag_planner, "sql", "planner for DAG query, sql builds the SQL string, optree builds the internal operator(stream) tree.") \ + M(SettingBool, dag_expr_field_type_strict_check, true, "when set to true, every expr in the dag request must provide field type, otherwise only the result expr will be checked.") \ M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \ M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \ M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \ diff --git a/dbms/src/Storages/Transaction/Codec.h b/dbms/src/Storages/Transaction/Codec.h index ff32d38e39d..e41295a1e2e 100644 --- a/dbms/src/Storages/Transaction/Codec.h +++ b/dbms/src/Storages/Transaction/Codec.h @@ -324,6 +324,11 @@ inline void EncodeVarUInt(UInt64 num, std::stringstream & ss) TiKV::writeVarUInt(num, ss); } +inline void EncodeNull(std::stringstream &ss) +{ + writeIntBinary(UInt8(TiDB::CodecFlagNil), ss); +} + inline void EncodeDecimal(const Decimal & dec, std::stringstream & ss) { writeIntBinary(UInt8(TiDB::CodecFlagDecimal), ss); @@ -394,6 +399,10 @@ T getFieldValue(const Field & field) inline void EncodeDatum(const Field & field, TiDB::CodecFlag flag, std::stringstream & ss) { + if (field.isNull()) + { + return EncodeNull(ss); + } switch (flag) { case TiDB::CodecFlagDecimal: