Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ CONF_Int32(be_port, "9060");
CONF_Int32(brpc_port, "8060");

// the number of bthreads for brpc, the default value is set to -1, which means the number of bthreads is #cpu-cores
CONF_Int32(brpc_num_threads, "-1")
CONF_Int32(brpc_num_threads, "-1");

// Declare a selection strategy for those servers have many ips.
// Note that there should at most one ip match this list.
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ OLAPStatus DeltaWriter::init() {
_reset_mem_table();

// create flush handler
RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token));
RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token, writer_context.rowset_type));

_is_init = true;
return OLAP_SUCCESS;
Expand Down
45 changes: 38 additions & 7 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,21 @@ OLAPStatus MemTable::flush() {
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
Table::Iterator it(_skip_list);
for (it.SeekToFirst(); it.Valid(); it.Next()) {
char* row = (char*)it.key();
ContiguousRow dst_row(_schema, row);
agg_finalize_row(&dst_row, _table_mem_pool.get());
RETURN_NOT_OK(_rowset_writer->add_row(dst_row));
OLAPStatus st = _rowset_writer->flush_single_memtable(this, &_flush_size);
if (st == OLAP_ERR_FUNC_NOT_IMPLEMENTED) {
// For alpha rowset, we do not implement "flush_single_memtable".
// Flush the memtable like the old way.
Table::Iterator it(_skip_list);
for (it.SeekToFirst(); it.Valid(); it.Next()) {
char* row = (char*)it.key();
ContiguousRow dst_row(_schema, row);
agg_finalize_row(&dst_row, _table_mem_pool.get());
RETURN_NOT_OK(_rowset_writer->add_row(dst_row));
}
RETURN_NOT_OK(_rowset_writer->flush());
} else {
RETURN_NOT_OK(st);
}
RETURN_NOT_OK(_rowset_writer->flush());
}
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
Expand All @@ -139,4 +146,28 @@ OLAPStatus MemTable::close() {
return flush();
}

MemTable::Iterator::Iterator(MemTable* memtable):
_mem_table(memtable),
_it(memtable->_skip_list) {
}

void MemTable::Iterator::seek_to_first() {
_it.SeekToFirst();
}

bool MemTable::Iterator::valid() {
return _it.Valid();
}

void MemTable::Iterator::next() {
_it.Next();
}

ContiguousRow MemTable::Iterator::get_current_row() {
char* row = (char*) _it.key();
ContiguousRow dst_row(_mem_table->_schema, row);
agg_finalize_row(&dst_row, _mem_table->_table_mem_pool.get());
return dst_row;
}

} // namespace doris
26 changes: 26 additions & 0 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ class MemTable {
int64_t tablet_id() const { return _tablet_id; }
size_t memory_usage() const { return _mem_tracker->consumption(); }
void insert(const Tuple* tuple);
/// Flush
OLAPStatus flush();
OLAPStatus close();

int64_t flush_size() const { return _flush_size; }

private:
class RowCursorComparator {
public:
Expand All @@ -58,9 +61,29 @@ class MemTable {
private:
const Schema* _schema;
};

typedef SkipList<char*, RowCursorComparator> Table;
typedef Table::key_type TableKey;

public:
/// The iterator of memtable, so that the data in this memtable
/// can be visited outside.
class Iterator {
public:
Iterator(MemTable* mem_table);
~Iterator() {}

void seek_to_first();
bool valid();
void next();
ContiguousRow get_current_row();

private:
MemTable* _mem_table;
Table::Iterator _it;
};

private:
void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool);
void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist);

Expand Down Expand Up @@ -89,6 +112,9 @@ class MemTable {

RowsetWriter* _rowset_writer;

// the data size flushed on disk of this memtable
int64_t _flush_size = 0;

}; // class MemTable

inline std::ostream& operator<<(std::ostream& os, const MemTable& table) {
Expand Down
21 changes: 18 additions & 3 deletions be/src/olap/memtable_flush_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ namespace doris {

std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000
<< ", flush count=" << stat.flush_count << ")";
<< ", flush count=" << stat.flush_count
<< ", flush bytes: " << stat.flush_size_bytes
<< ", flush disk bytes: " << stat.flush_disk_size_bytes << ")";
return os;
}

Expand Down Expand Up @@ -65,9 +67,14 @@ void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable) {
return;
}

VLOG_CRITICAL << "flush memtable cost: " << timer.elapsed_time()
<< ", count: " << _stats.flush_count
<< ", mem size: " << memtable->memory_usage()
<< ", disk size: " << memtable->flush_size();
_stats.flush_time_ns += timer.elapsed_time();
_stats.flush_count++;
_stats.flush_size_bytes += memtable->memory_usage();
_stats.flush_disk_size_bytes += memtable->flush_size();
}

void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
Expand All @@ -81,8 +88,16 @@ void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
}

// NOTE: we use SERIAL mode here to ensure all mem-tables from one tablet are flushed in order.
OLAPStatus MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* flush_token) {
flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
OLAPStatus MemTableFlushExecutor::create_flush_token(
std::unique_ptr<FlushToken>* flush_token,
RowsetTypePB rowset_type) {
if (rowset_type == BETA_ROWSET) {
// beta rowset can be flush in CONCURRENT, because each memtable using a new segment writer.
flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT)));
} else {
// alpha rowset do not support flush in CONCURRENT.
flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
}
return OLAP_SUCCESS;
}

Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/memtable_flush_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <memory>
#include <vector>

#include "gen_cpp/olap_file.pb.h"
#include "olap/olap_define.h"
#include "util/threadpool.h"

Expand All @@ -37,6 +38,7 @@ struct FlushStatistic {
int64_t flush_time_ns = 0;
int64_t flush_count = 0;
int64_t flush_size_bytes = 0;
int64_t flush_disk_size_bytes = 0;
};

std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
Expand Down Expand Up @@ -95,7 +97,9 @@ class MemTableFlushExecutor {
// because it needs path hash of each data dir.
void init(const std::vector<DataDir*>& data_dirs);

OLAPStatus create_flush_token(std::unique_ptr<FlushToken>* flush_token);
OLAPStatus create_flush_token(
std::unique_ptr<FlushToken>* flush_token,
RowsetTypePB rowset_type);

private:
std::unique_ptr<ThreadPool> _flush_pool;
Expand Down
69 changes: 54 additions & 15 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "env/env.h"
#include "gutil/strings/substitute.h"
#include "olap/fs/fs_util.h"
#include "olap/memtable.h"
#include "olap/olap_define.h"
#include "olap/row.h" // ContiguousRow
#include "olap/row_cursor.h" // RowCursor
Expand Down Expand Up @@ -90,7 +91,7 @@ OLAPStatus BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_conte
template <typename RowType>
OLAPStatus BetaRowsetWriter::_add_row(const RowType& row) {
if (PREDICT_FALSE(_segment_writer == nullptr)) {
RETURN_NOT_OK(_create_segment_writer());
RETURN_NOT_OK(_create_segment_writer(&_segment_writer));
}
// TODO update rowset zonemap
auto s = _segment_writer->append_row(row);
Expand All @@ -100,7 +101,7 @@ OLAPStatus BetaRowsetWriter::_add_row(const RowType& row) {
}
if (PREDICT_FALSE(_segment_writer->estimate_segment_size() >= MAX_SEGMENT_SIZE ||
_segment_writer->num_rows_written() >= _context.max_rows_per_segment)) {
RETURN_NOT_OK(_flush_segment_writer());
RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
}
++_num_rows_written;
return OLAP_SUCCESS;
Expand Down Expand Up @@ -131,11 +132,47 @@ OLAPStatus BetaRowsetWriter::add_rowset_for_linked_schema_change(

OLAPStatus BetaRowsetWriter::flush() {
if (_segment_writer != nullptr) {
RETURN_NOT_OK(_flush_segment_writer());
RETURN_NOT_OK(_flush_segment_writer(&_segment_writer));
}
return OLAP_SUCCESS;
}

OLAPStatus BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
int64_t current_flush_size = _total_data_size + _total_index_size;
// Create segment writer for each memtable, so that
// all memtables can be flushed in parallel.
std::unique_ptr<segment_v2::SegmentWriter> writer;

MemTable::Iterator it(memtable);
it.seek_to_first();
if (it.valid()) {
// Only create writer if memtable has data.
// Because we do not allow to flush a empty segment writer.
RETURN_NOT_OK(_create_segment_writer(&writer));
}
for ( ; it.valid(); it.next()) {
ContiguousRow dst_row = it.get_current_row();
auto s = writer->append_row(dst_row);
if (PREDICT_FALSE(!s.ok())) {
LOG(WARNING) << "failed to append row: " << s.to_string();
return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
}

if (PREDICT_FALSE(writer->estimate_segment_size() >= MAX_SEGMENT_SIZE ||
writer->num_rows_written() >= _context.max_rows_per_segment)) {
RETURN_NOT_OK(_flush_segment_writer(&writer));
}
++_num_rows_written;
}

if (writer != nullptr) {
RETURN_NOT_OK(_flush_segment_writer(&writer));
}

*flush_size = (_total_data_size + _total_index_size) - current_flush_size;
return OLAP_SUCCESS;
}

RowsetSharedPtr BetaRowsetWriter::build() {
// TODO(lingbin): move to more better place, or in a CreateBlockBatch?
for (auto& wblock : _wblocks) {
Expand Down Expand Up @@ -172,9 +209,9 @@ RowsetSharedPtr BetaRowsetWriter::build() {
return rowset;
}

OLAPStatus BetaRowsetWriter::_create_segment_writer() {
OLAPStatus BetaRowsetWriter::_create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
auto path = BetaRowset::segment_file_path(_context.rowset_path_prefix, _context.rowset_id,
_num_segment);
_num_segment++);
// TODO(lingbin): should use a more general way to get BlockManager object
// and tablets with the same type should share one BlockManager object;
fs::BlockManager* block_mgr = fs::fs_util::block_manager();
Expand All @@ -189,31 +226,33 @@ OLAPStatus BetaRowsetWriter::_create_segment_writer() {

DCHECK(wblock != nullptr);
segment_v2::SegmentWriterOptions writer_options;
_segment_writer.reset(new segment_v2::SegmentWriter(wblock.get(), _num_segment,
_context.tablet_schema, writer_options));
_wblocks.push_back(std::move(wblock));
// TODO set write_mbytes_per_sec based on writer type (load/base compaction/cumulative compaction)
auto s = _segment_writer->init(config::push_write_mbytes_per_sec);
writer->reset(new segment_v2::SegmentWriter(wblock.get(), _num_segment,
_context.tablet_schema, writer_options));
{
std::lock_guard<SpinLock> l(_lock);
_wblocks.push_back(std::move(wblock));
}

auto s = (*writer)->init(config::push_write_mbytes_per_sec);
if (!s.ok()) {
LOG(WARNING) << "failed to init segment writer: " << s.to_string();
_segment_writer.reset(nullptr);
writer->reset(nullptr);
return OLAP_ERR_INIT_FAILED;
}
++_num_segment;
return OLAP_SUCCESS;
}

OLAPStatus BetaRowsetWriter::_flush_segment_writer() {
OLAPStatus BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
uint64_t segment_size;
uint64_t index_size;
Status s = _segment_writer->finalize(&segment_size, &index_size);
Status s = (*writer)->finalize(&segment_size, &index_size);
if (!s.ok()) {
LOG(WARNING) << "failed to finalize segment: " << s.to_string();
return OLAP_ERR_WRITER_DATA_WRITE_ERROR;
}
_total_data_size += segment_size;
_total_index_size += index_size;
_segment_writer.reset();
writer->reset();
return OLAP_SUCCESS;
}

Expand Down
20 changes: 14 additions & 6 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class BetaRowsetWriter : public RowsetWriter {

OLAPStatus flush() override;

// Return the file size flushed to disk in "flush_size"
OLAPStatus flush_single_memtable(MemTable* memtable, int64_t* flush_size) override;

RowsetSharedPtr build() override;

Version version() override { return _context.version; }
Expand All @@ -63,27 +66,32 @@ class BetaRowsetWriter : public RowsetWriter {
template <typename RowType>
OLAPStatus _add_row(const RowType& row);

OLAPStatus _create_segment_writer();
OLAPStatus _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer);

OLAPStatus _flush_segment_writer();
OLAPStatus _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer);

private:
RowsetWriterContext _context;
std::shared_ptr<RowsetMeta> _rowset_meta;

int _num_segment;
AtomicInt<int32_t> _num_segment;
/// When flushing the memtable in the load process, we do not use this writer but an independent writer.
/// Because we want to flush memtables in parallel.
/// In other processes, such as merger or schema change, we will use this unified writer for data writing.
std::unique_ptr<segment_v2::SegmentWriter> _segment_writer;
mutable SpinLock _lock; // lock to protect _wblocks.
// TODO(lingbin): it is better to wrapper in a Batch?
std::vector<std::unique_ptr<fs::WritableBlock>> _wblocks;

// counters and statistics maintained during data write
int64_t _num_rows_written;
int64_t _total_data_size;
int64_t _total_index_size;
AtomicInt<int64_t> _num_rows_written;
AtomicInt<int64_t> _total_data_size;
AtomicInt<int64_t> _total_index_size;
// TODO rowset Zonemap

bool _is_pending = false;
bool _already_built = false;

};

} // namespace doris
Expand Down
5 changes: 5 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
namespace doris {

class ContiguousRow;
class MemTable;
class RowCursor;

class RowsetWriter {
Expand All @@ -52,6 +53,10 @@ class RowsetWriter {
// note that `add_row` could also trigger flush when certain conditions are met
virtual OLAPStatus flush() = 0;

virtual OLAPStatus flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
return OLAP_ERR_FUNC_NOT_IMPLEMENTED;
}

// finish building and return pointer to the built rowset (guaranteed to be inited).
// return nullptr when failed
virtual RowsetSharedPtr build() = 0;
Expand Down