Skip to content

Commit

Permalink
add DAGContext so InterpreterDAG can exchange information with DAGDri…
Browse files Browse the repository at this point in the history
…ver (#166)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments
  • Loading branch information
windtalker authored and zanmato1984 committed Aug 8, 2019
1 parent 5fe66ee commit 0174b7e
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 54 deletions.
9 changes: 5 additions & 4 deletions dbms/src/DataStreams/DAGBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <DataTypes/DataTypeNullable.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TypeMapping.h>

namespace DB
{
Expand All @@ -13,11 +14,11 @@ extern const int LOGICAL_ERROR;
} // namespace ErrorCodes

DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse & dag_response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_,
FieldTpAndFlags && field_tp_and_flags_, Block header_)
std::vector<tipb::FieldType> && result_field_types_, Block header_)
: dag_response(dag_response_),
records_per_chunk(records_per_chunk_),
encodeType(encodeType_),
field_tp_and_flags(field_tp_and_flags_),
result_field_types(result_field_types_),
header(header_)
{
if (encodeType == tipb::EncodeType::TypeArrow)
Expand Down Expand Up @@ -46,7 +47,7 @@ void DAGBlockOutputStream::writeSuffix()

void DAGBlockOutputStream::write(const Block & block)
{
if (block.columns() != field_tp_and_flags.size())
if (block.columns() != result_field_types.size())
throw Exception("Output column size mismatch with field type size", ErrorCodes::LOGICAL_ERROR);

// TODO: Check compatibility between field_tp_and_flags and block column types.
Expand All @@ -69,7 +70,7 @@ void DAGBlockOutputStream::write(const Block & block)
for (size_t j = 0; j < block.columns(); j++)
{
auto field = (*block.getByPosition(j).column.get())[i];
EncodeDatum(field, field_tp_and_flags[j].getCodecFlag(), current_ss);
EncodeDatum(field, getCodecFlagByFieldType(result_field_types[j]), current_ss);
}
// Encode current row
records_per_chunk++;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/DAGBlockOutputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class DAGBlockOutputStream : public IBlockOutputStream
{
public:
DAGBlockOutputStream(tipb::SelectResponse & response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_,
FieldTpAndFlags && field_tp_and_flags_, Block header_);
std::vector<tipb::FieldType> && result_field_types, Block header_);

Block getHeader() const override { return header; }
void write(const Block & block) override;
Expand All @@ -31,7 +31,7 @@ class DAGBlockOutputStream : public IBlockOutputStream

Int64 records_per_chunk;
tipb::EncodeType encodeType;
FieldTpAndFlags field_tp_and_flags;
std::vector<tipb::FieldType> result_field_types;

Block header;

Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include <DataStreams/IBlockInputStream.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
{

class Context;

class DAGContext
{
public:
DAGContext(size_t profile_list_size) { profile_streams_list.resize(profile_list_size); };
std::vector<BlockInputStreams> profile_streams_list;
};
} // namespace DB
26 changes: 24 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/DAGBlockOutputStream.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Interpreters/Context.h>
#include <Interpreters/DAGQuerySource.h>
Expand Down Expand Up @@ -30,7 +31,8 @@ void DAGDriver::execute()
{
context.setSetting("read_tso", UInt64(dag_request.start_ts()));

DAGQuerySource dag(context, region_id, region_version, region_conf_version, dag_request);
DAGContext dag_context(dag_request.executors_size());
DAGQuerySource dag(context, dag_context, region_id, region_version, region_conf_version, dag_request);
BlockIO streams;

String planner = context.getSettings().dag_planner;
Expand All @@ -55,8 +57,28 @@ void DAGDriver::execute()
throw Exception("DAG is not query.", ErrorCodes::LOGICAL_ERROR);

BlockOutputStreamPtr outputStreamPtr = std::make_shared<DAGBlockOutputStream>(dag_response, context.getSettings().dag_records_per_chunk,
dag_request.encode_type(), dag.getOutputFieldTpAndFlags(), streams.in->getHeader());
dag_request.encode_type(), dag.getResultFieldTypes(), streams.in->getHeader());
copyData(*streams.in, *outputStreamPtr);
// add ExecutorExecutionSummary info
for (auto & p_streams : dag_context.profile_streams_list)
{
auto * executeSummary = dag_response.add_execution_summaries();
UInt64 time_processed_ns = 0;
UInt64 num_produced_rows = 0;
UInt64 num_iterations = 0;
for (auto & streamPtr : p_streams)
{
if (auto * p_stream = dynamic_cast<IProfilingBlockInputStream *>(streamPtr.get()))
{
time_processed_ns += p_stream->getProfileInfo().total_stopwatch.elapsed();
num_produced_rows += p_stream->getProfileInfo().rows;
num_iterations += p_stream->getProfileInfo().blocks;
}
}
executeSummary->set_time_processed_ns(time_processed_ns);
executeSummary->set_num_produced_rows(num_produced_rows);
executeSummary->set_num_iterations(num_iterations);
}
}

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/tests/cop_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ grpc::Status rpcTest()
col->set_val(ss.str());
value->set_tp(tipb::ExprType::Int64);
ss.str("");
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(888, ss);
DB::EncodeNumber<Int64, TiDB::CodecFlagInt>(10, ss);
value->set_val(std::string(ss.str()));

// agg: count(s) group by i;
Expand Down
19 changes: 13 additions & 6 deletions dbms/src/Interpreters/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace DB

namespace ErrorCodes
{
extern const int COP_BAD_DAG_REQUEST;
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

static String genCastString(const String & org_name, const String & target_type_name)
Expand Down Expand Up @@ -151,9 +151,10 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
bool need_update_aggregated_columns = false;
NamesAndTypesList updated_aggregated_columns;
ExpressionActionsChain::Step step = chain.steps.back();
auto agg_col_names = aggregated_columns.getNames();
for (Int32 i = 0; i < aggregation.agg_func_size(); i++)
{
String & name = aggregated_columns.getNames()[i];
String & name = agg_col_names[i];
String updated_name = appendCastIfNeeded(aggregation.agg_func(i), step.actions, name);
if (name != updated_name)
{
Expand All @@ -170,7 +171,7 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons
}
for (Int32 i = 0; i < aggregation.group_by_size(); i++)
{
String & name = aggregated_columns.getNames()[i + aggregation.agg_func_size()];
String & name = agg_col_names[i + aggregation.agg_func_size()];
String updated_name = appendCastIfNeeded(aggregation.group_by(i), step.actions, name);
if (name != updated_name)
{
Expand All @@ -188,17 +189,23 @@ void DAGExpressionAnalyzer::appendAggSelect(ExpressionActionsChain & chain, cons

if (need_update_aggregated_columns)
{
auto updated_agg_col_names = updated_aggregated_columns.getNames();
auto updated_agg_col_types = updated_aggregated_columns.getTypes();
aggregated_columns.clear();
for (size_t i = 0; i < updated_aggregated_columns.size(); i++)
{
aggregated_columns.emplace_back(updated_aggregated_columns.getNames()[i], updated_aggregated_columns.getTypes()[i]);
aggregated_columns.emplace_back(updated_agg_col_names[i], updated_agg_col_types[i]);
}
}
}

String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String expr_name)
String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name)
{
if (expr.has_field_type() && isFunctionExpr(expr))
if (!expr.has_field_type())
{
throw Exception("Expression without field type", ErrorCodes::COP_BAD_DAG_REQUEST);
}
if (isFunctionExpr(expr))
{
DataTypePtr expected_type = getDataTypeByFieldType(expr.field_type());
DataTypePtr actual_type = actions->getSampleBlock().getByName(expr_name).type;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <tipb/executor.pb.h>
#pragma GCC diagnostic pop

#include <Flash/Coprocessor/DAGContext.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/DAGUtils.h>
#include <Interpreters/ExpressionActions.h>
Expand Down Expand Up @@ -34,7 +35,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
void appendAggregation(ExpressionActionsChain & chain, const tipb::Aggregation & agg, Names & aggregate_keys,
AggregateDescriptions & aggregate_descriptions);
void appendAggSelect(ExpressionActionsChain & chain, const tipb::Aggregation & agg);
String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String expr_name);
String appendCastIfNeeded(const tipb::Expr & expr, ExpressionActionsPtr & actions, const String & expr_name);
void initChain(ExpressionActionsChain & chain, const NamesAndTypesList & columns) const
{
if (chain.steps.empty())
Expand Down
84 changes: 69 additions & 15 deletions dbms/src/Interpreters/DAGQuerySource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
namespace DB
{

namespace ErrorCodes
{
extern const int COP_BAD_DAG_REQUEST;
} // namespace ErrorCodes

const String DAGQuerySource::TS_NAME("tablescan");
const String DAGQuerySource::SEL_NAME("selection");
const String DAGQuerySource::AGG_NAME("aggregation");
Expand All @@ -24,9 +29,10 @@ static void assignOrThrowException(Int32 & index, Int32 value, const String & na
index = value;
}

DAGQuerySource::DAGQuerySource(
Context & context_, RegionID region_id_, UInt64 region_version_, UInt64 region_conf_version_, const tipb::DAGRequest & dag_request_)
DAGQuerySource::DAGQuerySource(Context & context_, DAGContext & dag_context_, RegionID region_id_, UInt64 region_version_,
UInt64 region_conf_version_, const tipb::DAGRequest & dag_request_)
: context(context_),
dag_context(dag_context_),
region_id(region_id_),
region_version(region_version_),
region_conf_version(region_conf_version_),
Expand All @@ -48,6 +54,7 @@ DAGQuerySource::DAGQuerySource(
break;
case tipb::ExecType::TypeTopN:
assignOrThrowException(order_index, i, TOPN_NAME);
assignOrThrowException(limit_index, i, TOPN_NAME);
break;
case tipb::ExecType::TypeLimit:
assignOrThrowException(limit_index, i, LIMIT_NAME);
Expand Down Expand Up @@ -78,23 +85,70 @@ std::unique_ptr<IInterpreter> DAGQuerySource::interpreter(Context &, QueryProces
return std::make_unique<InterpreterDAG>(context, *this);
}

FieldTpAndFlags DAGQuerySource::getOutputFieldTpAndFlags() const
bool fillExecutorOutputFieldTypes(const tipb::Executor & executor, std::vector<tipb::FieldType> & output_field_types)
{
FieldTpAndFlags output;

const auto & ts = getTS();
const auto & column_infos = ts.columns();
for (auto i : dag_request.output_offsets())
tipb::FieldType field_type;
switch (executor.tp())
{
// TODO: Checking bound.
auto & column_info = column_infos[i];
output.emplace_back(FieldTpAndFlag{static_cast<TiDB::TP>(column_info.tp()), static_cast<UInt32>(column_info.flag())});
case tipb::ExecType::TypeTableScan:
for (auto ci : executor.tbl_scan().columns())
{
field_type.set_tp(ci.tp());
field_type.set_flag(ci.flag());
output_field_types.push_back(field_type);
}
return true;
case tipb::ExecType::TypeStreamAgg:
case tipb::ExecType::TypeAggregation:
for (auto & expr : executor.aggregation().agg_func())
{
if (!expr.has_field_type())
{
throw Exception("Agg expression without field type", ErrorCodes::COP_BAD_DAG_REQUEST);
}
output_field_types.push_back(expr.field_type());
}
for (auto & expr : executor.aggregation().group_by())
{
if (!expr.has_field_type())
{
throw Exception("Group by expression without field type", ErrorCodes::COP_BAD_DAG_REQUEST);
}
output_field_types.push_back(expr.field_type());
}
return true;
default:
return false;
}
}

// TODO: Add aggregation columns.
// We either write our own code to infer types that follows the convention between TiDB and TiKV, or ask TiDB to push down aggregation field types.

return output;
std::vector<tipb::FieldType> DAGQuerySource::getResultFieldTypes() const
{
std::vector<tipb::FieldType> executor_output;
for (int i = dag_request.executors_size() - 1; i >= 0; i--)
{
if (fillExecutorOutputFieldTypes(dag_request.executors(i), executor_output))
{
break;
}
}
if (executor_output.empty())
{
throw Exception("Do not found result field type for current dag request", ErrorCodes::COP_BAD_DAG_REQUEST);
}
// tispark assumes that if there is a agg, the output offset is
// ignored and the request out put is the same as the agg's output.
// todo should always use output offset to re-construct the output field types
if (hasAggregation())
{
return executor_output;
}
std::vector<tipb::FieldType> ret;
for (int i : dag_request.output_offsets())
{
ret.push_back(executor_output[i]);
}
return ret;
}

} // namespace DB
Loading

0 comments on commit 0174b7e

Please sign in to comment.