Skip to content

Commit

Permalink
columnref index is based on executor output schema (#167)
Browse files Browse the repository at this point in the history
* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema
  • Loading branch information
windtalker authored and zanmato1984 committed Aug 8, 2019
1 parent 0174b7e commit 9a1dd23
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 54 deletions.
104 changes: 80 additions & 24 deletions dbms/src/Flash/Coprocessor/tests/cop_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,78 @@ class FlashClient
SubPtr sp;

public:
static std::string decodeDatumToString(size_t & cursor, const std::string & raw_data)
{
switch (raw_data[cursor++])
{
case TiDB::CodecFlagNil:
return "NULL";
case TiDB::CodecFlagInt:
return std::to_string(DB::DecodeInt<Int64>(cursor, raw_data));
case TiDB::CodecFlagUInt:
return std::to_string(DB::DecodeInt<UInt64>(cursor, raw_data));
case TiDB::CodecFlagBytes:
return DB::DecodeBytes(cursor, raw_data);
case TiDB::CodecFlagCompactBytes:
return DB::DecodeCompactBytes(cursor, raw_data);
case TiDB::CodecFlagFloat:
return std::to_string(DB::DecodeFloat64(cursor, raw_data));
case TiDB::CodecFlagVarUInt:
return std::to_string(DB::DecodeVarUInt(cursor, raw_data));
case TiDB::CodecFlagVarInt:
return std::to_string(DB::DecodeVarInt(cursor, raw_data));
case TiDB::CodecFlagDuration:
throw DB::Exception("Not implented yet. DecodeDatum: CodecFlagDuration");
case TiDB::CodecFlagDecimal:
return DB::DecodeDecimal(cursor, raw_data).toString();
default:
throw DB::Exception("Unknown Type:" + std::to_string(raw_data[cursor - 1]));
}
}

FlashClient(ChannelPtr cp) : sp(tikvpb::Tikv::NewStub(cp)) {}
grpc::Status coprocessor(coprocessor::Request * rqst)
grpc::Status coprocessor(coprocessor::Request * rqst, size_t output_column_num)
{
grpc::ClientContext clientContext;
clientContext.AddMetadata("user_name", "");
clientContext.AddMetadata("dag_planner", "optree");
coprocessor::Response response;
grpc::Status status = sp->Coprocessor(&clientContext, *rqst, &response);
size_t column_num = 3;
if (status.ok())
{
// if status is ok, try to decode the result
tipb::SelectResponse selectResponse;
if (selectResponse.ParseFromString(response.data()))
{
for (tipb::Chunk chunk : selectResponse.chunks())
for (const tipb::Chunk & chunk : selectResponse.chunks())
{
size_t cursor = 0;
std::vector<DB::Field> row_result;
const std::string & data = chunk.rows_data();
while (cursor < data.size())
{
row_result.push_back(DB::DecodeDatum(cursor, data));
if (row_result.size() == column_num)
for (size_t i = 0; i < output_column_num; i++)
{
//print the result
std::cout << row_result[0].get<DB::Int64>() << " " << row_result[1].get<DB::String>() << " "
<< row_result[2].get<DB::Int64>() << std::endl;
row_result.clear();
std::cout << decodeDatumToString(cursor, data) << " ";
}
std::cout << std::endl;
}
}
std::cout << "Execute summary: " << std::endl;
for (int i = 0; i < selectResponse.execution_summaries_size(); i++)
{
auto & summary = selectResponse.execution_summaries(i);
std::cout << "Executor " << i;
std::cout << " time = " << summary.time_processed_ns() << " ns ";
std::cout << " rows = " << summary.num_produced_rows();
std::cout << " iter nums = " << summary.num_iterations();
std::cout << std::endl;
}
}
}
else
{
std::cout << "Coprocessor request failed, error code " << status.error_code() << " error msg " << status.error_message();
}
return status;
}
};
Expand All @@ -64,6 +102,7 @@ grpc::Status rpcTest()
{
ChannelPtr cp = grpc::CreateChannel("localhost:9093", grpc::InsecureChannelCredentials());
ClientPtr clientPtr = std::make_shared<FlashClient>(cp);
size_t result_field_num = 0;
// construct a dag request
tipb::DAGRequest dagRequest;
dagRequest.set_start_ts(18446744073709551615uL);
Expand All @@ -75,14 +114,15 @@ grpc::Status rpcTest()
tipb::ColumnInfo * ci = ts->add_columns();
ci->set_column_id(1);
ci->set_tp(0xfe);
ci->set_flag(1);
ci->set_flag(0);
ci = ts->add_columns();
ci->set_column_id(2);
ci->set_tp(8);
ci->set_flag(1);
ci->set_flag(0);
dagRequest.add_output_offsets(1);
dagRequest.add_output_offsets(0);
dagRequest.add_output_offsets(1);
result_field_num = 3;

// selection: less(i, 123)
executor = dagRequest.add_executors();
Expand All @@ -95,15 +135,23 @@ grpc::Status rpcTest()
tipb::Expr * value = expr->add_children();
col->set_tp(tipb::ExprType::ColumnRef);
std::stringstream ss;
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(2, ss);
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(1, ss);
col->set_val(ss.str());
auto * type = col->mutable_field_type();
type->set_tp(8);
type->set_flag(0);
value->set_tp(tipb::ExprType::Int64);
ss.str("");
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(10, ss);
value->set_val(std::string(ss.str()));
type = value->mutable_field_type();
type->set_tp(8);
type->set_flag(1);
type = expr->mutable_field_type();
type->set_tp(1);
type->set_flag(1 << 5);

// agg: count(s) group by i;
/*
executor = dagRequest.add_executors();
executor->set_tp(tipb::ExecType::TypeAggregation);
auto agg = executor->mutable_aggregation();
Expand All @@ -112,36 +160,44 @@ grpc::Status rpcTest()
auto child = agg_func->add_children();
child->set_tp(tipb::ExprType::ColumnRef);
ss.str("");
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(1, ss);
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(0, ss);
child->set_val(ss.str());
auto type = agg_func->mutable_field_type();
type->set_tp(3);
type->set_flag(33);
auto f_type = agg_func->mutable_field_type();
f_type->set_tp(3);
f_type->set_flag(33);
auto group_col = agg->add_group_by();
group_col->set_tp(tipb::ExprType::ColumnRef);
ss.str("");
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(2,ss);
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(1, ss);
group_col->set_val(ss.str());
*/
f_type = group_col->mutable_field_type();
f_type->set_tp(8);
f_type->set_flag(1);
result_field_num = 2;

// topn
/*
executor = dagRequest.add_executors();
executor->set_tp(tipb::ExecType::TypeTopN);
tipb::TopN * topN = executor->mutable_topn();
topN->set_limit(3);
tipb::ByItem * byItem = topN->add_order_by();
byItem->set_desc(true);
byItem->set_desc(false);
tipb::Expr * expr1 = byItem->mutable_expr();
expr1->set_tp(tipb::ExprType::ColumnRef);
ss.str("");
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(2, ss);
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(1, ss);
expr1->set_val(ss.str());
type = expr1->mutable_field_type();
type->set_tp(8);
type->set_tp(0);
*/
// limit
/*
executor = dagRequest.add_executors();
executor->set_tp(tipb::ExecType::TypeLimit);
tipb::Limit *limit = executor->mutable_limit();
limit->set_limit(1);
limit->set_limit(5);
*/


Expand All @@ -156,7 +212,7 @@ grpc::Status rpcTest()
request.set_tp(DAGREQUEST);
request.set_data(dagRequest.SerializeAsString());
//request.add_ranges();
return clientPtr->coprocessor(&request);
return clientPtr->coprocessor(&request, result_field_num);
}

void codecTest()
Expand Down
23 changes: 9 additions & 14 deletions dbms/src/Interpreters/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int COP_BAD_DAG_REQUEST;
extern const int UNSUPPORTED_METHOD;
} // namespace ErrorCodes

static String genCastString(const String & org_name, const String & target_type_name)
Expand Down Expand Up @@ -210,6 +211,7 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
DataTypePtr expected_type = getDataTypeByFieldType(expr.field_type());
DataTypePtr actual_type = actions->getSampleBlock().getByName(expr_name).type;
//todo maybe use a more decent compare method
// todo ignore nullable info??
if (expected_type->getName() != actual_type->getName())
{
// need to add cast function
Expand Down Expand Up @@ -266,9 +268,9 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi
else if (isColumnExpr(expr))
{
ColumnID columnId = getColumnID(expr);
if (columnId < 1 || columnId > (ColumnID)getCurrentInputColumns().size())
if (columnId < 0 || columnId >= (ColumnID)getCurrentInputColumns().size())
{
throw Exception("column id out of bound");
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
//todo check if the column type need to be cast to field type
return expr_name;
Expand All @@ -277,13 +279,13 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi
{
if (isAggFunctionExpr(expr))
{
throw Exception("agg function is not supported yet");
throw Exception("agg function is not supported yet", ErrorCodes::UNSUPPORTED_METHOD);
}
const String & func_name = getFunctionName(expr);
if (func_name == "in" || func_name == "notIn" || func_name == "globalIn" || func_name == "globalNotIn")
{
// todo support in
throw Exception(func_name + " is not supported yet");
throw Exception(func_name + " is not supported yet", ErrorCodes::UNSUPPORTED_METHOD);
}

const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(func_name, context);
Expand All @@ -292,15 +294,8 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi
for (auto & child : expr.children())
{
String name = getActions(child, actions);
if (actions->getSampleBlock().has(name))
{
argument_names.push_back(name);
argument_types.push_back(actions->getSampleBlock().getByName(name).type);
}
else
{
throw Exception("Unknown expr: " + child.DebugString());
}
argument_names.push_back(name);
argument_types.push_back(actions->getSampleBlock().getByName(name).type);
}

// re-construct expr_name, because expr_name generated previously is based on expr tree,
Expand All @@ -319,7 +314,7 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi
}
else
{
throw Exception("Unsupported expr type: " + getTypeName(expr));
throw Exception("Unsupported expr type: " + getTypeName(expr), ErrorCodes::UNSUPPORTED_METHOD);
}
}
} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Interpreters/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col
return DecodeBytes(cursor, expr.val());
case tipb::ExprType::ColumnRef:
columnId = DecodeInt<Int64>(cursor, expr.val());
if (columnId < 1 || columnId > (ColumnID)input_col.size())
if (columnId < 0 || columnId >= (ColumnID)input_col.size())
{
throw Exception("out of bound");
}
return input_col.getNames()[columnId - 1];
return input_col.getNames()[columnId];
case tipb::ExprType::Count:
case tipb::ExprType::Sum:
case tipb::ExprType::Avg:
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Interpreters/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
// cid out of bound
throw Exception("column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
String name = storage->getTableInfo().columns[cid - 1].name;
String name = storage->getTableInfo().getColumnName(cid);
required_columns.push_back(name);
NameAndTypePair nameAndTypePair = storage->getColumns().getPhysical(name);
source_columns.push_back(nameAndTypePair);
}
if (required_columns.empty())
{
Expand Down Expand Up @@ -168,7 +170,6 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
});
}
ColumnsWithTypeAndName columnsWithTypeAndName = pipeline.firstStream()->getHeader().getColumnsWithTypeAndName();
source_columns = storage->getColumns().getAllPhysical();
}

InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
Expand Down
Loading

0 comments on commit 9a1dd23

Please sign in to comment.