Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,13 @@ Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int
return status;
}

Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency,
Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency,
std::future<Status>* fut) {
// std::function will cause `copy`, we need to use heap memory to avoid copy ctor called
auto prom = std::make_shared<std::promise<Status>>();
*fut = prom->get_future();
std::function<void()>* fn =
new std::function<void()>([&tasks, concurrency, p = std::move(prom)]() mutable {
std::function<void()>* fn = new std::function<void()>(
[tasks = std::move(tasks), concurrency, p = std::move(prom)]() mutable {
p->set_value(bthread_fork_join(tasks, concurrency));
});

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int

// An async wrap of `bthread_fork_join` declared previously using promise-future
// return OK if fut successfully created, otherwise return error
Status bthread_fork_join(const std::vector<std::function<Status()>>& tasks, int concurrency,
Status bthread_fork_join(std::vector<std::function<Status()>>&& tasks, int concurrency,
std::future<Status>* fut);

class CloudMetaMgr {
Expand Down
3 changes: 0 additions & 3 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1066,9 +1066,6 @@ DEFINE_mInt32(segcompaction_num_threads, "5");
// enable java udf and jdbc scannode
DEFINE_Bool(enable_java_support, "true");

// enable prefetch tablets before opening
DEFINE_mBool(enable_prefetch_tablet, "true");

// Set config randomly to check more issues in github workflow
DEFINE_Bool(enable_fuzzy_mode, "false");

Expand Down
3 changes: 0 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1108,9 +1108,6 @@ DECLARE_mInt32(segcompaction_num_threads);
// enable java udf and jdbc scannode
DECLARE_Bool(enable_java_support);

// enable prefetch tablets before opening
DECLARE_mBool(enable_prefetch_tablet);

// Set config randomly to check more issues in github workflow
DECLARE_Bool(enable_fuzzy_mode);

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/multi_cast_data_stream_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class MultiCastDataStreamSourceLocalState final : public PipelineXLocalState<Mul
Status close(RuntimeState* state) override;
friend class MultiCastDataStreamerSourceOperatorX;

std::vector<Dependency*> filter_dependencies() override {
std::vector<Dependency*> execution_dependencies() override {
if (_filter_dependencies.empty()) {
return {};
}
Expand Down
85 changes: 56 additions & 29 deletions be/src/pipeline/exec/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@

namespace doris::pipeline {

Status OlapScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
RETURN_IF_ERROR(_sync_cloud_tablets(state));
return Status::OK();
}

Status OlapScanLocalState::_init_profile() {
RETURN_IF_ERROR(ScanLocalState<OlapScanLocalState>::_init_profile());
// Rows read from storage.
Expand Down Expand Up @@ -359,7 +365,6 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
state()->query_options().resource_limit.__isset.cpu_limit;

RETURN_IF_ERROR(hold_tablets());
if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
p._push_down_agg_type == TPushAggOp::NONE &&
(_storage_no_merge() || p._olap_scan_node.is_preaggregation)) {
Expand Down Expand Up @@ -453,30 +458,34 @@ Status OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
return Status::OK();
}

Status OlapScanLocalState::hold_tablets() {
if (!_tablets.empty()) {
return Status::OK();
}

MonotonicStopWatch timer;
timer.start();
_tablets.resize(_scan_ranges.size());
_read_sources.resize(_scan_ranges.size());

if (config::is_cloud_mode()) {
std::vector<SyncRowsetStats> sync_statistics(_scan_ranges.size());
std::vector<std::function<Status()>> tasks {};
tasks.reserve(_scan_ranges.size());
int64_t duration_ns {0};
{
SCOPED_RAW_TIMER(&duration_ns);
Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) {
if (config::is_cloud_mode() && !_sync_tablet) {
_pending_tablets_num = _scan_ranges.size();
if (_pending_tablets_num > 0) {
_sync_cloud_tablets_watcher.start();
_cloud_tablet_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP");
_tablets.resize(_scan_ranges.size());
std::vector<std::function<Status()>> tasks;
_sync_statistics.resize(_scan_ranges.size());
for (size_t i = 0; i < _scan_ranges.size(); i++) {
auto* sync_stats = &sync_statistics[i];
auto* sync_stats = &_sync_statistics[i];
int64_t version = 0;
std::from_chars(_scan_ranges[i]->version.data(),
_scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(),
version);
tasks.emplace_back([this, sync_stats, version, i]() {
auto task_ctx = state->get_task_execution_context();
tasks.emplace_back([this, sync_stats, version, i, task_ctx]() {
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
return Status::OK();
}
Defer defer([&] {
if (_pending_tablets_num.fetch_sub(1) == 1) {
_cloud_tablet_dependency->set_ready();
_sync_cloud_tablets_watcher.stop();
}
});
auto tablet =
DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats));
_tablets[i] = {std::move(tablet), version};
Expand All @@ -488,17 +497,38 @@ Status OlapScanLocalState::hold_tablets() {
return Status::OK();
});
}
RETURN_IF_ERROR(
cloud::bthread_fork_join(tasks, config::init_scanner_sync_rowsets_parallelism));
RETURN_IF_ERROR(cloud::bthread_fork_join(std::move(tasks),
config::init_scanner_sync_rowsets_parallelism,
&_cloud_tablet_future));
}
_sync_tablet = true;
}
return Status::OK();
}

Status OlapScanLocalState::prepare(RuntimeState* state) {
if (_prepared) {
return Status::OK();
}
MonotonicStopWatch timer;
timer.start();
_read_sources.resize(_scan_ranges.size());

if (config::is_cloud_mode()) {
if (!_cloud_tablet_dependency ||
_cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) {
// Remote tablet still in-flight.
return Status::OK();
}
COUNTER_UPDATE(_sync_rowset_timer, duration_ns);
DCHECK(_cloud_tablet_future.valid() && _cloud_tablet_future.get().ok());
COUNTER_UPDATE(_sync_rowset_timer, _sync_cloud_tablets_watcher.elapsed_time());
auto total_rowsets = std::accumulate(
_tablets.cbegin(), _tablets.cend(), 0LL,
[](long long acc, const auto& tabletWithVersion) {
return acc + tabletWithVersion.tablet->tablet_meta()->all_rs_metas().size();
});
COUNTER_UPDATE(_sync_rowset_tablets_rowsets_total_num, total_rowsets);
for (const auto& sync_stats : sync_statistics) {
for (const auto& sync_stats : _sync_statistics) {
COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_hit, sync_stats.tablet_meta_cache_hit);
COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_miss, sync_stats.tablet_meta_cache_miss);
COUNTER_UPDATE(_sync_rowset_get_remote_tablet_meta_rpc_timer,
Expand All @@ -518,6 +548,7 @@ Status OlapScanLocalState::hold_tablets() {
sync_stats.get_remote_delete_bitmap_rpc_ns);
}
} else {
_tablets.resize(_scan_ranges.size());
for (size_t i = 0; i < _scan_ranges.size(); i++) {
int64_t version = 0;
std::from_chars(_scan_ranges[i]->version.data(),
Expand Down Expand Up @@ -553,6 +584,7 @@ Status OlapScanLocalState::hold_tablets() {
cost_secs, print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(),
_scan_ranges.size());
}
_prepared = true;
return Status::OK();
}

Expand Down Expand Up @@ -765,9 +797,4 @@ OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, i
}
}

Status OlapScanOperatorX::hold_tablets(RuntimeState* state) {
auto& local_state = ScanOperatorX<OlapScanLocalState>::get_local_state(state);
return local_state.hold_tablets();
}

} // namespace doris::pipeline
26 changes: 21 additions & 5 deletions be/src/pipeline/exec/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <string>

#include "cloud/cloud_tablet.h"
#include "common/status.h"
#include "olap/tablet_reader.h"
#include "operator.h"
Expand All @@ -39,22 +40,31 @@ class OlapScanOperatorX;
class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
public:
using Parent = OlapScanOperatorX;
using Base = ScanLocalState<OlapScanLocalState>;
ENABLE_FACTORY_CREATOR(OlapScanLocalState);
OlapScanLocalState(RuntimeState* state, OperatorXBase* parent)
: ScanLocalState(state, parent) {}

OlapScanLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {}
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status prepare(RuntimeState* state) override;
TOlapScanNode& olap_scan_node() const;

std::string name_suffix() const override {
return fmt::format(" (id={}. nereids_id={}. table name = {})",
std::to_string(_parent->node_id()),
std::to_string(_parent->nereids_id()), olap_scan_node().table_name);
}
Status hold_tablets();
std::vector<Dependency*> execution_dependencies() override {
if (!_cloud_tablet_dependency) {
return Base::execution_dependencies();
}
std::vector<Dependency*> res = Base::execution_dependencies();
res.push_back(_cloud_tablet_dependency.get());
return res;
}

private:
friend class vectorized::NewOlapScanner;

Status _sync_cloud_tablets(RuntimeState* state);
void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) override;
Status _init_profile() override;
Expand Down Expand Up @@ -91,6 +101,13 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
Status _build_key_ranges_and_filters();

std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
std::vector<SyncRowsetStats> _sync_statistics;
MonotonicStopWatch _sync_cloud_tablets_watcher;
std::shared_ptr<Dependency> _cloud_tablet_dependency;
std::atomic<size_t> _pending_tablets_num = 0;
bool _prepared = false;
std::future<Status> _cloud_tablet_future;
std::atomic_bool _sync_tablet = false;
std::vector<std::unique_ptr<doris::OlapScanRange>> _cond_ranges;
OlapScanKeys _scan_keys;
std::vector<TCondition> _olap_filters;
Expand Down Expand Up @@ -237,7 +254,6 @@ class OlapScanOperatorX final : public ScanOperatorX<OlapScanLocalState> {
OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs, int parallel_tasks,
const TQueryCacheParam& cache_param);
Status hold_tablets(RuntimeState* state) override;

private:
friend class OlapScanLocalState;
Expand Down
11 changes: 9 additions & 2 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ class PipelineXLocalStateBase {
// Do initialization. This step should be executed only once and in bthread, so we can do some
// lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX)
virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0;
// Make sure all resources are ready before execution. For example, remote tablets should be
// loaded to local storage.
// This is called by execution pthread and different from `Operator::prepare` which is called
// by bthread.
virtual Status prepare(RuntimeState* state) = 0;
// Do initialization. This step can be executed multiple times, so we should make sure it is
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
Expand Down Expand Up @@ -180,7 +185,7 @@ class PipelineXLocalStateBase {
// override in Scan
virtual Dependency* finishdependency() { return nullptr; }
// override in Scan MultiCastSink
virtual std::vector<Dependency*> filter_dependencies() { return {}; }
virtual std::vector<Dependency*> execution_dependencies() { return {}; }

std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return _query_statistics; }

Expand Down Expand Up @@ -227,6 +232,7 @@ class PipelineXLocalState : public PipelineXLocalStateBase {
~PipelineXLocalState() override = default;

Status init(RuntimeState* state, LocalStateInfo& info) override;
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override;

virtual std::string name_suffix() const;
Expand Down Expand Up @@ -311,6 +317,7 @@ class PipelineXSinkLocalStateBase {
// lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX)
virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) = 0;

virtual Status prepare(RuntimeState* state) = 0;
// Do initialization. This step can be executed multiple times, so we should make sure it is
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
Expand Down Expand Up @@ -388,6 +395,7 @@ class PipelineXSinkLocalState : public PipelineXSinkLocalStateBase {

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;

Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }

Status close(RuntimeState* state, Status exec_status) override;
Expand Down Expand Up @@ -648,7 +656,6 @@ class OperatorXBase : public OperatorBase {
[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; }

// Tablets should be hold before open phase.
[[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return Status::OK(); }
Status open(RuntimeState* state) override;

[[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block,
Expand Down
11 changes: 4 additions & 7 deletions be/src/pipeline/exec/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public RuntimeFilterCon
virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0;
virtual void set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) = 0;

virtual TPushAggOp::type get_push_down_agg_type() = 0;

virtual int64_t get_push_down_count() = 0;
Expand Down Expand Up @@ -163,15 +162,13 @@ class ScanLocalState : public ScanLocalStateBase {

int64_t get_push_down_count() override;

std::vector<Dependency*> filter_dependencies() override {
std::vector<Dependency*> execution_dependencies() override {
if (_filter_dependencies.empty()) {
return {};
}
std::vector<Dependency*> res;
res.resize(_filter_dependencies.size());
for (size_t i = 0; i < _filter_dependencies.size(); i++) {
res[i] = _filter_dependencies[i].get();
}
std::vector<Dependency*> res(_filter_dependencies.size());
std::transform(_filter_dependencies.begin(), _filter_dependencies.end(), res.begin(),
[](DependencySPtr dep) { return dep.get(); });
return res;
}

Expand Down
Loading
Loading