Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add coprocessor support in TiFlash #161

Merged
merged 105 commits into from
Oct 17, 2019
Merged
Show file tree
Hide file tree
Changes from 99 commits
Commits
Show all changes
105 commits
Select commit Hold shift + click to select a range
aa92f4e
basic framework for coprocessor support in tiflash
windtalker Jul 30, 2019
4f37218
basic support for InterpreterDagRequestV2
windtalker Jul 30, 2019
85bfd5c
code refine
windtalker Jul 30, 2019
e1700c3
tipb submodule use tipb master branch
windtalker Jul 31, 2019
0f82665
rewrite build flow in InterpreterDagRequest
windtalker Jul 31, 2019
a7655bc
rename Dag to DAG
windtalker Jul 31, 2019
f516f00
Update tipb submodule
zanmato1984 Aug 1, 2019
3b520c9
basic support for selection/limit/topn executor in InterpreterDAGRequest
windtalker Aug 2, 2019
9591d26
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 2, 2019
ead9609
basic support for selection/limit/topn executor in InterpreterDAGRequ…
windtalker Aug 2, 2019
bed0bd4
merge pingcap/cop branch
windtalker Aug 2, 2019
526cad9
Code reorg
zanmato1984 Aug 4, 2019
be4d80c
Format
zanmato1984 Aug 4, 2019
64a45a9
merge pingcap/cop
windtalker Aug 5, 2019
a76fdb3
merge pingcap/cop
windtalker Aug 5, 2019
0cfe045
Refine code
zanmato1984 Aug 5, 2019
e9b216c
Merge branch 'cop' of https://github.com/pingcap/tics into cop
windtalker Aug 5, 2019
3617a87
basic support for dag agg executor
windtalker Aug 5, 2019
cb55df4
Code refine
zanmato1984 Aug 5, 2019
ed41c93
Merge master into cop
zanmato1984 Aug 5, 2019
08b7142
Refine code
zanmato1984 Aug 5, 2019
bc25942
Another way of getting codec flag
zanmato1984 Aug 5, 2019
059f267
fix cop test regression (#157)
windtalker Aug 6, 2019
e59e8f3
fix npe during dag execute (#160)
windtalker Aug 6, 2019
a618cb5
Add tipb cpp gen in build script
zanmato1984 Aug 6, 2019
4f797fe
Merge branch 'master' into cop
zanmato1984 Aug 6, 2019
bb51749
Fix build error and adjust some formats
zanmato1984 Aug 6, 2019
da1cb0e
Fix build error
zanmato1984 Aug 6, 2019
816ef4b
Fix build error
zanmato1984 Aug 6, 2019
f18fcdd
Update flash configs
zanmato1984 Aug 6, 2019
2ade1cb
Format
zanmato1984 Aug 6, 2019
3870d93
Merge branch 'master' into cop
zanmato1984 Aug 7, 2019
7cb9e71
throw exception when meet error duing cop request handling (#162)
windtalker Aug 7, 2019
5fe66ee
Merge branch 'master' into cop
zanmato1984 Aug 8, 2019
0174b7e
add DAGContext so InterpreterDAG can exchange information with DAGDri…
windtalker Aug 8, 2019
9a1dd23
columnref index is based on executor output schema (#167)
windtalker Aug 8, 2019
26e20d5
Move flash/cop/dag to individual library
zanmato1984 Aug 8, 2019
bf67d9d
Merge cop lib
zanmato1984 Aug 8, 2019
62ced38
DAG planner fix and mock dag request (#169)
zanmato1984 Aug 9, 2019
b346a24
Merge branch 'master' into cop
zanmato1984 Aug 9, 2019
57cd382
Fix DAG get and lock storage
zanmato1984 Aug 9, 2019
4a76e91
handle error in cop request (#171)
windtalker Aug 12, 2019
2d093a8
code refine && several minor bug fix (#174)
windtalker Aug 12, 2019
c8cd3d7
Fix region id in mock dag
zanmato1984 Aug 12, 2019
0492af6
support udf in (#175)
windtalker Aug 14, 2019
4a6bad8
Merge branch 'master' into cop
zanmato1984 Aug 14, 2019
8713ff2
1. fix decode literal expr error, 2. add all scalar function sig in s…
windtalker Aug 14, 2019
7759af1
Merge branch 'master' into cop
zanmato1984 Aug 15, 2019
b25d1cc
some bug fix (#179)
windtalker Aug 15, 2019
3d38b7b
Support all DAG operator types in mock SQL -> DAG parser (#176)
zanmato1984 Aug 15, 2019
cbcfdb0
filter column must be uint8 in tiflash (#180)
windtalker Aug 16, 2019
d87e2d5
1. fix encode null error, 2. fix empty field type generated by TiFlas…
windtalker Aug 16, 2019
17f7fcb
Merge branch 'master' into cop
zanmato1984 Aug 16, 2019
5853b91
check validation of dag exprs field type (#183)
windtalker Aug 19, 2019
0a6767a
Merge branch 'master' into cop
zanmato1984 Aug 19, 2019
d53ca34
Merge branch 'master' into cop
zanmato1984 Aug 20, 2019
5de0ec6
add more coprocessor mock tests (#185)
windtalker Aug 20, 2019
6196171
add some log about implicit cast (#188)
windtalker Aug 21, 2019
960cc56
Merge branch 'master' into cop
zanmato1984 Aug 24, 2019
08bacd7
Pass DAG tests after merging master (#199)
zanmato1984 Aug 24, 2019
e8b4198
Fix date/datetime/bit encode error (#200)
zanmato1984 Aug 26, 2019
61cdc8f
improve dag execution time collection (#202)
windtalker Aug 26, 2019
53dcd1f
Merge branch 'master' into cop
zanmato1984 Aug 27, 2019
10e3883
column id in table scan operator may be -1 (#205)
windtalker Aug 27, 2019
39d1994
quick fix for decimal encode (#210)
windtalker Aug 30, 2019
8a0fb66
support udf like with 3 arguments (#212)
windtalker Sep 2, 2019
ff9a1de
Flash-473 optimize date and datetime comparison (#221)
windtalker Sep 5, 2019
17aacde
Merge master
zanmato1984 Sep 5, 2019
6b14b38
FLASH-479 select from empty table throw error in tiflash (#223)
windtalker Sep 6, 2019
548e519
Update flash service port
zanmato1984 Sep 6, 2019
fce3676
fix bug in DAGBlockOutputStream (#230)
windtalker Sep 10, 2019
a9f9b48
FLASH-475: Support BATCH COMMANDS in flash service (#232)
zanmato1984 Sep 12, 2019
1ccfbd4
Merge branch 'master' into cop
zhexuany Sep 12, 2019
df07939
FLASH-483: Combine raft service and flash service (#235)
zanmato1984 Sep 16, 2019
99f26c0
Merge master
zanmato1984 Sep 16, 2019
0bb7991
Fix build error
zanmato1984 Sep 16, 2019
f41f853
Fix test regression
zanmato1984 Sep 16, 2019
259ec77
Fix null value bug in datum
zanmato1984 Sep 17, 2019
ef65514
Merge branch 'master' into cop
zanmato1984 Sep 17, 2019
708d52f
FLASH-490: Fix table scan with -1 column ID and no agg (#240)
zanmato1984 Sep 23, 2019
3656a95
Merge branch 'master' into cop
zanmato1984 Sep 23, 2019
a4c1074
throw error if the cop request is not based on full region scan (#247)
windtalker Sep 24, 2019
b57656c
Merge branch 'master' into cop
zanmato1984 Sep 25, 2019
3a43942
FLASH-437 Support time zone in coprocessor (#259)
windtalker Sep 27, 2019
01caa55
Merge branch 'master' into cop
zanmato1984 Sep 27, 2019
8d2576e
Address comment
zanmato1984 Sep 29, 2019
d33a278
FLASH-489 support key condition for coprocessor query (#261)
windtalker Sep 30, 2019
087faee
Merge branch 'master' into cop
zanmato1984 Sep 30, 2019
4aa2b58
only return execute summaies if requested (#264)
windtalker Sep 30, 2019
80f6f35
Refine service init (#265)
zanmato1984 Oct 8, 2019
f255362
FLASH-554 cop check range should be based on region range (#270)
windtalker Oct 10, 2019
7fc53ad
minor improve (#273)
windtalker Oct 11, 2019
22ad2d3
Merge branch 'master' into cop
zanmato1984 Oct 11, 2019
a1304ae
Fix mutex on timezone retrieval (#276)
ilovesoup2000 Oct 11, 2019
687dcbe
Fix race condition of batch command handling (#277)
zanmato1984 Oct 12, 2019
939b8cf
address comment
windtalker Oct 14, 2019
d25dadc
address comments
windtalker Oct 14, 2019
4080fba
address comments
windtalker Oct 14, 2019
d2890e3
Fix NULL order for dag (#281)
zanmato1984 Oct 14, 2019
bc075c5
refine get actions in DAGExpressionAnalyzer, fix bug in dbgFuncCoproc…
windtalker Oct 15, 2019
fbcbdc0
remove duplicate agg funcs (#283)
windtalker Oct 15, 2019
d968c09
address comments
windtalker Oct 16, 2019
4f58878
Update dbms/src/Flash/BatchCommandsHandler.cpp
zanmato1984 Oct 17, 2019
92c16c2
revert unnecessary changes
windtalker Oct 17, 2019
e8b92b4
Merge branch 'master' into cop
zanmato1984 Oct 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
[submodule "contrib/kvproto"]
path = contrib/kvproto
url = https://github.com/pingcap/kvproto.git
[submodule "contrib/tipb"]
path = contrib/tipb
url = https://github.com/pingcap/tipb.git
branch = master
[submodule "contrib/client-c"]
path = contrib/client-c
url = git@github.com:tikv/client-c.git
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ include (cmake/find_capnp.cmake)
include (cmake/find_llvm.cmake)
include (cmake/find_grpc.cmake)
include (cmake/find_kvproto.cmake)
include (cmake/find_tipb.cmake)


include (cmake/find_contrib_lib.cmake)
Expand Down
10 changes: 10 additions & 0 deletions cmake/find_tipb.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/tipb/cpp/tipb/select.pb.h")
if (EXISTS "${ClickHouse_SOURCE_DIR}/contrib/tipb/proto/select.proto")
message (FATAL_ERROR "tipb cpp files in contrib/tipb is missing. Try go to contrib/tipb, and run ./generate_cpp.sh")
else()
message (FATAL_ERROR "tipb submodule in contrib/tipb is missing. Try run 'git submodule update --init --recursive', and go to contrib/tipb, and run ./generate_cpp.sh")
endif()
endif ()

message(STATUS "Using tipb: ${ClickHouse_SOURCE_DIR}/contrib/tipb/cpp")
1 change: 1 addition & 0 deletions contrib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
add_subdirectory (kvproto/cpp)
add_subdirectory (client-c)
add_subdirectory (tipb/cpp)

if (NOT MSVC)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast")
Expand Down
1 change: 1 addition & 0 deletions contrib/tipb
Submodule tipb added at b2d318
3 changes: 3 additions & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ add_headers_and_sources(dbms src/Storages/Page)
add_headers_and_sources(dbms src/Raft)
add_headers_and_sources(dbms src/TiDB)
add_headers_and_sources(dbms src/Client)
add_headers_only(dbms src/Flash/Coprocessor)
add_headers_only(dbms src/Server)

list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
Expand Down Expand Up @@ -149,8 +150,10 @@ target_link_libraries (dbms
clickhouse_parsers
clickhouse_common_config
clickhouse_common_io
flash_service
kvproto
kv_client
tipb
${Protobuf_LIBRARIES}
gRPC::grpc++_unsecure
${CURL_LIBRARIES}
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ add_subdirectory (AggregateFunctions)
add_subdirectory (Server)
add_subdirectory (Client)
add_subdirectory (TableFunctions)
add_subdirectory (Analyzers)
add_subdirectory (Analyzers)
add_subdirectory (Flash)
1 change: 1 addition & 0 deletions dbms/src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ namespace ErrorCodes
extern const int SCHEMA_SYNC_ERROR = 10003;
extern const int SCHEMA_VERSION_ERROR = 10004;
extern const int DDL_ERROR = 10005;
extern const int COP_BAD_DAG_REQUEST = 10006;
}

}
11 changes: 11 additions & 0 deletions dbms/src/Common/MyTime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -473,4 +473,15 @@ void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & tim
to_time = to_my_time.toPackedUInt();
}

void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, const DateLUTImpl & time_zone)
{
MyDateTime from_my_time(from_time);
time_t epoch = time_zone.makeDateTime(
from_my_time.year, from_my_time.month, from_my_time.day, from_my_time.hour, from_my_time.minute, from_my_time.second);
epoch += offset;
MyDateTime to_my_time(time_zone.toYear(epoch), time_zone.toMonth(epoch), time_zone.toDayOfMonth(epoch),
time_zone.toHour(epoch), time_zone.toMinute(epoch), time_zone.toSecond(epoch), from_my_time.micro_second);
to_time = to_my_time.toPackedUInt();
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Common/MyTime.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ Field parseMyDateTime(const String & str);

void convertTimeZone(UInt64 from_time, UInt64 & to_time, const DateLUTImpl & time_zone_from, const DateLUTImpl & time_zone_to);

void convertTimeZoneByOffset(UInt64 from_time, UInt64 & to_time, Int64 offset, const DateLUTImpl & time_zone);

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
#define DEFAULT_MAX_READ_TSO 0xFFFFFFFFFFFFFFFF
#define DEFAULT_UNSPECIFIED_SCHEMA_VERSION -1

#define DEFAULT_DAG_RECORDS_PER_CHUNK 64L

/** Which blocks by default read the data (by number of rows).
* Smaller values give better cache locality, less consumption of RAM, but more overhead to process the query.
*/
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/DataStreams/BlockStreamProfileInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ struct BlockStreamProfileInfo
size_t rows = 0;
size_t blocks = 0;
size_t bytes = 0;
// execution time is the total time spent on current stream and all its children streams
// note that it is different from total_stopwatch.elapsed(), which includes not only the
// time spent on current stream and all its children streams, but also the time of its
// parent streams
UInt64 execution_time = 0;

using BlockStreamProfileInfos = std::vector<const BlockStreamProfileInfo *>;

Expand All @@ -45,6 +50,8 @@ struct BlockStreamProfileInfo

void update(Block & block);

void updateExecutionTime(UInt64 time) { execution_time += time; }

/// Binary serialization and deserialization of main fields.
/// Writes only main fields i.e. fields that required by internal transmission protocol.
void read(ReadBuffer & in);
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/DataStreams/IProfilingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ Block IProfilingBlockInputStream::read()
if (isCancelledOrThrowIfKilled())
return res;

auto start_time = info.total_stopwatch.elapsed();

if (!checkTimeLimit())
limit_exceeded_need_break = true;

Expand Down Expand Up @@ -83,6 +85,7 @@ Block IProfilingBlockInputStream::read()
}
#endif

info.updateExecutionTime(info.total_stopwatch.elapsed() - start_time);
return res;
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/DataTypes/DataTypeMyDateTime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ DataTypeMyDateTime::DataTypeMyDateTime(int fraction_)
{
fraction = fraction_;
if (fraction < 0 || fraction > 6)
throw Exception("fraction must >= 0 and < 6", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
throw Exception("fraction must >= 0 and <= 6", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}

void DataTypeMyDateTime::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr) const
Expand Down
115 changes: 69 additions & 46 deletions dbms/src/Debug/DBGInvoker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <DataStreams/StringStreamBlockInputStream.h>
#include <Debug/ClusterManage.h>
#include <Debug/DBGInvoker.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Debug/dbgFuncMockTiDBData.h>
#include <Debug/dbgFuncMockTiDBTable.h>
#include <Debug/dbgFuncRegion.h>
Expand All @@ -30,48 +31,50 @@ void dbgFuncSleep(Context &, const ASTs & args, DBGInvoker::Printer output)

DBGInvoker::DBGInvoker()
{
regFunc("echo", dbgFuncEcho);
regSchemalessFunc("echo", dbgFuncEcho);
// TODO: remove this, use sleep in bash script
regFunc("sleep", dbgFuncSleep);

regFunc("mock_tidb_table", MockTiDBTable::dbgFuncMockTiDBTable);
regFunc("mock_tidb_db", MockTiDBTable::dbgFuncMockTiDBDB);
regFunc("mock_tidb_partition", MockTiDBTable::dbgFuncMockTiDBPartition);
regFunc("drop_tidb_partition", MockTiDBTable::dbgFuncDropTiDBPartition);
regFunc("drop_tidb_table", MockTiDBTable::dbgFuncDropTiDBTable);
regFunc("drop_tidb_db", MockTiDBTable::dbgFuncDropTiDBDB);
regFunc("add_column_to_tidb_table", MockTiDBTable::dbgFuncAddColumnToTiDBTable);
regFunc("drop_column_from_tidb_table", MockTiDBTable::dbgFuncDropColumnFromTiDBTable);
regFunc("modify_column_in_tidb_table", MockTiDBTable::dbgFuncModifyColumnInTiDBTable);
regFunc("rename_column_in_tidb_table", MockTiDBTable::dbgFuncRenameColumnInTiDBTable);
regFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable);
regFunc("truncate_tidb_table", MockTiDBTable::dbgFuncTruncateTiDBTable);

regFunc("set_flush_threshold", dbgFuncSetFlushThreshold);

regFunc("raft_insert_row", dbgFuncRaftInsertRow);
regFunc("raft_insert_row_full", dbgFuncRaftInsertRowFull);
regFunc("raft_insert_rows", dbgFuncRaftInsertRows);
regFunc("raft_update_rows", dbgFuncRaftUpdateRows);
regFunc("raft_delete_rows", dbgFuncRaftDelRows);
regFunc("raft_delete_row", dbgFuncRaftDeleteRow);

regFunc("put_region", dbgFuncPutRegion);
regFunc("region_snapshot", dbgFuncRegionSnapshot);
regFunc("region_snapshot_data", dbgFuncRegionSnapshotWithData);

regFunc("try_flush", dbgFuncTryFlush);
regFunc("try_flush_region", dbgFuncTryFlushRegion);

regFunc("dump_all_region", dbgFuncDumpAllRegion);
regFunc("dump_all_mock_region", dbgFuncDumpAllMockRegion);

regFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService);
regFunc("refresh_schemas", dbgFuncRefreshSchemas);
regFunc("reset_schemas", dbgFuncResetSchemas);

regFunc("dump_region_table", ClusterManage::dumpRegionTable);
regFunc("find_region_by_range", ClusterManage::findRegionByRange);
regSchemalessFunc("sleep", dbgFuncSleep);

regSchemalessFunc("mock_tidb_table", MockTiDBTable::dbgFuncMockTiDBTable);
regSchemalessFunc("mock_tidb_db", MockTiDBTable::dbgFuncMockTiDBDB);
regSchemalessFunc("mock_tidb_partition", MockTiDBTable::dbgFuncMockTiDBPartition);
regSchemalessFunc("drop_tidb_table", MockTiDBTable::dbgFuncDropTiDBTable);
regSchemalessFunc("drop_tidb_db", MockTiDBTable::dbgFuncDropTiDBDB);
regSchemalessFunc("drop_tidb_partition", MockTiDBTable::dbgFuncDropTiDBPartition);
regSchemalessFunc("add_column_to_tidb_table", MockTiDBTable::dbgFuncAddColumnToTiDBTable);
regSchemalessFunc("drop_column_from_tidb_table", MockTiDBTable::dbgFuncDropColumnFromTiDBTable);
regSchemalessFunc("modify_column_in_tidb_table", MockTiDBTable::dbgFuncModifyColumnInTiDBTable);
regSchemalessFunc("rename_column_in_tidb_table", MockTiDBTable::dbgFuncRenameColumnInTiDBTable);
regSchemalessFunc("rename_tidb_table", MockTiDBTable::dbgFuncRenameTiDBTable);
regSchemalessFunc("truncate_tidb_table", MockTiDBTable::dbgFuncTruncateTiDBTable);

regSchemalessFunc("set_flush_threshold", dbgFuncSetFlushThreshold);

regSchemalessFunc("raft_insert_row", dbgFuncRaftInsertRow);
regSchemalessFunc("raft_insert_row_full", dbgFuncRaftInsertRowFull);
regSchemalessFunc("raft_insert_rows", dbgFuncRaftInsertRows);
regSchemalessFunc("raft_update_rows", dbgFuncRaftUpdateRows);
regSchemalessFunc("raft_delete_rows", dbgFuncRaftDelRows);
regSchemalessFunc("raft_delete_row", dbgFuncRaftDeleteRow);

regSchemalessFunc("put_region", dbgFuncPutRegion);
regSchemalessFunc("region_snapshot", dbgFuncRegionSnapshot);
regSchemalessFunc("region_snapshot_data", dbgFuncRegionSnapshotWithData);

regSchemalessFunc("try_flush", dbgFuncTryFlush);
regSchemalessFunc("try_flush_region", dbgFuncTryFlushRegion);

regSchemalessFunc("dump_all_region", dbgFuncDumpAllRegion);
regSchemalessFunc("dump_all_mock_region", dbgFuncDumpAllMockRegion);
regSchemalessFunc("dump_region_table", ClusterManage::dumpRegionTable);
regSchemalessFunc("find_region_by_range", ClusterManage::findRegionByRange);

regSchemalessFunc("enable_schema_sync_service", dbgFuncEnableSchemaSyncService);
regSchemalessFunc("refresh_schemas", dbgFuncRefreshSchemas);
regSchemalessFunc("reset_schemas", dbgFuncResetSchemas);

regSchemafulFunc("dag", dbgFuncDAG);
regSchemafulFunc("mock_dag", dbgFuncMockDAG);
}

void replaceSubstr(std::string & str, const std::string & target, const std::string & replacement)
Expand Down Expand Up @@ -103,10 +106,25 @@ BlockInputStreamPtr DBGInvoker::invoke(Context & context, const std::string & or
name = ori_name.substr(prefix_not_print_res.size(), ori_name.size() - prefix_not_print_res.size());
}

auto it = funcs.find(name);
if (it == funcs.end())
throw Exception("DBG function not found", ErrorCodes::BAD_ARGUMENTS);
BlockInputStreamPtr res;
auto it_schemaless = schemaless_funcs.find(name);
if (it_schemaless != schemaless_funcs.end())
res = invokeSchemaless(context, name, it_schemaless->second, args);
else
{
auto it_schemaful = schemaful_funcs.find(name);
if (it_schemaful != schemaful_funcs.end())
res = invokeSchemaful(context, name, it_schemaful->second, args);
else
throw Exception("DBG function not found", ErrorCodes::BAD_ARGUMENTS);
}

return print_res ? res : std::shared_ptr<StringStreamBlockInputStream>();
}

BlockInputStreamPtr DBGInvoker::invokeSchemaless(
Context & context, const std::string & name, const SchemalessDBGFunc & func, const ASTs & args)
{
std::stringstream col_name;
col_name << name << "(";
for (size_t i = 0; i < args.size(); ++i)
Expand All @@ -119,9 +137,14 @@ BlockInputStreamPtr DBGInvoker::invoke(Context & context, const std::string & or
std::shared_ptr<StringStreamBlockInputStream> res = std::make_shared<StringStreamBlockInputStream>(col_name.str());
Printer printer = [&](const std::string & s) { res->append(s); };

(it->second)(context, args, printer);
func(context, args, printer);

return print_res ? res : std::shared_ptr<StringStreamBlockInputStream>();
return res;
}

BlockInputStreamPtr DBGInvoker::invokeSchemaful(Context & context, const std::string &, const SchemafulDBGFunc & func, const ASTs & args)
{
return func(context, args);
}

} // namespace DB
11 changes: 8 additions & 3 deletions dbms/src/Debug/DBGInvoker.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,21 @@ class DBGInvoker
{
public:
using Printer = std::function<void(const std::string &)>;
using DBGFunc = std::function<void(Context & context, const ASTs & args, Printer printer)>;
using SchemalessDBGFunc = std::function<void(Context & context, const ASTs & args, Printer printer)>;
using SchemafulDBGFunc = std::function<BlockInputStreamPtr(Context & context, const ASTs & args)>;

DBGInvoker();

void regFunc(const std::string & name, DBGFunc func) { funcs[name] = func; }
void regSchemalessFunc(const std::string & name, SchemalessDBGFunc func) { schemaless_funcs[name] = func; }
void regSchemafulFunc(const std::string & name, SchemafulDBGFunc func) { schemaful_funcs[name] = func; }

BlockInputStreamPtr invoke(Context & context, const std::string & ori_name, const ASTs & args);
BlockInputStreamPtr invokeSchemaless(Context & context, const std::string & name, const SchemalessDBGFunc & func, const ASTs & args);
BlockInputStreamPtr invokeSchemaful(Context & context, const std::string & name, const SchemafulDBGFunc & func, const ASTs & args);

private:
std::unordered_map<std::string, DBGFunc> funcs;
std::unordered_map<std::string, SchemalessDBGFunc> schemaless_funcs;
std::unordered_map<std::string, SchemafulDBGFunc> schemaful_funcs;
};

} // namespace DB
Loading