Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 55 additions & 6 deletions be/src/pipeline/exec/multi_cast_data_stream_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@
namespace doris::pipeline {

MultiCastDataStreamerSourceOperatorBuilder::MultiCastDataStreamerSourceOperatorBuilder(
int32_t id, const int consumer_id, std::shared_ptr<MultiCastDataStreamer>& data_streamer)
int32_t id, const int consumer_id, std::shared_ptr<MultiCastDataStreamer>& data_streamer,
const TDataStreamSink& sink)
: OperatorBuilderBase(id, "MultiCastDataStreamerSourceOperator"),
_consumer_id(consumer_id),
_multi_cast_data_streamer(data_streamer) {};
_multi_cast_data_streamer(data_streamer),
_t_data_stream_sink(sink) {}

OperatorPtr MultiCastDataStreamerSourceOperatorBuilder::build_operator() {
return std::make_shared<MultiCastDataStreamerSourceOperator>(this, _consumer_id,
_multi_cast_data_streamer);
return std::make_shared<MultiCastDataStreamerSourceOperator>(
this, _consumer_id, _multi_cast_data_streamer, _t_data_stream_sink);
}

const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() {
Expand All @@ -43,10 +45,44 @@ const RowDescriptor& MultiCastDataStreamerSourceOperatorBuilder::row_desc() {

MultiCastDataStreamerSourceOperator::MultiCastDataStreamerSourceOperator(
OperatorBuilderBase* operator_builder, const int consumer_id,
std::shared_ptr<MultiCastDataStreamer>& data_streamer)
std::shared_ptr<MultiCastDataStreamer>& data_streamer, const TDataStreamSink& sink)
: OperatorBase(operator_builder),
vectorized::RuntimeFilterConsumer(sink.dest_node_id, sink.runtime_filters,
data_streamer->row_desc(), _conjuncts),
_consumer_id(consumer_id),
_multi_cast_data_streamer(data_streamer) {};
_multi_cast_data_streamer(data_streamer),
_t_data_stream_sink(sink) {}

Status MultiCastDataStreamerSourceOperator::init(const TDataSink& tsink) {
RETURN_IF_ERROR(OperatorBase::init(tsink));
if (_t_data_stream_sink.__isset.output_exprs) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs,
_output_expr_contexts));
}

if (_t_data_stream_sink.__isset.conjuncts) {
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts, _conjuncts));
}

return Status::OK();
}

Status MultiCastDataStreamerSourceOperator::prepare(doris::RuntimeState* state) {
RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state));
_register_runtime_filter();
RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state, row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_expr_contexts, state, row_desc()));
return Status::OK();
}

Status MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) {
return _acquire_runtime_filter(state);
}

bool MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() {
return vectorized::RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout();
}

bool MultiCastDataStreamerSourceOperator::can_read() {
return _multi_cast_data_streamer->can_read(_consumer_id);
Expand All @@ -56,6 +92,19 @@ Status MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
SourceState& source_state) {
bool eos = false;
_multi_cast_data_streamer->pull(_consumer_id, block, &eos);
if (!_output_expr_contexts.empty()) {
vectorized::Block output_block;
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
_output_expr_contexts, *block, &output_block));
materialize_block_inplace(output_block);
block->swap(output_block);
}

if (!_conjuncts.empty()) {
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(_conjuncts, block, block->columns()));
}

if (eos) {
source_state = SourceState::FINISHED;
}
Expand Down
23 changes: 18 additions & 5 deletions be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "common/status.h"
#include "operator.h"
#include "vec/exec/runtime_filter_consumer.h"

namespace doris {
class ExecNode;
Expand All @@ -37,7 +38,8 @@ class MultiCastDataStreamer;
class MultiCastDataStreamerSourceOperatorBuilder final : public OperatorBuilderBase {
public:
MultiCastDataStreamerSourceOperatorBuilder(int32_t id, const int consumer_id,
std::shared_ptr<MultiCastDataStreamer>&);
std::shared_ptr<MultiCastDataStreamer>&,
const TDataStreamSink&);

bool is_source() const override { return true; }

Expand All @@ -48,20 +50,27 @@ class MultiCastDataStreamerSourceOperatorBuilder final : public OperatorBuilderB
private:
const int _consumer_id;
std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
TDataStreamSink _t_data_stream_sink;
};

class MultiCastDataStreamerSourceOperator final : public OperatorBase {
class MultiCastDataStreamerSourceOperator final : public OperatorBase,
public vectorized::RuntimeFilterConsumer {
public:
MultiCastDataStreamerSourceOperator(OperatorBuilderBase* operator_builder,
const int consumer_id,
std::shared_ptr<MultiCastDataStreamer>& data_streamer);
std::shared_ptr<MultiCastDataStreamer>& data_streamer,
const TDataStreamSink& sink);

Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override;

Status prepare(RuntimeState* state) override { return Status::OK(); };
Status init(const TDataSink& tsink) override;

Status open(RuntimeState* state) override { return Status::OK(); };
Status prepare(RuntimeState* state) override;

Status open(RuntimeState* state) override;

bool runtime_filters_are_ready_or_timeout() override;

Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override {
return Status::OK();
Expand All @@ -76,6 +85,10 @@ class MultiCastDataStreamerSourceOperator final : public OperatorBase {
private:
const int _consumer_id;
std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
TDataStreamSink _t_data_stream_sink;

vectorized::VExprContextSPtrs _output_expr_contexts;
vectorized::VExprContextSPtrs _conjuncts;
};

} // namespace pipeline
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,8 @@ Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thr
// 2. create and set the source operator of multi_cast_data_stream_source for new pipeline
OperatorBuilderPtr source_op =
std::make_shared<MultiCastDataStreamerSourceOperatorBuilder>(
next_operator_builder_id(), i, multi_cast_data_streamer);
next_operator_builder_id(), i, multi_cast_data_streamer,
thrift_sink.multi_cast_stream_sink.sinks[i]);
new_pipeline->add_operator(source_op);

// 3. create and set sink operator of data stream sender for new pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,26 @@
// specific language governing permissions and limitations
// under the License.

#include "vec/exec/runtime_filter_consumer_node.h"
#include "vec/exec/runtime_filter_consumer.h"

namespace doris::vectorized {

RuntimeFilterConsumerNode::RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs), _runtime_filter_descs(tnode.runtime_filters) {}
RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id,
const std::vector<TRuntimeFilterDesc>& runtime_filters,
const RowDescriptor& row_descriptor,
VExprContextSPtrs& conjuncts)
: _filter_id(filter_id),
_runtime_filter_descs(runtime_filters),
_row_descriptor_ref(row_descriptor),
_conjuncts_ref(conjuncts) {}

Status RuntimeFilterConsumerNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
Status RuntimeFilterConsumer::init(RuntimeState* state) {
_state = state;
RETURN_IF_ERROR(_register_runtime_filter());
return Status::OK();
}

Status RuntimeFilterConsumerNode::_register_runtime_filter() {
Status RuntimeFilterConsumer::_register_runtime_filter() {
int filter_size = _runtime_filter_descs.size();
_runtime_filter_ctxs.reserve(filter_size);
_runtime_filter_ready_flag.reserve(filter_size);
Expand All @@ -43,22 +47,22 @@ Status RuntimeFilterConsumerNode::_register_runtime_filter() {
// 1. All BE and FE has been upgraded (e.g. opt_remote_rf)
// 2. This filter is bloom filter (only bloom filter should be used for merging)
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->register_consumer_filter(
filter_desc, _state->query_options(), id(), false));
filter_desc, _state->query_options(), _filter_id, false));
RETURN_IF_ERROR(_state->get_query_ctx()->runtime_filter_mgr()->get_consume_filter(
filter_desc.filter_id, id(), &runtime_filter));
filter_desc.filter_id, _filter_id, &runtime_filter));
} else {
RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_consumer_filter(
filter_desc, _state->query_options(), id(), false));
filter_desc, _state->query_options(), _filter_id, false));
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(
filter_desc.filter_id, id(), &runtime_filter));
filter_desc.filter_id, _filter_id, &runtime_filter));
}
_runtime_filter_ctxs.emplace_back(runtime_filter);
_runtime_filter_ready_flag.emplace_back(false);
}
return Status::OK();
}

bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() {
bool RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
if (!_blocked_by_rf) {
return true;
}
Expand All @@ -72,7 +76,7 @@ bool RuntimeFilterConsumerNode::runtime_filters_are_ready_or_timeout() {
return true;
}

Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) {
Status RuntimeFilterConsumer::_acquire_runtime_filter(bool wait) {
SCOPED_TIMER(_acquire_runtime_filter_timer);
VExprSPtrs vexprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
Expand Down Expand Up @@ -101,23 +105,23 @@ Status RuntimeFilterConsumerNode::_acquire_runtime_filter(bool wait) {
return Status::OK();
}

Status RuntimeFilterConsumerNode::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
Status RuntimeFilterConsumer::_append_rf_into_conjuncts(const VExprSPtrs& vexprs) {
if (vexprs.empty()) {
return Status::OK();
}

for (auto& expr : vexprs) {
VExprContextSPtr conjunct = VExprContext::create_shared(expr);
RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor));
RETURN_IF_ERROR(conjunct->prepare(_state, _row_descriptor_ref));
RETURN_IF_ERROR(conjunct->open(_state));
_rf_vexpr_set.insert(expr);
_conjuncts.emplace_back(conjunct);
_conjuncts_ref.emplace_back(conjunct);
}

return Status::OK();
}

Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
Status RuntimeFilterConsumer::try_append_late_arrival_runtime_filter(int* arrived_rf_num) {
if (_is_all_rf_applied) {
*arrived_rf_num = _runtime_filter_descs.size();
return Status::OK();
Expand All @@ -140,12 +144,12 @@ Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar
continue;
} else if (_runtime_filter_ctxs[i].runtime_filter->is_ready()) {
RETURN_IF_ERROR(_runtime_filter_ctxs[i].runtime_filter->get_prepared_exprs(
&exprs, _row_descriptor, _state));
&exprs, _row_descriptor_ref, _state));
++current_arrived_rf_num;
_runtime_filter_ctxs[i].apply_mark = true;
}
}
// 2. Append unapplied runtime filters to vconjunct_ctx_ptr
// 2. Append unapplied runtime filters to _conjuncts
if (!exprs.empty()) {
RETURN_IF_ERROR(_append_rf_into_conjuncts(exprs));
}
Expand All @@ -157,7 +161,7 @@ Status RuntimeFilterConsumerNode::try_append_late_arrival_runtime_filter(int* ar
return Status::OK();
}

void RuntimeFilterConsumerNode::_prepare_rf_timer(RuntimeProfile* profile) {
void RuntimeFilterConsumer::_prepare_rf_timer(RuntimeProfile* profile) {
_acquire_runtime_filter_timer = ADD_TIMER(profile, "AcquireRuntimeFilterTime");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@

namespace doris::vectorized {

class RuntimeFilterConsumerNode : public ExecNode {
class RuntimeFilterConsumer {
public:
RuntimeFilterConsumerNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
~RuntimeFilterConsumerNode() override = default;
RuntimeFilterConsumer(const int32_t filter_id,
const std::vector<TRuntimeFilterDesc>& runtime_filters,
const RowDescriptor& row_descriptor, VExprContextSPtrs& conjuncts);
~RuntimeFilterConsumer() = default;

Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override;
Status init(RuntimeState* state);

// Try append late arrived runtime filters.
// Try to append late arrived runtime filters.
// Return num of filters which are applied already.
Status try_append_late_arrival_runtime_filter(int* arrived_rf_num);

Expand All @@ -54,15 +56,23 @@ class RuntimeFilterConsumerNode : public ExecNode {
IRuntimeFilter* runtime_filter;
};

RuntimeState* _state;

std::vector<RuntimeFilterContext> _runtime_filter_ctxs;

std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
// Set to true if the runtime filter is ready.
std::vector<bool> _runtime_filter_ready_flag;
doris::Mutex _rf_locks;
phmap::flat_hash_set<VExprSPtr> _rf_vexpr_set;

private:
RuntimeState* _state;

int32_t _filter_id;

std::vector<TRuntimeFilterDesc> _runtime_filter_descs;

const RowDescriptor& _row_descriptor_ref;

VExprContextSPtrs& _conjuncts_ref;

// True means all runtime filters are applied to scanners
bool _is_all_rf_applied = true;
bool _blocked_by_rf = false;
Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/exec/scan/vscan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) {
}

Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(RuntimeFilterConsumerNode::init(tnode, state));
RETURN_IF_ERROR(ExecNode::init(tnode, state));
RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
_state = state;
_is_pipeline_scan = state->enable_pipeline_exec();

const TQueryOptions& query_options = state->query_options();
Expand Down
10 changes: 7 additions & 3 deletions be/src/vec/exec/scan/vscan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
#include "runtime/runtime_state.h"
#include "util/lock.h"
#include "util/runtime_profile.h"
#include "vec/exec/runtime_filter_consumer_node.h"
#include "vec/exec/runtime_filter_consumer.h"
#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vscanner.h"
#include "vec/runtime/shared_scanner_controller.h"
Expand Down Expand Up @@ -88,10 +88,12 @@ struct FilterPredicates {
std::vector<std::pair<std::string, std::shared_ptr<HybridSetBase>>> in_filters;
};

class VScanNode : public RuntimeFilterConsumerNode {
class VScanNode : public ExecNode, public RuntimeFilterConsumer {
public:
VScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: RuntimeFilterConsumerNode(pool, tnode, descs) {
: ExecNode(pool, tnode, descs),
RuntimeFilterConsumer(id(), tnode.runtime_filters, ExecNode::_row_descriptor,
ExecNode::_conjuncts) {
if (!tnode.__isset.conjuncts || tnode.conjuncts.empty()) {
// Which means the request could be fullfilled in a single segment iterator request.
if (tnode.limit > 0 && tnode.limit < 1024) {
Expand Down Expand Up @@ -304,6 +306,8 @@ class VScanNode : public RuntimeFilterConsumerNode {
VExprContextSPtrs _stale_expr_ctxs;
VExprContextSPtrs _common_expr_ctxs_push_down;

RuntimeState* _state;

// If sort info is set, push limit to each scanner;
int64_t _limit_per_scanner = -1;

Expand Down
Loading