diff --git a/be/src/runtime/buffered_block_mgr2.cc b/be/src/runtime/buffered_block_mgr2.cc index 18f5b6383d07bf..d86a39b8c71925 100644 --- a/be/src/runtime/buffered_block_mgr2.cc +++ b/be/src/runtime/buffered_block_mgr2.cc @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "exec/exec_node.h" #include "runtime/buffered_block_mgr2.h" - #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/mem_tracker.h" @@ -219,7 +219,8 @@ BufferedBlockMgr2::BufferedBlockMgr2(RuntimeState* state, TmpFileMgr* tmp_file_m _non_local_outstanding_writes(0), _io_mgr(state->exec_env()->disk_io_mgr()), _is_cancelled(false), - _writes_issued(0) { + _writes_issued(0), + _state(state){ } Status BufferedBlockMgr2::create( @@ -313,6 +314,11 @@ bool BufferedBlockMgr2::try_acquire_tmp_reservation(Client* client, int num_buff } bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) { + // Later, we use this interface to manage the consumption of memory of hashtable instead of ReservationTracker. + // So it is possible to allocate 0, which has no additional impact on the behavior of BufferedBlockMgr. + // The process of memory allocation still by BufferPool, Because bufferpool has done a lot of optimization in memory allocation + // which is better than using the new operator directly. + if (size == 0) return true; // Workaround IMPALA-1619. Return immediately if the allocation size will cause // an arithmetic overflow. if (UNLIKELY(size >= (1LL << 31))) { @@ -321,7 +327,6 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) { return false; } int buffers_needed = BitUtil::ceil(size, max_block_size()); - DCHECK_GT(buffers_needed, 0) << "Trying to consume 0 memory"; unique_lock lock(_lock); if (size < max_block_size() && _mem_tracker->try_consume(size)) { @@ -331,7 +336,7 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) { return true; } - if (std::max(0, remaining_unreserved_buffers()) + + if (available_buffers(client) + client->_num_tmp_reserved_buffers < buffers_needed) { return false; } @@ -436,7 +441,15 @@ Status BufferedBlockMgr2::mem_limit_too_low_error(Client* client, int node_id) { << PrettyPrinter::print( client->_num_reserved_buffers * max_block_size(), TUnit::BYTES) << "."; - return Status::MemoryLimitExceeded(error_msg.str()); + return add_exec_msg(error_msg.str()); +} + +Status BufferedBlockMgr2::add_exec_msg(const std::string& msg) const { + stringstream str; + str << msg << " "; + str << "Backend: " << BackendOptions::get_localhost() << ", "; + str << "fragment: " << print_id(_state->fragment_instance_id()) << " "; + return Status::MemoryLimitExceeded(str.str()); } Status BufferedBlockMgr2::get_new_block( @@ -914,11 +927,12 @@ void BufferedBlockMgr2::delete_block(Block* block) { if (block->is_max_size()) { --_total_pinned_buffers; } - block->_is_pinned = false; block->_client->unpin_buffer(block->_buffer_desc); - if (block->_client->_num_pinned_buffers < block->_client->_num_reserved_buffers) { + // Only block is io size we need change _unfullfilled_reserved_buffers + if (block->is_max_size() && block->_client->_num_pinned_buffers < block->_client->_num_reserved_buffers) { ++_unfullfilled_reserved_buffers; } + block->_is_pinned = false; } else if (_unpinned_blocks.contains(block)) { // Remove block from unpinned list. _unpinned_blocks.remove(block); @@ -1034,7 +1048,7 @@ Status BufferedBlockMgr2::find_buffer_for_block(Block* block, bool* in_mem) { << endl << debug_internal() << endl << client->debug_string(); VLOG_QUERY << ss.str(); } - return Status::MemoryLimitExceeded("Query did not have enough memory to get the minimum required " + return add_exec_msg("Query did not have enough memory to get the minimum required " "buffers in the block manager."); } @@ -1089,8 +1103,8 @@ Status BufferedBlockMgr2::find_buffer( // There are no free buffers. If spills are disabled or there no unpinned blocks we // can write, return. We can't get a buffer. if (!_enable_spill) { - return Status::InternalError("Spilling has been disabled for plans," - "current memory usage has reached the bottleneck." + return add_exec_msg("Spilling has been disabled for plans," + "current memory usage has reached the bottleneck. " "You can avoid the behavior via increasing the mem limit " "by session variable exec_mem_limior or enable spilling."); } diff --git a/be/src/runtime/buffered_block_mgr2.h b/be/src/runtime/buffered_block_mgr2.h index 982fa07e7366f5..b2d355b1f63538 100644 --- a/be/src/runtime/buffered_block_mgr2.h +++ b/be/src/runtime/buffered_block_mgr2.h @@ -508,6 +508,9 @@ class BufferedBlockMgr2 { bool validate() const; std::string debug_internal() const; + // Add BE hostname and fragmentid for debug tuning + Status add_exec_msg(const std::string& msg) const; + // Size of the largest/default block in bytes. const int64_t _max_block_size; @@ -638,6 +641,9 @@ class BufferedBlockMgr2 { typedef boost::unordered_map > BlockMgrsMap; static BlockMgrsMap _s_query_to_block_mgrs; + // Unowned. + RuntimeState* _state; + }; // class BufferedBlockMgr2 } // end namespace doris