From 055234168caf93d8efcaa892be8f0f2f433fdc66 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 28 Dec 2020 18:37:21 +0800 Subject: [PATCH 1/5] [Refactor] Support parallel flushing memtable during load In the previous implementation, in an load job, multiple memtables of the same tablet are written to disk sequentially. In fact, multiple memtables can be written out of order in parallel, only need to ensure that each memtable uses a different segment writer. --- be/src/common/config.h | 2 +- be/src/olap/delta_writer.cpp | 2 +- be/src/olap/memtable.cpp | 51 ++++++++++++++++++ be/src/olap/memtable.h | 22 ++++++++ be/src/olap/memtable_flush_executor.cpp | 18 +++++-- be/src/olap/memtable_flush_executor.h | 5 +- be/src/olap/rowset/beta_rowset_writer.cpp | 63 +++++++++++++++++------ be/src/olap/rowset/beta_rowset_writer.h | 19 ++++--- be/src/olap/rowset/rowset_writer.h | 5 ++ build.sh | 2 +- 10 files changed, 161 insertions(+), 28 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 9a28c6bca60551..2d5526ecdf29ae 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -34,7 +34,7 @@ CONF_Int32(be_port, "9060"); CONF_Int32(brpc_port, "8060"); // the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores -CONF_Int32(brpc_num_threads, "-1") +CONF_Int32(brpc_num_threads, "-1"); // Declare a selection strategy for those servers have many ips. // Note that there should at most one ip match this list. diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 5339a47233dfbf..1ed44fbf9f1b7b 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -169,7 +169,7 @@ OLAPStatus DeltaWriter::init() { _reset_mem_table(); // create flush handler - RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token)); + RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token, writer_context.rowset_type)); _is_init = true; return OLAP_SUCCESS; diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 2ff4b588348937..aa663e5b9ef8b6 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -117,6 +117,7 @@ void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_ } } +#if 0 OLAPStatus MemTable::flush() { int64_t duration_ns = 0; { @@ -134,9 +135,59 @@ OLAPStatus MemTable::flush() { DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); return OLAP_SUCCESS; } +#endif + +OLAPStatus MemTable::flush() { + int64_t duration_ns = 0; + { + SCOPED_RAW_TIMER(&duration_ns); + OLAPStatus st = _rowset_writer->flush_single_memtable(this); + if (st == OLAP_ERR_FUNC_NOT_IMPLEMENTED) { + // For alpha rowset, we do not implement "flush_single_memtable". + // Flush the memtable like the old way. + Table::Iterator it(_skip_list); + for (it.SeekToFirst(); it.Valid(); it.Next()) { + char* row = (char*)it.key(); + ContiguousRow dst_row(_schema, row); + agg_finalize_row(&dst_row, _table_mem_pool.get()); + RETURN_NOT_OK(_rowset_writer->add_row(dst_row)); + } + RETURN_NOT_OK(_rowset_writer->flush()); + } else { + RETURN_NOT_OK(st); + } + } + DorisMetrics::instance()->memtable_flush_total->increment(1); + DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); + return OLAP_SUCCESS; +} OLAPStatus MemTable::close() { return flush(); } +MemTable::Iterator::Iterator(MemTable* memtable): + _mem_table(memtable), + _it(memtable->_skip_list) { +} + +void MemTable::Iterator::seek_to_first() { + _it.SeekToFirst(); +} + +bool MemTable::Iterator::valid() { + return _it.Valid(); +} + +void MemTable::Iterator::next() { + _it.Next(); +} + +ContiguousRow MemTable::Iterator::get_current_row() { + char* row = (char*) _it.key(); + ContiguousRow dst_row(_mem_table->_schema, row); + agg_finalize_row(&dst_row, _mem_table->_table_mem_pool.get()); + return dst_row; +} + } // namespace doris diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 633ab335c0f4c0..2012ec5deb4333 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -46,6 +46,8 @@ class MemTable { int64_t tablet_id() const { return _tablet_id; } size_t memory_usage() const { return _mem_tracker->consumption(); } void insert(const Tuple* tuple); + // OLAPStatus flush(); + /// Flush OLAPStatus flush(); OLAPStatus close(); @@ -58,9 +60,29 @@ class MemTable { private: const Schema* _schema; }; + typedef SkipList Table; typedef Table::key_type TableKey; +public: + /// The iterator of memtable, so that the data in this memtable + /// can be visited outside. + class Iterator { + public: + Iterator(MemTable* mem_table); + ~Iterator() {} + + void seek_to_first(); + bool valid(); + void next(); + ContiguousRow get_current_row(); + + private: + MemTable* _mem_table; + Table::Iterator _it; + }; + +private: void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool); void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist); diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index e3f6c907028468..fe55f84bc7a541 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -26,7 +26,8 @@ namespace doris { std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000 - << ", flush count=" << stat.flush_count << ")"; + << ", flush count=" << stat.flush_count + << ", flush bytes: " << stat.flush_size_bytes << ")"; return os; } @@ -65,6 +66,9 @@ void FlushToken::_flush_memtable(std::shared_ptr memtable) { return; } + VLOG(1) << "flush memtable cost: " << timer.elapsed_time() + << ", count: " << _stats.flush_count + << ", size: " << memtable->memory_usage(); _stats.flush_time_ns += timer.elapsed_time(); _stats.flush_count++; _stats.flush_size_bytes += memtable->memory_usage(); @@ -81,8 +85,16 @@ void MemTableFlushExecutor::init(const std::vector& data_dirs) { } // NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order. -OLAPStatus MemTableFlushExecutor::create_flush_token(std::unique_ptr* flush_token) { - flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL))); +OLAPStatus MemTableFlushExecutor::create_flush_token( + std::unique_ptr* flush_token, + RowsetTypePB rowset_type) { + if (rowset_type == BETA_ROWSET) { + // beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer. + flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT))); + } else { + // alpha rowset do not support flush in CONCURRENT. + flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL))); + } return OLAP_SUCCESS; } diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 9f8a8c925d3d9b..04cf3064b282ec 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -21,6 +21,7 @@ #include #include +#include "gen_cpp/olap_file.pb.h" #include "olap/olap_define.h" #include "util/threadpool.h" @@ -95,7 +96,9 @@ class MemTableFlushExecutor { // because it needs path hash of each data dir. void init(const std::vector& data_dirs); - OLAPStatus create_flush_token(std::unique_ptr* flush_token); + OLAPStatus create_flush_token( + std::unique_ptr* flush_token, + RowsetTypePB rowset_type); private: std::unique_ptr _flush_pool; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index e4ebd759e1b92f..be1e65af04186f 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -24,6 +24,7 @@ #include "env/env.h" #include "gutil/strings/substitute.h" #include "olap/fs/fs_util.h" +#include "olap/memtable.h" #include "olap/olap_define.h" #include "olap/row.h" // ContiguousRow #include "olap/row_cursor.h" // RowCursor @@ -90,7 +91,7 @@ OLAPStatus BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_conte template OLAPStatus BetaRowsetWriter::_add_row(const RowType& row) { if (PREDICT_FALSE(_segment_writer == nullptr)) { - RETURN_NOT_OK(_create_segment_writer()); + RETURN_NOT_OK(_create_segment_writer(&_segment_writer)); } // TODO update rowset zonemap auto s = _segment_writer->append_row(row); @@ -100,7 +101,7 @@ OLAPStatus BetaRowsetWriter::_add_row(const RowType& row) { } if (PREDICT_FALSE(_segment_writer->estimate_segment_size() >= MAX_SEGMENT_SIZE || _segment_writer->num_rows_written() >= _context.max_rows_per_segment)) { - RETURN_NOT_OK(_flush_segment_writer()); + RETURN_NOT_OK(_flush_segment_writer(&_segment_writer)); } ++_num_rows_written; return OLAP_SUCCESS; @@ -131,7 +132,37 @@ OLAPStatus BetaRowsetWriter::add_rowset_for_linked_schema_change( OLAPStatus BetaRowsetWriter::flush() { if (_segment_writer != nullptr) { - RETURN_NOT_OK(_flush_segment_writer()); + RETURN_NOT_OK(_flush_segment_writer(&_segment_writer)); + } + return OLAP_SUCCESS; +} + +OLAPStatus BetaRowsetWriter::flush_single_memtable(MemTable* memtable) { + // Create segment writer for each memtable, so that + // all memtables can be flushed in parallel. + std::unique_ptr writer; + + MemTable::Iterator it(memtable); + for (it.seek_to_first(); it.valid(); it.next()) { + if (writer == nullptr) { + RETURN_NOT_OK(_create_segment_writer(&writer)); + } + ContiguousRow dst_row = it.get_current_row(); + auto s = writer->append_row(dst_row); + if (PREDICT_FALSE(!s.ok())) { + LOG(WARNING) << "failed to append row: " << s.to_string(); + return OLAP_ERR_WRITER_DATA_WRITE_ERROR; + } + + if (PREDICT_FALSE(writer->estimate_segment_size() >= MAX_SEGMENT_SIZE || + writer->num_rows_written() >= _context.max_rows_per_segment)) { + RETURN_NOT_OK(_flush_segment_writer(&writer)); + } + ++_num_rows_written; + } + + if (writer != nullptr) { + RETURN_NOT_OK(_flush_segment_writer(&writer)); } return OLAP_SUCCESS; } @@ -172,9 +203,9 @@ RowsetSharedPtr BetaRowsetWriter::build() { return rowset; } -OLAPStatus BetaRowsetWriter::_create_segment_writer() { +OLAPStatus BetaRowsetWriter::_create_segment_writer(std::unique_ptr* writer) { auto path = BetaRowset::segment_file_path(_context.rowset_path_prefix, _context.rowset_id, - _num_segment); + _num_segment++); // TODO(lingbin): should use a more general way to get BlockManager object // and tablets with the same type should share one BlockManager object; fs::BlockManager* block_mgr = fs::fs_util::block_manager(); @@ -189,31 +220,33 @@ OLAPStatus BetaRowsetWriter::_create_segment_writer() { DCHECK(wblock != nullptr); segment_v2::SegmentWriterOptions writer_options; - _segment_writer.reset(new segment_v2::SegmentWriter(wblock.get(), _num_segment, - _context.tablet_schema, writer_options)); - _wblocks.push_back(std::move(wblock)); - // TODO set write_mbytes_per_sec based on writer type (load/base compaction/cumulative compaction) - auto s = _segment_writer->init(config::push_write_mbytes_per_sec); + writer->reset(new segment_v2::SegmentWriter(wblock.get(), _num_segment, + _context.tablet_schema, writer_options)); + { + std::lock_guard l(_lock); + _wblocks.push_back(std::move(wblock)); + } + + auto s = (*writer)->init(config::push_write_mbytes_per_sec); if (!s.ok()) { LOG(WARNING) << "failed to init segment writer: " << s.to_string(); - _segment_writer.reset(nullptr); + writer->reset(nullptr); return OLAP_ERR_INIT_FAILED; } - ++_num_segment; return OLAP_SUCCESS; } -OLAPStatus BetaRowsetWriter::_flush_segment_writer() { +OLAPStatus BetaRowsetWriter::_flush_segment_writer(std::unique_ptr* writer) { uint64_t segment_size; uint64_t index_size; - Status s = _segment_writer->finalize(&segment_size, &index_size); + Status s = (*writer)->finalize(&segment_size, &index_size); if (!s.ok()) { LOG(WARNING) << "failed to finalize segment: " << s.to_string(); return OLAP_ERR_WRITER_DATA_WRITE_ERROR; } _total_data_size += segment_size; _total_index_size += index_size; - _segment_writer.reset(); + writer->reset(); return OLAP_SUCCESS; } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index c0c0467c5803dc..211e31bc1ec8d3 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -51,6 +51,8 @@ class BetaRowsetWriter : public RowsetWriter { OLAPStatus flush() override; + OLAPStatus flush_single_memtable(MemTable* memtable) override; + RowsetSharedPtr build() override; Version version() override { return _context.version; } @@ -63,27 +65,32 @@ class BetaRowsetWriter : public RowsetWriter { template OLAPStatus _add_row(const RowType& row); - OLAPStatus _create_segment_writer(); + OLAPStatus _create_segment_writer(std::unique_ptr* writer); - OLAPStatus _flush_segment_writer(); + OLAPStatus _flush_segment_writer(std::unique_ptr* writer); private: RowsetWriterContext _context; std::shared_ptr _rowset_meta; - int _num_segment; + AtomicInt _num_segment; + /// When flushing the memtable in the load process, we do not use this writer but an independent writer. + /// Because we want to flush memtables in parallel. + /// In other processes, such as merger or schema change, we will use this unified writer for data writing. std::unique_ptr _segment_writer; + mutable SpinLock _lock; // lock to protect _wblocks. // TODO(lingbin): it is better to wrapper in a Batch? std::vector> _wblocks; // counters and statistics maintained during data write - int64_t _num_rows_written; - int64_t _total_data_size; - int64_t _total_index_size; + AtomicInt _num_rows_written; + AtomicInt _total_data_size; + AtomicInt _total_index_size; // TODO rowset Zonemap bool _is_pending = false; bool _already_built = false; + }; } // namespace doris diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index ba698c5ac27dd8..8338bf1c1cabf4 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -27,6 +27,7 @@ namespace doris { class ContiguousRow; +class MemTable; class RowCursor; class RowsetWriter { @@ -52,6 +53,10 @@ class RowsetWriter { // note that `add_row` could also trigger flush when certain conditions are met virtual OLAPStatus flush() = 0; + virtual OLAPStatus flush_single_memtable(MemTable* memtable) { + return OLAP_ERR_FUNC_NOT_IMPLEMENTED; + } + // finish building and return pointer to the built rowset (guaranteed to be inited). // return nullptr when failed virtual RowsetSharedPtr build() = 0; diff --git a/build.sh b/build.sh index 6c0422b200d2d1..831001e6690970 100755 --- a/build.sh +++ b/build.sh @@ -172,7 +172,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then mkdir -p ${CMAKE_BUILD_DIR} cd ${CMAKE_BUILD_DIR} ${CMAKE_CMD} -G "${GENERATOR}" -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DMAKE_TEST=OFF -DWITH_MYSQL=${WITH_MYSQL} -DWITH_LZO=${WITH_LZO} ../ - ${BUILD_SYSTEM} -j${PARALLEL} + ${BUILD_SYSTEM} -j${PARALLEL} #VERBOSE=1 ${BUILD_SYSTEM} install cd ${DORIS_HOME} fi From d5c1f06868f73a0dfb2a48895d924a5d20132443 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 28 Dec 2020 20:40:27 +0800 Subject: [PATCH 2/5] 1 --- be/src/olap/memtable.cpp | 22 +--------------------- be/src/olap/memtable.h | 5 +++++ be/src/olap/memtable_flush_executor.cpp | 7 +++++-- be/src/olap/memtable_flush_executor.h | 1 + be/src/olap/rowset/beta_rowset_writer.cpp | 5 ++++- be/src/olap/rowset/beta_rowset_writer.h | 3 ++- be/src/olap/rowset/rowset_writer.h | 2 +- build.sh | 2 +- 8 files changed, 20 insertions(+), 27 deletions(-) diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index aa663e5b9ef8b6..f728af55faaa50 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -117,31 +117,11 @@ void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_ } } -#if 0 OLAPStatus MemTable::flush() { int64_t duration_ns = 0; { SCOPED_RAW_TIMER(&duration_ns); - Table::Iterator it(_skip_list); - for (it.SeekToFirst(); it.Valid(); it.Next()) { - char* row = (char*)it.key(); - ContiguousRow dst_row(_schema, row); - agg_finalize_row(&dst_row, _table_mem_pool.get()); - RETURN_NOT_OK(_rowset_writer->add_row(dst_row)); - } - RETURN_NOT_OK(_rowset_writer->flush()); - } - DorisMetrics::instance()->memtable_flush_total->increment(1); - DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); - return OLAP_SUCCESS; -} -#endif - -OLAPStatus MemTable::flush() { - int64_t duration_ns = 0; - { - SCOPED_RAW_TIMER(&duration_ns); - OLAPStatus st = _rowset_writer->flush_single_memtable(this); + OLAPStatus st = _rowset_writer->flush_single_memtable(this, &_flush_size); if (st == OLAP_ERR_FUNC_NOT_IMPLEMENTED) { // For alpha rowset, we do not implement "flush_single_memtable". // Flush the memtable like the old way. diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 2012ec5deb4333..75b74ea6999756 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -51,6 +51,8 @@ class MemTable { OLAPStatus flush(); OLAPStatus close(); + int64_t flush_size() const { return _flush_size; } + private: class RowCursorComparator { public: @@ -111,6 +113,9 @@ class MemTable { RowsetWriter* _rowset_writer; + // the data size flushed on disk of this memtable + int64_t _flush_size = 0; + }; // class MemTable inline std::ostream& operator<<(std::ostream& os, const MemTable& table) { diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index fe55f84bc7a541..54d645661cfefa 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -27,7 +27,8 @@ namespace doris { std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000 << ", flush count=" << stat.flush_count - << ", flush bytes: " << stat.flush_size_bytes << ")"; + << ", flush bytes: " << stat.flush_size_bytes + << ", flush disk bytes: " << stat.flush_disk_size_bytes << ")"; return os; } @@ -68,10 +69,12 @@ void FlushToken::_flush_memtable(std::shared_ptr memtable) { VLOG(1) << "flush memtable cost: " << timer.elapsed_time() << ", count: " << _stats.flush_count - << ", size: " << memtable->memory_usage(); + << ", mem size: " << memtable->memory_usage() + << ", disk size: " << memtable->flush_size(); _stats.flush_time_ns += timer.elapsed_time(); _stats.flush_count++; _stats.flush_size_bytes += memtable->memory_usage(); + _stats.flush_disk_size_bytes += memtable->flush_size(); } void MemTableFlushExecutor::init(const std::vector& data_dirs) { diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 04cf3064b282ec..4b6795b77d8051 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -38,6 +38,7 @@ struct FlushStatistic { int64_t flush_time_ns = 0; int64_t flush_count = 0; int64_t flush_size_bytes = 0; + int64_t flush_disk_size_bytes = 0; }; std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index be1e65af04186f..07866e6c067c9d 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -137,7 +137,8 @@ OLAPStatus BetaRowsetWriter::flush() { return OLAP_SUCCESS; } -OLAPStatus BetaRowsetWriter::flush_single_memtable(MemTable* memtable) { +OLAPStatus BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flush_size) { + int64_t current_flush_size = _total_data_size + _total_index_size; // Create segment writer for each memtable, so that // all memtables can be flushed in parallel. std::unique_ptr writer; @@ -164,6 +165,8 @@ OLAPStatus BetaRowsetWriter::flush_single_memtable(MemTable* memtable) { if (writer != nullptr) { RETURN_NOT_OK(_flush_segment_writer(&writer)); } + + *flush_size = (_total_data_size + _total_index_size) - current_flush_size; return OLAP_SUCCESS; } diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 211e31bc1ec8d3..c174a906a28a77 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -51,7 +51,8 @@ class BetaRowsetWriter : public RowsetWriter { OLAPStatus flush() override; - OLAPStatus flush_single_memtable(MemTable* memtable) override; + // Return the file size flushed to disk in "flush_size" + OLAPStatus flush_single_memtable(MemTable* memtable, int64_t* flush_size) override; RowsetSharedPtr build() override; diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 8338bf1c1cabf4..74a648558a9e67 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -53,7 +53,7 @@ class RowsetWriter { // note that `add_row` could also trigger flush when certain conditions are met virtual OLAPStatus flush() = 0; - virtual OLAPStatus flush_single_memtable(MemTable* memtable) { + virtual OLAPStatus flush_single_memtable(MemTable* memtable, int64_t* flush_size) { return OLAP_ERR_FUNC_NOT_IMPLEMENTED; } diff --git a/build.sh b/build.sh index 831001e6690970..6c0422b200d2d1 100755 --- a/build.sh +++ b/build.sh @@ -172,7 +172,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then mkdir -p ${CMAKE_BUILD_DIR} cd ${CMAKE_BUILD_DIR} ${CMAKE_CMD} -G "${GENERATOR}" -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} -DMAKE_TEST=OFF -DWITH_MYSQL=${WITH_MYSQL} -DWITH_LZO=${WITH_LZO} ../ - ${BUILD_SYSTEM} -j${PARALLEL} #VERBOSE=1 + ${BUILD_SYSTEM} -j${PARALLEL} ${BUILD_SYSTEM} install cd ${DORIS_HOME} fi From 3a7a3897d28717af680e055695243b003ec3387d Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 21 Jan 2021 23:25:43 +0800 Subject: [PATCH 3/5] 2 --- be/src/olap/memtable.h | 1 - be/src/olap/memtable_flush_executor.cpp | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 75b74ea6999756..42ded4e37f5e62 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -46,7 +46,6 @@ class MemTable { int64_t tablet_id() const { return _tablet_id; } size_t memory_usage() const { return _mem_tracker->consumption(); } void insert(const Tuple* tuple); - // OLAPStatus flush(); /// Flush OLAPStatus flush(); OLAPStatus close(); diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index 54d645661cfefa..fb78a5503a9801 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -67,7 +67,7 @@ void FlushToken::_flush_memtable(std::shared_ptr memtable) { return; } - VLOG(1) << "flush memtable cost: " << timer.elapsed_time() + VLOG_CRITICAL << "flush memtable cost: " << timer.elapsed_time() << ", count: " << _stats.flush_count << ", mem size: " << memtable->memory_usage() << ", disk size: " << memtable->flush_size(); From f477a048a1f156dd57e448547dd4e72d8f6625a2 Mon Sep 17 00:00:00 2001 From: morningman Date: Thu, 21 Jan 2021 23:36:10 +0800 Subject: [PATCH 4/5] fix bug --- be/src/olap/rowset/beta_rowset_writer.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 07866e6c067c9d..dca63d284a64bc 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -142,12 +142,10 @@ OLAPStatus BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* // Create segment writer for each memtable, so that // all memtables can be flushed in parallel. std::unique_ptr writer; + RETURN_NOT_OK(_create_segment_writer(&writer)); MemTable::Iterator it(memtable); for (it.seek_to_first(); it.valid(); it.next()) { - if (writer == nullptr) { - RETURN_NOT_OK(_create_segment_writer(&writer)); - } ContiguousRow dst_row = it.get_current_row(); auto s = writer->append_row(dst_row); if (PREDICT_FALSE(!s.ok())) { From 4d7dbd732913815e703ae7ef10feabae3f490f00 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 22 Jan 2021 09:43:47 +0800 Subject: [PATCH 5/5] fix bug --- be/src/olap/rowset/beta_rowset_writer.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index dca63d284a64bc..84207c28886a92 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -142,10 +142,15 @@ OLAPStatus BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* // Create segment writer for each memtable, so that // all memtables can be flushed in parallel. std::unique_ptr writer; - RETURN_NOT_OK(_create_segment_writer(&writer)); MemTable::Iterator it(memtable); - for (it.seek_to_first(); it.valid(); it.next()) { + it.seek_to_first(); + if (it.valid()) { + // Only create writer if memtable has data. + // Because we do not allow to flush a empty segment writer. + RETURN_NOT_OK(_create_segment_writer(&writer)); + } + for ( ; it.valid(); it.next()) { ContiguousRow dst_row = it.get_current_row(); auto s = writer->append_row(dst_row); if (PREDICT_FALSE(!s.ok())) {