Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ void RuntimeFilterTimerQueue::start() {
if (it.use_count() == 1) {
// `use_count == 1` means this runtime filter has been released
} else if (it->should_be_check_timeout()) {
if (it->_parent->is_blocked_by()) {
if (it->force_wait_timeout() || it->_parent->is_blocked_by()) {
// This means runtime filter is not ready, so we call timeout or continue to poll this timer.
int64_t ms_since_registration = MonotonicMillis() - it->registration_time();
if (ms_since_registration > it->wait_time_ms()) {
Expand Down
9 changes: 7 additions & 2 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ struct RuntimeFilterTimerQueue;
class RuntimeFilterTimer {
public:
RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
std::shared_ptr<Dependency> parent)
std::shared_ptr<Dependency> parent, bool force_wait_timeout = false)
: _parent(std::move(parent)),
_registration_time(registration_time),
_wait_time_ms(wait_time_ms) {}
_wait_time_ms(wait_time_ms),
_force_wait_timeout(force_wait_timeout) {}

// Called by runtime filter producer.
void call_ready();
Expand All @@ -224,13 +225,17 @@ class RuntimeFilterTimer {

bool should_be_check_timeout();

bool force_wait_timeout() { return _force_wait_timeout; }

private:
friend struct RuntimeFilterTimerQueue;
std::shared_ptr<Dependency> _parent = nullptr;
std::vector<std::shared_ptr<Dependency>> _local_runtime_filter_dependencies;
std::mutex _lock;
int64_t _registration_time;
const int32_t _wait_time_ms;
// true only for group_commit_scan_operator
bool _force_wait_timeout;
};

struct RuntimeFilterTimerQueue {
Expand Down
8 changes: 2 additions & 6 deletions be/src/pipeline/exec/group_commit_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state, vectorized::Block* b
SCOPED_TIMER(local_state.exec_time_counter());
bool find_node = false;
RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos,
local_state._get_block_dependency,
local_state._timer_dependency));
local_state._get_block_dependency));
return Status::OK();
}

Expand All @@ -45,16 +44,13 @@ Status GroupCommitLocalState::init(RuntimeState* state, LocalStateInfo& info) {
auto& p = _parent->cast<GroupCommitOperatorX>();
_get_block_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"GroupCommitGetBlockDependency", true);
_timer_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"GroupCommitTimerDependency", true);
auto st = state->exec_env()->group_commit_mgr()->get_load_block_queue(
p._table_id, state->fragment_instance_id(), load_block_queue, _get_block_dependency);
if (st.ok()) {
DCHECK(load_block_queue != nullptr);
_timer_dependency->block();
_runtime_filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
MonotonicMillis(), load_block_queue->get_group_commit_interval_ms(),
_timer_dependency);
_get_block_dependency, true);
std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> timers;
timers.push_back(_runtime_filter_timer);
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(std::move(timers));
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/group_commit_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@ class GroupCommitLocalState final : public ScanLocalState<GroupCommitLocalState>
Status init(RuntimeState* state, LocalStateInfo& info) override;
std::shared_ptr<LoadBlockQueue> load_block_queue = nullptr;
std::vector<Dependency*> dependencies() const override {
return {_scan_dependency.get(), _get_block_dependency.get(), _timer_dependency.get()};
return {_scan_dependency.get(), _get_block_dependency.get()};
}

private:
friend class GroupCommitOperatorX;
Status _process_conjuncts(RuntimeState* state) override;

std::shared_ptr<Dependency> _get_block_dependency = nullptr;
std::shared_ptr<Dependency> _timer_dependency = nullptr;
std::shared_ptr<pipeline::RuntimeFilterTimer> _runtime_filter_timer = nullptr;
};

Expand Down
103 changes: 45 additions & 58 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@

namespace doris {

std::string LoadBlockQueue::_get_load_ids() {
std::stringstream ss;
ss << "[";
for (auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
}
ss << "]";
return ss.str();
}

Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block, bool write_wal,
UniqueId& load_id) {
Expand All @@ -55,23 +65,12 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
_data_bytes += block->bytes();
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_add(block->bytes(), std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
for (const auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). "
<< "block queue size is " << _block_queue.size() << ", block rows is "
<< block->rows() << ", block bytes is " << block->bytes()
<< ", before add block, all block queues bytes is "
<< before_block_queues_bytes
<< ", after add block, all block queues bytes is "
<< _all_block_queues_bytes->load() << ", txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", load_ids=" << ss.str() << ", runtime_state=" << runtime_state
<< ", the block is " << block->dump_data() << ", the block column size is "
<< block->columns_bytes();
<< "Cur block rows=" << block->rows() << ", bytes=" << block->bytes()
<< ". all block queues bytes from " << before_block_queues_bytes << " to "
<< _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size()
<< ". txn_id=" << txn_id << ", label=" << label
<< ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids();
}
if (write_wal || config::group_commit_wait_replay_wal_finish) {
auto st = _v_wal_writer->write_wal(block.get());
Expand All @@ -85,6 +84,9 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
config::group_commit_queue_mem_limit) {
DCHECK(_load_ids_to_write_dep.find(load_id) != _load_ids_to_write_dep.end());
_load_ids_to_write_dep[load_id]->block();
VLOG_DEBUG << "block add_block for load_id=" << load_id
<< ", memory=" << _all_block_queues_bytes->load(std::memory_order_relaxed)
<< ". inner load_id=" << load_instance_id << ", label=" << label;
}
}
if (!_need_commit) {
Expand All @@ -104,14 +106,14 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
}
for (auto read_dep : _read_deps) {
read_dep->set_ready();
VLOG_DEBUG << "set ready for inner load_id=" << load_instance_id;
}
return Status::OK();
}

Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block* block,
bool* find_block, bool* eos,
std::shared_ptr<pipeline::Dependency> get_block_dep,
std::shared_ptr<pipeline::Dependency> timer_dependency) {
std::shared_ptr<pipeline::Dependency> get_block_dep) {
*find_block = false;
*eos = false;
std::unique_lock l(mutex);
Expand All @@ -126,15 +128,6 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
if (!_need_commit && duration >= _group_commit_interval_ms) {
_need_commit = true;
}
auto get_load_ids = [&]() {
std::stringstream ss;
ss << "[";
for (auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
}
ss << "]";
return ss.str();
};
if (_block_queue.empty()) {
if (_need_commit && duration >= 10 * _group_commit_interval_ms) {
auto last_print_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
Expand All @@ -144,12 +137,13 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
_last_print_time = std::chrono::steady_clock::now();
LOG(INFO) << "find one group_commit need to commit, txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", duration=" << duration << ", load_ids=" << get_load_ids();
<< ", duration=" << duration << ", load_ids=" << _get_load_ids();
}
}
VLOG_DEBUG << "get_block for inner load_id=" << load_instance_id << ", but queue is empty";
if (!_need_commit) {
get_block_dep->block();
VLOG_DEBUG << "block get_block for query_id=" << load_instance_id;
VLOG_DEBUG << "block get_block for inner load_id=" << load_instance_id;
}
} else {
const BlockData block_data = _block_queue.front();
Expand All @@ -159,15 +153,11 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed);
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
<< "block queue size is " << _block_queue.size() << ", block rows is "
<< block->rows() << ", block bytes is " << block->bytes()
<< ", before remove block, all block queues bytes is "
<< before_block_queues_bytes
<< ", after remove block, all block queues bytes is "
<< _all_block_queues_bytes->load() << ", txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", load_ids=" << get_load_ids() << ", the block is " << block->dump_data()
<< ", the block column size is " << block->columns_bytes();
<< "Cur block rows=" << block->rows() << ", bytes=" << block->bytes()
<< ". all block queues bytes from " << before_block_queues_bytes << " to "
<< _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size()
<< ". txn_id=" << txn_id << ", label=" << label
<< ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids();
}
if (_block_queue.empty() && _need_commit && _load_ids_to_write_dep.empty()) {
*eos = true;
Expand All @@ -179,6 +169,8 @@ Status LoadBlockQueue::get_block(RuntimeState* runtime_state, vectorized::Block*
for (auto& id : _load_ids_to_write_dep) {
id.second->set_ready();
}
VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids()
<< ". inner load_id=" << load_instance_id << ", label=" << label;
}
return Status::OK();
}
Expand All @@ -191,6 +183,7 @@ Status LoadBlockQueue::remove_load_id(const UniqueId& load_id) {
for (auto read_dep : _read_deps) {
read_dep->set_ready();
}
VLOG_DEBUG << "set ready for load_id=" << load_id << ", inner load_id=" << load_instance_id;
return Status::OK();
}
return Status::NotFound<false>("load_id=" + load_id.to_string() +
Expand Down Expand Up @@ -225,35 +218,25 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) {
<< ", status=" << st.to_string();
status =
Status::Cancelled("cancel group_commit, label=" + label + ", status=" + st.to_string());
int before_block_queues_bytes = _all_block_queues_bytes->load();
while (!_block_queue.empty()) {
const BlockData& block_data = _block_queue.front().block;
int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_sub(block_data.block_bytes, std::memory_order_relaxed);
std::stringstream ss;
ss << "[";
for (const auto& id : _load_ids_to_write_dep) {
ss << id.first.to_string() << ", ";
}
ss << "]";
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). "
<< "block queue size is " << _block_queue.size() << ", block rows is "
<< block_data.block->rows() << ", block bytes is " << block_data.block->bytes()
<< ", before remove block, all block queues bytes is "
<< before_block_queues_bytes
<< ", after remove block, all block queues bytes is "
<< _all_block_queues_bytes->load() << ", txn_id=" << txn_id
<< ", label=" << label << ", instance_id=" << load_instance_id
<< ", load_ids=" << ss.str() << ", the block is "
<< block_data.block->dump_data() << ", the block column size is "
<< block_data.block->columns_bytes();
_block_queue.pop_front();
}
VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::_cancel_without_block). "
<< "all block queues bytes from " << before_block_queues_bytes << " to "
<< _all_block_queues_bytes->load() << ", queue size=" << _block_queue.size()
<< ", txn_id=" << txn_id << ", label=" << label
<< ", instance_id=" << load_instance_id << ", load_ids=" << _get_load_ids();
for (auto& id : _load_ids_to_write_dep) {
id.second->set_always_ready();
}
for (auto read_dep : _read_deps) {
read_dep->set_ready();
}
VLOG_DEBUG << "set ready for load_ids=" << _get_load_ids()
<< ", inner load_id=" << load_instance_id;
}

Status GroupCommitTable::get_first_block_load_queue(
Expand Down Expand Up @@ -287,7 +270,7 @@ Status GroupCommitTable::get_first_block_load_queue(
}
}
return Status::InternalError<false>("can not get a block queue for table_id: " +
std::to_string(_table_id));
std::to_string(_table_id) + _create_plan_failed_reason);
};

if (try_to_get_matched_queue().ok()) {
Expand All @@ -310,7 +293,11 @@ Status GroupCommitTable::get_first_block_load_queue(
}};
auto st = _create_group_commit_load(be_exe_version, mem_tracker);
if (!st.ok()) {
LOG(WARNING) << "create group commit load error, st=" << st.to_string();
LOG(WARNING) << "create group commit load error: " << st.to_string();
_create_plan_failed_reason = ". create group commit load error: " +
st.to_string().substr(0, 300);
} else {
_create_plan_failed_reason = "";
}
}));
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ class LoadBlockQueue {
Status add_block(RuntimeState* runtime_state, std::shared_ptr<vectorized::Block> block,
bool write_wal, UniqueId& load_id);
Status get_block(RuntimeState* runtime_state, vectorized::Block* block, bool* find_block,
bool* eos, std::shared_ptr<pipeline::Dependency> get_block_dep,
std::shared_ptr<pipeline::Dependency> timer_dependency);
bool* eos, std::shared_ptr<pipeline::Dependency> get_block_dep);
bool contain_load_id(const UniqueId& load_id);
Status add_load_id(const UniqueId& load_id,
const std::shared_ptr<pipeline::Dependency> put_block_dep);
Expand Down Expand Up @@ -124,6 +123,7 @@ class LoadBlockQueue {

private:
void _cancel_without_lock(const Status& st);
std::string _get_load_ids();

// the set of load ids of all blocks in this queue
std::map<UniqueId, std::shared_ptr<pipeline::Dependency>> _load_ids_to_write_dep;
Expand Down Expand Up @@ -196,6 +196,7 @@ class GroupCommitTable {
std::tuple<std::shared_ptr<pipeline::Dependency>,
std::shared_ptr<pipeline::Dependency>, int64_t, int64_t>>
_create_plan_deps;
std::string _create_plan_failed_reason;
};

class GroupCommitMgr {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.nereids.NereidsPlanner;
Expand All @@ -50,6 +51,7 @@
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TOlapTableLocationParam;
import org.apache.doris.thrift.TPartitionType;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
Expand Down Expand Up @@ -92,6 +94,9 @@ public void beginTransaction() {
return;
}
try {
if (DebugPointUtil.isEnable("OlapInsertExecutor.beginTransaction.failed")) {
throw new BeginTransactionException("current running txns on db is larger than limit");
}
this.txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
database.getId(), ImmutableList.of(table.getId()), labelName,
new TxnCoordinator(TxnSourceType.FE, 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ suite("test_group_commit_error", "nonConcurrent") {

GetDebugPoint().clearDebugPointsForAllBEs()
GetDebugPoint().clearDebugPointsForAllFEs()
try {
GetDebugPoint().enableDebugPointForAllFEs("OlapInsertExecutor.beginTransaction.failed")
sql """ set group_commit = async_mode """
sql """ insert into ${tableName} values (1, 1) """
assertTrue(false)
} catch (Exception e) {
logger.info("failed: " + e.getMessage())
assertTrue(e.getMessage().contains("begin transaction failed"))
} finally {
GetDebugPoint().clearDebugPointsForAllFEs()
}

try {
GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr.exec_plan_fragment.failed")
sql """ set group_commit = async_mode """
Expand Down
Loading