Skip to content

Commit

Permalink
[improve](move-memtable) increase load_stream_flush_token_max_tasks (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Dec 26, 2023
1 parent 1964a77 commit 4a60d01
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ DEFINE_Int32(load_stream_messages_in_batch, "128");
// brpc streaming StreamWait seconds on EAGAIN
DEFINE_Int32(load_stream_eagain_wait_seconds, "60");
// max tasks per flush token in load stream
DEFINE_Int32(load_stream_flush_token_max_tasks, "5");
DEFINE_Int32(load_stream_flush_token_max_tasks, "15");

// max send batch parallelism for OlapTableSink
// The value set by the user for send_batch_parallelism is not allowed to exceed max_send_batch_parallelism_per_job,
Expand Down
12 changes: 11 additions & 1 deletion be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@

namespace doris {

bvar::LatencyRecorder g_load_stream_flush_wait_ms("load_stream_flush_wait_ms");
bvar::Adder<int> g_load_stream_flush_running_threads("load_stream_flush_wait_threads");

TabletStream::TabletStream(PUniqueId load_id, int64_t id, int64_t txn_id,
LoadStreamMgr* load_stream_mgr, RuntimeProfile* profile)
: _id(id),
Expand Down Expand Up @@ -130,6 +133,7 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
butil::IOBuf buf = data->movable();
auto flush_func = [this, new_segid, eos, buf, header]() {
signal::set_signal_task_id(_load_id);
g_load_stream_flush_running_threads << -1;
auto st = _load_stream_writer->append_data(new_segid, header.offset(), buf);
if (eos && st.ok()) {
st = _load_stream_writer->close_segment(new_segid);
Expand All @@ -140,9 +144,15 @@ Status TabletStream::append_data(const PStreamHeader& header, butil::IOBuf* data
}
};
auto& flush_token = _flush_tokens[new_segid % _flush_tokens.size()];
MonotonicStopWatch timer;
timer.start();
while (flush_token->num_tasks() >= config::load_stream_flush_token_max_tasks) {
bthread_usleep(10 * 1000); // 10ms
bthread_usleep(2 * 1000); // 2ms
}
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_load_stream_flush_wait_ms << time_ms;
g_load_stream_flush_running_threads << 1;
return flush_token->submit_func(flush_func);
}

Expand Down

0 comments on commit 4a60d01

Please sign in to comment.