Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support enum type in mpp mode. #1534

Merged
merged 10 commits into from
Mar 13, 2021
2 changes: 1 addition & 1 deletion contrib/tipb
34 changes: 13 additions & 21 deletions dbms/src/Debug/dbgFuncCoprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,10 @@ BlockInputStreamPtr executeQuery(Context & context, RegionID region_id, const DA
}
for (auto & field : root_task_schema)
{
auto tipb_type = TiDB::columnInfoToFieldType(field.second);
tipb_type.set_collate(properties.collator);
auto * field_type = tipb_exchange_receiver.add_field_types();
field_type->set_tp(field.second.tp);
field_type->set_flag(field.second.flag);
field_type->set_flen(field.second.flen);
field_type->set_decimal(field.second.decimal);
*field_type = tipb_type;
}
mpp::TaskMeta root_tm;
root_tm.set_start_ts(properties.start_ts);
Expand Down Expand Up @@ -854,12 +853,11 @@ struct ExchangeReceiver : Executor
tipb::ExchangeReceiver * exchange_receiver = tipb_executor->mutable_exchange_receiver();
for (auto & field : output_schema)
{
auto tipb_type = TiDB::columnInfoToFieldType(field.second);
tipb_type.set_collate(collator_id);

auto * field_type = exchange_receiver->add_field_types();
field_type->set_tp(field.second.tp);
field_type->set_flag(field.second.flag);
field_type->set_flen(field.second.flen);
field_type->set_decimal(field.second.decimal);
field_type->set_collate(collator_id);
*field_type = tipb_type;
}
auto it = mpp_info.receiver_source_task_ids_map.find(name);
if (it == mpp_info.receiver_source_task_ids_map.end())
Expand Down Expand Up @@ -1319,22 +1317,16 @@ struct Join : Executor
auto & field = schema[index];
if (splitQualifiedName(field.first).second == identifier->getColumnName())
{
auto tipb_type = TiDB::columnInfoToFieldType(field.second);
tipb_type.set_collate(collator_id);

tipb_key->set_tp(tipb::ColumnRef);
std::stringstream ss;
encodeDAGInt64(index, ss);
tipb_key->set_val(ss.str());
auto * key_type = tipb_key->mutable_field_type();
key_type->set_tp(field.second.tp);
key_type->set_flag(field.second.flag);
key_type->set_flen(field.second.flen);
key_type->set_decimal(field.second.decimal);
key_type->set_collate(collator_id);

tipb_field_type->set_tp(field.second.tp);
tipb_field_type->set_flag(field.second.flag);
tipb_field_type->set_flen(field.second.flen);
tipb_field_type->set_decimal(field.second.decimal);
tipb_field_type->set_collate(collator_id);
*tipb_key->mutable_field_type() = tipb_type;

*tipb_field_type = tipb_type;
break;
}
}
Expand Down
15 changes: 6 additions & 9 deletions dbms/src/Flash/Coprocessor/ArrowColCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ void flashDoubleColToArrowCol(TiDBColumn & dag_column, const IColumn * flash_col
}
return;
}
throw TiFlashException("Error while trying to convert flash col to DAG col, column name " + flash_col_untyped->getName(),
Errors::Coprocessor::Internal);
throw TiFlashException(
"Error while trying to convert flash col to DAG col, column name " + flash_col_untyped->getName(), Errors::Coprocessor::Internal);
}

template <bool is_nullable>
Expand Down Expand Up @@ -258,12 +258,11 @@ void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & f
{
const IColumn * col = flash_col.column.get();
const IDataType * type = flash_col.type.get();
const TiDB::ColumnInfo tidb_column_info = fieldTypeToColumnInfo(field_type);
const TiDB::ColumnInfo tidb_column_info = TiDB::fieldTypeToColumnInfo(field_type);

if (type->isNullable() && tidb_column_info.hasNotNullFlag())
{
throw TiFlashException(
"Flash column and TiDB column has different not null flag", Errors::Coprocessor::Internal);
throw TiFlashException("Flash column and TiDB column has different not null flag", Errors::Coprocessor::Internal);
}
if (type->isNullable())
type = dynamic_cast<const DataTypeNullable *>(type)->getNestedType().get();
Expand All @@ -282,8 +281,7 @@ void flashColToArrowCol(TiDBColumn & dag_column, const ColumnWithTypeAndName & f
"Type un-matched during arrow encode, target col type is integer and source column type is " + type->getName(),
Errors::Coprocessor::Internal);
if (type->isUnsignedInteger() != tidb_column_info.hasUnsignedFlag())
throw TiFlashException(
"Flash column and TiDB column has different unsigned flag", Errors::Coprocessor::Internal);
throw TiFlashException("Flash column and TiDB column has different unsigned flag", Errors::Coprocessor::Internal);
if (tidb_column_info.hasNotNullFlag())
flashIntegerColToArrowCol<false>(dag_column, col, start_index, end_index);
else
Expand Down Expand Up @@ -632,8 +630,7 @@ const char * arrowColToFlashCol(const char * pos, UInt8 field_length, UInt32 nul
case TiDB::TypeEnum:
return arrowEnumColToFlashCol(pos, field_length, null_count, null_bitmap, offsets, flash_col, col_info, length);
default:
throw TiFlashException(
"Not supported yet: field tp = " + std::to_string(col_info.tp), Errors::Coprocessor::Unimplemented);
throw TiFlashException("Not supported yet: field tp = " + std::to_string(col_info.tp), Errors::Coprocessor::Unimplemented);
}
}

Expand Down
20 changes: 1 addition & 19 deletions dbms/src/Flash/Coprocessor/CHBlockChunkCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,7 @@ class CHBlockChunkCodecStream : public ChunkCodecStream
output = std::make_unique<WriteBufferFromOwnString>();
for (size_t i = 0; i < field_types.size(); i++)
{
if (field_types[i].tp() == TiDB::TypeEnum)
{
ColumnInfo ci;
auto & field_type = field_types[i];
ci.tp = static_cast<TiDB::TP>(field_type.tp());
ci.flag = field_type.flag();
ci.flen = field_type.flen();
ci.decimal = field_type.decimal();
/// this is a workaround, since tidb does not push down
/// the element of Enum type, should remove if
/// https://github.com/pingcap/tics/issues/1489 is fixed
ci.elems.emplace_back("a", 1);
ci.elems.emplace_back("b", 2);
expected_types.emplace_back(getDataTypeByColumnInfo(ci));
}
else
{
expected_types.emplace_back(getDataTypeByFieldType(field_types[i]));
}
expected_types.emplace_back(getDataTypeByFieldType(field_types[i]));
}
}

Expand Down
28 changes: 13 additions & 15 deletions dbms/src/Flash/Coprocessor/DAGQueryBlock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,6 @@ DAGQueryBlock::DAGQueryBlock(UInt32 id_, const ::google::protobuf::RepeatedPtrFi
fillOutputFieldTypes();
}

void DAGQueryBlock::insertOutputFiledType(int32_t tp, int32_t flag, int32_t flen, int32_t dec)
{
tipb::FieldType field_type;
field_type.set_tp(tp);
field_type.set_flag(flag);
field_type.set_flen(flen);
field_type.set_decimal(dec);
output_field_types.push_back(field_type);
}

void DAGQueryBlock::fillOutputFieldTypes()
{
if (!output_field_types.empty())
Expand Down Expand Up @@ -258,24 +248,32 @@ void DAGQueryBlock::fillOutputFieldTypes()
}
else if (source->tp() == tipb::ExecType::TypeExchangeReceiver)
{
for (auto & ci : source->exchange_receiver().field_types())
for (auto & field_type : source->exchange_receiver().field_types())
{
insertOutputFiledType(ci.tp(), ci.flag(), ci.flen(), ci.decimal());
output_field_types.push_back(field_type);
}
}
else if (source->tp() == tipb::ExecType::TypeProjection)
{
for (auto & expr : source->projection().exprs())
{
auto & ci = expr.field_type();
insertOutputFiledType(ci.tp(), ci.flag(), ci.flen(), ci.decimal());
output_field_types.push_back(expr.field_type());
}
}
else
{
for (auto & ci : source->tbl_scan().columns())
{
insertOutputFiledType(ci.tp(), ci.flag(), ci.columnlen(), ci.decimal());
tipb::FieldType field_type;
field_type.set_tp(ci.tp());
field_type.set_flag(ci.flag());
field_type.set_flen(ci.columnlen());
field_type.set_decimal(ci.decimal());
for (const auto & elem : ci.elems())
{
field_type.add_elems(elem);
}
output_field_types.push_back(field_type);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlock.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ class DAGQueryBlock
{
return source->tp() == tipb::ExecType::TypeTableScan && source->tbl_scan().next_read_engine() != tipb::EngineType::Local;
}
void insertOutputFiledType(int32_t tp, int32_t flag, int32_t flen, int32_t dec);
};

} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,7 @@ void DAGQueryBlockInterpreter::executeRemoteQuery(Pipeline & pipeline)
for (int i = 0; i < (int)query_block.output_field_types.size(); i++)
{
dag_req.add_output_offsets(i);
ColumnInfo info = fieldTypeToColumnInfo(query_block.output_field_types[i]);
ColumnInfo info = TiDB::fieldTypeToColumnInfo(query_block.output_field_types[i]);
String col_name = query_block.qb_column_prefix + "col_" + std::to_string(i);
schema.push_back(std::make_pair(col_name, info));
is_ts_column.push_back(query_block.output_field_types[i].tp() == TiDB::TypeTimestamp);
Expand Down
32 changes: 2 additions & 30 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,8 @@ const String & getAggFunctionName(const tipb::Expr & expr)
}
}

const auto errmsg = tipb::ExprType_Name(expr.tp())
+ "(distinct=" + (expr.has_distinct() ? "true" : "false") + ")"
+ " is not supported.";
const auto errmsg
= tipb::ExprType_Name(expr.tp()) + "(distinct=" + (expr.has_distinct() ? "true" : "false") + ")" + " is not supported.";
throw TiFlashException(errmsg, Errors::Coprocessor::Unimplemented);
}

Expand Down Expand Up @@ -423,13 +422,6 @@ void assertBlockSchema(const DataTypes & expected_types, const Block & block, co

if (!expected->equals(*actual))
{
/// This is a workaround for Enum type: because TiDB does not push down enough
/// information about enum type, we only check the nullability if both type is
/// Enum, should be removed if https://github.com/pingcap/tics/issues/1489 is fixed
if (expected->isEnum() && actual->isEnum())
continue;
if (expected->isNullable() && removeNullable(expected)->isEnum() && actual->isNullable() && removeNullable(actual)->isEnum())
continue;
throw Exception("Block schema mismatch in " + context_description + ": different types: expected " + expected->getName()
+ ", got " + actual->getName());
}
Expand Down Expand Up @@ -962,24 +954,4 @@ std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map({
//{tipb::ScalarFuncSig::CharLength, "upper"},
});

tipb::FieldType columnInfoToFieldType(const TiDB::ColumnInfo & ci)
{
tipb::FieldType ret;
ret.set_tp(ci.tp);
ret.set_flag(ci.flag);
ret.set_flen(ci.flen);
ret.set_decimal(ci.decimal);
return ret;
}

TiDB::ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type)
{
TiDB::ColumnInfo ret;
ret.tp = static_cast<TiDB::TP>(field_type.tp());
ret.flag = field_type.flag();
ret.flen = field_type.flen();
ret.decimal = field_type.decimal();
return ret;
}

} // namespace DB
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ extern std::unordered_map<tipb::ExprType, String> distinct_agg_func_map;
extern std::unordered_map<tipb::ScalarFuncSig, String> scalar_func_map;
extern const Int8 VAR_SIZE;

tipb::FieldType columnInfoToFieldType(const TiDB::ColumnInfo & ci);
TiDB::ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type);
UInt8 getFieldLengthForArrowEncode(Int32 tp);
bool isUnsupportedEncodeType(const std::vector<tipb::FieldType> & types, tipb::EncodeType encode_type);
std::shared_ptr<TiDB::ITiDBCollator> getCollatorFromExpr(const tipb::Expr & expr);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Mpp/ExchangeReceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class ExchangeReceiver
for (int i = 0; i < exc.field_types_size(); i++)
{
String name = "exchange_receiver_" + std::to_string(i);
ColumnInfo info = fieldTypeToColumnInfo(exc.field_types(i));
ColumnInfo info = TiDB::fieldTypeToColumnInfo(exc.field_types(i));
schema.push_back(std::make_pair(name, info));
}
setUpConnection();
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Storages/Transaction/TiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -935,4 +935,32 @@ String genJsonNull()
return null;
}

tipb::FieldType columnInfoToFieldType(const ColumnInfo & ci)
{
tipb::FieldType ret;
ret.set_tp(ci.tp);
ret.set_flag(ci.flag);
ret.set_flen(ci.flen);
ret.set_decimal(ci.decimal);
for (const auto & elem : ci.elems)
{
ret.add_elems(elem.first);
}
return ret;
}

ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type)
{
TiDB::ColumnInfo ret;
ret.tp = static_cast<TiDB::TP>(field_type.tp());
ret.flag = field_type.flag();
ret.flen = field_type.flen();
ret.decimal = field_type.decimal();
for (int i = 0; i < field_type.elems_size(); i++)
{
ret.elems.emplace_back(field_type.elems(i), i + 1);
}
return ret;
}

} // namespace TiDB
4 changes: 4 additions & 0 deletions dbms/src/Storages/Transaction/TiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <tipb/expression.pb.h>
#pragma GCC diagnostic pop

namespace DB::ErrorCodes
Expand Down Expand Up @@ -376,4 +377,7 @@ using DBInfoPtr = std::shared_ptr<DBInfo>;

String genJsonNull();

tipb::FieldType columnInfoToFieldType(const ColumnInfo & ci);
ColumnInfo fieldTypeToColumnInfo(const tipb::FieldType & field_type);

} // namespace TiDB
17 changes: 4 additions & 13 deletions dbms/src/Storages/Transaction/TypeMapping.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include <type_traits>

#include <Common/FieldVisitors.h>
#include <Common/typeid_cast.h>
#include <Core/NamesAndTypes.h>
Expand All @@ -21,6 +19,8 @@
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/TypeMapping.h>

#include <type_traits>

namespace DB
{

Expand Down Expand Up @@ -195,22 +195,13 @@ DataTypePtr getDataTypeByColumnInfo(const ColumnInfo & column_info)

DataTypePtr getDataTypeByFieldType(const tipb::FieldType & field_type)
{
ColumnInfo ci;
ci.tp = static_cast<TiDB::TP>(field_type.tp());
ci.flag = field_type.flag();
ci.flen = field_type.flen();
ci.decimal = field_type.decimal();
// TODO: Enum's elems?
ColumnInfo ci = TiDB::fieldTypeToColumnInfo(field_type);
return getDataTypeByColumnInfo(ci);
}

TiDB::CodecFlag getCodecFlagByFieldType(const tipb::FieldType & field_type)
{
ColumnInfo ci;
ci.tp = static_cast<TiDB::TP>(field_type.tp());
ci.flag = field_type.flag();
ci.flen = field_type.flen();
ci.decimal = field_type.decimal();
ColumnInfo ci = TiDB::fieldTypeToColumnInfo(field_type);
return ci.getCodecFlag();
}

Expand Down
Loading