-
Notifications
You must be signed in to change notification settings - Fork 7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix header for union stream in distributed #2226
Fix header for union stream in distributed #2226
Conversation
aaccf3a
to
081f0fe
Compare
@@ -21,8 +21,8 @@ class FilterBlockInputStream : public IProfilingBlockInputStream | |||
|
|||
public: | |||
/// filter_column_ - the number of the column with filter conditions. | |||
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, ssize_t filter_column_); | |||
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_); | |||
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, ssize_t filter_column_, bool remove_filter = false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing comment.
: expression(expression_), filter_column(filter_column_) | ||
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, | ||
ssize_t filter_column_, bool remove_filter) | ||
: remove_filter(remove_filter), expression(expression_), filter_column(filter_column_) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When first ctor is used, header is not set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed this ctor.
namespace DB | ||
{ | ||
|
||
class PrewhereFilterBlockInputStream : public FilterBlockInputStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Total absence of comments.
prewhere_info->prewhere_column_name, prewhere_info->remove_prewhere_column) | ||
, prewhere_info(prewhere_info) | ||
{ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about header?
protected: | ||
Block readImpl() override | ||
{ | ||
remove_filter = prewhere_info->remove_prewhere_column; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have to be evaluated for each block?
@@ -1016,10 +1016,25 @@ void ExpressionActionsChain::finalize() | |||
for (int i = static_cast<int>(steps.size()) - 1; i >= 0; --i) | |||
{ | |||
Names required_output = steps[i].required_output; | |||
std::unordered_map<String, size_t> required_output_indexes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indices
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still Ok.
@@ -103,7 +103,8 @@ class ExpressionAnalyzer : private boost::noncopyable | |||
/// Before aggregation: | |||
bool appendArrayJoin(ExpressionActionsChain & chain, bool only_types); | |||
bool appendJoin(ExpressionActionsChain & chain, bool only_types); | |||
bool appendWhere(ExpressionActionsChain & chain, bool only_types); | |||
bool appendPrewhere(ExpressionActionsChain & chain, bool only_types, std::shared_ptr<bool> & remove_filter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cannot understand without comments.
Why we need std::shared_ptr<bool>
. Looks weird.
@@ -230,7 +230,9 @@ struct ExpressionActionsChain | |||
struct Step | |||
{ | |||
ExpressionActionsPtr actions; | |||
NameSet additional_input; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this? Don't understand.
Names required_output; | ||
std::vector<std::shared_ptr<bool>> can_remove_required_output; /// Has the same size with required_output, is filled in finalize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the output is required, why it's possible that we "can remove" it? No explanation.
Non obvious.
@@ -676,12 +696,25 @@ QueryProcessingStage::Enum InterpreterSelectQuery::executeFetchColumns(Pipeline | |||
optimize_prewhere(*merge_tree); | |||
} | |||
|
|||
if (!dry_run && query_analyzer->appendPrewhere(chain, false, remove_prewhere_filter)) | |||
{ | |||
query_info.prewhere_info = prewhere_info = std::make_shared<PrewhereInfo>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This style is discouraged.
block.erase(prewhere_info->prewhere_column_name); | ||
|
||
if (!block) | ||
block.insert({nullptr, std::make_shared<DataTypeNothing>(), "_nothing"}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it Ok to have a block without column?
@@ -99,15 +111,14 @@ try | |||
} | |||
is_first_task = false; | |||
|
|||
Names pre_column_names, column_names = ordered_names; | |||
bool remove_prewhere_column = false; | |||
Names pre_column_names, column_names = required_columns; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bad style.
@@ -48,9 +48,10 @@ class MergeTreeBlockInputStream : public MergeTreeBaseBlockInputStream | |||
bool getNewTask() override; | |||
|
|||
private: | |||
Block header; | |||
mutable Block header; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a red flag.
May lead to race conditions in getHeader
method.
@@ -71,6 +72,8 @@ class MergeTreeBlockInputStream : public MergeTreeBaseBlockInputStream | |||
bool is_first_task = true; | |||
|
|||
Logger * log = &Logger::get("MergeTreeBlockInputStream"); | |||
|
|||
const Names & getOrderedNames(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing comment.
@@ -497,7 +497,7 @@ void MergeTreeReader::fillMissingColumns(Block & res, bool & should_reorder, boo | |||
} | |||
} | |||
|
|||
void MergeTreeReader::reorderColumns(Block & res, const Names & ordered_names) | |||
void MergeTreeReader::reorderColumns(Block & res, const Names & ordered_names, const String * filter_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::optional?
@@ -43,7 +43,7 @@ class MergeTreeReader : private boost::noncopyable | |||
/// If at least one column was added, reorders all columns in the block according to ordered_names. | |||
void fillMissingColumns(Block & res, bool & should_reorder, bool & should_evaluate_missing_defaults); | |||
/// Sort columns to ensure consistent order among all blocks. | |||
void reorderColumns(Block & res, const Names & ordered_names); | |||
void reorderColumns(Block & res, const Names & ordered_names, const String * filter_name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment above wasn't updated.
164fb72
to
a6375d6
Compare
ece7662
to
8061046
Compare
8061046
to
7a62bb9
Compare
@@ -174,7 +177,7 @@ class IStorage : public std::enable_shared_from_this<IStorage>, private boost::n | |||
const Names & /*column_names*/, | |||
const SelectQueryInfo & /*query_info*/, | |||
const Context & /*context*/, | |||
QueryProcessingStage::Enum & /*processed_stage*/, | |||
QueryProcessingStage::Enum /*processed_stage*/, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that this parameter is unnecessary now?
BlockInputStreams StorageMerge::read( | ||
const Names & column_names, | ||
const SelectQueryInfo & query_info, | ||
const Context & context, | ||
QueryProcessingStage::Enum & processed_stage, | ||
QueryProcessingStage::Enum processed_stage, | ||
const size_t max_block_size, | ||
const unsigned num_streams) | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we check processed_stage here?
What will happen if new tables with a different stage will be added between calls getQueryProcessingStage()
and read()
?
/// Example: select A prewhere B > 0. B can be removed at prewhere step. | ||
/// 2. Store side columns which were calculated during prewhere actions execution if they are used. | ||
/// Example: select F(A) prewhere F(A) > 0. F(A) can be saved from prewhere step. | ||
/// 3. Check if we can remove filer column at prewhere step. If we can, action will store single REMOVE_COLUMN. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filer -> filter
is there any progress on this fix? we hit the same problem multiple times and has to disable optimize_move_to_prewhere to workaround the problem. |
83b2541
to
60f0ed9
Compare
/// True if column from required_output is needed only for current step and not used in next actions | ||
/// (and can be removed from block). Example: filter column for where actions. | ||
/// If not empty, has the same size with required_output; is filled in finalize(). | ||
std::vector<bool *> not_need_in_future_steps; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks quite dangerous and also hurts code readability - when we modify a flag in this vector, where is it in the code? You have to switch to another file to determine.
Alternative - have a collection (can be a vector<bool>
or even a NameSet that will tell us which columns are not needed by future steps). Then the calling code can set any needed bool value itself.
if (block && remove_filter) | ||
block.erase(static_cast<size_t>(filter_column)); | ||
|
||
return block; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to add std::move here otherwise a copy will be performed (the compiler won't do copy elision for a function parameter).
if (i + 1 < static_cast<int>(steps.size())) | ||
{ | ||
const NameSet & additional_input = steps[i + 1].additional_input; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it ok that we are considering only the next step when setting the not_need_in_future_steps
variable (the name points to all future steps instead of just the next one)?
Also it's better to stick to a single name (can_remove_required_output
or not_need_in_future_steps
)
create table test.tab (date Date, time DateTime, data String) ENGINE = MergeTree(date, (time, data), 8192); | ||
insert into test.tab values ('2018-01-21','2018-01-21 15:12:13','test'); | ||
select time FROM remote('127.0.0.{1,2}', test.tab) WHERE date = '2018-01-21' limit 2; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to drop the test table at the end
Seems that #2083 is still not fixed in this PR. |
92a923a
to
6e5e573
Compare
else if (const StorageReplicatedMergeTree * merge_tree = dynamic_cast<const StorageReplicatedMergeTree *>(storage.get())) | ||
optimize_prewhere(*merge_tree); | ||
} | ||
|
||
AnalysisResult expressions; | ||
|
||
if (dry_run) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dry_run condition does not cover all cases. There exists some cases (i.e. case 00229 in /dbms/tests/queries/stateless/) that makes Expression::execute function throw an exception.
In this case, apply_functions in prewhere actions will not be executed if InterpreterSelectQuery::getHeader() is invoked. Then the PROJECT will not find corresponding columns, which leads to an exception.
I think it can be solved by swapping prewhere_expression with where_expression if dry_run is true.
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en