From cbb6492610279c249aff7c61620dae776af4a1bf Mon Sep 17 00:00:00 2001 From: xufei Date: Fri, 25 Oct 2019 16:24:11 +0800 Subject: [PATCH] Flash-481 Arrow encode (#279) * basic framework for coprocessor support in tiflash * basic support for InterpreterDagRequestV2 * code refine * tipb submodule use tipb master branch * rewrite build flow in InterpreterDagRequest * rename Dag to DAG * Update tipb submodule * basic support for selection/limit/topn executor in InterpreterDAGRequest * basic support for selection/limit/topn executor in InterpreterDAGRequest (#150) * merge pingcap/cop branch * Code reorg * Format * merge pingcap/cop * Refine code * basic support for dag agg executor * Code refine * Refine code * Another way of getting codec flag * fix cop test regression (#157) * fix cop test regression * address comments * format code * fix npe during dag execute (#160) * fix cop test regression * address comments * format code * fix npe for dag execute * format code * address comment * add some comments * Add tipb cpp gen in build script * Fix build error and adjust some formats * Fix build error * Fix build error * Update flash configs * Format * throw exception when meet error duing cop request handling (#162) * fix cop test regression * address comments * format code * fix npe for dag execute * format code * address comment * add some comments * throw exception when meet error duing cop request handling * address comments * add error code * throw exception when meet error duing cop request handling * address comments * add DAGContext so InterpreterDAG can exchange information with DAGDriver (#166) * fix cop test regression * address comments * format code * fix npe for dag execute * format code * address comment * add some comments * throw exception when meet error duing cop request handling * address comments * add error code * throw exception when meet error duing cop request handling * address comments * add DAGContext so InterpreterDAG can exchange information with DAGDriver * fix bug * 1. refine code, 2. address comments * update comments * columnref index is based on executor output schema (#167) * fix cop test regression * address comments * format code * fix npe for dag execute * format code * address comment * add some comments * throw exception when meet error duing cop request handling * address comments * add error code * throw exception when meet error duing cop request handling * address comments * add DAGContext so InterpreterDAG can exchange information with DAGDriver * fix bug * 1. refine code, 2. address comments * update comments * columnref index is based on executor output schema * Move flash/cop/dag to individual library * DAG planner fix and mock dag request (#169) * Enhance dbg invoke and add dag as schemaful function * Add basic sql parse to dag * Column id starts from 1 * Fix value to ref * Add basic dag test * Fix dag bugs and pass 1st mock test * Make dag go normal routine and add mock dag * Add todo * Add comment * Fix gcc compile error * Enhance dag test * Address comments * Fix DAG get and lock storage * handle error in cop request (#171) * fix cop test regression * address comments * format code * fix npe for dag execute * format code * address comment * add some comments * throw exception when meet error duing cop request handling * address comments * add error code * throw exception when meet error duing cop request handling * address comments * add DAGContext so InterpreterDAG can exchange information with DAGDriver * fix bug * 1. refine code, 2. address comments * update comments * columnref index is based on executor output schema * handle error in coprocessor request * refine code * use Clear to clear a protobuf message completely * refine code * code refine && several minor bug fix (#174) * fix cop test regression * address comments * format code * fix npe for dag execute * format code * address comment * add some comments * throw exception when meet error duing cop request handling * address comments * add error code * throw exception when meet error duing cop request handling * address comments * add DAGContext so InterpreterDAG can exchange information with DAGDriver * fix bug * 1. refine code, 2. address comments * update comments * columnref index is based on executor output schema * handle error in coprocessor request * refine code * use Clear to clear a protobuf message completely * refine code * code refine && several minor bug fix * address comments * address comments * Fix region id in mock dag * support udf in (#175) * fix cop test regression * address comments * format code * fix npe for dag execute * format code * address comment * add some comments * throw exception when meet error duing cop request handling * address comments * add error code * throw exception when meet error duing cop request handling * address comments * add DAGContext so InterpreterDAG can exchange information with DAGDriver * fix bug * 1. refine code, 2. address comments * update comments * columnref index is based on executor output schema * handle error in coprocessor request * refine code * use Clear to clear a protobuf message completely * refine code * code refine && several minor bug fix * address comments * address comments * support udf in * refine code * address comments * address comments * 1. fix decode literal expr error, 2. add all scalar function sig in scalar_func_map (#177) * add all scalar function sig in scalarFunMap * fix literal expr decode * enable ltrim && rtrim * code refine * use throw instead of rethrow in DAGDriver.cpp * some bug fix (#179) * add all scalar function sig in scalarFunMap * fix literal expr decode * enable ltrim && rtrim * code refine * use throw instead of rethrow in DAGDriver.cpp * 1. fix decode UInt literal error, 2. support mysqlDecimal type * format code * Support all DAG operator types in mock SQL -> DAG parser (#176) * Enhance dbg invoke and add dag as schemaful function * Add basic sql parse to dag * Column id starts from 1 * Fix value to ref * Add basic dag test * Fix dag bugs and pass 1st mock test * Make dag go normal routine and add mock dag * Add todo * Add comment * Fix gcc compile error * Enhance dag test * Address comments * Enhance mock sql -> dag compiler and add project test * Mock sql dag compiler support more expression types and add filter test * Add topn and limit test * Add agg for sql -> dag parser and agg test * Add dag specific codec * type * Update codec accordingly * Remove cop-test * filter column must be uint8 in tiflash (#180) * add all scalar function sig in scalarFunMap * fix literal expr decode * enable ltrim && rtrim * code refine * use throw instead of rethrow in DAGDriver.cpp * 1. fix decode UInt literal error, 2. support mysqlDecimal type * format code * filter column must be uint8 in tiflash * address comments * address comments * address comments * remove useless include * 1. fix encode null error, 2. fix empty field type generated by TiFlash (#182) * add all scalar function sig in scalarFunMap * fix literal expr decode * enable ltrim && rtrim * code refine * use throw instead of rethrow in DAGDriver.cpp * 1. fix decode UInt literal error, 2. support mysqlDecimal type * format code * filter column must be uint8 in tiflash * address comments * address comments * address comments * remove useless include * 1. fix encode null error, 2. fix empty field type generated by TiFlash * check validation of dag exprs field type (#183) * check validation of dag exprs field type * format code * address comments * add more coprocessor mock tests (#185) * check validation of dag exprs field type * format code * address comments * add more filter test * add data type tests * remove useless comment * disable decimal test * add some log about implicit cast (#188) * check validation of dag exprs field type * format code * address comments * add more filter test * add data type tests * remove useless comment * disable decimal test * add some log about implicit cast * address comment * Pass DAG tests after merging master (#199) * Enhance dbg invoke and add dag as schemaful function * Add basic sql parse to dag * Column id starts from 1 * Fix value to ref * Add basic dag test * Fix dag bugs and pass 1st mock test * Make dag go normal routine and add mock dag * Add todo * Add comment * Fix gcc compile error * Enhance dag test * Address comments * Enhance mock sql -> dag compiler and add project test * Mock sql dag compiler support more expression types and add filter test * Add topn and limit test * Add agg for sql -> dag parser and agg test * Add dag specific codec * type * Update codec accordingly * Remove cop-test * Pass tests after merging master * Fix date/datetime/bit encode error (#200) * Enhance dbg invoke and add dag as schemaful function * Add basic sql parse to dag * Column id starts from 1 * Fix value to ref * Add basic dag test * Fix dag bugs and pass 1st mock test * Make dag go normal routine and add mock dag * Add todo * Add comment * Fix gcc compile error * Enhance dag test * Address comments * Enhance mock sql -> dag compiler and add project test * Mock sql dag compiler support more expression types and add filter test * Add topn and limit test * Add agg for sql -> dag parser and agg test * Add dag specific codec * type * Update codec accordingly * Remove cop-test * Pass tests after merging master * Copy some changes from xufei * Enable date/datetime test * Enable date/datetime test * Refine code * Adjust date/datetime tiflash rep to UInt * Fix datetime to Int * Typo * improve dag execution time collection (#202) * improve dag execution time collection * address comment * update comments * update comment * update comment * column id in table scan operator may be -1 (#205) * improve dag execution time collection * address comment * update comments * update comment * update comment * column id in table scan operator may be -1 * column id in table scan operator may be -1 * quick fix for decimal encode (#210) * quick fix for decimal encode * address comments * update comments * support udf like with 3 arguments (#212) * support udf like with 3 arguments * address comments * add some comments * Flash-473 optimize date and datetime comparison (#221) * support udf like with 3 arguments * address comments * add some comments * Flash-473 optimize date and datetime comparison * address comments * FLASH-479 select from empty table throw error in tiflash (#223) * 1. select from empty table throw error in tiflash, 2. add some logs, 3. disable timestamp literal in DAG request * revert unrelated change * Update flash service port * fix bug in DAGBlockOutputStream * fix bug in DAGBlockOutputStream (#230) * FLASH-475: Support BATCH COMMANDS in flash service (#232) * Initial batch command support * Add config to control thread pool size * Address comments * init change for array encode * FLASH-483: Combine raft service and flash service (#235) * Combine raft service and flash service * Address comment and fix build error * Update configs * Fix build error * Fix test regression * Fix null value bug in datum * FLASH-490: Fix table scan with -1 column ID and no agg (#240) * Fix table scan with -1 column ID and no agg * Add break * Remove useless includes * Use dag context to store void ft instead of dag query source * Fix decimal type reverse get * Change adding smallest column to adding handle column, address comments * throw error if the cop request is not based on full region scan (#247) * throw error if the cop request is not based on full region scan * format code * FLASH-437 Support time zone in coprocessor (#259) * do not allow timestamp literal in DAG request * refine code * fix cop date type encode error * support tz info in DAG request * address comments * Address comment * use the new date implementation * FLASH-489 support key condition for coprocessor query (#261) * support key condition for coprocessor query * add tests * remove useless code * check validation when build RPNElement for function in/notIn * address comments * address comments * only return execute summaies if requested (#264) * refine code * Refine service init (#265) * fix bug * fix bug * FLASH-554 cop check range should be based on region range (#270) * only return execute summaies if requested * cop check range should be based on region range * address comments * add tests * minor improve * add ut for arrow encode * minor improve (#273) * update tipb * Fix mutex on timezone retrieval (#276) * fix mutex contention * add const ref * Fix race condition of batch command handling (#277) * update tipb version * set default record_per_chunk to 1024 * address comment * address comments * refine code * refine code * add mock_dag test * code refine * code refine * address comments * Fix NULL order for dag (#281) * refine get actions in DAGExpressionAnalyzer, fix bug in dbgFuncCoprocessor (#282) * remove duplicate agg funcs (#283) * 1. remove duplicate agg funcs, 2. for column ref expr, change column_id to column_index since the value stored in column ref expr is not column id * bug fix * refine code * remove useless code * address comments * remove uselss include * address comments * refine code * address comments * format code * fix typo * Update dbms/src/Flash/BatchCommandsHandler.cpp Co-Authored-By: JaySon * revert unnecessary changes * refine code * fix build error * refine code * address comments * refine code * address comments --- contrib/tipb | 2 +- dbms/src/Core/Defines.h | 2 +- dbms/src/DataTypes/DataTypeDecimal.h | 1 + dbms/src/DataTypes/IDataType.h | 2 + dbms/src/Debug/dbgFuncCoprocessor.cpp | 131 +++-- .../src/Flash/Coprocessor/ArrowChunkCodec.cpp | 84 +++ dbms/src/Flash/Coprocessor/ArrowChunkCodec.h | 18 + dbms/src/Flash/Coprocessor/ArrowColCodec.cpp | 546 ++++++++++++++++++ dbms/src/Flash/Coprocessor/ArrowColCodec.h | 13 + dbms/src/Flash/Coprocessor/ChunkCodec.h | 37 ++ .../Coprocessor/DAGBlockOutputStream.cpp | 78 ++- .../Flash/Coprocessor/DAGBlockOutputStream.h | 11 +- dbms/src/Flash/Coprocessor/DAGDriver.cpp | 2 +- dbms/src/Flash/Coprocessor/DAGUtils.cpp | 4 +- .../Flash/Coprocessor/DefaultChunkCodec.cpp | 80 +++ .../src/Flash/Coprocessor/DefaultChunkCodec.h | 17 + dbms/src/Flash/Coprocessor/TiDBChunk.cpp | 37 ++ dbms/src/Flash/Coprocessor/TiDBChunk.h | 39 ++ dbms/src/Flash/Coprocessor/TiDBColumn.cpp | 171 ++++++ dbms/src/Flash/Coprocessor/TiDBColumn.h | 81 +++ dbms/src/Flash/Coprocessor/TiDBDecimal.cpp | 81 +++ dbms/src/Flash/Coprocessor/TiDBDecimal.h | 25 + dbms/src/Flash/Coprocessor/TiDBTime.h | 26 + dbms/src/Flash/CoprocessorHandler.cpp | 2 +- tests/mutable-test/txn_dag/arrow_encode.test | 45 ++ .../txn_dag/data_type_number.test | 4 + .../mutable-test/txn_dag/data_type_time.test | 4 + tests/mutable-test/txn_dag/time_zone.test | 21 +- 28 files changed, 1439 insertions(+), 125 deletions(-) create mode 100644 dbms/src/Flash/Coprocessor/ArrowChunkCodec.cpp create mode 100644 dbms/src/Flash/Coprocessor/ArrowChunkCodec.h create mode 100644 dbms/src/Flash/Coprocessor/ArrowColCodec.cpp create mode 100644 dbms/src/Flash/Coprocessor/ArrowColCodec.h create mode 100644 dbms/src/Flash/Coprocessor/ChunkCodec.h create mode 100644 dbms/src/Flash/Coprocessor/DefaultChunkCodec.cpp create mode 100644 dbms/src/Flash/Coprocessor/DefaultChunkCodec.h create mode 100644 dbms/src/Flash/Coprocessor/TiDBChunk.cpp create mode 100644 dbms/src/Flash/Coprocessor/TiDBChunk.h create mode 100644 dbms/src/Flash/Coprocessor/TiDBColumn.cpp create mode 100644 dbms/src/Flash/Coprocessor/TiDBColumn.h create mode 100644 dbms/src/Flash/Coprocessor/TiDBDecimal.cpp create mode 100644 dbms/src/Flash/Coprocessor/TiDBDecimal.h create mode 100644 dbms/src/Flash/Coprocessor/TiDBTime.h create mode 100644 tests/mutable-test/txn_dag/arrow_encode.test diff --git a/contrib/tipb b/contrib/tipb index b2d318af5e8..018b2fadf41 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit b2d318af5e8af28f54a2c6422bc18631f65a8506 +Subproject commit 018b2fadf4145fbb7d6acd78b8a4b7e66d7d205d diff --git a/dbms/src/Core/Defines.h b/dbms/src/Core/Defines.h index acb722ba567..369206664db 100644 --- a/dbms/src/Core/Defines.h +++ b/dbms/src/Core/Defines.h @@ -29,7 +29,7 @@ #define DEFAULT_MAX_READ_TSO 0xFFFFFFFFFFFFFFFF #define DEFAULT_UNSPECIFIED_SCHEMA_VERSION -1 -#define DEFAULT_DAG_RECORDS_PER_CHUNK 64L +#define DEFAULT_DAG_RECORDS_PER_CHUNK 1024L /** Which blocks by default read the data (by number of rows). * Smaller values give better cache locality, less consumption of RAM, but more overhead to process the query. diff --git a/dbms/src/DataTypes/DataTypeDecimal.h b/dbms/src/DataTypes/DataTypeDecimal.h index 492cd1ea669..8a99ae1b8d2 100644 --- a/dbms/src/DataTypes/DataTypeDecimal.h +++ b/dbms/src/DataTypes/DataTypeDecimal.h @@ -147,6 +147,7 @@ class DataTypeDecimal : public IDataType size_t getSizeOfValueInMemory() const override { return sizeof(T); } bool isCategorial() const override { return true; } bool canBeInsideNullable() const override { return true; } + bool isDecimal() const override { return true; } }; using DataTypeDecimal32 = DataTypeDecimal; diff --git a/dbms/src/DataTypes/IDataType.h b/dbms/src/DataTypes/IDataType.h index 2850b751535..7fabd290d5e 100644 --- a/dbms/src/DataTypes/IDataType.h +++ b/dbms/src/DataTypes/IDataType.h @@ -343,6 +343,8 @@ class IDataType : private boost::noncopyable virtual bool isDateOrDateTime() const { return false; }; + virtual bool isDecimal() const { return false; }; + /** Numbers, Enums, Date, DateTime. Not nullable. */ virtual bool isValueRepresentedByNumber() const { return false; }; diff --git a/dbms/src/Debug/dbgFuncCoprocessor.cpp b/dbms/src/Debug/dbgFuncCoprocessor.cpp index 962ad335a1c..799c9fbc0de 100644 --- a/dbms/src/Debug/dbgFuncCoprocessor.cpp +++ b/dbms/src/Debug/dbgFuncCoprocessor.cpp @@ -3,9 +3,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -37,29 +39,30 @@ using TiDB::TableInfo; using DAGColumnInfo = std::pair; using DAGSchema = std::vector; using SchemaFetcher = std::function; -std::tuple compileQuery( - Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts, - Int64 tz_offset, const String & tz_name); -tipb::SelectResponse executeDAGRequest( - Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, +std::tuple compileQuery(Context & context, const String & query, SchemaFetcher schema_fetcher, + Timestamp start_ts, Int64 tz_offset, const String & tz_name, const String & encode_type); +tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version, std::vector> & key_ranges); BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse & dag_response); BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args) { - if (args.size() < 1 || args.size() > 4) - throw Exception("Args not matched, should be: query[, region-id, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS); + if (args.size() < 1 || args.size() > 5) + throw Exception("Args not matched, should be: query[, region-id, encode_type, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS); String query = safeGet(typeid_cast(*args[0]).value); RegionID region_id = InvalidRegionID; if (args.size() >= 2) region_id = safeGet(typeid_cast(*args[1]).value); + String encode_type = ""; + if (args.size() >= 3) + encode_type = safeGet(typeid_cast(*args[2]).value); Int64 tz_offset = 0; String tz_name = ""; - if (args.size() >= 3) - tz_offset = get(typeid_cast(*args[2]).value); if (args.size() >= 4) - tz_name = safeGet(typeid_cast(*args[3]).value); + tz_offset = get(typeid_cast(*args[3]).value); + if (args.size() >= 5) + tz_name = safeGet(typeid_cast(*args[4]).value); Timestamp start_ts = context.getTMTContext().getPDClient()->getTS(); auto [table_id, schema, dag_request] = compileQuery( @@ -71,7 +74,7 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args) throw Exception("Not TMT", ErrorCodes::BAD_ARGUMENTS); return mmt->getTableInfo(); }, - start_ts, tz_offset, tz_name); + start_ts, tz_offset, tz_name, encode_type); RegionPtr region; if (region_id == InvalidRegionID) @@ -93,16 +96,17 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args) DecodedTiKVKey start_key = RecordKVFormat::genRawKey(table_id, handle_range.first.handle_id); DecodedTiKVKey end_key = RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id); key_ranges.emplace_back(std::make_pair(std::move(start_key), std::move(end_key))); - tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region->id(), region->version(), - region->confVer(), key_ranges); + tipb::SelectResponse dag_response + = executeDAGRequest(context, dag_request, region->id(), region->version(), region->confVer(), key_ranges); return outputDAGResponse(context, schema, dag_response); } BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args) { - if (args.size() < 2 || args.size() > 5) - throw Exception("Args not matched, should be: query, region-id[, start-ts, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS); + if (args.size() < 2 || args.size() > 6) + throw Exception( + "Args not matched, should be: query, region-id[, start-ts, encode_type, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS); String query = safeGet(typeid_cast(*args[0]).value); RegionID region_id = safeGet(typeid_cast(*args[1]).value); @@ -111,19 +115,22 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args) start_ts = safeGet(typeid_cast(*args[2]).value); if (start_ts == 0) start_ts = context.getTMTContext().getPDClient()->getTS(); + String encode_type = ""; + if (args.size() >= 4) + encode_type = safeGet(typeid_cast(*args[3]).value); Int64 tz_offset = 0; String tz_name = ""; - if (args.size() >= 3) - tz_offset = safeGet(typeid_cast(*args[2]).value); - if (args.size() >= 4) - tz_name = safeGet(typeid_cast(*args[3]).value); + if (args.size() >= 5) + tz_offset = safeGet(typeid_cast(*args[4]).value); + if (args.size() >= 6) + tz_name = safeGet(typeid_cast(*args[5]).value); auto [table_id, schema, dag_request] = compileQuery( context, query, [&](const String & database_name, const String & table_name) { return MockTiDB::instance().getTableByName(database_name, table_name)->table_info; }, - start_ts, tz_offset, tz_name); + start_ts, tz_offset, tz_name, encode_type); std::ignore = table_id; RegionPtr region = context.getTMTContext().getKVStore()->getRegion(region_id); @@ -132,8 +139,8 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args) DecodedTiKVKey start_key = RecordKVFormat::genRawKey(table_id, handle_range.first.handle_id); DecodedTiKVKey end_key = RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id); key_ranges.emplace_back(std::make_pair(std::move(start_key), std::move(end_key))); - tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region_id, region->version(), - region->confVer(), key_ranges); + tipb::SelectResponse dag_response + = executeDAGRequest(context, dag_request, region_id, region->version(), region->confVer(), key_ranges); return outputDAGResponse(context, schema, dag_response); } @@ -206,7 +213,7 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un else if (func_name_lowercase == "greaterorequals") { expr->set_sig(tipb::ScalarFuncSig::GEInt); - auto *ft = expr->mutable_field_type(); + auto * ft = expr->mutable_field_type(); ft->set_tp(TiDB::TypeLongLong); ft->set_flag(TiDB::ColumnFlagUnsigned); } @@ -292,9 +299,8 @@ void compileFilter(const DAGSchema & input, ASTPtr ast, tipb::Selection * filter compileExpr(input, ast, cond, referred_columns, col_ref_map); } -std::tuple compileQuery( - Context & context, const String & query, SchemaFetcher schema_fetcher, - Timestamp start_ts, Int64 tz_offset, const String & tz_name) +std::tuple compileQuery(Context & context, const String & query, SchemaFetcher schema_fetcher, + Timestamp start_ts, Int64 tz_offset, const String & tz_name, const String & encode_type) { DAGSchema schema; tipb::DAGRequest dag_request; @@ -302,6 +308,10 @@ std::tuple compileQuery( dag_request.set_time_zone_offset(tz_offset); dag_request.set_start_ts(start_ts); + if (encode_type == "arrow") + dag_request.set_encode_type(tipb::EncodeType::TypeArrow); + else + dag_request.set_encode_type(tipb::EncodeType::TypeDefault); ParserSelectQuery parser; ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "from DAG compiler", 0); @@ -355,7 +365,8 @@ std::tuple compileQuery( ci.tp = TiDB::TypeTimestamp; ts_output.emplace_back(std::make_pair(column_info.name, std::move(ci))); } - executor_ctx_map.emplace(ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map>{}}); + executor_ctx_map.emplace( + ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map>{}}); last_executor = ts_exec; } @@ -400,8 +411,8 @@ std::tuple compileQuery( tipb::Limit * limit = limit_exec->mutable_limit(); auto limit_length = safeGet(typeid_cast(*ast_query.limit_length).value); limit->set_limit(limit_length); - executor_ctx_map.emplace( - limit_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::unordered_map>{}}); + executor_ctx_map.emplace(limit_exec, + ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::unordered_map>{}}); last_executor = limit_exec; } @@ -593,8 +604,7 @@ std::tuple compileQuery( return std::make_tuple(table_info.id, std::move(schema), std::move(dag_request)); } -tipb::SelectResponse executeDAGRequest( - Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, +tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version, UInt64 region_conf_version, std::vector> & key_ranges) { static Logger * log = &Logger::get("MockDAG"); @@ -607,49 +617,38 @@ tipb::SelectResponse executeDAGRequest( return dag_response; } +void arrowChunkToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks) +{ + ArrowChunkCodec codec; + for (const auto & chunk : dag_response.chunks()) + { + blocks.emplace_back(codec.decode(chunk, schema)); + } +} + +void defaultChunkToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks) +{ + DefaultChunkCodec codec; + for (const auto & chunk : dag_response.chunks()) + { + blocks.emplace_back(codec.decode(chunk, schema)); + } +} + BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse & dag_response) { if (dag_response.has_error()) throw Exception(dag_response.error().msg(), dag_response.error().code()); BlocksList blocks; - for (const auto & chunk : dag_response.chunks()) + if (dag_response.encode_type() == tipb::EncodeType::TypeArrow) { - std::vector> rows; - std::vector curr_row; - const std::string & data = chunk.rows_data(); - size_t cursor = 0; - while (cursor < data.size()) - { - curr_row.push_back(DecodeDatum(cursor, data)); - if (curr_row.size() == schema.size()) - { - rows.emplace_back(std::move(curr_row)); - curr_row.clear(); - } - } - - ColumnsWithTypeAndName columns; - for (auto & field : schema) - { - const auto & name = field.first; - auto data_type = getDataTypeByColumnInfo(field.second); - ColumnWithTypeAndName col(data_type, name); - col.column->assumeMutable()->reserve(rows.size()); - columns.emplace_back(std::move(col)); - } - for (const auto & row : rows) - { - for (size_t i = 0; i < row.size(); i++) - { - const Field & field = row[i]; - columns[i].column->assumeMutable()->insert(DatumFlat(field, schema[i].second.tp).field()); - } - } - - blocks.emplace_back(Block(columns)); + arrowChunkToBlocks(schema, dag_response, blocks); + } + else + { + defaultChunkToBlocks(schema, dag_response, blocks); } - return std::make_shared(std::move(blocks)); } diff --git a/dbms/src/Flash/Coprocessor/ArrowChunkCodec.cpp b/dbms/src/Flash/Coprocessor/ArrowChunkCodec.cpp new file mode 100644 index 00000000000..bc4695906c2 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/ArrowChunkCodec.cpp @@ -0,0 +1,84 @@ +#include + +#include +#include + +namespace DB +{ + +class ArrowChunkCodecStream : public ChunkCodecStream +{ +public: + explicit ArrowChunkCodecStream(const std::vector & field_types) : ChunkCodecStream(field_types) + { + ti_chunk = std::make_unique(field_types); + } + + String getString() override + { + std::stringstream ss; + ti_chunk->encodeChunk(ss); + return ss.str(); + } + void clear() override { ti_chunk->clear(); } + void encode(const Block & block, size_t start, size_t end) override; + std::unique_ptr ti_chunk; +}; + +void ArrowChunkCodecStream::encode(const Block & block, size_t start, size_t end) +{ + // Encode data in chunk by arrow encode + ti_chunk->buildDAGChunkFromBlock(block, field_types, start, end); +} + +Block ArrowChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema & schema) +{ + const String & row_data = chunk.rows_data(); + const char * start = row_data.c_str(); + const char * pos = start; + int column_index = 0; + ColumnsWithTypeAndName columns; + while (pos < start + row_data.size()) + { + UInt32 length = toLittleEndian(*(reinterpret_cast(pos))); + pos += 4; + UInt32 null_count = toLittleEndian(*(reinterpret_cast(pos))); + pos += 4; + std::vector null_bitmap; + const auto & field = schema[column_index]; + const auto & name = field.first; + auto data_type = getDataTypeByColumnInfo(field.second); + if (null_count > 0) + { + auto bit_map_length = (length + 7) / 8; + for (UInt32 i = 0; i < bit_map_length; i++) + { + null_bitmap.push_back(*pos); + pos++; + } + } + Int8 field_length = getFieldLength(field.second.tp); + std::vector offsets; + if (field_length == VAR_SIZE) + { + for (UInt32 i = 0; i <= length; i++) + { + offsets.push_back(toLittleEndian(*(reinterpret_cast(pos)))); + pos += 8; + } + } + ColumnWithTypeAndName col(data_type, name); + col.column->assumeMutable()->reserve(length); + pos = arrowColToFlashCol(pos, field_length, null_count, null_bitmap, offsets, col, field.second, length); + columns.emplace_back(std::move(col)); + column_index++; + } + return Block(columns); +} + +std::unique_ptr ArrowChunkCodec::newCodecStream(const std::vector & field_types) +{ + return std::make_unique(field_types); +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ArrowChunkCodec.h b/dbms/src/Flash/Coprocessor/ArrowChunkCodec.h new file mode 100644 index 00000000000..8c482d0264c --- /dev/null +++ b/dbms/src/Flash/Coprocessor/ArrowChunkCodec.h @@ -0,0 +1,18 @@ +#pragma once + +#include + +#include + +namespace DB +{ + +class ArrowChunkCodec : public ChunkCodec +{ +public: + ArrowChunkCodec() = default; + Block decode(const tipb::Chunk & chunk, const DAGSchema & schema) override; + std::unique_ptr newCodecStream(const std::vector & field_types) override; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ArrowColCodec.cpp b/dbms/src/Flash/Coprocessor/ArrowColCodec.cpp new file mode 100644 index 00000000000..c374ff372e0 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/ArrowColCodec.cpp @@ -0,0 +1,546 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ +extern const int LOGICAL_ERROR; +extern const int UNKNOWN_EXCEPTION; +} // namespace ErrorCodes + +const IColumn * getNestedCol(const IColumn * flash_col) +{ + if (flash_col->isColumnNullable()) + return dynamic_cast(flash_col)->getNestedColumnPtr().get(); + else + return flash_col; +} + +template +void decimalToVector(T dec, std::vector & vec, UInt32 scale) +{ + Int256 value = dec.value; + if (value < 0) + { + value = -value; + } + while (value != 0) + { + vec.push_back(static_cast(value % 10)); + value = value / 10; + } + while (vec.size() < scale) + { + vec.push_back(0); + } +} + +template +bool flashDecimalColToArrowColInternal( + TiDBColumn & dag_column, const IColumn * flash_col_untyped, size_t start_index, size_t end_index, const IDataType * data_type) +{ + const IColumn * nested_col = getNestedCol(flash_col_untyped); + if (checkColumn>(nested_col) && checkDataType>(data_type)) + { + const ColumnDecimal * flash_col = checkAndGetColumn>(nested_col); + const DataTypeDecimal * type = checkAndGetDataType>(data_type); + for (size_t i = start_index; i < end_index; i++) + { + if constexpr (is_nullable) + { + if (flash_col_untyped->isNullAt(i)) + { + dag_column.appendNull(); + continue; + } + } + const T & dec = flash_col->getElement(i); + std::vector digits; + UInt32 scale = type->getScale(); + decimalToVector(dec, digits, scale); + TiDBDecimal tiDecimal(scale, digits, dec.value < 0); + dag_column.append(tiDecimal); + } + return true; + } + return false; +} + +template +void flashDecimalColToArrowCol( + TiDBColumn & dag_column, const IColumn * flash_col_untyped, size_t start_index, size_t end_index, const IDataType * data_type) +{ + if (!(flashDecimalColToArrowColInternal(dag_column, flash_col_untyped, start_index, end_index, data_type) + || flashDecimalColToArrowColInternal(dag_column, flash_col_untyped, start_index, end_index, data_type) + || flashDecimalColToArrowColInternal(dag_column, flash_col_untyped, start_index, end_index, data_type) + || flashDecimalColToArrowColInternal( + dag_column, flash_col_untyped, start_index, end_index, data_type))) + throw Exception("Error while trying to convert flash col to DAG col, " + "column name " + + flash_col_untyped->getName(), + ErrorCodes::UNKNOWN_EXCEPTION); +} + +template +bool flashIntegerColToArrowColInternal(TiDBColumn & dag_column, const IColumn * flash_col_untyped, size_t start_index, size_t end_index) +{ + const IColumn * nested_col = getNestedCol(flash_col_untyped); + if (const ColumnVector * flash_col = checkAndGetColumn>(nested_col)) + { + constexpr bool is_unsigned = std::is_unsigned_v; + for (size_t i = start_index; i < end_index; i++) + { + if constexpr (is_nullable) + { + if (flash_col_untyped->isNullAt(i)) + { + dag_column.appendNull(); + continue; + } + } + if constexpr (is_unsigned) + dag_column.append((UInt64)flash_col->getElement(i)); + else + dag_column.append((Int64)flash_col->getElement(i)); + } + return true; + } + return false; +} + +template +void flashDoubleColToArrowCol(TiDBColumn & dag_column, const IColumn * flash_col_untyped, size_t start_index, size_t end_index) +{ + const IColumn * nested_col = getNestedCol(flash_col_untyped); + if (const ColumnVector * flash_col = checkAndGetColumn>(nested_col)) + { + for (size_t i = start_index; i < end_index; i++) + { + if constexpr (is_nullable) + { + if (flash_col_untyped->isNullAt(i)) + { + dag_column.appendNull(); + continue; + } + } + dag_column.append((T)flash_col->getElement(i)); + } + return; + } + throw Exception("Error while trying to convert flash col to DAG col, " + "column name " + + flash_col_untyped->getName(), + ErrorCodes::UNKNOWN_EXCEPTION); +} + +template +void flashIntegerColToArrowCol(TiDBColumn & dag_column, const IColumn * flash_col_untyped, size_t start_index, size_t end_index) +{ + if (!(flashIntegerColToArrowColInternal(dag_column, flash_col_untyped, start_index, end_index) + || flashIntegerColToArrowColInternal(dag_column, flash_col_untyped, start_index, end_index) + || flashIntegerColToArrowColInternal(dag_column, flash_col_untyped, start_index, end_index) + || flashIntegerColToArrowColInternal(dag_column, flash_col_untyped, start_index, end_index) + || flashIntegerColToArrowColInternal(dag_column, flash_col_untyped, start_index, end_index) + || flashIntegerColToArrowColInternal(dag_column, flash_col_untyped, start_index, end_index) + || flashIntegerColToArrowColInternal(dag_column, flash_col_untyped, start_index, end_index) + || flashIntegerColToArrowColInternal(dag_column, flash_col_untyped, start_index, end_index))) + throw Exception("Error while trying to convert flash col to DAG col, " + "column name " + + flash_col_untyped->getName(), + ErrorCodes::UNKNOWN_EXCEPTION); +} + + +template +void flashDateOrDateTimeColToArrowCol( + TiDBColumn & dag_column, const IColumn * flash_col_untyped, size_t start_index, size_t end_index, const tipb::FieldType & field_type) +{ + const IColumn * nested_col = getNestedCol(flash_col_untyped); + using DateFieldType = DataTypeMyTimeBase::FieldType; + auto * flash_col = checkAndGetColumn>(nested_col); + for (size_t i = start_index; i < end_index; i++) + { + if constexpr (is_nullable) + { + if (flash_col_untyped->isNullAt(i)) + { + dag_column.appendNull(); + continue; + } + } + TiDBTime time = TiDBTime(flash_col->getElement(i), field_type); + dag_column.append(time); + } +} + +template +void flashStringColToArrowCol(TiDBColumn & dag_column, const IColumn * flash_col_untyped, size_t start_index, size_t end_index) +{ + const IColumn * nested_col = getNestedCol(flash_col_untyped); + // columnFixedString is not used so do not check it + auto * flash_col = checkAndGetColumn(nested_col); + for (size_t i = start_index; i < end_index; i++) + { + // todo check if we can convert flash_col to DAG col directly since the internal representation is almost the same + if constexpr (is_nullable) + { + if (flash_col_untyped->isNullAt(i)) + { + dag_column.appendNull(); + continue; + } + } + dag_column.append(flash_col->getDataAt(i)); + } +} + +void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & flash_col, const tipb::FieldType & field_type, + size_t start_index, size_t end_index) +{ + const IColumn * col = flash_col.column.get(); + const IDataType * type = flash_col.type.get(); + const TiDB::ColumnInfo tidb_column_info = fieldTypeToColumnInfo(field_type); + + if (type->isNullable() && tidb_column_info.hasNotNullFlag()) + { + throw Exception("Flash column and TiDB column has different not null flag", ErrorCodes::LOGICAL_ERROR); + } + if (type->isNullable()) + type = dynamic_cast(type)->getNestedType().get(); + + switch (tidb_column_info.tp) + { + case TiDB::TypeTiny: + case TiDB::TypeShort: + case TiDB::TypeInt24: + case TiDB::TypeLong: + case TiDB::TypeLongLong: + case TiDB::TypeYear: + if (!type->isInteger()) + throw Exception("Type un-matched during arrow encode, target col type is integer and source column" + " type is " + + type->getName(), + ErrorCodes::LOGICAL_ERROR); + if (type->isUnsignedInteger() != tidb_column_info.hasUnsignedFlag()) + throw Exception("Flash column and TiDB column has different unsigned flag", ErrorCodes::LOGICAL_ERROR); + if (tidb_column_info.hasNotNullFlag()) + flashIntegerColToArrowCol(dag_column, col, start_index, end_index); + else + flashIntegerColToArrowCol(dag_column, col, start_index, end_index); + break; + case TiDB::TypeFloat: + if (!checkDataType(type)) + throw Exception("Type un-matched during arrow encode, target col type is float32 and source column" + " type is " + + type->getName(), + ErrorCodes::LOGICAL_ERROR); + if (tidb_column_info.hasNotNullFlag()) + flashDoubleColToArrowCol(dag_column, col, start_index, end_index); + else + flashDoubleColToArrowCol(dag_column, col, start_index, end_index); + break; + case TiDB::TypeDouble: + if (!checkDataType(type)) + throw Exception("Type un-matched during arrow encode, target col type is float64 and source column" + " type is " + + type->getName(), + ErrorCodes::LOGICAL_ERROR); + if (tidb_column_info.hasNotNullFlag()) + flashDoubleColToArrowCol(dag_column, col, start_index, end_index); + else + flashDoubleColToArrowCol(dag_column, col, start_index, end_index); + break; + case TiDB::TypeDate: + case TiDB::TypeDatetime: + case TiDB::TypeTimestamp: + if (!type->isDateOrDateTime()) + throw Exception("Type un-matched during arrow encode, target col type is datetime and source column" + " type is " + + type->getName(), + ErrorCodes::LOGICAL_ERROR); + if (tidb_column_info.hasNotNullFlag()) + flashDateOrDateTimeColToArrowCol(dag_column, col, start_index, end_index, field_type); + else + flashDateOrDateTimeColToArrowCol(dag_column, col, start_index, end_index, field_type); + break; + case TiDB::TypeNewDecimal: + if (!type->isDecimal()) + throw Exception("Type un-matched during arrow encode, target col type is datetime and source column" + " type is " + + type->getName(), + ErrorCodes::LOGICAL_ERROR); + if (tidb_column_info.hasNotNullFlag()) + flashDecimalColToArrowCol(dag_column, col, start_index, end_index, type); + else + flashDecimalColToArrowCol(dag_column, col, start_index, end_index, type); + break; + case TiDB::TypeVarchar: + case TiDB::TypeVarString: + case TiDB::TypeString: + case TiDB::TypeBlob: + case TiDB::TypeLongBlob: + case TiDB::TypeMediumBlob: + case TiDB::TypeTinyBlob: + if (!checkDataType(type)) + throw Exception("Type un-matched during arrow encode, target col type is string and source column" + " type is " + + type->getName(), + ErrorCodes::LOGICAL_ERROR); + if (tidb_column_info.hasNotNullFlag()) + flashStringColToArrowCol(dag_column, col, start_index, end_index); + else + flashStringColToArrowCol(dag_column, col, start_index, end_index); + break; + default: + throw Exception("Unsupported field type " + field_type.DebugString() + " when try to convert flash col to DAG col", + ErrorCodes::NOT_IMPLEMENTED); + } +} + +bool checkNull(UInt32 i, UInt32 null_count, const std::vector & null_bitmap, const ColumnWithTypeAndName & col) +{ + if (null_count > 0) + { + size_t index = i >> 3; + size_t p = i & 7; + if (!(null_bitmap[index] & (1 << p))) + { + col.column->assumeMutable()->insert(Field()); + return true; + } + } + return false; +} + +const char * arrowStringColToFlashCol(const char * pos, UInt8, UInt32 null_count, const std::vector & null_bitmap, + const std::vector & offsets, const ColumnWithTypeAndName & col, const ColumnInfo &, UInt32 length) +{ + for (UInt32 i = 0; i < length; i++) + { + if (checkNull(i, null_count, null_bitmap, col)) + continue; + const String value = String(pos + offsets[i], pos + offsets[i + 1]); + col.column->assumeMutable()->insert(Field(value)); + } + return pos + offsets[length]; +} + +template +T toCHDecimal(UInt8 digits_int, UInt8 digits_frac, bool negative, const Int32 * word_buf) +{ + static_assert(IsDecimal); + + UInt8 word_int = (digits_int + DIGITS_PER_WORD - 1) / DIGITS_PER_WORD; + UInt8 word_frac = digits_frac / DIGITS_PER_WORD; + UInt8 tailing_digit = digits_frac % DIGITS_PER_WORD; + + typename T::NativeType value = 0; + const int word_max = int(1e9); + for (int i = 0; i < word_int; i++) + { + value = value * word_max + word_buf[i]; + } + for (int i = 0; i < word_frac; i++) + { + value = value * word_max + word_buf[i + word_int]; + } + if (tailing_digit > 0) + { + Int32 tail = word_buf[word_int + word_frac]; + for (int i = 0; i < DIGITS_PER_WORD - tailing_digit; i++) + { + tail /= 10; + } + for (int i = 0; i < tailing_digit; i++) + { + value *= 10; + } + value += tail; + } + return negative ? -value : value; +} + +const char * arrowDecimalColToFlashCol(const char * pos, UInt8 field_length, UInt32 null_count, const std::vector & null_bitmap, + const std::vector &, const ColumnWithTypeAndName & col, const ColumnInfo &, UInt32 length) +{ + for (UInt32 i = 0; i < length; i++) + { + if (checkNull(i, null_count, null_bitmap, col)) + { + pos += field_length; + continue; + } + UInt8 digits_int = toLittleEndian(*(reinterpret_cast(pos))); + pos += 1; + UInt8 digits_frac = toLittleEndian(*(reinterpret_cast(pos))); + pos += 1; + //UInt8 result_frac = toLittleEndian(*(reinterpret_cast(pos))); + pos += 1; + UInt8 negative = toLittleEndian(*(reinterpret_cast(pos))); + pos += 1; + Int32 word_buf[MAX_WORD_BUF_LEN]; + const DataTypePtr decimal_type + = col.type->isNullable() ? dynamic_cast(col.type.get())->getNestedType() : col.type; + for (int j = 0; j < MAX_WORD_BUF_LEN; j++) + { + word_buf[j] = toLittleEndian(*(reinterpret_cast(pos))); + pos += 4; + } + if (auto * type32 = checkDecimal(*decimal_type)) + { + auto res = toCHDecimal(digits_int, digits_frac, negative, word_buf); + col.column->assumeMutable()->insert(DecimalField(res, type32->getScale())); + } + else if (auto * type64 = checkDecimal(*decimal_type)) + { + auto res = toCHDecimal(digits_int, digits_frac, negative, word_buf); + col.column->assumeMutable()->insert(DecimalField(res, type64->getScale())); + } + else if (auto * type128 = checkDecimal(*decimal_type)) + { + auto res = toCHDecimal(digits_int, digits_frac, negative, word_buf); + col.column->assumeMutable()->insert(DecimalField(res, type128->getScale())); + } + else if (auto * type256 = checkDecimal(*decimal_type)) + { + auto res = toCHDecimal(digits_int, digits_frac, negative, word_buf); + col.column->assumeMutable()->insert(DecimalField(res, type256->getScale())); + } + } + return pos; +} + +const char * arrowDateColToFlashCol(const char * pos, UInt8 field_length, UInt32 null_count, const std::vector & null_bitmap, + const std::vector &, const ColumnWithTypeAndName & col, const ColumnInfo &, UInt32 length) +{ + for (UInt32 i = 0; i < length; i++) + { + if (checkNull(i, null_count, null_bitmap, col)) + { + pos += field_length; + continue; + } + UInt32 hour = toLittleEndian(*(reinterpret_cast(pos))); + pos += 4; + UInt32 micro_second = toLittleEndian(*(reinterpret_cast(pos))); + pos += 4; + UInt16 year = toLittleEndian(*(reinterpret_cast(pos))); + pos += 2; + UInt8 month = toLittleEndian(*(reinterpret_cast(pos))); + pos += 1; + UInt8 day = toLittleEndian(*(reinterpret_cast(pos))); + pos += 1; + UInt8 minute = toLittleEndian(*(reinterpret_cast(pos))); + pos += 1; + UInt8 second = toLittleEndian(*(reinterpret_cast(pos))); + pos += 1; + pos += 2; + //UInt8 time_type = toLittleEndian(*(reinterpret_cast(pos))); + pos += 1; + //UInt8 fsp = toLittleEndian(*(reinterpret_cast(pos))); + pos += 1; + pos += 2; + MyDateTime mt(year, month, day, hour, minute, second, micro_second); + col.column->assumeMutable()->insert(Field(mt.toPackedUInt())); + } + return pos; +} + +const char * arrowNumColToFlashCol(const char * pos, UInt8 field_length, UInt32 null_count, const std::vector & null_bitmap, + const std::vector &, const ColumnWithTypeAndName & col, const ColumnInfo & col_info, UInt32 length) +{ + for (UInt32 i = 0; i < length; i++, pos += field_length) + { + if (checkNull(i, null_count, null_bitmap, col)) + continue; + UInt64 u64; + Int64 i64; + UInt32 u32; + Float32 f32; + Float64 f64; + switch (col_info.tp) + { + case TiDB::TypeTiny: + case TiDB::TypeShort: + case TiDB::TypeInt24: + case TiDB::TypeLong: + case TiDB::TypeLongLong: + case TiDB::TypeYear: + if (col_info.flag & TiDB::ColumnFlagUnsigned) + { + u64 = toLittleEndian(*(reinterpret_cast(pos))); + col.column->assumeMutable()->insert(Field(u64)); + } + else + { + i64 = toLittleEndian(*(reinterpret_cast(pos))); + col.column->assumeMutable()->insert(Field(i64)); + } + break; + case TiDB::TypeFloat: + u32 = toLittleEndian(*(reinterpret_cast(pos))); + std::memcpy(&f32, &u32, sizeof(Float32)); + col.column->assumeMutable()->insert(Field((Float64)f32)); + break; + case TiDB::TypeDouble: + u64 = toLittleEndian(*(reinterpret_cast(pos))); + std::memcpy(&f64, &u64, sizeof(Float64)); + col.column->assumeMutable()->insert(Field(f64)); + break; + default: + throw Exception("Should not reach here", ErrorCodes::LOGICAL_ERROR); + } + } + return pos; +} + +const char * arrowColToFlashCol(const char * pos, UInt8 field_length, UInt32 null_count, const std::vector & null_bitmap, + const std::vector & offsets, const ColumnWithTypeAndName & flash_col, const ColumnInfo & col_info, UInt32 length) +{ + switch (col_info.tp) + { + case TiDB::TypeTiny: + case TiDB::TypeShort: + case TiDB::TypeInt24: + case TiDB::TypeLong: + case TiDB::TypeLongLong: + case TiDB::TypeYear: + case TiDB::TypeFloat: + case TiDB::TypeDouble: + return arrowNumColToFlashCol(pos, field_length, null_count, null_bitmap, offsets, flash_col, col_info, length); + case TiDB::TypeDatetime: + case TiDB::TypeDate: + case TiDB::TypeTimestamp: + return arrowDateColToFlashCol(pos, field_length, null_count, null_bitmap, offsets, flash_col, col_info, length); + case TiDB::TypeNewDecimal: + return arrowDecimalColToFlashCol(pos, field_length, null_count, null_bitmap, offsets, flash_col, col_info, length); + case TiDB::TypeVarString: + case TiDB::TypeVarchar: + case TiDB::TypeBlob: + case TiDB::TypeString: + case TiDB::TypeTinyBlob: + case TiDB::TypeMediumBlob: + case TiDB::TypeLongBlob: + return arrowStringColToFlashCol(pos, field_length, null_count, null_bitmap, offsets, flash_col, col_info, length); + default: + throw Exception("Not supported yet: field tp = " + std::to_string(col_info.tp)); + } +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ArrowColCodec.h b/dbms/src/Flash/Coprocessor/ArrowColCodec.h new file mode 100644 index 00000000000..4996e4312ad --- /dev/null +++ b/dbms/src/Flash/Coprocessor/ArrowColCodec.h @@ -0,0 +1,13 @@ +#pragma once + +#include +#include + +namespace DB +{ +void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & flash_col, const tipb::FieldType & field_type, + size_t start_index, size_t end_index); +const char * arrowColToFlashCol(const char * pos, UInt8 field_length, UInt32 null_count, const std::vector & null_bitmap, + const std::vector & offsets, const ColumnWithTypeAndName & flash_col, const ColumnInfo & col_info, UInt32 length); + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/ChunkCodec.h b/dbms/src/Flash/Coprocessor/ChunkCodec.h new file mode 100644 index 00000000000..cf78733263a --- /dev/null +++ b/dbms/src/Flash/Coprocessor/ChunkCodec.h @@ -0,0 +1,37 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +using DAGColumnInfo = std::pair; +using DAGSchema = std::vector; + +class ChunkCodecStream +{ +public: + explicit ChunkCodecStream(const std::vector & field_types_) : field_types(field_types_) {} + virtual String getString() = 0; + virtual void clear() = 0; + virtual void encode(const Block & block, size_t start, size_t end) = 0; + virtual ~ChunkCodecStream() = default; + +protected: + const std::vector & field_types; +}; + +class ChunkCodec +{ +public: + ChunkCodec() = default; + virtual Block decode(const tipb::Chunk & chunk, const DAGSchema & schema) = 0; + + virtual std::unique_ptr newCodecStream(const std::vector & result_field_types) = 0; + + virtual ~ChunkCodec() = default; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp index 3514c1d006d..fca255d6423 100644 --- a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp +++ b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.cpp @@ -1,9 +1,7 @@ #include -#include -#include -#include -#include +#include +#include namespace DB { @@ -14,23 +12,27 @@ extern const int UNSUPPORTED_PARAMETER; extern const int LOGICAL_ERROR; } // namespace ErrorCodes -using TiDB::DatumBumpy; -using TiDB::TP; - -DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse & dag_response_, Int64 records_per_chunk_, tipb::EncodeType encodeType_, +DAGBlockOutputStream::DAGBlockOutputStream(tipb::SelectResponse & dag_response_, Int64 records_per_chunk_, tipb::EncodeType encodeType, std::vector && result_field_types_, Block header_) : dag_response(dag_response_), - records_per_chunk(records_per_chunk_), - encodeType(encodeType_), result_field_types(result_field_types_), - header(header_) + header(std::move(header_)), + records_per_chunk(records_per_chunk_), + current_records_num(0) { - if (encodeType == tipb::EncodeType::TypeArrow) + if (encodeType == tipb::EncodeType::TypeDefault) { - throw Exception("Encode type TypeArrow is not supported yet in DAGBlockOutputStream.", ErrorCodes::UNSUPPORTED_PARAMETER); + chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); } - current_chunk = nullptr; - current_records_num = 0; + else if (encodeType == tipb::EncodeType::TypeArrow) + { + chunk_codec_stream = std::make_unique()->newCodecStream(result_field_types); + } + else + { + throw Exception("Only Default and Arrow encode type is supported in DAGBlockOutputStream.", ErrorCodes::UNSUPPORTED_PARAMETER); + } + dag_response.set_encode_type(encodeType); } @@ -39,13 +41,21 @@ void DAGBlockOutputStream::writePrefix() //something to do here? } +void DAGBlockOutputStream::encodeChunkToDAGResponse() +{ + auto dag_chunk = dag_response.add_chunks(); + dag_chunk->set_rows_data(chunk_codec_stream->getString()); + chunk_codec_stream->clear(); + dag_response.add_output_counts(current_records_num); + current_records_num = 0; +} + void DAGBlockOutputStream::writeSuffix() { - // error handle, - if (current_chunk != nullptr && current_records_num > 0) + // todo error handle + if (current_records_num > 0) { - current_chunk->set_rows_data(current_ss.str()); - dag_response.add_output_counts(current_records_num); + encodeChunkToDAGResponse(); } } @@ -53,33 +63,17 @@ void DAGBlockOutputStream::write(const Block & block) { if (block.columns() != result_field_types.size()) throw Exception("Output column size mismatch with field type size", ErrorCodes::LOGICAL_ERROR); - - // TODO: Check compatibility between field_tp_and_flags and block column types. - - // Encode data to chunk size_t rows = block.rows(); - for (size_t i = 0; i < rows; i++) + for (size_t row_index = 0; row_index < rows;) { - if (current_chunk == nullptr || current_records_num >= records_per_chunk) - { - if (current_chunk) - { - // set the current ss to current chunk - current_chunk->set_rows_data(current_ss.str()); - dag_response.add_output_counts(current_records_num); - } - current_chunk = dag_response.add_chunks(); - current_ss.str(""); - current_records_num = 0; - } - for (size_t j = 0; j < block.columns(); j++) + if (current_records_num >= records_per_chunk) { - const auto & field = (*block.getByPosition(j).column.get())[i]; - DatumBumpy datum(field, static_cast(result_field_types[j].tp())); - EncodeDatum(datum.field(), getCodecFlagByFieldType(result_field_types[j]), current_ss); + encodeChunkToDAGResponse(); } - // Encode current row - current_records_num++; + const size_t upper = std::min(row_index + (records_per_chunk - current_records_num), rows); + chunk_codec_stream->encode(block, row_index, upper); + current_records_num += (upper - row_index); + row_index = upper; } } diff --git a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h index 9ac6c5495fa..3c7cb73c62b 100644 --- a/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h +++ b/dbms/src/Flash/Coprocessor/DAGBlockOutputStream.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -25,19 +26,15 @@ class DAGBlockOutputStream : public IBlockOutputStream void write(const Block & block) override; void writePrefix() override; void writeSuffix() override; + void encodeChunkToDAGResponse(); private: tipb::SelectResponse & dag_response; - - const Int64 records_per_chunk; - tipb::EncodeType encodeType; std::vector result_field_types; - Block header; - - tipb::Chunk * current_chunk; + const Int64 records_per_chunk; + std::unique_ptr chunk_codec_stream; Int64 current_records_num; - std::stringstream current_ss; }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index a0499693def..164a35b23ea 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -109,7 +109,7 @@ catch (const Exception & e) } catch (const std::exception & e) { - LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what()); + LOG_ERROR(log, __PRETTY_FUNCTION__ << ": std exception: " << e.what()); recordError(ErrorCodes::UNKNOWN_EXCEPTION, e.what()); } diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index 0f196df8db1..5526fdf0152 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -499,7 +499,9 @@ std::unordered_map scalar_func_map({ {tipb::ScalarFuncSig::LogicalAnd, "and"}, {tipb::ScalarFuncSig::LogicalOr, "or"}, {tipb::ScalarFuncSig::LogicalXor, "xor"}, - {tipb::ScalarFuncSig::UnaryNot, "not"}, + {tipb::ScalarFuncSig::UnaryNotDecimal, "not"}, + {tipb::ScalarFuncSig::UnaryNotInt, "not"}, + {tipb::ScalarFuncSig::UnaryNotReal, "not"}, {tipb::ScalarFuncSig::UnaryMinusInt, "negate"}, {tipb::ScalarFuncSig::UnaryMinusReal, "negate"}, {tipb::ScalarFuncSig::UnaryMinusDecimal, "negate"}, diff --git a/dbms/src/Flash/Coprocessor/DefaultChunkCodec.cpp b/dbms/src/Flash/Coprocessor/DefaultChunkCodec.cpp new file mode 100644 index 00000000000..7bb402f8739 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DefaultChunkCodec.cpp @@ -0,0 +1,80 @@ +#include + +#include +#include +#include +#include + +using TiDB::DatumBumpy; +using TiDB::DatumFlat; +using TiDB::TP; + +namespace DB +{ +class DefaultChunkCodecStream : public ChunkCodecStream +{ +public: + explicit DefaultChunkCodecStream(const std::vector & field_types) : ChunkCodecStream(field_types) {} + std::stringstream ss; + String getString() override { return ss.str(); } + void encode(const Block & block, size_t start, size_t end) override; + void clear() override { ss.str(""); } +}; + +void DefaultChunkCodecStream::encode(const Block & block, size_t start, size_t end) +{ + // TODO: Check compatibility between field_tp_and_flags and block column types. + // Encode data to chunk by default encode + for (size_t i = start; i < end; i++) + { + for (size_t j = 0; j < block.columns(); j++) + { + const auto & field = (*block.getByPosition(j).column.get())[i]; + DatumBumpy datum(field, static_cast(field_types[j].tp())); + EncodeDatum(datum.field(), getCodecFlagByFieldType(field_types[j]), ss); + } + } +} + +Block DefaultChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema & schema) +{ + std::vector> rows; + std::vector curr_row; + const std::string & data = chunk.rows_data(); + size_t cursor = 0; + while (cursor < data.size()) + { + curr_row.push_back(DecodeDatum(cursor, data)); + if (curr_row.size() == schema.size()) + { + rows.emplace_back(std::move(curr_row)); + curr_row.clear(); + } + } + + ColumnsWithTypeAndName columns; + for (auto & field : schema) + { + const auto & name = field.first; + auto data_type = getDataTypeByColumnInfo(field.second); + ColumnWithTypeAndName col(data_type, name); + col.column->assumeMutable()->reserve(rows.size()); + columns.emplace_back(std::move(col)); + } + for (const auto & row : rows) + { + for (size_t i = 0; i < row.size(); i++) + { + const Field & field = row[i]; + columns[i].column->assumeMutable()->insert(DatumFlat(field, schema[i].second.tp).field()); + } + } + return Block(columns); +} + +std::unique_ptr DefaultChunkCodec::newCodecStream(const std::vector & field_types) +{ + return std::make_unique(field_types); +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DefaultChunkCodec.h b/dbms/src/Flash/Coprocessor/DefaultChunkCodec.h new file mode 100644 index 00000000000..b22222acfe0 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/DefaultChunkCodec.h @@ -0,0 +1,17 @@ +#pragma once + +#include + +namespace DB +{ + +class DefaultChunkCodec : public ChunkCodec +{ +public: + DefaultChunkCodec() = default; + + Block decode(const tipb::Chunk & chunk, const DAGSchema & schema) override; + std::unique_ptr newCodecStream(const std::vector & field_types) override; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/TiDBChunk.cpp b/dbms/src/Flash/Coprocessor/TiDBChunk.cpp new file mode 100644 index 00000000000..75794a3334d --- /dev/null +++ b/dbms/src/Flash/Coprocessor/TiDBChunk.cpp @@ -0,0 +1,37 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +TiDBChunk::TiDBChunk(const std::vector & field_types) +{ + for (auto & type : field_types) + { + columns.emplace_back(getFieldLength(type.tp())); + } +} + +void TiDBChunk::buildDAGChunkFromBlock( + const Block & block, const std::vector & field_types, size_t start_index, size_t end_index) +{ + for (size_t i = 0; i < block.columns(); i++) + { + flashColToArrowCol(columns[i], block.getByPosition(i), field_types[i], start_index, end_index); + } +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/TiDBChunk.h b/dbms/src/Flash/Coprocessor/TiDBChunk.h new file mode 100644 index 00000000000..62c3cbfdf19 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/TiDBChunk.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +// `TiDBChunk` stores multiple rows of data in Apache Arrow format. +// See https://arrow.apache.org/docs/memory_layout.html +// Values are appended in compact format and can be directly accessed without decoding. +// When the chunk is done processing, we can reuse the allocated memory by resetting it. +class TiDBChunk +{ +public: + TiDBChunk(const std::vector & field_types); + + void encodeChunk(std::stringstream & ss) + { + for (auto & c : columns) + c.encodeColumn(ss); + } + + void buildDAGChunkFromBlock( + const Block & block, const std::vector & field_types, size_t start_index, size_t end_index); + + TiDBColumn & getColumn(int index) { return columns[index]; }; + void clear() + { + for (auto & c : columns) + c.clear(); + } + +private: + std::vector columns; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/TiDBColumn.cpp b/dbms/src/Flash/Coprocessor/TiDBColumn.cpp new file mode 100644 index 00000000000..3e5b785270f --- /dev/null +++ b/dbms/src/Flash/Coprocessor/TiDBColumn.cpp @@ -0,0 +1,171 @@ +#include + +#include +#include + +namespace DB +{ + +template +void encodeLittleEndian(const T & value, std::stringstream & ss) +{ + auto v = toLittleEndian(value); + ss.write(reinterpret_cast(&v), sizeof(v)); +} + +TiDBColumn::TiDBColumn(Int8 element_len_) : length(0), null_cnt(0), current_data_size(0), fixed_size(element_len_) +{ + if (fixed_size != VAR_SIZE) + default_value = String(fixed_size, '0'); + var_offsets.push_back(0); +} + +void TiDBColumn::clear() +{ + length = 0; + null_cnt = 0; + null_bitmap.clear(); + var_offsets.clear(); + var_offsets.push_back(0); + data.str(""); + current_data_size = 0; +} + +void TiDBColumn::appendNullBitMap(bool value) +{ + size_t index = length >> 3; + if (index >= null_bitmap.size()) + { + null_bitmap.push_back(0); + } + if (value) + { + size_t pos = length & 7; + null_bitmap[index] |= (1 << pos); + } + else + { + null_cnt++; + } +} + +void TiDBColumn::finishAppendFixed() +{ + current_data_size += fixed_size; + appendNullBitMap(true); + length++; +} + +void TiDBColumn::finishAppendVar(UInt32 size) +{ + current_data_size += size; + appendNullBitMap(true); + var_offsets.push_back(current_data_size); + length++; +} + +void TiDBColumn::appendNull() +{ + // todo try to decoupling the logic of appendNullBitMap and appendData + appendNullBitMap(false); + if (isFixed()) + { + data << default_value; + } + else + { + auto offset = var_offsets[length]; + var_offsets.push_back(offset); + } + length++; +} + +void TiDBColumn::append(Int64 value) +{ + encodeLittleEndian(value, data); + finishAppendFixed(); +} + +void TiDBColumn::append(UInt64 value) +{ + encodeLittleEndian(value, data); + finishAppendFixed(); +} + +void TiDBColumn::append(const TiDBTime & time) +{ + encodeLittleEndian(time.my_date_time.hour, data); + encodeLittleEndian(time.my_date_time.micro_second, data); + encodeLittleEndian(time.my_date_time.year, data); + encodeLittleEndian(time.my_date_time.month, data); + encodeLittleEndian(time.my_date_time.day, data); + encodeLittleEndian(time.my_date_time.minute, data); + encodeLittleEndian(time.my_date_time.second, data); + // Encode an useless u16 to make byte alignment 16 bytes. + encodeLittleEndian(0, data); + encodeLittleEndian(time.time_type, data); + encodeLittleEndian(time.fsp, data); + // Encode an useless u16 to make byte alignment 20 bytes. + encodeLittleEndian(0, data); + finishAppendFixed(); +} + +void TiDBColumn::append(const TiDBDecimal & decimal) +{ + encodeLittleEndian(decimal.digits_int, data); + encodeLittleEndian(decimal.digits_frac, data); + encodeLittleEndian(decimal.result_frac, data); + encodeLittleEndian((UInt8)decimal.negative, data); + for (int i = 0; i < MAX_WORD_BUF_LEN; i++) + { + encodeLittleEndian(decimal.word_buf[i], data); + } + finishAppendFixed(); +} + +void TiDBColumn::append(const StringRef & value) +{ + data.write(value.data, value.size); + finishAppendVar(value.size); +} + +void TiDBColumn::append(DB::Float32 value) +{ + // use memcpy to avoid breaking strict-aliasing rules + UInt32 u; + std::memcpy(&u, &value, sizeof(value)); + encodeLittleEndian(u, data); + finishAppendFixed(); +} + +void TiDBColumn::append(DB::Float64 value) +{ + // use memcpy to avoid breaking strict-aliasing rules + UInt64 u; + std::memcpy(&u, &value, sizeof(value)); + encodeLittleEndian(u, data); + finishAppendFixed(); +} + +void TiDBColumn::encodeColumn(std::stringstream & ss) +{ + encodeLittleEndian(length, ss); + encodeLittleEndian(null_cnt, ss); + if (null_cnt > 0) + { + for (auto c : null_bitmap) + { + encodeLittleEndian(c, ss); + } + } + if (!isFixed()) + { + for (auto c : var_offsets) + { + encodeLittleEndian(c, ss); + } + } + ss << data.str(); +} + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/TiDBColumn.h b/dbms/src/Flash/Coprocessor/TiDBColumn.h new file mode 100644 index 00000000000..d8baef39f7a --- /dev/null +++ b/dbms/src/Flash/Coprocessor/TiDBColumn.h @@ -0,0 +1,81 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +const Int8 VAR_SIZE = 0; +inline UInt8 getFieldLength(Int32 tp) +{ + switch (tp) + { + case TiDB::TypeTiny: + case TiDB::TypeShort: + case TiDB::TypeInt24: + case TiDB::TypeLong: + case TiDB::TypeLongLong: + case TiDB::TypeYear: + case TiDB::TypeDouble: + return 8; + case TiDB::TypeFloat: + return 4; + case TiDB::TypeDecimal: + case TiDB::TypeNewDecimal: + return 40; + case TiDB::TypeDate: + case TiDB::TypeDatetime: + case TiDB::TypeNewDate: + case TiDB::TypeTimestamp: + return 20; + case TiDB::TypeVarchar: + case TiDB::TypeVarString: + case TiDB::TypeString: + case TiDB::TypeBlob: + case TiDB::TypeTinyBlob: + case TiDB::TypeMediumBlob: + case TiDB::TypeLongBlob: + return VAR_SIZE; + default: + throw Exception("not supported field type in arrow encode: " + std::to_string(tp)); + } +} + +class TiDBColumn +{ +public: + TiDBColumn(Int8 element_len); + + void appendNull(); + void append(Int64 value); + void append(UInt64 value); + void append(const StringRef & value); + void append(Float64 value); + void append(Float32 value); + //void appendDuration(); + void append(const TiDBTime & time); + //void appendJson(); + void append(const TiDBDecimal & decimal); + void encodeColumn(std::stringstream & ss); + void clear(); + +private: + bool isFixed() { return fixed_size != VAR_SIZE; }; + void finishAppendFixed(); + void finishAppendVar(UInt32 size); + void appendNullBitMap(bool value); + + UInt32 length; + UInt32 null_cnt; + std::vector null_bitmap; + std::vector var_offsets; + std::stringstream data; + std::string default_value; + UInt64 current_data_size; + Int8 fixed_size; +}; + +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/TiDBDecimal.cpp b/dbms/src/Flash/Coprocessor/TiDBDecimal.cpp new file mode 100644 index 00000000000..a4a33aa3276 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/TiDBDecimal.cpp @@ -0,0 +1,81 @@ +#include + +namespace DB +{ + +Int32 vectorToInt(int start_index, int end_index, const std::vector & vec) +{ + Int32 ret = 0; + for (int i = end_index - 1; i >= start_index; i--) + { + ret = ret * 10 + vec[i]; + } + return ret; +} + +TiDBDecimal::TiDBDecimal(UInt32 scale, const std::vector & digits, bool neg) : negative(neg) +{ + UInt32 prec = digits.size(); + if (prec == 0) + { + // zero decimal + digits_int = digits_frac = result_frac = 0; + } + else + { + digits_int = prec - scale; + digits_frac = scale; + result_frac = scale; + + int word_int = digits_int / DIGITS_PER_WORD; + int leading_digit = digits_int % DIGITS_PER_WORD; + + int word_frac = digits_frac / DIGITS_PER_WORD; + int tailing_digit = digits_frac % DIGITS_PER_WORD; + + int word_index = 0; + Int32 value = 0; + int vector_index = digits.size(); + + // fill the int part + if (leading_digit > 0) + { + value = vectorToInt(vector_index - leading_digit, vector_index, digits); + vector_index -= leading_digit; + if (value > 0) + { + word_buf[word_index++] = value; + } + else + { + digits_int -= leading_digit; + } + } + for (int i = 0; i < word_int; i++, vector_index -= DIGITS_PER_WORD) + { + value = vectorToInt(vector_index - DIGITS_PER_WORD, vector_index, digits); + if (word_index > 0 || value > 0) + { + word_buf[word_index++] = value; + } + else + { + digits_int -= DIGITS_PER_WORD; + } + } + + // fill the frac part + for (int i = 0; i < word_frac; i++, vector_index -= DIGITS_PER_WORD) + { + value = vectorToInt(vector_index - DIGITS_PER_WORD, vector_index, digits); + word_buf[word_index++] = value; + } + + if (tailing_digit > 0) + { + value = vectorToInt(vector_index - tailing_digit, vector_index, digits); + word_buf[word_index++] = value * POWERS10[DIGITS_PER_WORD - tailing_digit]; + } + } +} +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/TiDBDecimal.h b/dbms/src/Flash/Coprocessor/TiDBDecimal.h new file mode 100644 index 00000000000..3c88d3d715b --- /dev/null +++ b/dbms/src/Flash/Coprocessor/TiDBDecimal.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +const static Int32 MAX_WORD_BUF_LEN = 9; +const static Int32 DIGITS_PER_WORD = 9; +const static Int32 POWERS10[] = {1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000}; + +class TiDBDecimal +{ +public: + TiDBDecimal(UInt32 scale, const std::vector & digits, bool neg); + + Int8 digits_int; + Int8 digits_frac; + Int8 result_frac; + bool negative; + Int32 word_buf[MAX_WORD_BUF_LEN] = {0}; +}; +} // namespace DB diff --git a/dbms/src/Flash/Coprocessor/TiDBTime.h b/dbms/src/Flash/Coprocessor/TiDBTime.h new file mode 100644 index 00000000000..1ba01908d4b --- /dev/null +++ b/dbms/src/Flash/Coprocessor/TiDBTime.h @@ -0,0 +1,26 @@ +#pragma once + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include +#pragma GCC diagnostic pop + +#include +#include + +namespace DB +{ + +class TiDBTime +{ +public: + TiDBTime(UInt64 packed, const tipb::FieldType & field_type) : my_date_time(packed) + { + time_type = field_type.tp(); + fsp = field_type.decimal(); + } + MyDateTime my_date_time; + UInt8 time_type; + Int8 fsp; +}; +} // namespace DB diff --git a/dbms/src/Flash/CoprocessorHandler.cpp b/dbms/src/Flash/CoprocessorHandler.cpp index c45890e29ce..f6db2272f3d 100644 --- a/dbms/src/Flash/CoprocessorHandler.cpp +++ b/dbms/src/Flash/CoprocessorHandler.cpp @@ -108,7 +108,7 @@ catch (const Exception & e) } catch (const std::exception & e) { - LOG_ERROR(log, __PRETTY_FUNCTION__ << ": Exception: " << e.what()); + LOG_ERROR(log, __PRETTY_FUNCTION__ << ": std exception: " << e.what()); cop_response->Clear(); cop_response->set_other_error(e.what()); return grpc::Status(grpc::StatusCode::INTERNAL, e.what()); diff --git a/tests/mutable-test/txn_dag/arrow_encode.test b/tests/mutable-test/txn_dag/arrow_encode.test new file mode 100644 index 00000000000..18a9da02e6f --- /dev/null +++ b/tests/mutable-test/txn_dag/arrow_encode.test @@ -0,0 +1,45 @@ +# Preparation. +=> DBGInvoke __enable_schema_sync_service('true') + +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test + +=> DBGInvoke __set_flush_threshold(1000000, 1000000) + +# Data. +=> DBGInvoke __mock_tidb_table(default, test, 'col_1 Int8, col_2 UInt8, col_3 Int16, col_4 Nullable(UInt16), col_5 Int32, col_6 UInt32, col_7 Int64, col_8 UInt64, col_9 Nullable(Float32), col_10 Float64, col_11 MyDate, col_12 Nullable(MyDateTime), col_13 Nullable(String), col_14 Nullable(Decimal(8,2))') +=> DBGInvoke __refresh_schemas() +=> DBGInvoke __put_region(4, 0, 100, default, test) +=> DBGInvoke __raft_insert_row(default, test, 4, 50, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'arrow encode test', 12.12) +=> DBGInvoke __raft_insert_row(default, test, 4, 51, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'arrow encode',123.23) +=> DBGInvoke __raft_insert_row(default, test, 4, 52, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'encode test',0.00) +=> DBGInvoke __raft_insert_row(default, test, 4, 53, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', null,12345.10) +=> DBGInvoke __raft_insert_row(default, test, 4, 54, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', null, 'arrow encode test',null) +=> DBGInvoke __raft_insert_row(default, test, 4, 55, -128, 255, -32768, 65535, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, null, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'arrow encode test',11.11) +=> DBGInvoke __raft_insert_row(default, test, 4, 56, -128, 255, -32768, null, -2147483648, 4294967295, -9223372036854775808, 18446744073709551615, 12345.6789, 1234567.890123, '2010-01-01', '2010-01-01 11:11:11', 'arrow encode test',22.22) + +=> DBGInvoke dag('select * from default.test',4,'arrow') " --dag_planner="optree +┌─col_1─┬─col_2─┬──col_3─┬─col_4─┬───────col_5─┬──────col_6─┬────────────────col_7─┬────────────────col_8─┬─────col_9─┬─────────col_10─┬─────col_11─┬──────────────col_12─┬─col_13────────────┬─col_14───┐ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 12.12 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode │ 123.23 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ encode test │ 0.00 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ \N │ 12345.10 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ \N │ arrow encode test │ \N │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ \N │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 11.11 │ +│ -128 │ 255 │ -32768 │ \N │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 22.22 │ +└───────┴───────┴────────┴───────┴─────────────┴────────────┴──────────────────────┴──────────────────────┴───────────┴────────────────┴────────────┴─────────────────────┴───────────────────┴──────────┘ + +=> DBGInvoke mock_dag('select * from default.test',4,0,'arrow') " --dag_planner="optree +┌─col_1─┬─col_2─┬──col_3─┬─col_4─┬───────col_5─┬──────col_6─┬────────────────col_7─┬────────────────col_8─┬─────col_9─┬─────────col_10─┬─────col_11─┬──────────────col_12─┬─col_13────────────┬─col_14───┐ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 12.12 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode │ 123.23 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ encode test │ 0.00 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ \N │ 12345.10 │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ \N │ arrow encode test │ \N │ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ \N │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 11.11 │ +│ -128 │ 255 │ -32768 │ \N │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ 2010-01-01 │ 2010-01-01 11:11:11 │ arrow encode test │ 22.22 │ +└───────┴───────┴────────┴───────┴─────────────┴────────────┴──────────────────────┴──────────────────────┴───────────┴────────────────┴────────────┴─────────────────────┴───────────────────┴──────────┘ + +# Clean up. +=> DBGInvoke __drop_tidb_table(default, test) +=> drop table if exists default.test diff --git a/tests/mutable-test/txn_dag/data_type_number.test b/tests/mutable-test/txn_dag/data_type_number.test index 95ec49b10f6..4231d8d955f 100644 --- a/tests/mutable-test/txn_dag/data_type_number.test +++ b/tests/mutable-test/txn_dag/data_type_number.test @@ -19,6 +19,10 @@ ┌─col_1─┬─col_2─┬──col_3─┬─col_4─┬───────col_5─┬──────col_6─┬────────────────col_7─┬────────────────col_8─┬─────col_9─┬─────────col_10─┐ │ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ └───────┴───────┴────────┴───────┴─────────────┴────────────┴──────────────────────┴──────────────────────┴───────────┴────────────────┘ +=> DBGInvoke dag('select * from default.test',4,'arrow') " --dag_planner="optree +┌─col_1─┬─col_2─┬──col_3─┬─col_4─┬───────col_5─┬──────col_6─┬────────────────col_7─┬────────────────col_8─┬─────col_9─┬─────────col_10─┐ +│ -128 │ 255 │ -32768 │ 65535 │ -2147483648 │ 4294967295 │ -9223372036854775808 │ 18446744073709551615 │ 12345.679 │ 1234567.890123 │ +└───────┴───────┴────────┴───────┴─────────────┴────────────┴──────────────────────┴──────────────────────┴───────────┴────────────────┘ # DAG read filter by Int8 column => DBGInvoke dag('select * from default.test where col_1 = -128') " --dag_planner="optree diff --git a/tests/mutable-test/txn_dag/data_type_time.test b/tests/mutable-test/txn_dag/data_type_time.test index de813bbb71a..4ec7efb05cb 100644 --- a/tests/mutable-test/txn_dag/data_type_time.test +++ b/tests/mutable-test/txn_dag/data_type_time.test @@ -16,6 +16,10 @@ ┌──────col_1─┬───────────────col_2─┐ │ 2019-06-10 │ 2019-06-10 09:00:00 │ └────────────┴─────────────────────┘ +=> DBGInvoke dag('select * from default.test',4,'arrow') " --dag_planner="optree +┌──────col_1─┬───────────────col_2─┐ +│ 2019-06-10 │ 2019-06-10 09:00:00 │ +└────────────┴─────────────────────┘ # Mock DAG doesn't support date/datetime comparison with string, may need type inference and do implicit conversion to literal. # => DBGInvoke dag('select * from default.test where col_1 = \'2019-06-06\' and col_2 = \'2019-06-10 09:00:00\'') " --dag_planner="optree diff --git a/tests/mutable-test/txn_dag/time_zone.test b/tests/mutable-test/txn_dag/time_zone.test index 9806a8bcae5..73a863737b1 100644 --- a/tests/mutable-test/txn_dag/time_zone.test +++ b/tests/mutable-test/txn_dag/time_zone.test @@ -24,7 +24,7 @@ └────────────┴───────────────────────────┴─────────────────────┘ # use tz_offset, result is the same since cop will convert the timestamp value to utc timestamp when returing to tidb -=> DBGInvoke dag('select * from default.test',4,28800) " --dag_planner="optree +=> DBGInvoke dag('select * from default.test',4,'default',28800) " --dag_planner="optree ┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐ │ 2019-06-10 │ 2019-06-10 09:00:00.00000 │ 2019-06-10 09:00:00 │ │ 2019-06-11 │ 2019-06-11 07:00:00.00000 │ 2019-06-11 09:00:00 │ @@ -34,7 +34,7 @@ => DBGInvoke dag('select * from default.test where col_2 > col_3') " --dag_planner="optree -=> DBGInvoke dag('select * from default.test where col_2 > col_3',4,28800) " --dag_planner="optree +=> DBGInvoke dag('select * from default.test where col_2 > col_3',4,'default',28800) " --dag_planner="optree ┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐ │ 2019-06-10 │ 2019-06-10 09:00:00.00000 │ 2019-06-10 09:00:00 │ │ 2019-06-11 │ 2019-06-11 07:00:00.00000 │ 2019-06-11 09:00:00 │ @@ -42,17 +42,28 @@ │ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │ └────────────┴───────────────────────────┴─────────────────────┘ +=> DBGInvoke dag('select * from default.test where col_2 = col_3',4,'default',3600) " --dag_planner="optree +┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐ +│ 2019-06-11 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │ +│ 2019-06-12 │ 2019-06-11 08:00:00.00000 │ 2019-06-11 09:00:00 │ +└────────────┴───────────────────────────┴─────────────────────┘ + +=> DBGInvoke dag('select * from default.test where col_2 = col_3',4,'default',7200) " --dag_planner="optree +┌──────col_1─┬─────────────────────col_2─┬───────────────col_3─┐ +│ 2019-06-11 │ 2019-06-11 07:00:00.00000 │ 2019-06-11 09:00:00 │ +└────────────┴───────────────────────────┴─────────────────────┘ + # tz_name overwrite tz_offset -=> DBGInvoke dag('select * from default.test where col_2 > col_3',4,28800,'UTC') " --dag_planner="optree +=> DBGInvoke dag('select * from default.test where col_2 > col_3',4,'default',28800,'UTC') " --dag_planner="optree # ts_col in group by clause -=> DBGInvoke dag('select count(1) from default.test where col_2 > \'2019-06-11 15:00:00\' group by col_2',4,28800) " --dag_planner="optree +=> DBGInvoke dag('select count(1) from default.test where col_2 > \'2019-06-11 15:00:00\' group by col_2',4,'default',28800) " --dag_planner="optree ┌─count(1)─┬─────────────────────col_2─┐ │ 2 │ 2019-06-11 08:00:00.00000 │ └──────────┴───────────────────────────┘ # ts_col in agg clause -=> DBGInvoke dag('select max(col_2) from default.test group by col_1',4,28800) " --dag_planner="optree +=> DBGInvoke dag('select max(col_2) from default.test group by col_1',4,'default',28800) " --dag_planner="optree ┌──────────max(col_2)─┬──────col_1─┐ │ 2019-06-11 08:00:00 │ 2019-06-12 │ │ 2019-06-11 08:00:00 │ 2019-06-11 │