Skip to content

Commit

Permalink
refine appendFinalProjectFor(Non)RootQueryBlock in analyzer (#4052)
Browse files Browse the repository at this point in the history
ref #4118
  • Loading branch information
SeaRise authored Mar 21, 2022
1 parent 48c78f9 commit ef608ee
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 72 deletions.
188 changes: 116 additions & 72 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -808,44 +808,120 @@ void DAGExpressionAnalyzer::appendCastAfterAgg(
}
}

NamesWithAliases DAGExpressionAnalyzer::genNonRootFinalProjectAliases(const String & column_prefix) const
{
NamesWithAliases final_project_aliases;
UniqueNameGenerator unique_name_generator;
for (const auto & element : getCurrentInputColumns())
final_project_aliases.emplace_back(element.name, unique_name_generator.toUniqueName(column_prefix + element.name));
return final_project_aliases;
}

NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForNonRootQueryBlock(
ExpressionActionsChain & chain,
const String & column_prefix) const
{
NamesWithAliases final_project;
UniqueNameGenerator unique_name_generator;
for (const auto & element : getCurrentInputColumns())
final_project.emplace_back(element.name, unique_name_generator.toUniqueName(column_prefix + element.name));
NamesWithAliases final_project = genNonRootFinalProjectAliases(column_prefix);

auto & step = initAndGetLastStep(chain);
for (const auto & name : final_project)
step.required_output.push_back(name.first);
return final_project;
}

NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock(
ExpressionActionsChain & chain,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
NamesWithAliases DAGExpressionAnalyzer::genRootFinalProjectAliases(
const String & column_prefix,
bool keep_session_timezone_info)
const std::vector<Int32> & output_offsets) const
{
if (unlikely(output_offsets.empty()))
throw Exception("Root Query block without output_offsets", ErrorCodes::LOGICAL_ERROR);

NamesWithAliases final_project;
NamesWithAliases final_project_aliases;
const auto & current_columns = getCurrentInputColumns();
UniqueNameGenerator unique_name_generator;
bool need_append_timezone_cast = !keep_session_timezone_info && !context.getTimezoneInfo().is_utc_timezone;
for (auto i : output_offsets)
{
final_project_aliases.emplace_back(
current_columns[i].name,
unique_name_generator.toUniqueName(column_prefix + current_columns[i].name));
}
return final_project_aliases;
}

void DAGExpressionAnalyzer::appendCastForRootFinalProjection(
const ExpressionActionsPtr & actions,
const std::vector<tipb::FieldType> & require_schema,
const std::vector<Int32> & output_offsets,
bool need_append_timezone_cast,
const BoolVec & need_append_type_cast_vec)
{
tipb::Expr tz_expr = constructTZExpr(context.getTimezoneInfo());
String tz_col;
String tz_cast_func_name = context.getTimezoneInfo().is_name_based ? "ConvertTimeZoneToUTC" : "ConvertTimeZoneByOffsetToUTC";
// <origin_column_name, offset>
std::unordered_map<String, size_t> had_casted_map;

const auto & current_columns = getCurrentInputColumns();
NamesAndTypes after_cast_columns = current_columns;

for (size_t index = 0; index < output_offsets.size(); ++index)
{
UInt32 offset = output_offsets[index];
assert(offset < current_columns.size());
assert(offset < require_schema.size());
assert(offset < after_cast_columns.size());

/// for all the columns that need to be returned, if the type is timestamp, then convert
/// the timestamp column to UTC based, refer to appendTimeZoneCastsAfterTS for more details
if ((need_append_timezone_cast && require_schema[offset].tp() == TiDB::TypeTimestamp) || need_append_type_cast_vec[index])
{
const String & origin_column_name = current_columns[offset].name;
auto it = had_casted_map.find(origin_column_name);
if (it == had_casted_map.end())
{
String updated_name = origin_column_name;
auto updated_type = current_columns[offset].type;
/// first add timestamp cast
if (need_append_timezone_cast && require_schema[offset].tp() == TiDB::TypeTimestamp)
{
if (tz_col.empty())
tz_col = getActions(tz_expr, actions);
updated_name = appendTimeZoneCast(tz_col, updated_name, tz_cast_func_name, actions);
}
/// then add type cast
if (need_append_type_cast_vec[index])
{
updated_type = getDataTypeByFieldTypeForComputingLayer(require_schema[offset]);
updated_name = appendCast(updated_type, actions, updated_name);
}
had_casted_map[origin_column_name] = offset;

after_cast_columns[offset].name = updated_name;
after_cast_columns[offset].type = updated_type;
}
else
{
size_t pre_casted_offset = it->second;
assert(after_cast_columns.size() > pre_casted_offset);
after_cast_columns[offset] = after_cast_columns[pre_casted_offset];
}
}
}

source_columns = std::move(after_cast_columns);
}

std::pair<bool, BoolVec> DAGExpressionAnalyzer::isCastRequiredForRootFinalProjection(
const std::vector<tipb::FieldType> & require_schema,
const std::vector<Int32> & output_offsets) const
{
/// TiDB can not guarantee that the field type in DAG request is accurate, so in order to make things work,
/// TiFlash will append extra type cast if needed.
const auto & current_columns = getCurrentInputColumns();
bool need_append_type_cast = false;
BoolVec need_append_type_cast_vec;
/// we need to append type cast for root block if necessary
/// we need to append type cast for root final projection if necessary
for (UInt32 i : output_offsets)
{
const auto & actual_type = current_columns[i].type;
auto expected_type = getDataTypeByFieldTypeForComputingLayer(schema[i]);
auto expected_type = getDataTypeByFieldTypeForComputingLayer(require_schema[i]);
if (actual_type->getName() != expected_type->getName())
{
need_append_type_cast = true;
Expand All @@ -856,66 +932,34 @@ NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock(
need_append_type_cast_vec.push_back(false);
}
}
if (!need_append_timezone_cast && !need_append_type_cast)
{
for (auto i : output_offsets)
{
final_project.emplace_back(
current_columns[i].name,
unique_name_generator.toUniqueName(column_prefix + current_columns[i].name));
}
}
else
{
/// for all the columns that need to be returned, if the type is timestamp, then convert
/// the timestamp column to UTC based, refer to appendTimeZoneCastsAfterTS for more details
auto & step = initAndGetLastStep(chain);
return std::make_pair(need_append_type_cast, std::move(need_append_type_cast_vec));
}

NamesWithAliases DAGExpressionAnalyzer::appendFinalProjectForRootQueryBlock(
ExpressionActionsChain & chain,
const std::vector<tipb::FieldType> & schema,
const std::vector<Int32> & output_offsets,
const String & column_prefix,
bool keep_session_timezone_info)
{
if (unlikely(output_offsets.empty()))
throw Exception("Root Query block without output_offsets", ErrorCodes::LOGICAL_ERROR);

tipb::Expr tz_expr = constructTZExpr(context.getTimezoneInfo());
String tz_col;
String tz_cast_func_name = context.getTimezoneInfo().is_name_based ? "ConvertTimeZoneToUTC" : "ConvertTimeZoneByOffsetToUTC";
std::vector<Int32> casted(schema.size(), 0);
std::unordered_map<String, String> casted_name_map;
bool need_append_timezone_cast = !keep_session_timezone_info && !context.getTimezoneInfo().is_utc_timezone;
auto [need_append_type_cast, need_append_type_cast_vec] = isCastRequiredForRootFinalProjection(schema, output_offsets);
assert(need_append_type_cast_vec.size() == output_offsets.size());

for (size_t index = 0; index < output_offsets.size(); index++)
{
UInt32 i = output_offsets[index];
if ((need_append_timezone_cast && schema[i].tp() == TiDB::TypeTimestamp) || need_append_type_cast_vec[index])
{
const auto & it = casted_name_map.find(current_columns[i].name);
if (it == casted_name_map.end())
{
/// first add timestamp cast
String updated_name = current_columns[i].name;
if (need_append_timezone_cast && schema[i].tp() == TiDB::TypeTimestamp)
{
if (tz_col.length() == 0)
tz_col = getActions(tz_expr, step.actions);
updated_name = appendTimeZoneCast(tz_col, current_columns[i].name, tz_cast_func_name, step.actions);
}
/// then add type cast
if (need_append_type_cast_vec[index])
{
updated_name = appendCast(getDataTypeByFieldTypeForComputingLayer(schema[i]), step.actions, updated_name);
}
final_project.emplace_back(updated_name, unique_name_generator.toUniqueName(column_prefix + updated_name));
casted_name_map[current_columns[i].name] = updated_name;
}
else
{
final_project.emplace_back(it->second, unique_name_generator.toUniqueName(column_prefix + it->second));
}
}
else
{
final_project.emplace_back(
current_columns[i].name,
unique_name_generator.toUniqueName(column_prefix + current_columns[i].name));
}
}
auto & step = initAndGetLastStep(chain);

if (need_append_timezone_cast || need_append_type_cast)
{
// after appendCastForRootFinalProjection, source_columns has been modified.
appendCastForRootFinalProjection(step.actions, schema, output_offsets, need_append_timezone_cast, need_append_type_cast_vec);
}

auto & step = initAndGetLastStep(chain);
// generate project aliases from source_columns.
NamesWithAliases final_project = genRootFinalProjectAliases(column_prefix, output_offsets);

for (const auto & name : final_project)
{
step.required_output.push_back(name.first);
Expand Down
26 changes: 26 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,15 @@ class DAGExpressionAnalyzer : private boost::noncopyable
SubqueryForSet & join_query,
const NamesAndTypesList & columns_added_by_join) const;

// Generate a project action for non-root DAGQueryBlock,
// to keep the schema of Block and tidb-schema the same, and
// guarantee that left/right block of join don't have duplicated column names.
NamesWithAliases appendFinalProjectForNonRootQueryBlock(
ExpressionActionsChain & chain,
const String & column_prefix) const;

// Generate a project action for root DAGQueryBlock,
// to keep the schema of Block and tidb-schema the same.
NamesWithAliases appendFinalProjectForRootQueryBlock(
ExpressionActionsChain & chain,
const std::vector<tipb::FieldType> & schema,
Expand Down Expand Up @@ -258,6 +263,27 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const ExpressionActionsPtr & actions,
const std::vector<const tipb::Expr *> & conditions);

NamesWithAliases genNonRootFinalProjectAliases(const String & column_prefix) const;

NamesWithAliases genRootFinalProjectAliases(
const String & column_prefix,
const std::vector<Int32> & output_offsets) const;

// May change the source columns.
void appendCastForRootFinalProjection(
const ExpressionActionsPtr & actions,
const std::vector<tipb::FieldType> & require_schema,
const std::vector<Int32> & output_offsets,
bool need_append_timezone_cast,
const BoolVec & need_append_type_cast_vec);

// return {need_append_type_cast, need_append_type_cast_vec}
// need_append_type_cast_vec: BoolVec of which one should append type cast.
// And need_append_type_cast_vec.size() == output_offsets.size().
std::pair<bool, BoolVec> isCastRequiredForRootFinalProjection(
const std::vector<tipb::FieldType> & require_schema,
const std::vector<Int32> & output_offsets) const;

// all columns from table scan
NamesAndTypes source_columns;
DAGPreparedSets prepared_sets;
Expand Down

0 comments on commit ef608ee

Please sign in to comment.