Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support udf in #175

Merged
merged 37 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
71c09fb
fix cop test regression
windtalker Aug 6, 2019
6b8a054
address comments
windtalker Aug 6, 2019
6f32efd
format code
windtalker Aug 6, 2019
11b3e09
fix npe for dag execute
windtalker Aug 6, 2019
64fef5c
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 6, 2019
f96fcf4
format code
windtalker Aug 6, 2019
324b64d
address comment
windtalker Aug 6, 2019
6b06122
add some comments
windtalker Aug 6, 2019
2327e9f
throw exception when meet error duing cop request handling
windtalker Aug 6, 2019
72d11ad
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 6, 2019
428459a
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 7, 2019
f3eb6e5
address comments
windtalker Aug 7, 2019
d8bb7d9
add error code
windtalker Aug 7, 2019
b6eaa3b
throw exception when meet error duing cop request handling
windtalker Aug 7, 2019
fe7916e
address comments
windtalker Aug 7, 2019
f1d0bfe
add DAGContext so InterpreterDAG can exchange information with DAGDriver
windtalker Aug 8, 2019
dde6dab
merge pingcap/tics cop
windtalker Aug 8, 2019
3c29365
fix bug
windtalker Aug 8, 2019
b984cb6
1. refine code, 2. address comments
windtalker Aug 8, 2019
ddf64e6
update comments
windtalker Aug 8, 2019
d9c4a0d
columnref index is based on executor output schema
windtalker Aug 8, 2019
947606a
merge pingcap/tics cop
windtalker Aug 8, 2019
85ae8b9
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 9, 2019
aea80d6
handle error in coprocessor request
windtalker Aug 9, 2019
2ed69ef
merge pingcap/tics cop
windtalker Aug 9, 2019
353a2b1
refine code
windtalker Aug 9, 2019
f406893
use Clear to clear a protobuf message completely
windtalker Aug 9, 2019
021e4c3
refine code
windtalker Aug 12, 2019
8994597
code refine && several minor bug fix
windtalker Aug 12, 2019
f959a9e
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 12, 2019
1550ede
address comments
windtalker Aug 12, 2019
7336bea
address comments
windtalker Aug 12, 2019
f93081b
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 12, 2019
3ac2e8d
support udf in
windtalker Aug 12, 2019
883de31
refine code
windtalker Aug 12, 2019
6e8f9c2
address comments
windtalker Aug 13, 2019
c11167b
address comments
windtalker Aug 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 52 additions & 8 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>

#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Columns/ColumnSet.h>
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/FieldToDataType.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/Set.h>
#include <Interpreters/convertFieldToType.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TypeMapping.h>
Expand Down Expand Up @@ -251,6 +254,32 @@ String DAGExpressionAnalyzer::appendCastIfNeeded(const tipb::Expr & expr, Expres
return expr_name;
}

void DAGExpressionAnalyzer::makeExplicitSet(
const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name)
{
if (prepared_sets.count(&expr))
{
return;
}
DataTypes set_element_types;
// todo support tuple in, i.e. (a,b) in ((1,2), (3,4)), currently TiDB convert tuple in into a series of or/and/eq exprs
// which means tuple in is never be pushed to coprocessor, but it is quite in-efficient
set_element_types.push_back(sample_block.getByName(left_arg_name).type);

// todo if this is a single value in, then convert it to equal expr
SetPtr set = std::make_shared<Set>(SizeLimits(settings.max_rows_in_set, settings.max_bytes_in_set, settings.set_overflow_mode));
set->createFromDAGExpr(set_element_types, expr, create_ordered_set);
prepared_sets[&expr] = std::move(set);
}

static String getUniqueName(const Block & block, const String & prefix)
{
int i = 1;
while (block.has(prefix + toString(i)))
++i;
return prefix + toString(i);
}

String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions)
{
String expr_name = getName(expr, getCurrentInputColumns());
Expand Down Expand Up @@ -288,20 +317,35 @@ String DAGExpressionAnalyzer::getActions(const tipb::Expr & expr, ExpressionActi
throw Exception("agg function is not supported yet", ErrorCodes::UNSUPPORTED_METHOD);
}
const String & func_name = getFunctionName(expr);
if (func_name == "in" || func_name == "notIn" || func_name == "globalIn" || func_name == "globalNotIn")
{
// todo support in
throw Exception(func_name + " is not supported yet", ErrorCodes::UNSUPPORTED_METHOD);
}

const FunctionBuilderPtr & function_builder = FunctionFactory::instance().get(func_name, context);
Names argument_names;
DataTypes argument_types;
for (auto & child : expr.children())

if (isInOrGlobalInOperator(func_name))
{
String name = getActions(child, actions);
String name = getActions(expr.children(0), actions);
argument_names.push_back(name);
argument_types.push_back(actions->getSampleBlock().getByName(name).type);
makeExplicitSet(expr, actions->getSampleBlock(), false, name);
ColumnWithTypeAndName column;
column.type = std::make_shared<DataTypeSet>();

const SetPtr & set = prepared_sets[&expr];

column.name = getUniqueName(actions->getSampleBlock(), "___set");
column.column = ColumnSet::create(1, set);
actions->add(ExpressionAction::addColumn(column));
argument_names.push_back(column.name);
argument_types.push_back(column.type);
}
else
{
for (auto & child : expr.children())
{
String name = getActions(child, actions);
argument_names.push_back(name);
argument_types.push_back(actions->getSampleBlock().getByName(name).type);
}
}

// re-construct expr_name, because expr_name generated previously is based on expr tree,
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
namespace DB
{

class Set;
using SetPtr = std::shared_ptr<Set>;
using DAGPreparedSets = std::unordered_map<const tipb::Expr *, SetPtr>;

/** Transforms an expression from DAG expression into a sequence of actions to execute it.
*
*/
Expand All @@ -24,6 +28,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
NamesAndTypesList source_columns;
// all columns after aggregation
NamesAndTypesList aggregated_columns;
DAGPreparedSets prepared_sets;
Settings settings;
const Context & context;
bool after_agg;
Expand All @@ -47,6 +52,7 @@ class DAGExpressionAnalyzer : private boost::noncopyable
void appendFinalProject(ExpressionActionsChain & chain, const NamesWithAliases & final_project);
String getActions(const tipb::Expr & expr, ExpressionActionsPtr & actions);
const NamesAndTypesList & getCurrentInputColumns();
void makeExplicitSet(const tipb::Expr & expr, const Block & sample_block, bool create_ordered_set, const String & left_arg_name);
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGStringConverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void DAGStringConverter::buildTSString(const tipb::TableScan & ts, std::stringst
String name = merge_tree->getTableInfo().columns[cid - 1].name;
output_from_ts.push_back(std::move(name));
}
ss << "FROM " << merge_tree->getTableInfo().db_name << "." << merge_tree->getTableInfo().name << " ";
ss << "FROM " << storage->getDatabaseName() << "." << storage->getTableName() << " ";
}

void DAGStringConverter::buildSelString(const tipb::Selection & sel, std::stringstream & ss)
Expand Down
61 changes: 33 additions & 28 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
namespace DB
{

namespace ErrorCodes
{
extern const int COP_BAD_DAG_REQUEST;
extern const int UNSUPPORTED_METHOD;
} // namespace ErrorCodes

bool isFunctionExpr(const tipb::Expr & expr)
{
switch (expr.tp())
Expand Down Expand Up @@ -43,7 +49,7 @@ const String & getAggFunctionName(const tipb::Expr & expr)
{
if (!aggFunMap.count(expr.tp()))
{
throw Exception(tipb::ExprType_Name(expr.tp()) + " is not supported.");
throw Exception(tipb::ExprType_Name(expr.tp()) + " is not supported.", ErrorCodes::UNSUPPORTED_METHOD);
}
return aggFunMap[expr.tp()];
}
Expand All @@ -54,21 +60,21 @@ const String & getFunctionName(const tipb::Expr & expr)
{
if (!aggFunMap.count(expr.tp()))
{
throw Exception(tipb::ExprType_Name(expr.tp()) + " is not supported.");
throw Exception(tipb::ExprType_Name(expr.tp()) + " is not supported.", ErrorCodes::UNSUPPORTED_METHOD);
}
return aggFunMap[expr.tp()];
}
else
{
if (!scalarFunMap.count(expr.sig()))
{
throw Exception(tipb::ScalarFuncSig_Name(expr.sig()) + " is not supported.");
throw Exception(tipb::ScalarFuncSig_Name(expr.sig()) + " is not supported.", ErrorCodes::UNSUPPORTED_METHOD);
}
return scalarFunMap[expr.sig()];
}
}

String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col)
String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col, bool for_parser)
{
std::stringstream ss;
size_t cursor = 1;
Expand All @@ -94,7 +100,7 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col
columnId = DecodeInt<Int64>(cursor, expr.val());
if (columnId < 0 || columnId >= (ColumnID)input_col.size())
{
throw Exception("out of bound");
throw Exception("Column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
}
return input_col.getNames()[columnId];
case tipb::ExprType::Count:
Expand All @@ -105,53 +111,50 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col
case tipb::ExprType::First:
if (!aggFunMap.count(expr.tp()))
{
throw Exception("not supported");
throw Exception(tipb::ExprType_Name(expr.tp()) + "not supported", ErrorCodes::UNSUPPORTED_METHOD);
}
func_name = aggFunMap.find(expr.tp())->second;
break;
case tipb::ExprType::ScalarFunc:
if (!scalarFunMap.count(expr.sig()))
{
throw Exception("not supported");
throw Exception(tipb::ScalarFuncSig_Name(expr.sig()) + "not supported", ErrorCodes::UNSUPPORTED_METHOD);
}
func_name = scalarFunMap.find(expr.sig())->second;
break;
default:
throw Exception("not supported");
throw Exception(tipb::ExprType_Name(expr.tp()) + "not supported", ErrorCodes::UNSUPPORTED_METHOD);
}
// build function expr
if (func_name == "in")
if (isInOrGlobalInOperator(func_name) && for_parser)
{
// for in, we could not represent the function expr using func_name(param1, param2, ...)
throw Exception("not supported");
throw Exception("Function " + func_name + " not supported", ErrorCodes::UNSUPPORTED_METHOD);
}
else
ss << func_name << "(";
bool first = true;
for (const tipb::Expr & child : expr.children())
{
ss << func_name << "(";
bool first = true;
for (const tipb::Expr & child : expr.children())
String s = exprToString(child, input_col, for_parser);
if (first)
{
String s = exprToString(child, input_col);
if (first)
{
first = false;
}
else
{
ss << ", ";
}
ss << s;
first = false;
}
ss << ") ";
return ss.str();
else
{
ss << ", ";
}
ss << s;
}
ss << ") ";
return ss.str();
}

const String & getTypeName(const tipb::Expr & expr) { return tipb::ExprType_Name(expr.tp()); }

String getName(const tipb::Expr & expr, const NamesAndTypesList & current_input_columns)
{
return exprToString(expr, current_input_columns);
return exprToString(expr, current_input_columns, false);
}

bool isAggFunctionExpr(const tipb::Expr & expr)
Expand Down Expand Up @@ -225,7 +228,7 @@ Field decodeLiteral(const tipb::Expr & expr)
case tipb::ExprType::MysqlTime:
case tipb::ExprType::MysqlJson:
case tipb::ExprType::ValueList:
throw Exception("mysql type literal is not supported yet");
throw Exception(tipb::ExprType_Name(expr.tp()) + "is not supported yet", ErrorCodes::UNSUPPORTED_METHOD);
default:
return DecodeDatum(cursor, expr.val());
}
Expand All @@ -237,6 +240,8 @@ ColumnID getColumnID(const tipb::Expr & expr)
return DecodeInt<Int64>(cursor, expr.val());
}

bool isInOrGlobalInOperator(const String & name) { return name == "in" || name == "notIn" || name == "globalIn" || name == "globalNotIn"; }

std::unordered_map<tipb::ExprType, String> aggFunMap({
{tipb::ExprType::Count, "count"}, {tipb::ExprType::Sum, "sum"}, {tipb::ExprType::Avg, "avg"}, {tipb::ExprType::Min, "min"},
{tipb::ExprType::Max, "max"}, {tipb::ExprType::First, "any"},
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ bool isColumnExpr(const tipb::Expr & expr);
ColumnID getColumnID(const tipb::Expr & expr);
String getName(const tipb::Expr & expr, const NamesAndTypesList & current_input_columns);
const String & getTypeName(const tipb::Expr & expr);
String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col);
String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col, bool for_parser = true);
bool isInOrGlobalInOperator(const String & name);
extern std::unordered_map<tipb::ExprType, String> aggFunMap;
extern std::unordered_map<tipb::ScalarFuncSig, String> scalarFunMap;

Expand Down
15 changes: 8 additions & 7 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ void InterpreterDAG::executeTS(const tipb::TableScan & ts, Pipeline & pipeline)
throw Exception("No column is selected in table scan executor", ErrorCodes::COP_BAD_DAG_REQUEST);
}

analyzer = std::make_unique<DAGExpressionAnalyzer>(source_columns, context);

if (!dag.hasAggregation())
{
// if the dag request does not contain agg, then the final output is
Expand Down Expand Up @@ -175,28 +177,27 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
{
AnalysisResult res;
ExpressionActionsChain chain;
DAGExpressionAnalyzer analyzer(source_columns, context);
if (dag.hasSelection())
{
analyzer.appendWhere(chain, dag.getSelection(), res.filter_column_name);
analyzer->appendWhere(chain, dag.getSelection(), res.filter_column_name);
res.has_where = true;
res.before_where = chain.getLastActions();
chain.addStep();
}
// There will be either Agg...
if (dag.hasAggregation())
{
analyzer.appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions);
analyzer->appendAggregation(chain, dag.getAggregation(), res.aggregation_keys, res.aggregate_descriptions);
res.need_aggregate = true;
res.before_aggregation = chain.getLastActions();

chain.finalize();
chain.clear();

// add cast if type is not match
analyzer.appendAggSelect(chain, dag.getAggregation());
analyzer->appendAggSelect(chain, dag.getAggregation());
//todo use output_offset to reconstruct the final project columns
for (auto element : analyzer.getCurrentInputColumns())
for (auto element : analyzer->getCurrentInputColumns())
{
final_project.emplace_back(element.name, "");
}
Expand All @@ -205,10 +206,10 @@ InterpreterDAG::AnalysisResult InterpreterDAG::analyzeExpressions()
if (dag.hasTopN())
{
res.has_order_by = true;
analyzer.appendOrderBy(chain, dag.getTopN(), res.order_column_names);
analyzer->appendOrderBy(chain, dag.getTopN(), res.order_column_names);
}
// Append final project results if needed.
analyzer.appendFinalProject(chain, final_project);
analyzer->appendFinalProject(chain, final_project);
res.before_order_and_select = chain.getLastActions();
chain.finalize();
chain.clear();
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#pragma GCC diagnostic pop

#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Interpreters/AggregateDescription.h>
Expand Down Expand Up @@ -98,6 +99,8 @@ class InterpreterDAG : public IInterpreter
TMTStoragePtr storage;
TableStructureReadLockPtr table_lock;

std::unique_ptr<DAGExpressionAnalyzer> analyzer;

Poco::Logger * log;
};
} // namespace DB
Loading