Skip to content

Commit

Permalink
handle memtable stats and metrics in FlushToken
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen committed Jul 6, 2023
1 parent 06995c8 commit ac78cf8
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 27 deletions.
19 changes: 18 additions & 1 deletion be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ Status FlushToken::wait() {
return s == OK ? Status::OK() : Status::Error(s);
}

Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
<< ", memsize: " << memtable->memory_usage()
<< ", rows: " << memtable->stat().raw_rows;
int64_t duration_ns;
SCOPED_RAW_TIMER(&duration_ns);
std::unique_ptr<vectorized::Block> block = memtable->to_block();
SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_rowset_writer->flush_memtable(
block.get(), segment_id, memtable->flush_mem_tracker(), memtable->stat(),
flush_size)));
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
VLOG_CRITICAL << "after flush memtable for tablet: " << memtable->tablet_id()
<< ", flushsize: " << *flush_size;
return Status::OK();
}

void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t submit_task_time) {
uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time;
_stats.flush_wait_time_ns += flush_wait_time_ns;
Expand All @@ -102,7 +119,7 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t
size_t memory_usage = memtable->memory_usage();

int64_t flush_size;
Status s = _rowset_writer->flush_memtable(memtable, segment_id, &flush_size);
Status s = _do_flush_memtable(memtable, segment_id, &flush_size);

if (!s) {
LOG(WARNING) << "Flush memtable failed with res = " << s;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class FlushToken {

void _flush_memtable(MemTable* mem_table, int32_t segment_id, int64_t submit_task_time);

Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t& flush_size);
Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size);

std::unique_ptr<ThreadPoolToken> _flush_token;

Expand Down
29 changes: 7 additions & 22 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,36 +518,21 @@ Status BetaRowsetWriter::flush() {
return Status::OK();
}

Status BetaRowsetWriter::flush_memtable(MemTable* memtable, int32_t segment_id,
int64_t* flush_size) {
VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id()
<< ", memsize: " << memtable->memory_usage()
<< ", rows: " << memtable->stat().raw_rows;
int64_t duration_ns;
SCOPED_RAW_TIMER(&duration_ns);
SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_do_flush_memtable(memtable, segment_id, flush_size)));
_memtable_stat += memtable->stat();
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
VLOG_CRITICAL << "after flush memtable for tablet: " << memtable->tablet_id()
<< ", flushsize: " << *flush_size;
return Status::OK();
}

Status BetaRowsetWriter::_do_flush_memtable(MemTable* memtable, int32_t segment_id,
int64_t* flush_size) {
SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker());
std::unique_ptr<vectorized::Block> block = memtable->to_block();
Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segment_id,
const std::shared_ptr<MemTracker>& flush_mem_tracker,
const MemTableStat& stat, int64_t* flush_size) {
SCOPED_CONSUME_MEM_TRACKER(flush_mem_tracker);

FlushContext ctx;
ctx.block = block.get();
ctx.block = block;
if (_context.tablet_schema->is_dynamic_schema()) {
// Unfold variant column
RETURN_IF_ERROR(_unfold_variant_column(*block, &ctx));
}
ctx.segment_id = std::optional<int32_t> {segment_id};
SCOPED_RAW_TIMER(&_memtable_stat.segment_writer_ns);
RETURN_IF_ERROR(flush_single_memtable(block.get(), flush_size, &ctx));
RETURN_IF_ERROR(flush_single_memtable(block, flush_size, &ctx));
_memtable_stat += stat;
return Status::OK();
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ class BetaRowsetWriter : public RowsetWriter {

Status flush() override;

Status flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) override;
Status flush_memtable(vectorized::Block* block, int32_t segment_id,
const std::shared_ptr<MemTracker>& flush_mem_tracker,
const MemTableStat& stat, int64_t* flush_size) override;

// Return the file size flushed to disk in "flush_size"
// This method is thread-safe.
Expand Down Expand Up @@ -136,7 +138,6 @@ class BetaRowsetWriter : public RowsetWriter {
const FlushContext* ctx = nullptr);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer,
int64_t* flush_size = nullptr);
Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size);
Status _generate_delete_bitmap(int32_t segment_id);
void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
Status _segcompaction_if_necessary();
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ class RowsetWriter {
}
virtual Status final_flush() { return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(); }

virtual Status flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) {
virtual Status flush_memtable(vectorized::Block* block, int32_t segment_id,
const std::shared_ptr<MemTracker>& flush_mem_tracker,
const MemTableStat& stat, int64_t* flush_size) {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
}

Expand Down

0 comments on commit ac78cf8

Please sign in to comment.