Skip to content

Commit

Permalink
Flash-481 Arrow encode (#279)
Browse files Browse the repository at this point in the history
* basic framework for coprocessor support in tiflash

* basic support for InterpreterDagRequestV2

* code refine

* tipb submodule use tipb master branch

* rewrite build flow in InterpreterDagRequest

* rename Dag to DAG

* Update tipb submodule

* basic support for selection/limit/topn executor in InterpreterDAGRequest

* basic support for selection/limit/topn executor in InterpreterDAGRequest (#150)

* merge pingcap/cop branch

* Code reorg

* Format

* merge pingcap/cop

* Refine code

* basic support for dag agg executor

* Code refine

* Refine code

* Another way of getting codec flag

* fix cop test regression (#157)

* fix cop test regression

* address comments

* format code

* fix npe during dag execute (#160)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* Add tipb cpp gen in build script

* Fix build error and adjust some formats

* Fix build error

* Fix build error

* Update flash configs

* Format

*  throw exception when meet error duing cop request handling (#162)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver (#166)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema (#167)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema

* Move flash/cop/dag to individual library

* DAG planner fix and mock dag request (#169)

* Enhance dbg invoke and add dag as schemaful function

* Add basic sql parse to dag

* Column id starts from 1

* Fix value to ref

* Add basic dag test

* Fix dag bugs and pass 1st mock test

* Make dag go normal routine and add mock dag

* Add todo

* Add comment

* Fix gcc compile error

* Enhance dag test

* Address comments

* Fix DAG get and lock storage

* handle error in cop request (#171)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema

* handle error in coprocessor request

* refine code

* use Clear to clear a protobuf message completely

* refine code

*  code refine && several minor bug fix (#174)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema

* handle error in coprocessor request

* refine code

* use Clear to clear a protobuf message completely

* refine code

* code refine && several minor bug fix

* address comments

* address comments

* Fix region id in mock dag

* support udf in (#175)

* fix cop test regression

* address comments

* format code

* fix npe for dag execute

* format code

* address comment

* add some comments

* throw exception when meet error duing cop request handling

* address comments

* add error code

* throw exception when meet error duing cop request handling

* address comments

* add DAGContext so InterpreterDAG can exchange information with DAGDriver

* fix bug

* 1. refine code, 2. address comments

* update comments

* columnref index is based on executor output schema

* handle error in coprocessor request

* refine code

* use Clear to clear a protobuf message completely

* refine code

* code refine && several minor bug fix

* address comments

* address comments

* support udf in

* refine code

* address comments

* address comments

* 1. fix decode literal expr error, 2. add all scalar function sig in scalar_func_map (#177)

* add all scalar function sig in scalarFunMap

* fix literal expr decode

* enable ltrim && rtrim

* code refine

* use throw instead of rethrow in DAGDriver.cpp

* some bug fix (#179)

* add all scalar function sig in scalarFunMap

* fix literal expr decode

* enable ltrim && rtrim

* code refine

* use throw instead of rethrow in DAGDriver.cpp

* 1. fix decode UInt literal error, 2. support mysqlDecimal type

* format code

* Support all DAG operator types in mock SQL -> DAG parser (#176)

* Enhance dbg invoke and add dag as schemaful function

* Add basic sql parse to dag

* Column id starts from 1

* Fix value to ref

* Add basic dag test

* Fix dag bugs and pass 1st mock test

* Make dag go normal routine and add mock dag

* Add todo

* Add comment

* Fix gcc compile error

* Enhance dag test

* Address comments

* Enhance mock sql -> dag compiler and add project test

* Mock sql dag compiler support more expression types and add filter test

* Add topn and limit test

* Add agg for sql -> dag parser and agg test

* Add dag specific codec

* type

* Update codec accordingly

* Remove cop-test

* filter column must be uint8 in tiflash (#180)

* add all scalar function sig in scalarFunMap

* fix literal expr decode

* enable ltrim && rtrim

* code refine

* use throw instead of rethrow in DAGDriver.cpp

* 1. fix decode UInt literal error, 2. support mysqlDecimal type

* format code

* filter column must be uint8 in tiflash

* address comments

* address comments

* address comments

* remove useless include

* 1. fix encode null error, 2. fix empty field type generated by TiFlash (#182)

* add all scalar function sig in scalarFunMap

* fix literal expr decode

* enable ltrim && rtrim

* code refine

* use throw instead of rethrow in DAGDriver.cpp

* 1. fix decode UInt literal error, 2. support mysqlDecimal type

* format code

* filter column must be uint8 in tiflash

* address comments

* address comments

* address comments

* remove useless include

* 1. fix encode null error, 2. fix empty field type generated by TiFlash

* check validation of dag exprs field type (#183)

* check validation of dag exprs field type

* format code

* address comments

*  add more coprocessor mock tests (#185)

* check validation of dag exprs field type

* format code

* address comments

* add more filter test

* add data type tests

* remove useless comment

* disable decimal test

*  add some log about implicit cast (#188)

* check validation of dag exprs field type

* format code

* address comments

* add more filter test

* add data type tests

* remove useless comment

* disable decimal test

* add some log about implicit cast

* address comment

* Pass DAG tests after merging master (#199)

* Enhance dbg invoke and add dag as schemaful function

* Add basic sql parse to dag

* Column id starts from 1

* Fix value to ref

* Add basic dag test

* Fix dag bugs and pass 1st mock test

* Make dag go normal routine and add mock dag

* Add todo

* Add comment

* Fix gcc compile error

* Enhance dag test

* Address comments

* Enhance mock sql -> dag compiler and add project test

* Mock sql dag compiler support more expression types and add filter test

* Add topn and limit test

* Add agg for sql -> dag parser and agg test

* Add dag specific codec

* type

* Update codec accordingly

* Remove cop-test

* Pass tests after merging master

* Fix date/datetime/bit encode error (#200)

* Enhance dbg invoke and add dag as schemaful function

* Add basic sql parse to dag

* Column id starts from 1

* Fix value to ref

* Add basic dag test

* Fix dag bugs and pass 1st mock test

* Make dag go normal routine and add mock dag

* Add todo

* Add comment

* Fix gcc compile error

* Enhance dag test

* Address comments

* Enhance mock sql -> dag compiler and add project test

* Mock sql dag compiler support more expression types and add filter test

* Add topn and limit test

* Add agg for sql -> dag parser and agg test

* Add dag specific codec

* type

* Update codec accordingly

* Remove cop-test

* Pass tests after merging master

* Copy some changes from xufei

* Enable date/datetime test

* Enable date/datetime test

* Refine code

* Adjust date/datetime tiflash rep to UInt

* Fix datetime to Int

* Typo

* improve dag execution time collection (#202)

* improve dag execution time collection

* address comment

* update comments

* update comment

* update comment

*  column id in table scan operator may be -1 (#205)

* improve dag execution time collection

* address comment

* update comments

* update comment

* update comment

* column id in table scan operator may be -1

* column id in table scan operator may be -1

* quick fix for decimal encode (#210)

* quick fix for decimal encode

* address comments

* update comments

* support udf like with 3 arguments (#212)

* support udf like with 3 arguments

* address comments

* add some comments

* Flash-473 optimize date and datetime comparison (#221)

* support udf like with 3 arguments

* address comments

* add some comments

* Flash-473 optimize date and datetime comparison

* address comments

* FLASH-479 select from empty table throw error in tiflash (#223)

* 1. select from empty table throw error in tiflash, 2. add some logs, 3. disable timestamp literal in DAG request

* revert unrelated change

* Update flash service port

* fix bug in DAGBlockOutputStream

* fix bug in DAGBlockOutputStream (#230)

* FLASH-475: Support BATCH COMMANDS in flash service (#232)

* Initial batch command support

* Add config to control thread pool size

* Address comments

* init change for array encode

* FLASH-483: Combine raft service and flash service (#235)

* Combine raft service and flash service

* Address comment and fix build error

* Update configs

* Fix build error

* Fix test regression

* Fix null value bug in datum

* FLASH-490: Fix table scan with -1 column ID and no agg (#240)

* Fix table scan with -1 column ID and no agg

* Add break

* Remove useless includes

* Use dag context to store void ft instead of dag query source

* Fix decimal type reverse get

* Change adding smallest column to adding handle column, address comments

* throw error if the cop request is not based on full region scan (#247)

* throw error if the cop request is not based on full region scan

* format code

* FLASH-437 Support time zone in coprocessor (#259)

* do not allow timestamp literal in DAG request

* refine code

* fix cop date type encode error

* support tz info in DAG request

* address comments

* Address comment

* use the new date implementation

* FLASH-489 support key condition for coprocessor query (#261)

* support key condition for coprocessor query

* add tests

* remove useless code

* check validation when build RPNElement for function in/notIn

* address comments

* address comments

* only return execute summaies if requested (#264)

* refine code

* Refine service init (#265)

* fix bug

* fix bug

* FLASH-554 cop check range should be based on region range (#270)

* only return execute summaies if requested

* cop check range should be based on region range

* address comments

* add tests

* minor improve

* add ut for arrow encode

* minor improve (#273)

* update tipb

* Fix mutex on timezone retrieval (#276)

* fix mutex contention

* add const ref

* Fix race condition of batch command handling (#277)

* update tipb version

* set default record_per_chunk to 1024

* address comment

* address comments

* refine code

* refine code

* add mock_dag test

* code refine

* code refine

* address comments

* Fix NULL order for dag (#281)

* refine get actions in DAGExpressionAnalyzer, fix bug in dbgFuncCoprocessor (#282)

* remove duplicate agg funcs (#283)

* 1. remove duplicate agg funcs, 2. for column ref expr, change column_id to column_index since the value stored in column ref expr is not column id

* bug fix

* refine code

* remove useless code

* address comments

* remove uselss include

* address comments

* refine code

* address comments

* format code

* fix typo

* Update dbms/src/Flash/BatchCommandsHandler.cpp

Co-Authored-By: JaySon <jayson.hjs@gmail.com>

* revert unnecessary changes

* refine code

* fix build error

* refine code

* address comments

* refine code

* address comments
  • Loading branch information
windtalker authored Oct 25, 2019
1 parent daee3ab commit cbb6492
Show file tree
Hide file tree
Showing 28 changed files with 1,439 additions and 125 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
2 changes: 1 addition & 1 deletion dbms/src/Core/Defines.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#define DEFAULT_MAX_READ_TSO 0xFFFFFFFFFFFFFFFF
#define DEFAULT_UNSPECIFIED_SCHEMA_VERSION -1

#define DEFAULT_DAG_RECORDS_PER_CHUNK 64L
#define DEFAULT_DAG_RECORDS_PER_CHUNK 1024L

/** Which blocks by default read the data (by number of rows).
* Smaller values give better cache locality, less consumption of RAM, but more overhead to process the query.
Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataTypes/DataTypeDecimal.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class DataTypeDecimal : public IDataType
size_t getSizeOfValueInMemory() const override { return sizeof(T); }
bool isCategorial() const override { return true; }
bool canBeInsideNullable() const override { return true; }
bool isDecimal() const override { return true; }
};

using DataTypeDecimal32 = DataTypeDecimal<Decimal32>;
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataTypes/IDataType.h
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ class IDataType : private boost::noncopyable

virtual bool isDateOrDateTime() const { return false; };

virtual bool isDecimal() const { return false; };

/** Numbers, Enums, Date, DateTime. Not nullable.
*/
virtual bool isValueRepresentedByNumber() const { return false; };
Expand Down
131 changes: 65 additions & 66 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
#include <DataStreams/BlocksListBlockInputStream.h>
#include <Debug/MockTiDB.h>
#include <Debug/dbgFuncCoprocessor.h>
#include <Flash/Coprocessor/ArrowChunkCodec.h>
#include <Flash/Coprocessor/DAGCodec.h>
#include <Flash/Coprocessor/DAGDriver.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/DefaultChunkCodec.h>
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
Expand Down Expand Up @@ -37,29 +39,30 @@ using TiDB::TableInfo;
using DAGColumnInfo = std::pair<String, ColumnInfo>;
using DAGSchema = std::vector<DAGColumnInfo>;
using SchemaFetcher = std::function<TableInfo(const String &, const String &)>;
std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher, Timestamp start_ts,
Int64 tz_offset, const String & tz_name);
tipb::SelectResponse executeDAGRequest(
Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version,
std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context, const String & query, SchemaFetcher schema_fetcher,
Timestamp start_ts, Int64 tz_offset, const String & tz_name, const String & encode_type);
tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version,
UInt64 region_conf_version, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges);
BlockInputStreamPtr outputDAGResponse(Context & context, const DAGSchema & schema, const tipb::SelectResponse & dag_response);

BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
{
if (args.size() < 1 || args.size() > 4)
throw Exception("Args not matched, should be: query[, region-id, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 1 || args.size() > 5)
throw Exception("Args not matched, should be: query[, region-id, encode_type, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS);

String query = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
RegionID region_id = InvalidRegionID;
if (args.size() >= 2)
region_id = safeGet<RegionID>(typeid_cast<const ASTLiteral &>(*args[1]).value);
String encode_type = "";
if (args.size() >= 3)
encode_type = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[2]).value);
Int64 tz_offset = 0;
String tz_name = "";
if (args.size() >= 3)
tz_offset = get<Int64>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (args.size() >= 4)
tz_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);
tz_offset = get<Int64>(typeid_cast<const ASTLiteral &>(*args[3]).value);
if (args.size() >= 5)
tz_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[4]).value);
Timestamp start_ts = context.getTMTContext().getPDClient()->getTS();

auto [table_id, schema, dag_request] = compileQuery(
Expand All @@ -71,7 +74,7 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
throw Exception("Not TMT", ErrorCodes::BAD_ARGUMENTS);
return mmt->getTableInfo();
},
start_ts, tz_offset, tz_name);
start_ts, tz_offset, tz_name, encode_type);

RegionPtr region;
if (region_id == InvalidRegionID)
Expand All @@ -93,16 +96,17 @@ BlockInputStreamPtr dbgFuncDAG(Context & context, const ASTs & args)
DecodedTiKVKey start_key = RecordKVFormat::genRawKey(table_id, handle_range.first.handle_id);
DecodedTiKVKey end_key = RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id);
key_ranges.emplace_back(std::make_pair(std::move(start_key), std::move(end_key)));
tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region->id(), region->version(),
region->confVer(), key_ranges);
tipb::SelectResponse dag_response
= executeDAGRequest(context, dag_request, region->id(), region->version(), region->confVer(), key_ranges);

return outputDAGResponse(context, schema, dag_response);
}

BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
{
if (args.size() < 2 || args.size() > 5)
throw Exception("Args not matched, should be: query, region-id[, start-ts, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS);
if (args.size() < 2 || args.size() > 6)
throw Exception(
"Args not matched, should be: query, region-id[, start-ts, encode_type, tz_offset, tz_name]", ErrorCodes::BAD_ARGUMENTS);

String query = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[0]).value);
RegionID region_id = safeGet<RegionID>(typeid_cast<const ASTLiteral &>(*args[1]).value);
Expand All @@ -111,19 +115,22 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
start_ts = safeGet<Timestamp>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (start_ts == 0)
start_ts = context.getTMTContext().getPDClient()->getTS();
String encode_type = "";
if (args.size() >= 4)
encode_type = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);
Int64 tz_offset = 0;
String tz_name = "";
if (args.size() >= 3)
tz_offset = safeGet<Int64>(typeid_cast<const ASTLiteral &>(*args[2]).value);
if (args.size() >= 4)
tz_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[3]).value);
if (args.size() >= 5)
tz_offset = safeGet<Int64>(typeid_cast<const ASTLiteral &>(*args[4]).value);
if (args.size() >= 6)
tz_name = safeGet<String>(typeid_cast<const ASTLiteral &>(*args[5]).value);

auto [table_id, schema, dag_request] = compileQuery(
context, query,
[&](const String & database_name, const String & table_name) {
return MockTiDB::instance().getTableByName(database_name, table_name)->table_info;
},
start_ts, tz_offset, tz_name);
start_ts, tz_offset, tz_name, encode_type);
std::ignore = table_id;

RegionPtr region = context.getTMTContext().getKVStore()->getRegion(region_id);
Expand All @@ -132,8 +139,8 @@ BlockInputStreamPtr dbgFuncMockDAG(Context & context, const ASTs & args)
DecodedTiKVKey start_key = RecordKVFormat::genRawKey(table_id, handle_range.first.handle_id);
DecodedTiKVKey end_key = RecordKVFormat::genRawKey(table_id, handle_range.second.handle_id);
key_ranges.emplace_back(std::make_pair(std::move(start_key), std::move(end_key)));
tipb::SelectResponse dag_response = executeDAGRequest(context, dag_request, region_id, region->version(),
region->confVer(), key_ranges);
tipb::SelectResponse dag_response
= executeDAGRequest(context, dag_request, region_id, region->version(), region->confVer(), key_ranges);

return outputDAGResponse(context, schema, dag_response);
}
Expand Down Expand Up @@ -206,7 +213,7 @@ void compileExpr(const DAGSchema & input, ASTPtr ast, tipb::Expr * expr, std::un
else if (func_name_lowercase == "greaterorequals")
{
expr->set_sig(tipb::ScalarFuncSig::GEInt);
auto *ft = expr->mutable_field_type();
auto * ft = expr->mutable_field_type();
ft->set_tp(TiDB::TypeLongLong);
ft->set_flag(TiDB::ColumnFlagUnsigned);
}
Expand Down Expand Up @@ -292,16 +299,19 @@ void compileFilter(const DAGSchema & input, ASTPtr ast, tipb::Selection * filter
compileExpr(input, ast, cond, referred_columns, col_ref_map);
}

std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
Context & context, const String & query, SchemaFetcher schema_fetcher,
Timestamp start_ts, Int64 tz_offset, const String & tz_name)
std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(Context & context, const String & query, SchemaFetcher schema_fetcher,
Timestamp start_ts, Int64 tz_offset, const String & tz_name, const String & encode_type)
{
DAGSchema schema;
tipb::DAGRequest dag_request;
dag_request.set_time_zone_name(tz_name);
dag_request.set_time_zone_offset(tz_offset);

dag_request.set_start_ts(start_ts);
if (encode_type == "arrow")
dag_request.set_encode_type(tipb::EncodeType::TypeArrow);
else
dag_request.set_encode_type(tipb::EncodeType::TypeDefault);

ParserSelectQuery parser;
ASTPtr ast = parseQuery(parser, query.data(), query.data() + query.size(), "from DAG compiler", 0);
Expand Down Expand Up @@ -355,7 +365,8 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
ci.tp = TiDB::TypeTimestamp;
ts_output.emplace_back(std::make_pair(column_info.name, std::move(ci)));
}
executor_ctx_map.emplace(ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map<String, std::vector<tipb::Expr *>>{}});
executor_ctx_map.emplace(
ts_exec, ExecutorCtx{nullptr, std::move(ts_output), std::unordered_map<String, std::vector<tipb::Expr *>>{}});
last_executor = ts_exec;
}

Expand Down Expand Up @@ -400,8 +411,8 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
tipb::Limit * limit = limit_exec->mutable_limit();
auto limit_length = safeGet<UInt64>(typeid_cast<ASTLiteral &>(*ast_query.limit_length).value);
limit->set_limit(limit_length);
executor_ctx_map.emplace(
limit_exec, ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::unordered_map<String, std::vector<tipb::Expr *>>{}});
executor_ctx_map.emplace(limit_exec,
ExecutorCtx{last_executor, executor_ctx_map[last_executor].output, std::unordered_map<String, std::vector<tipb::Expr *>>{}});
last_executor = limit_exec;
}

Expand Down Expand Up @@ -593,8 +604,7 @@ std::tuple<TableID, DAGSchema, tipb::DAGRequest> compileQuery(
return std::make_tuple(table_info.id, std::move(schema), std::move(dag_request));
}

tipb::SelectResponse executeDAGRequest(
Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version,
tipb::SelectResponse executeDAGRequest(Context & context, const tipb::DAGRequest & dag_request, RegionID region_id, UInt64 region_version,
UInt64 region_conf_version, std::vector<std::pair<DecodedTiKVKey, DecodedTiKVKey>> & key_ranges)
{
static Logger * log = &Logger::get("MockDAG");
Expand All @@ -607,49 +617,38 @@ tipb::SelectResponse executeDAGRequest(
return dag_response;
}

void arrowChunkToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks)
{
ArrowChunkCodec codec;
for (const auto & chunk : dag_response.chunks())
{
blocks.emplace_back(codec.decode(chunk, schema));
}
}

void defaultChunkToBlocks(const DAGSchema & schema, const tipb::SelectResponse & dag_response, BlocksList & blocks)
{
DefaultChunkCodec codec;
for (const auto & chunk : dag_response.chunks())
{
blocks.emplace_back(codec.decode(chunk, schema));
}
}

BlockInputStreamPtr outputDAGResponse(Context &, const DAGSchema & schema, const tipb::SelectResponse & dag_response)
{
if (dag_response.has_error())
throw Exception(dag_response.error().msg(), dag_response.error().code());

BlocksList blocks;
for (const auto & chunk : dag_response.chunks())
if (dag_response.encode_type() == tipb::EncodeType::TypeArrow)
{
std::vector<std::vector<Field>> rows;
std::vector<Field> curr_row;
const std::string & data = chunk.rows_data();
size_t cursor = 0;
while (cursor < data.size())
{
curr_row.push_back(DecodeDatum(cursor, data));
if (curr_row.size() == schema.size())
{
rows.emplace_back(std::move(curr_row));
curr_row.clear();
}
}

ColumnsWithTypeAndName columns;
for (auto & field : schema)
{
const auto & name = field.first;
auto data_type = getDataTypeByColumnInfo(field.second);
ColumnWithTypeAndName col(data_type, name);
col.column->assumeMutable()->reserve(rows.size());
columns.emplace_back(std::move(col));
}
for (const auto & row : rows)
{
for (size_t i = 0; i < row.size(); i++)
{
const Field & field = row[i];
columns[i].column->assumeMutable()->insert(DatumFlat(field, schema[i].second.tp).field());
}
}

blocks.emplace_back(Block(columns));
arrowChunkToBlocks(schema, dag_response, blocks);
}
else
{
defaultChunkToBlocks(schema, dag_response, blocks);
}

return std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
}

Expand Down
84 changes: 84 additions & 0 deletions dbms/src/Flash/Coprocessor/ArrowChunkCodec.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#include <Flash/Coprocessor/ArrowChunkCodec.h>

#include <Flash/Coprocessor/ArrowColCodec.h>
#include <IO/Endian.h>

namespace DB
{

class ArrowChunkCodecStream : public ChunkCodecStream
{
public:
explicit ArrowChunkCodecStream(const std::vector<tipb::FieldType> & field_types) : ChunkCodecStream(field_types)
{
ti_chunk = std::make_unique<TiDBChunk>(field_types);
}

String getString() override
{
std::stringstream ss;
ti_chunk->encodeChunk(ss);
return ss.str();
}
void clear() override { ti_chunk->clear(); }
void encode(const Block & block, size_t start, size_t end) override;
std::unique_ptr<TiDBChunk> ti_chunk;
};

void ArrowChunkCodecStream::encode(const Block & block, size_t start, size_t end)
{
// Encode data in chunk by arrow encode
ti_chunk->buildDAGChunkFromBlock(block, field_types, start, end);
}

Block ArrowChunkCodec::decode(const tipb::Chunk & chunk, const DAGSchema & schema)
{
const String & row_data = chunk.rows_data();
const char * start = row_data.c_str();
const char * pos = start;
int column_index = 0;
ColumnsWithTypeAndName columns;
while (pos < start + row_data.size())
{
UInt32 length = toLittleEndian(*(reinterpret_cast<const UInt32 *>(pos)));
pos += 4;
UInt32 null_count = toLittleEndian(*(reinterpret_cast<const UInt32 *>(pos)));
pos += 4;
std::vector<UInt8> null_bitmap;
const auto & field = schema[column_index];
const auto & name = field.first;
auto data_type = getDataTypeByColumnInfo(field.second);
if (null_count > 0)
{
auto bit_map_length = (length + 7) / 8;
for (UInt32 i = 0; i < bit_map_length; i++)
{
null_bitmap.push_back(*pos);
pos++;
}
}
Int8 field_length = getFieldLength(field.second.tp);
std::vector<UInt64> offsets;
if (field_length == VAR_SIZE)
{
for (UInt32 i = 0; i <= length; i++)
{
offsets.push_back(toLittleEndian(*(reinterpret_cast<const UInt64 *>(pos))));
pos += 8;
}
}
ColumnWithTypeAndName col(data_type, name);
col.column->assumeMutable()->reserve(length);
pos = arrowColToFlashCol(pos, field_length, null_count, null_bitmap, offsets, col, field.second, length);
columns.emplace_back(std::move(col));
column_index++;
}
return Block(columns);
}

std::unique_ptr<ChunkCodecStream> ArrowChunkCodec::newCodecStream(const std::vector<tipb::FieldType> & field_types)
{
return std::make_unique<ArrowChunkCodecStream>(field_types);
}

} // namespace DB
18 changes: 18 additions & 0 deletions dbms/src/Flash/Coprocessor/ArrowChunkCodec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include <Flash/Coprocessor/ChunkCodec.h>

#include <Flash/Coprocessor/TiDBChunk.h>

namespace DB
{

class ArrowChunkCodec : public ChunkCodec
{
public:
ArrowChunkCodec() = default;
Block decode(const tipb::Chunk & chunk, const DAGSchema & schema) override;
std::unique_ptr<ChunkCodecStream> newCodecStream(const std::vector<tipb::FieldType> & field_types) override;
};

} // namespace DB
Loading

0 comments on commit cbb6492

Please sign in to comment.