diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index e0d9fb53207f978..0573aaaca05040d 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -89,6 +89,23 @@ Status FlushToken::wait() { return s == OK ? Status::OK() : Status::Error(s); } +Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) { + VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id() + << ", memsize: " << memtable->memory_usage() + << ", rows: " << memtable->stat().raw_rows; + int64_t duration_ns; + SCOPED_RAW_TIMER(&duration_ns); + std::unique_ptr block = memtable->to_block(); + SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_rowset_writer->flush_memtable( + block.get(), segment_id, memtable->flush_mem_tracker(), memtable->stat(), + flush_size))); + DorisMetrics::instance()->memtable_flush_total->increment(1); + DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); + VLOG_CRITICAL << "after flush memtable for tablet: " << memtable->tablet_id() + << ", flushsize: " << *flush_size; + return Status::OK(); +} + void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t submit_task_time) { uint64_t flush_wait_time_ns = MonotonicNanos() - submit_task_time; _stats.flush_wait_time_ns += flush_wait_time_ns; @@ -102,7 +119,7 @@ void FlushToken::_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t size_t memory_usage = memtable->memory_usage(); int64_t flush_size; - Status s = _rowset_writer->flush_memtable(memtable, segment_id, &flush_size); + Status s = _do_flush_memtable(memtable, segment_id, &flush_size); if (!s) { LOG(WARNING) << "Flush memtable failed with res = " << s; diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index b60e35858df5134..5eb0fa00fa03803 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -78,7 +78,7 @@ class FlushToken { void _flush_memtable(MemTable* mem_table, int32_t segment_id, int64_t submit_task_time); - Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t& flush_size); + Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size); std::unique_ptr _flush_token; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 31d2f9c7075f6ed..f668dfc3ec7badf 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -518,36 +518,21 @@ Status BetaRowsetWriter::flush() { return Status::OK(); } -Status BetaRowsetWriter::flush_memtable(MemTable* memtable, int32_t segment_id, - int64_t* flush_size) { - VLOG_CRITICAL << "begin to flush memtable for tablet: " << memtable->tablet_id() - << ", memsize: " << memtable->memory_usage() - << ", rows: " << memtable->stat().raw_rows; - int64_t duration_ns; - SCOPED_RAW_TIMER(&duration_ns); - SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_do_flush_memtable(memtable, segment_id, flush_size))); - _memtable_stat += memtable->stat(); - DorisMetrics::instance()->memtable_flush_total->increment(1); - DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); - VLOG_CRITICAL << "after flush memtable for tablet: " << memtable->tablet_id() - << ", flushsize: " << *flush_size; - return Status::OK(); -} - -Status BetaRowsetWriter::_do_flush_memtable(MemTable* memtable, int32_t segment_id, - int64_t* flush_size) { - SCOPED_CONSUME_MEM_TRACKER(memtable->flush_mem_tracker()); - std::unique_ptr block = memtable->to_block(); +Status BetaRowsetWriter::flush_memtable(vectorized::Block* block, int32_t segment_id, + const std::shared_ptr& flush_mem_tracker, + const MemTableStat& stat, int64_t* flush_size) { + SCOPED_CONSUME_MEM_TRACKER(flush_mem_tracker); FlushContext ctx; - ctx.block = block.get(); + ctx.block = block; if (_context.tablet_schema->is_dynamic_schema()) { // Unfold variant column RETURN_IF_ERROR(_unfold_variant_column(*block, &ctx)); } ctx.segment_id = std::optional {segment_id}; SCOPED_RAW_TIMER(&_memtable_stat.segment_writer_ns); - RETURN_IF_ERROR(flush_single_memtable(block.get(), flush_size, &ctx)); + RETURN_IF_ERROR(flush_single_memtable(block, flush_size, &ctx)); + _memtable_stat += stat; return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index a4ff96272c3ece2..c279b53eba7d47f 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -80,7 +80,9 @@ class BetaRowsetWriter : public RowsetWriter { Status flush() override; - Status flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) override; + Status flush_memtable(vectorized::Block* block, int32_t segment_id, + const std::shared_ptr& flush_mem_tracker, + const MemTableStat& stat, int64_t* flush_size) override; // Return the file size flushed to disk in "flush_size" // This method is thread-safe. @@ -136,7 +138,6 @@ class BetaRowsetWriter : public RowsetWriter { const FlushContext* ctx = nullptr); Status _flush_segment_writer(std::unique_ptr* writer, int64_t* flush_size = nullptr); - Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size); Status _generate_delete_bitmap(int32_t segment_id); void _build_rowset_meta(std::shared_ptr rowset_meta); Status _segcompaction_if_necessary(); diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 9bd368ed68d7a89..7e741dd8f3e1ff6 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -73,7 +73,9 @@ class RowsetWriter { } virtual Status final_flush() { return Status::Error(); } - virtual Status flush_memtable(MemTable* memtable, int32_t segment_id, int64_t* flush_size) { + virtual Status flush_memtable(vectorized::Block* block, int32_t segment_id, + const std::shared_ptr& flush_mem_tracker, + const MemTableStat& stat, int64_t* flush_size) { return Status::Error(); }