Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support all DAG operator types in mock SQL -> DAG parser (#176)
Browse files Browse the repository at this point in the history
* Enhance dbg invoke and add dag as schemaful function

* Add basic sql parse to dag

* Column id starts from 1

* Fix value to ref

* Add basic dag test

* Fix dag bugs and pass 1st mock test

* Make dag go normal routine and add mock dag

* Add todo

* Add comment

* Fix gcc compile error

* Enhance dag test

* Address comments

* Enhance mock sql -> dag compiler and add project test

* Mock sql dag compiler support more expression types and add filter test

* Add topn and limit test

* Add agg for sql -> dag parser and agg test

* Add dag specific codec

* type

* Update codec accordingly

* Remove cop-test
zanmato1984 authored and windtalker committed Aug 15, 2019
1 parent b25d1cc commit 3d38b7b
Showing 13 changed files with 634 additions and 385 deletions.
378 changes: 345 additions & 33 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp

Large diffs are not rendered by default.

65 changes: 65 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGCodec.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include <Flash/Coprocessor/DAGCodec.h>

#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TiKVRecordFormat.h>

namespace DB
{

void encodeDAGInt64(Int64 i, std::stringstream & ss)
{
auto u = RecordKVFormat::encodeInt64(i);
ss.write(reinterpret_cast<const char *>(&u), sizeof(u));
}

void encodeDAGUInt64(UInt64 i, std::stringstream & ss)
{
auto u = RecordKVFormat::encodeUInt64(i);
ss.write(reinterpret_cast<const char *>(&u), sizeof(u));
}

void encodeDAGFloat32(Float32 f, std::stringstream & ss) { EncodeFloat64(f, ss); }

void encodeDAGFloat64(Float64 f, std::stringstream & ss) { EncodeFloat64(f, ss); }

void encodeDAGString(const String & s, std::stringstream & ss) { ss << s; }

void encodeDAGBytes(const String & bytes, std::stringstream & ss) { ss << bytes; }

void encodeDAGDecimal(const Decimal & d, std::stringstream & ss) { EncodeDecimal(d, ss); }

Int64 decodeDAGInt64(const String & s)
{
auto u = *(reinterpret_cast<const UInt64 *>(s.data()));
return RecordKVFormat::decodeInt64(u);
}

UInt64 decodeDAGUInt64(const String & s)
{
auto u = *(reinterpret_cast<const UInt64 *>(s.data()));
return RecordKVFormat::decodeUInt64(u);
}

Float32 decodeDAGFloat32(const String & s)
{
size_t cursor = 0;
return DecodeFloat64(cursor, s);
}

Float64 decodeDAGFloat64(const String & s)
{
size_t cursor = 0;
return DecodeFloat64(cursor, s);
}

String decodeDAGString(const String & s) { return s; }

String decodeDAGBytes(const String & s) { return s; }

Decimal decodeDAGDecimal(const String & s)
{
size_t cursor = 0;
return DecodeDecimal(cursor, s);
}

} // namespace DB
25 changes: 25 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGCodec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <Common/Decimal.h>
#include <Core/Field.h>

namespace DB
{

void encodeDAGInt64(Int64, std::stringstream &);
void encodeDAGUInt64(UInt64, std::stringstream &);
void encodeDAGFloat32(Float32, std::stringstream &);
void encodeDAGFloat64(Float64, std::stringstream &);
void encodeDAGString(const String &, std::stringstream &);
void encodeDAGBytes(const String &, std::stringstream &);
void encodeDAGDecimal(const Decimal &, std::stringstream &);

Int64 decodeDAGInt64(const String &);
UInt64 decodeDAGUInt64(const String &);
Float32 decodeDAGFloat32(const String &);
Float64 decodeDAGFloat64(const String &);
String decodeDAGString(const String &);
String decodeDAGBytes(const String &);
Decimal decodeDAGDecimal(const String &);

} // namespace DB
33 changes: 17 additions & 16 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#include <Flash/Coprocessor/DAGUtils.h>

#include <Core/Types.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Interpreters/Context.h>
#include <Storages/Transaction/Codec.h>
#include <Storages/Transaction/TiKVRecordFormat.h>

#include <unordered_map>

@@ -51,7 +50,6 @@ const String & getFunctionName(const tipb::Expr & expr)
String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col, bool for_parser)
{
std::stringstream ss;
size_t cursor = 0;
Int64 column_id = 0;
String func_name;
Field f;
@@ -60,19 +58,21 @@ String exprToString(const tipb::Expr & expr, const NamesAndTypesList & input_col
case tipb::ExprType::Null:
return "NULL";
case tipb::ExprType::Int64:
return std::to_string(RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(expr.val().data())));
return std::to_string(decodeDAGInt64(expr.val()));
case tipb::ExprType::Uint64:
return std::to_string(DecodeInt<UInt64>(cursor, expr.val()));
return std::to_string(decodeDAGUInt64(expr.val()));
case tipb::ExprType::Float32:
return std::to_string(decodeDAGFloat32(expr.val()));
case tipb::ExprType::Float64:
return std::to_string(DecodeFloat64(cursor, expr.val()));
return std::to_string(decodeDAGFloat64(expr.val()));
case tipb::ExprType::String:
return decodeDAGString(expr.val());
case tipb::ExprType::Bytes:
return expr.val();
return decodeDAGBytes(expr.val());
case tipb::ExprType::MysqlDecimal:
return DecodeDecimal(cursor, expr.val()).toString();
return decodeDAGDecimal(expr.val()).toString();
case tipb::ExprType::ColumnRef:
column_id = RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(expr.val().data()));
column_id = decodeDAGInt64(expr.val());
if (column_id < 0 || column_id >= (ColumnID)input_col.size())
{
throw Exception("Column id out of bound", ErrorCodes::COP_BAD_DAG_REQUEST);
@@ -191,23 +191,24 @@ bool isColumnExpr(const tipb::Expr & expr) { return expr.tp() == tipb::ExprType:

Field decodeLiteral(const tipb::Expr & expr)
{
size_t cursor = 0;
switch (expr.tp())
{
case tipb::ExprType::Null:
return Field();
case tipb::ExprType::Int64:
return RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(expr.val().data()));
return decodeDAGInt64(expr.val());
case tipb::ExprType::Uint64:
return DecodeInt<UInt64>(cursor, expr.val());
return decodeDAGUInt64(expr.val());
case tipb::ExprType::Float32:
return Float64(decodeDAGFloat32(expr.val()));
case tipb::ExprType::Float64:
return DecodeFloat64(cursor, expr.val());
return decodeDAGFloat64(expr.val());
case tipb::ExprType::String:
return decodeDAGString(expr.val());
case tipb::ExprType::Bytes:
return expr.val();
return decodeDAGBytes(expr.val());
case tipb::ExprType::MysqlDecimal:
return DecodeDecimal(cursor, expr.val());
return decodeDAGDecimal(expr.val());
case tipb::ExprType::MysqlBit:
case tipb::ExprType::MysqlDuration:
case tipb::ExprType::MysqlEnum:
@@ -224,7 +225,7 @@ Field decodeLiteral(const tipb::Expr & expr)

ColumnID getColumnID(const tipb::Expr & expr)
{
auto column_id = RecordKVFormat::decodeInt64(RecordKVFormat::read<UInt64>(expr.val().data()));
auto column_id = decodeDAGInt64(expr.val());
return column_id;
}

3 changes: 0 additions & 3 deletions dbms/src/Flash/Coprocessor/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
include_directories (${CMAKE_CURRENT_BINARY_DIR})

add_executable (cop_test cop_test.cpp)
target_link_libraries (cop_test dbms)
332 changes: 0 additions & 332 deletions dbms/src/Flash/Coprocessor/tests/cop_test.cpp

This file was deleted.

10 changes: 10 additions & 0 deletions dbms/src/Storages/Transaction/TiDB.h
Original file line number Diff line number Diff line change
@@ -97,6 +97,16 @@ enum TP
M(PartKey, (1 << 14)) \
M(Num, (1 << 15))

enum ColumnFlag
{
#ifdef M
#error "Please undefine macro M first."
#endif
#define M(cf, v) ColumnFlag##cf = v,
COLUMN_FLAGS(M)
#undef M
};

// Codec flags.
// In format: TiDB codec flag, int value.
#ifdef M
32 changes: 32 additions & 0 deletions tests/mutable-test/txn_dag/aggregation.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666)
=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 666)
=> DBGInvoke __raft_insert_row(default, test, 4, 52, 'test3', 777)

# DAG read by not specifying region id, group by.
=> DBGInvoke dag('select count(col_1) from default.test group by col_2')
┌─count(col_1)─┬─col_2─┐
│ 2 │ 666 │
│ 1 │ 777 │
└──────────────┴───────┘

# DAG read by explicitly specifying region id, where + group by.
=> DBGInvoke dag('select count(col_1) from default.test where col_2 = 666 group by col_2', 4)
┌─count(col_1)─┬─col_2─┐
│ 2 │ 666 │
└──────────────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
37 changes: 37 additions & 0 deletions tests/mutable-test/txn_dag/filter.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666)
=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 777)

# DAG read by not specifying region id, where col_1 = 666.
=> DBGInvoke dag('select * from default.test where col_2 = 666')
┌─col_1─┬─col_2─┐
│ test1 │ 666 │
└───────┴───────┘

# DAG read by explicitly specifying region id, where col_2 = 'test2'.
=> DBGInvoke dag('select col_2 from default.test where col_1 = \'test2\'', 4)
┌─col_2─┐
│ 777 │
└───────┘

# Mock DAG read, where or.
=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_1 = \'test2\' or col_2 = 666', 4)
┌─col_2─┬─col_1─┬─col_2─┐
│ 666 │ test1 │ 666 │
│ 777 │ test2 │ 777 │
└───────┴───────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
31 changes: 31 additions & 0 deletions tests/mutable-test/txn_dag/limit.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666)
=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test1', 666)

# DAG read by not specifying region id, order by col_2 limit 1.
=> DBGInvoke dag('select * from default.test')
┌─col_1─┬─col_2─┐
│ test1 │ 666 │
│ test1 │ 666 │
└───────┴───────┘

# Mock DAG read, where + topn.
=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_2 = 666 limit 1', 4)
┌─col_2─┬─col_1─┬─col_2─┐
│ 666 │ test1 │ 666 │
└───────┴───────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
41 changes: 41 additions & 0 deletions tests/mutable-test/txn_dag/project.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666)

# DAG read by not specifying region id, select *.
=> DBGInvoke dag('select * from default.test') " --dag_planner="optree
┌─col_1─┬─col_2─┐
│ test1 │ 666 │
└───────┴───────┘

# DAG read by not specifying region id, select col_1.
=> DBGInvoke dag('select col_1 from default.test') " --dag_planner="optree
┌─col_1─┐
│ test1 │
└───────┘

# DAG read by explicitly specifying region id, select col_2.
=> DBGInvoke dag('select col_2 from default.test', 4) " --dag_planner="optree
┌─col_2─┐
│ 666 │
└───────┘

# Mock DAG read, select col_2, col_1, col_2.
=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test', 4) " --dag_planner="optree
┌─col_2─┬─col_1─┬─col_2─┐
│ 666 │ test1 │ 666 │
└───────┴───────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test
2 changes: 1 addition & 1 deletion tests/mutable-test/txn_dag/table_scan.test
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data
# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
30 changes: 30 additions & 0 deletions tests/mutable-test/txn_dag/topn.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Preparation.
=> DBGInvoke __enable_schema_sync_service('true')

=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

=> DBGInvoke __set_flush_threshold(1000000, 1000000)

# Data.
=> DBGInvoke __mock_tidb_table(default, test, 'col_1 String, col_2 Int64')
=> DBGInvoke __refresh_schemas()
=> DBGInvoke __put_region(4, 0, 100, default, test)
=> DBGInvoke __raft_insert_row(default, test, 4, 50, 'test1', 666)
=> DBGInvoke __raft_insert_row(default, test, 4, 51, 'test2', 777)

# DAG read by not specifying region id, order by col_2 limit 1.
=> DBGInvoke dag('select * from default.test order by col_2 limit 1')
┌─col_1─┬─col_2─┐
│ test1 │ 666 │
└───────┴───────┘

# Mock DAG read, where + topn.
=> DBGInvoke mock_dag('select col_2, col_1, col_2 from default.test where col_1 = \'test2\' or col_2 = 666 order by col_1 desc limit 1', 4)
┌─col_2─┬─col_1─┬─col_2─┐
│ 777 │ test2 │ 777 │
└───────┴───────┴───────┘

# Clean up.
=> DBGInvoke __drop_tidb_table(default, test)
=> drop table if exists default.test

0 comments on commit 3d38b7b

Please sign in to comment.