Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

code refine && several minor bug fix #174

Merged
merged 32 commits into from
Aug 12, 2019
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
71c09fb
fix cop test regression
windtalker Aug 6, 2019
6b8a054
address comments
windtalker Aug 6, 2019
6f32efd
format code
windtalker Aug 6, 2019
11b3e09
fix npe for dag execute
windtalker Aug 6, 2019
64fef5c
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 6, 2019
f96fcf4
format code
windtalker Aug 6, 2019
324b64d
address comment
windtalker Aug 6, 2019
6b06122
add some comments
windtalker Aug 6, 2019
2327e9f
throw exception when meet error duing cop request handling
windtalker Aug 6, 2019
72d11ad
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 6, 2019
428459a
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 7, 2019
f3eb6e5
address comments
windtalker Aug 7, 2019
d8bb7d9
add error code
windtalker Aug 7, 2019
b6eaa3b
throw exception when meet error duing cop request handling
windtalker Aug 7, 2019
fe7916e
address comments
windtalker Aug 7, 2019
f1d0bfe
add DAGContext so InterpreterDAG can exchange information with DAGDriver
windtalker Aug 8, 2019
dde6dab
merge pingcap/tics cop
windtalker Aug 8, 2019
3c29365
fix bug
windtalker Aug 8, 2019
b984cb6
1. refine code, 2. address comments
windtalker Aug 8, 2019
ddf64e6
update comments
windtalker Aug 8, 2019
d9c4a0d
columnref index is based on executor output schema
windtalker Aug 8, 2019
947606a
merge pingcap/tics cop
windtalker Aug 8, 2019
85ae8b9
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 9, 2019
aea80d6
handle error in coprocessor request
windtalker Aug 9, 2019
2ed69ef
merge pingcap/tics cop
windtalker Aug 9, 2019
353a2b1
refine code
windtalker Aug 9, 2019
f406893
use Clear to clear a protobuf message completely
windtalker Aug 9, 2019
021e4c3
refine code
windtalker Aug 12, 2019
8994597
code refine && several minor bug fix
windtalker Aug 12, 2019
f959a9e
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 12, 2019
1550ede
address comments
windtalker Aug 12, 2019
7336bea
address comments
windtalker Aug 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void DAGBlockOutputStream::writeSuffix()
if (current_chunk != nullptr && records_per_chunk > 0)
{
current_chunk->set_rows_data(current_ss.str());
dag_response.add_output_counts(records_per_chunk);
}
}

Expand All @@ -62,6 +63,7 @@ void DAGBlockOutputStream::write(const Block & block)
{
// set the current ss to current chunk
current_chunk->set_rows_data(current_ss.str());
dag_response.add_output_counts(current_records_num);
}
current_chunk = dag_response.add_chunks();
current_ss.str("");
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ try
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(streamPtr.get()))
{
time_processed_ns += p_stream->getProfileInfo().total_stopwatch.elapsed();
time_processed_ns = std::max(time_processed_ns, p_stream->getProfileInfo().total_stopwatch.elapsed());
num_produced_rows += p_stream->getProfileInfo().rows;
num_iterations += p_stream->getProfileInfo().blocks;
}
Expand Down
13 changes: 11 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,15 @@ void DAGExpressionAnalyzer::appendOrderBy(ExpressionActionsChain & chain, const

const NamesAndTypesList & DAGExpressionAnalyzer::getCurrentInputColumns() { return after_agg ? aggregated_columns : source_columns; }

void DAGExpressionAnalyzer::appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project)
{
initChain(chain, getCurrentInputColumns());
for (auto name : final_project)
{
chain.steps.back().required_output.push_back(name.first);
}
}

void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & aggregation)
{
initChain(chain, getCurrentInputColumns());
Expand Down Expand Up @@ -199,11 +208,11 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons

String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name)
{
if (!expr.has_field_type())
if (!expr.has_field_type() && context.getSettingsRef().dag_expr_field_type_strict_check)
{
throw Exception("Expression without field type", ErrorCodes::COP_BAD_DAG_REQUEST);
}
if (isFunctionExpr(expr))
if (expr.has_field_type() && isFunctionExpr(expr))
{
DataTypePtr expected_type = getDataTypeByFieldType(expr.field_type());
DataTypePtr actual_type = actions->getSampleBlock().getByName(expr_name).type;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
chain.steps.emplace_back(std::make_shared<ExpressionActions>(columns, settings));
}
}
void appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project);
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const NamesAndTypesList & getCurrentInputColumns();
};
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector<t
switch (executor.tp())
{
case tipb::ExecType::TypeTableScan:
for (auto ci : executor.tbl_scan().columns())
for (auto & ci : executor.tbl_scan().columns())
{
field_type.set_tp(ci.tp());
field_type.set_flag(ci.flag());
Expand Down Expand Up @@ -136,7 +136,7 @@ std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes() const
throw Exception("Do not found result field type for current dag request", ErrorCodes::COP_BAD_DAG_REQUEST);
}
// tispark assumes that if there is a agg, the output offset is
// ignored and the request out put is the same as the agg's output.
// ignored and the request output is the same as the agg's output.
// todo should always use output offset to re-construct the output field types
if (hasAggregation())
{
Expand Down
11 changes: 2 additions & 9 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,8 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names);
}
// Append final project results if needed.
// TODO: Refine this logic by an `analyzer.appendFinalProject()`-like call.
if (dag.hasSelection() || dag.hasAggregation() || dag.hasTopN())
{
for (auto & name : final_project)
{
chain.steps.back().required_output.push_back(name.first);
}
res.before_order_and_select = chain.getLastActions();
}
analyzer.appendFinalProject(chain, final_project);
res.before_order_and_select = chain.getLastActions();
chain.finalize();
chain.clear();
//todo need call prependProjectInput??
Expand Down
75 changes: 50 additions & 25 deletions dbms/src/Flash/Coprocessor/tests/cop_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,11 @@ class FlashClient
};

using ClientPtr = std::shared_ptr<FlashClient>;
grpc::Status rpcTest()

void appendTS(tipb::DAGRequest & dag_request, size_t & result_field_num)
{
ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials());
ClientPtr clientPtr = std::make_shared<FlashClient>(cp);
size_t result_field_num = 0;
// construct a dag request
tipb::DAGRequest dagRequest;
dagRequest.set_start_ts(18446744073709551615uL);
// table scan: s,i
tipb::Executor * executor = dagRequest.add_executors();
tipb::Executor * executor = dag_request.add_executors();
executor->set_tp(tipb::ExecType::TypeTableScan);
tipb::TableScan * ts = executor->mutable_tbl_scan();
ts->set_table_id(44);
Expand All @@ -119,13 +114,16 @@ grpc::Status rpcTest()
ci->set_column_id(2);
ci->set_tp(8);
ci->set_flag(0);
dagRequest.add_output_offsets(1);
dagRequest.add_output_offsets(0);
dagRequest.add_output_offsets(1);
dag_request.add_output_offsets(1);
dag_request.add_output_offsets(0);
dag_request.add_output_offsets(1);
result_field_num = 3;
}

void appendSelection(tipb::DAGRequest & dag_request)
{
// selection: less(i, 123)
executor = dagRequest.add_executors();
auto * executor = dag_request.add_executors();
executor->set_tp(tipb::ExecType::TypeSelection);
tipb::Selection * selection = executor->mutable_selection();
tipb::Expr * expr = selection->add_conditions();
Expand All @@ -150,16 +148,19 @@ grpc::Status rpcTest()
type = expr->mutable_field_type();
type->set_tp(1);
type->set_flag(1 << 5);
}

void appendAgg(tipb::DAGRequest & dag_request, size_t & result_field_num)
{
// agg: count(s) group by i;
executor = dagRequest.add_executors();
auto * executor = dag_request.add_executors();
executor->set_tp(tipb::ExecType::TypeAggregation);
auto agg = executor->mutable_aggregation();
auto agg_func = agg->add_agg_func();
agg_func->set_tp(tipb::ExprType::Count);
auto child = agg_func->add_children();
child->set_tp(tipb::ExprType::ColumnRef);
ss.str("");
std::stringstream ss;
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(0, ss);
child->set_val(ss.str());
auto f_type = agg_func->mutable_field_type();
Expand All @@ -174,32 +175,56 @@ grpc::Status rpcTest()
f_type->set_tp(8);
f_type->set_flag(1);
result_field_num = 2;
}

// topn
/*
executor = dagRequest.add_executors();
void appendTopN(tipb::DAGRequest & dag_request)
{
auto * executor = dag_request.add_executors();
executor->set_tp(tipb::ExecType::TypeTopN);
tipb::TopN * topN = executor->mutable_topn();
topN->set_limit(3);
tipb::ByItem * byItem = topN->add_order_by();
byItem->set_desc(false);
tipb::Expr * expr1 = byItem->mutable_expr();
expr1->set_tp(tipb::ExprType::ColumnRef);
ss.str("");
std::stringstream ss;
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(1, ss);
expr1->set_val(ss.str());
type = expr1->mutable_field_type();
auto * type = expr1->mutable_field_type();
type->set_tp(8);
type->set_tp(0);
*/
// limit
/*
executor = dagRequest.add_executors();
}

void appendLimit(tipb::DAGRequest & dag_request)
{
auto * executor = dag_request.add_executors();
executor->set_tp(tipb::ExecType::TypeLimit);
tipb::Limit *limit = executor->mutable_limit();
tipb::Limit * limit = executor->mutable_limit();
limit->set_limit(5);
*/
}

grpc::Status rpcTest()
{
ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials());
ClientPtr clientPtr = std::make_shared<FlashClient>(cp);
size_t result_field_num = 0;
bool has_selection = false;
bool has_agg = true;
bool has_topN = false;
bool has_limit = false;
// construct a dag request
tipb::DAGRequest dagRequest;
dagRequest.set_start_ts(18446744073709551615uL);

appendTS(dagRequest, result_field_num);
if (has_selection)
appendSelection(dagRequest);
if (has_agg)
appendAgg(dagRequest, result_field_num);
if (has_topN)
appendTopN(dagRequest);
if (has_limit)
appendLimit(dagRequest);

// construct a coprocessor request
coprocessor::Request request;
Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,10 @@ std::tuple<Context, ::grpc::Status> FlashService::createDBContext(grpc::ServerCo
{
context.setSetting("dag_records_per_chunk", dag_records_per_chunk_str);
}
std::string planner = getClientMetaVarWithDefault(grpc_context, "dag_planner", "sql");
std::string planner = getClientMetaVarWithDefault(grpc_context, "dag_planner", "optree");
context.setSetting("dag_planner", planner);
std::string expr_field_type_check = getClientMetaVarWithDefault(grpc_context, "dag_expr_field_type_strict_check", "1");
context.setSetting("dag_expr_field_type_strict_check", expr_field_type_check);

return std::make_tuple(context, ::grpc::Status::OK);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct Settings
M(SettingUInt64, read_tso, DEFAULT_MAX_READ_TSO, "tmt read tso.") \
M(SettingInt64, dag_records_per_chunk, DEFAULT_DAG_RECORDS_PER_CHUNK, "default chunk size of a DAG response.") \
M(SettingString, dag_planner, "sql", "planner for DAG query, sql builds the SQL string, optree builds the internal operator(stream) tree.") \
M(SettingBool, dag_expr_field_type_strict_check, true, "when set to true, every expr in the dag request must provide field type, otherwise only the result expr will be checked.") \
M(SettingInt64, schema_version, DEFAULT_UNSPECIFIED_SCHEMA_VERSION, "tmt schema version.") \
M(SettingUInt64, min_compress_block_size, DEFAULT_MIN_COMPRESS_BLOCK_SIZE, "The actual size of the block to compress, if the uncompressed data less than max_compress_block_size is no less than this value and no less than the volume of data for one mark.") \
M(SettingUInt64, max_compress_block_size, DEFAULT_MAX_COMPRESS_BLOCK_SIZE, "The maximum size of blocks of uncompressed data before compressing for writing to a table.") \
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/Transaction/Codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ inline void EncodeVarUInt(UInt64 num, std::stringstream & ss)
TiKV::writeVarUInt(num, ss);
}

inline void EncodeNull(std::stringstream &ss)
{
writeIntBinary(UInt8(TiDB::CodecFlagNil), ss);
}

inline void EncodeDecimal(const Decimal & dec, std::stringstream & ss)
{
writeIntBinary(UInt8(TiDB::CodecFlagDecimal), ss);
Expand Down Expand Up @@ -394,6 +399,9 @@ T getFieldValue(const Field & field)

inline void EncodeDatum(const Field & field, TiDB::CodecFlag flag, std::stringstream & ss)
{
if (field.isNull()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format

return EncodeNull(ss);
}
switch (flag)
{
case TiDB::CodecFlagDecimal:
Expand Down