Skip to content

Commit

Permalink
code refine && several minor bug fix (#174)
Browse files Browse the repository at this point in the history
* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema

* handle error in coprocessor request

* refine code

* use Clear to clear a protobuf message completely

* refine code

* code refine && several minor bug fix

* address comments

* address comments
  • Loading branch information
windtalker authored and zanmato1984 committed Aug 12, 2019
1 parent 4a76e91 commit 2d093a8
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 40 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void DAGBlockOutputStream::writeSuffix()
if (current_chunk != nullptr && records_per_chunk > 0)
{
current_chunk->set_rows_data(current_ss.str());
dag_response.add_output_counts(records_per_chunk);
}
}

Expand All @@ -62,6 +63,7 @@ void DAGBlockOutputStream::write(const Block & block)
{
// set the current ss to current chunk
current_chunk->set_rows_data(current_ss.str());
dag_response.add_output_counts(current_records_num);
}
current_chunk = dag_response.add_chunks();
current_ss.str("");
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ try
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(streamPtr.get()))
{
time_processed_ns += p_stream->getProfileInfo().total_stopwatch.elapsed();
time_processed_ns = std::max(time_processed_ns, p_stream->getProfileInfo().total_stopwatch.elapsed());
num_produced_rows += p_stream->getProfileInfo().rows;
num_iterations += p_stream->getProfileInfo().blocks;
}
Expand Down
13 changes: 11 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ void DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const

const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return after_agg ? aggregated_columns : source_columns; }

void DAGExpressionAnalyzer::appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project)
{
initChain(chain, getCurrentInputColumns());
for (auto name : final_project)
{
chain.steps.back().required_output.push_back(name.first);
}
}

void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation)
{
initChain(chain, getCurrentInputColumns());
Expand Down Expand Up @@ -199,11 +208,11 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons

String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name)
{
if (!expr.has_field_type())
if (!expr.has_field_type() && context.getSettingsRef().dag_expr_field_type_strict_check)
{
throw Exception("Expression without field type", ErrorCodes::COP_BAD_DAG_REQUEST);
}
if (isFunctionExpr(expr))
if (expr.has_field_type() && isFunctionExpr(expr))
{
DataTypePtr expected_type = getDataTypeByFieldType(expr.field_type());
DataTypePtr actual_type = actions->getSampleBlock().getByName(expr_name).type;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
chain.steps.emplace_back(std::make_shared<ExpressionActions>(columns, settings));
}
}
void appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project);
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const NamesAndTypesList & getCurrentInputColumns();
};
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector<t
switch (executor.tp())
{
case tipb::ExecType::TypeTableScan:
for (auto ci : executor.tbl_scan().columns())
for (auto & ci : executor.tbl_scan().columns())
{
field_type.set_tp(ci.tp());
field_type.set_flag(ci.flag());
Expand Down Expand Up @@ -136,7 +136,7 @@ std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes() const
throw Exception("Do not found result field type for current dag request", ErrorCodes::COP_BAD_DAG_REQUEST);
}
// tispark assumes that if there is a agg, the output offset is
// ignored and the request out put is the same as the agg's output.
// ignored and the request output is the same as the agg's output.
// todo should always use output offset to re-construct the output field types
if (hasAggregation())
{
Expand Down
11 changes: 2 additions & 9 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,8 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names);
}
// Append final project results if needed.
// TODO: Refine this logic by an `analyzer.appendFinalProject()`-like call.
if (dag.hasSelection() || dag.hasAggregation() || dag.hasTopN())
{
for (auto & name : final_project)
{
chain.steps.back().required_output.push_back(name.first);
}
res.before_order_and_select = chain.getLastActions();
}
analyzer.appendFinalProject(chain, final_project);
res.before_order_and_select = chain.getLastActions();
chain.finalize();
chain.clear();
//todo need call prependProjectInput??
Expand Down
75 changes: 50 additions & 25 deletions dbms/src/Flash/Coprocessor/tests/cop_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,11 @@ class FlashClient
};

using ClientPtr = std::shared_ptr<FlashClient>;
grpc::Status rpcTest()

void appendTS(tipb::DAGRequest & dag_request, size_t & result_field_num)
{
ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials());
ClientPtr clientPtr = std::make_shared<FlashClient>(cp);
size_t result_field_num = 0;
// construct a dag request
tipb::DAGRequest dagRequest;
dagRequest.set_start_ts(18446744073709551615uL);
// table scan: s,i
tipb::Executor * executor = dagRequest.add_executors();
tipb::Executor * executor = dag_request.add_executors();
executor->set_tp(tipb::ExecType::TypeTableScan);
tipb::TableScan * ts = executor->mutable_tbl_scan();
ts->set_table_id(44);
Expand All @@ -119,13 +114,16 @@ grpc::Status rpcTest()
ci->set_column_id(2);
ci->set_tp(8);
ci->set_flag(0);
dagRequest.add_output_offsets(1);
dagRequest.add_output_offsets(0);
dagRequest.add_output_offsets(1);
dag_request.add_output_offsets(1);
dag_request.add_output_offsets(0);
dag_request.add_output_offsets(1);
result_field_num = 3;
}

void appendSelection(tipb::DAGRequest & dag_request)
{
// selection: less(i, 123)
executor = dagRequest.add_executors();
auto * executor = dag_request.add_executors();
executor->set_tp(tipb::ExecType::TypeSelection);
tipb::Selection * selection = executor->mutable_selection();
tipb::Expr * expr = selection->add_conditions();
Expand All @@ -150,16 +148,19 @@ grpc::Status rpcTest()
type = expr->mutable_field_type();
type->set_tp(1);
type->set_flag(1 << 5);
}

void appendAgg(tipb::DAGRequest & dag_request, size_t & result_field_num)
{
// agg: count(s) group by i;
executor = dagRequest.add_executors();
auto * executor = dag_request.add_executors();
executor->set_tp(tipb::ExecType::TypeAggregation);
auto agg = executor->mutable_aggregation();
auto agg_func = agg->add_agg_func();
agg_func->set_tp(tipb::ExprType::Count);
auto child = agg_func->add_children();
child->set_tp(tipb::ExprType::ColumnRef);
ss.str("");
std::stringstream ss;
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(0, ss);
child->set_val(ss.str());
auto f_type = agg_func->mutable_field_type();
Expand All @@ -174,32 +175,56 @@ grpc::Status rpcTest()
f_type->set_tp(8);
f_type->set_flag(1);
result_field_num = 2;
}

// topn
/*
executor = dagRequest.add_executors();
void appendTopN(tipb::DAGRequest & dag_request)
{
auto * executor = dag_request.add_executors();
executor->set_tp(tipb::ExecType::TypeTopN);
tipb::TopN * topN = executor->mutable_topn();
topN->set_limit(3);
tipb::ByItem * byItem = topN->add_order_by();
byItem->set_desc(false);
tipb::Expr * expr1 = byItem->mutable_expr();
expr1->set_tp(tipb::ExprType::ColumnRef);
ss.str("");
std::stringstream ss;
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(1, ss);
expr1->set_val(ss.str());
type = expr1->mutable_field_type();
auto * type = expr1->mutable_field_type();
type->set_tp(8);
type->set_tp(0);
*/
// limit
/*
executor = dagRequest.add_executors();
}

void appendLimit(tipb::DAGRequest & dag_request)
{
auto * executor = dag_request.add_executors();
executor->set_tp(tipb::ExecType::TypeLimit);
tipb::Limit *limit = executor->mutable_limit();
tipb::Limit * limit = executor->mutable_limit();
limit->set_limit(5);
*/
}

grpc::Status rpcTest()
{
ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials());
ClientPtr clientPtr = std::make_shared<FlashClient>(cp);
size_t result_field_num = 0;
bool has_selection = false;
bool has_agg = true;
bool has_topN = false;
bool has_limit = false;
// construct a dag request
tipb::DAGRequest dagRequest;
dagRequest.set_start_ts(18446744073709551615uL);

appendTS(dagRequest, result_field_num);
if (has_selection)
appendSelection(dagRequest);
if (has_agg)
appendAgg(dagRequest, result_field_num);
if (has_topN)
appendTopN(dagRequest);
if (has_limit)
appendLimit(dagRequest);

// construct a coprocessor request
coprocessor::Request request;
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ std::tuple<Context, ::grpc::Status> FlashService::createDBContext(grpc::ServerCo
{
context.setSetting("dag_records_per_chunk", dag_records_per_chunk_str);
}
std::string planner = getClientMetaVarWithDefault(grpc_context, "dag_planner", "sql");
std::string planner = getClientMetaVarWithDefault(grpc_context, "dag_planner", "optree");
context.setSetting("dag_planner", planner);
std::string expr_field_type_check = getClientMetaVarWithDefault(grpc_context, "dag_expr_field_type_strict_check", "1");
context.setSetting("dag_expr_field_type_strict_check", expr_field_type_check);

return std::make_tuple(context, ::grpc::Status::OK);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct Settings
M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \
M(SettingInt64, dag_records_per_chunk, DEFAULT_DAG_RECORDS_PER_CHUNK, "default chunk size of a DAG response.") \
M(SettingString, dag_planner, "sql", "planner for DAG query, sql builds the SQL string, optree builds the internal operator(stream) tree.") \
M(SettingBool, dag_expr_field_type_strict_check, true, "when set to true, every expr in the dag request must provide field type, otherwise only the result expr will be checked.") \
M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \
Expand Down
9 changes: 9 additions & 0 deletions dbms/src/Storages/Transaction/Codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ inline void EncodeVarUInt(UInt64 num, std::stringstream & ss)
TiKV::writeVarUInt(num, ss);
}

inline void EncodeNull(std::stringstream &ss)
{
writeIntBinary(UInt8(TiDB::CodecFlagNil), ss);
}

inline void EncodeDecimal(const Decimal & dec, std::stringstream & ss)
{
writeIntBinary(UInt8(TiDB::CodecFlagDecimal), ss);
Expand Down Expand Up @@ -394,6 +399,10 @@ T getFieldValue(const Field & field)

inline void EncodeDatum(const Field & field, TiDB::CodecFlag flag, std::stringstream & ss)
{
if (field.isNull())
{
return EncodeNull(ss);
}
switch (flag)
{
case TiDB::CodecFlagDecimal:
Expand Down

0 comments on commit 2d093a8

Please sign in to comment.