Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: small refine of MergedTask. #8512

Merged
merged 3 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ class RFWaitTask : public Task

static void submitReadyRfsAndSegmentTaskPool(
const RuntimeFilteList & ready_rf_list,
const DM::SegmentReadTaskPoolPtr & task_pool)
const DM::SegmentReadTaskPoolPtr & task_pool,
const LoggerPtr & log)
{
for (const RuntimeFilterPtr & rf : ready_rf_list)
{
auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead());
task_pool->appendRSOperator(rs_operator);
}
DM::SegmentReadTaskScheduler::instance().add(task_pool);
DM::SegmentReadTaskScheduler::instance().add(task_pool, log);
}

private:
Expand All @@ -82,7 +83,7 @@ class RFWaitTask : public Task
filterAndMoveReadyRfs(waiting_rf_list, ready_rf_list);
if (waiting_rf_list.empty() || stopwatch.elapsed() >= max_wait_time_ns)
{
submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool);
submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool, log);
return ExecTaskStatus::FINISHED;
}
return ExecTaskStatus::WAITING;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Operators/UnorderedSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void UnorderedSourceOp::operatePrefixImpl()
std::call_once(task_pool->addToSchedulerFlag(), [&]() {
if (waiting_rf_list.empty())
{
DM::SegmentReadTaskScheduler::instance().add(task_pool);
DM::SegmentReadTaskScheduler::instance().add(task_pool, log);
}
else
{
Expand All @@ -74,7 +74,7 @@ void UnorderedSourceOp::operatePrefixImpl()

if (max_wait_time_ms <= 0 || waiting_rf_list.empty())
{
RFWaitTask::submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool);
RFWaitTask::submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool, log);
}
else
{
Expand Down
47 changes: 19 additions & 28 deletions dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void MergedTask::initOnce()
auto & [pool, task, stream] = units[cur_idx];
if (!pool->valid())
{
setStreamFinished(cur_idx);
setUnitFinish(cur_idx);
continue;
}
if (pool->isRUExhausted())
Expand All @@ -65,7 +65,7 @@ int MergedTask::readOneBlock()
int read_block_count = 0;
for (cur_idx = 0; cur_idx < static_cast<int>(units.size()); cur_idx++)
{
if (isStreamFinished(cur_idx))
if (units[cur_idx].isFinished())
{
continue;
}
Expand All @@ -74,7 +74,7 @@ int MergedTask::readOneBlock()

if (!pool->valid())
{
setStreamFinished(cur_idx);
setUnitFinish(cur_idx);
continue;
}

Expand All @@ -94,37 +94,33 @@ int MergedTask::readOneBlock()
}
else
{
setStreamFinished(cur_idx);
setUnitFinish(cur_idx);
}
}
return read_block_count;
}

void MergedTask::setException(const DB::Exception & e)
{
for (auto & unit : units)
{
if (unit.pool != nullptr)
{
unit.pool->setException(e);
}
}
std::for_each(units.begin(), units.end(), [&e](auto & u) {
if (u.pool != nullptr)
u.pool->setException(e);
});
}

MergedTaskPtr MergedTaskPool::pop(uint64_t pool_id)
{
std::lock_guard lock(mtx);
MergedTaskPtr target;
for (auto itr = merged_task_pool.begin(); itr != merged_task_pool.end(); ++itr)
auto itr = std::find_if(merged_task_pool.begin(), merged_task_pool.end(), [pool_id](const auto & merged_task) {
return merged_task->containPool(pool_id);
});
if (itr != merged_task_pool.end())
{
if ((*itr)->containPool(pool_id))
{
target = *itr;
merged_task_pool.erase(itr);
break;
}
auto target = *itr;
merged_task_pool.erase(itr);
return target;
}
return target;
return nullptr; // Not Found.
}

void MergedTaskPool::push(const MergedTaskPtr & t)
Expand All @@ -136,13 +132,8 @@ void MergedTaskPool::push(const MergedTaskPtr & t)
bool MergedTaskPool::has(UInt64 pool_id)
{
std::lock_guard lock(mtx);
for (const auto & t : merged_task_pool)
{
if (t->containPool(pool_id))
{
return true;
}
}
return false;
return std::any_of(merged_task_pool.begin(), merged_task_pool.end(), [pool_id](const auto & merged_task) {
return merged_task->containPool(pool_id);
});
}
} // namespace DB::DM
100 changes: 39 additions & 61 deletions dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,29 @@ struct MergedUnit
, task(task_)
{}

~MergedUnit()
{
// Calling `setFinish()` for updating memory statistics of `MemoryTracker`.
[[maybe_unused]] auto res = setFinish();
}

[[nodiscard]] bool isFinished() const { return pool == nullptr && task == nullptr && stream == nullptr; }

// If setted return true else return false.
[[nodiscard]] bool setFinish()
{
if (!isFinished())
{
// For updating memory statistics of `MemoryTracker`.
MemoryTrackerSetter setter(true, pool->mem_tracker.get());
task = nullptr;
stream = nullptr;
pool = nullptr;
return true;
}
return false;
}

SegmentReadTaskPoolPtr pool; // The information of a read request.
SegmentReadTaskPtr task; // The information of a segment that want to read.
BlockInputStreamPtr stream; // BlockInputStream of a segment, will be created by read threads.
Expand All @@ -44,7 +67,6 @@ class MergedTask
, inited(false)
, cur_idx(-1)
, finished_count(0)
, log(Logger::get())
{
passive_merged_segments.fetch_add(units.size() - 1, std::memory_order_relaxed);
GET_METRIC(tiflash_storage_read_thread_gauge, type_merged_task).Increment();
Expand All @@ -54,8 +76,6 @@ class MergedTask
passive_merged_segments.fetch_sub(units.size() - 1, std::memory_order_relaxed);
GET_METRIC(tiflash_storage_read_thread_gauge, type_merged_task).Decrement();
GET_METRIC(tiflash_storage_read_thread_seconds, type_merged_task).Observe(sw.elapsedSeconds());
// `setAllStreamFinished` must be called to explicitly releasing all streams for updating memory statistics of `MemoryTracker`.
setAllStreamsFinished();
}

int readBlock();
Expand All @@ -64,36 +84,27 @@ class MergedTask

const GlobalSegmentID & getSegmentId() const { return seg_id; }

size_t getPoolCount() const { return units.size(); }

std::vector<uint64_t> getPoolIds() const
bool containPool(uint64_t pool_id) const
{
std::vector<uint64_t> ids;
ids.reserve(units.size());
for (const auto & unit : units)
{
if (unit.pool != nullptr)
{
ids.push_back(unit.pool->pool_id);
}
}
return ids;
return std::any_of(units.begin(), units.end(), [pool_id](const auto & u) {
return u.pool != nullptr && u.pool->pool_id == pool_id;
});
}

bool containPool(uint64_t pool_id) const
void setException(const DB::Exception & e);

String toString() const
{
for (const auto & unit : units)
{
if (unit.pool != nullptr && unit.pool->pool_id == pool_id)
{
return true;
}
}
return false;
std::vector<UInt64> ids;
ids.reserve(units.size());
std::for_each(units.begin(), units.end(), [&ids](const auto & u) {
if (u.pool != nullptr)
ids.push_back(u.pool->pool_id);
});
return fmt::format("seg_id:{} pool_id:{}", seg_id, ids);
}
void setException(const DB::Exception & e);

const LoggerPtr getCurrentLogger() const
LoggerPtr getCurrentLogger() const
{
// `std::cmp_*` is safety to compare negative signed integers and unsigned integers.
if (likely(
Expand All @@ -115,41 +126,13 @@ class MergedTask
private:
void initOnce();
int readOneBlock();
void setUnitFinish(int i) { finished_count += units[i].setFinish(); }

bool isStreamFinished(size_t i) const
{
return units[i].pool == nullptr && units[i].task == nullptr && units[i].stream == nullptr;
}

void setStreamFinished(size_t i)
{
if (!isStreamFinished(i))
{
// `MergedUnit.stream` must be released explicitly for updating memory statistics of `MemoryTracker`.
auto & [pool, task, stream] = units[i];
{
MemoryTrackerSetter setter(true, pool->mem_tracker.get());
task = nullptr;
stream = nullptr;
}
pool = nullptr;
finished_count++;
}
}

void setAllStreamsFinished()
{
for (size_t i = 0; i < units.size(); ++i)
{
setStreamFinished(i);
}
}
GlobalSegmentID seg_id;
std::vector<MergedUnit> units;
bool inited;
int cur_idx;
size_t finished_count;
LoggerPtr log;
Stopwatch sw;
inline static std::atomic<int64_t> passive_merged_segments{0};
};
Expand All @@ -162,17 +145,12 @@ using MergedTaskPtr = std::shared_ptr<MergedTask>;
class MergedTaskPool
{
public:
MergedTaskPool()
: log(Logger::get())
{}

MergedTaskPtr pop(uint64_t pool_id);
void push(const MergedTaskPtr & t);
bool has(UInt64 pool_id);

private:
std::mutex mtx;
std::list<MergedTaskPtr> merged_task_pool GUARDED_BY(mtx);
LoggerPtr log;
};
} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ SegmentReadTaskScheduler::~SegmentReadTaskScheduler()
sched_thread.join();
}

void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool)
void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool, const LoggerPtr & req_log)
{
Stopwatch sw_add;
// `add_lock` is only used in this function to make all threads calling `add` to execute serially.
std::lock_guard add_lock(add_mtx);
// `lock` is used to protect data.
std::lock_guard lock(mtx);
Stopwatch sw_do_add;
read_pools.add(pool);
Expand All @@ -43,12 +45,11 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool)
{
merging_segments[seg_id].push_back(pool->pool_id);
}
auto block_slots = pool->getFreeBlockSlots();
LOG_DEBUG(
log,
req_log,
"Added, pool_id={} block_slots={} segment_count={} pool_count={} cost={:.3f}us do_add_cost={:.3f}us", //
pool->pool_id,
block_slots,
pool->getFreeBlockSlots(),
tasks.size(),
read_pools.size(),
sw_add.elapsed() / 1000.0,
Expand Down Expand Up @@ -235,9 +236,8 @@ bool SegmentReadTaskScheduler::schedule()
{
LOG_DEBUG(
log,
"scheduleMergedTask segment_id={} pool_ids={} cost={}ms pool_count={}",
merged_task->getSegmentId(),
merged_task->getPoolIds(),
"scheduleMergedTask merged_task=<{}> cost={}ms pool_count={}",
merged_task->toString(),
elapsed_ms,
pool_count);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include <Storages/DeltaMerge/ReadThread/MergedTask.h>
#include <Storages/DeltaMerge/SegmentReadTaskPool.h>

#include <memory>
namespace DB::DM
{
using SegmentReadTaskPoolList = CircularScanList<SegmentReadTaskPool>;
Expand All @@ -43,7 +42,7 @@ class SegmentReadTaskScheduler
DISALLOW_COPY_AND_MOVE(SegmentReadTaskScheduler);

// Add SegmentReadTaskPool to `read_pools` and index segments into merging_segments.
void add(const SegmentReadTaskPoolPtr & pool) LOCKS_EXCLUDED(add_mtx, mtx);
void add(const SegmentReadTaskPoolPtr & pool, const LoggerPtr & req_log) LOCKS_EXCLUDED(add_mtx, mtx);

void pushMergedTask(const MergedTaskPtr & p) { merged_task_pool.push(p); }

Expand Down
7 changes: 1 addition & 6 deletions dbms/src/Storages/DeltaMerge/ReadThread/SegmentReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,7 @@ class SegmentReader
}
if (read_count <= 0)
{
LOG_DEBUG(
log,
"All finished, pool_ids={} segment_id={} read_count={}",
merged_task->getPoolIds(),
merged_task->getSegmentId(),
read_count);
LOG_DEBUG(log, "All finished, merged_task=<{}> read_count={}", merged_task->toString(), read_count);
}
// If `merged_task` is pushed back to `MergedTaskPool`, it can be accessed by another read thread if it is scheduled.
// So do not push back to `MergedTaskPool` when exception happened since current read thread can still access to this `merged_task` object and set exception message to it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class UnorderedInputStream : public IProfilingBlockInputStream
}
std::call_once(task_pool->addToSchedulerFlag(), [&]() {
prepareRuntimeFilter();
SegmentReadTaskScheduler::instance().add(task_pool);
SegmentReadTaskScheduler::instance().add(task_pool, log);
});
task_pool_added = true;
}
Expand Down