Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions be/src/runtime/buffered_block_mgr2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#include "exec/exec_node.h"
#include "runtime/buffered_block_mgr2.h"

#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/mem_tracker.h"
Expand Down Expand Up @@ -219,7 +219,8 @@ BufferedBlockMgr2::BufferedBlockMgr2(RuntimeState* state, TmpFileMgr* tmp_file_m
_non_local_outstanding_writes(0),
_io_mgr(state->exec_env()->disk_io_mgr()),
_is_cancelled(false),
_writes_issued(0) {
_writes_issued(0),
_state(state){
}

Status BufferedBlockMgr2::create(
Expand Down Expand Up @@ -313,6 +314,11 @@ bool BufferedBlockMgr2::try_acquire_tmp_reservation(Client* client, int num_buff
}

bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) {
// Later, we use this interface to manage the consumption of memory of hashtable instead of ReservationTracker.
// So it is possible to allocate 0, which has no additional impact on the behavior of BufferedBlockMgr.
// The process of memory allocation still by BufferPool, Because bufferpool has done a lot of optimization in memory allocation
// which is better than using the new operator directly.
if (size == 0) return true;
// Workaround IMPALA-1619. Return immediately if the allocation size will cause
// an arithmetic overflow.
if (UNLIKELY(size >= (1LL << 31))) {
Expand All @@ -321,7 +327,6 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) {
return false;
}
int buffers_needed = BitUtil::ceil(size, max_block_size());
DCHECK_GT(buffers_needed, 0) << "Trying to consume 0 memory";
unique_lock<mutex> lock(_lock);

if (size < max_block_size() && _mem_tracker->try_consume(size)) {
Expand All @@ -331,7 +336,7 @@ bool BufferedBlockMgr2::consume_memory(Client* client, int64_t size) {
return true;
}

if (std::max<int64_t>(0, remaining_unreserved_buffers()) +
if (available_buffers(client) +
client->_num_tmp_reserved_buffers < buffers_needed) {
return false;
}
Expand Down Expand Up @@ -436,7 +441,15 @@ Status BufferedBlockMgr2::mem_limit_too_low_error(Client* client, int node_id) {
<< PrettyPrinter::print(
client->_num_reserved_buffers * max_block_size(), TUnit::BYTES)
<< ".";
return Status::MemoryLimitExceeded(error_msg.str());
return add_exec_msg(error_msg.str());
}

Status BufferedBlockMgr2::add_exec_msg(const std::string& msg) const {
stringstream str;
str << msg << " ";
str << "Backend: " << BackendOptions::get_localhost() << ", ";
str << "fragment: " << print_id(_state->fragment_instance_id()) << " ";
return Status::MemoryLimitExceeded(str.str());
}

Status BufferedBlockMgr2::get_new_block(
Expand Down Expand Up @@ -914,11 +927,12 @@ void BufferedBlockMgr2::delete_block(Block* block) {
if (block->is_max_size()) {
--_total_pinned_buffers;
}
block->_is_pinned = false;
block->_client->unpin_buffer(block->_buffer_desc);
if (block->_client->_num_pinned_buffers < block->_client->_num_reserved_buffers) {
// Only block is io size we need change _unfullfilled_reserved_buffers
if (block->is_max_size() && block->_client->_num_pinned_buffers < block->_client->_num_reserved_buffers) {
++_unfullfilled_reserved_buffers;
}
block->_is_pinned = false;
} else if (_unpinned_blocks.contains(block)) {
// Remove block from unpinned list.
_unpinned_blocks.remove(block);
Expand Down Expand Up @@ -1034,7 +1048,7 @@ Status BufferedBlockMgr2::find_buffer_for_block(Block* block, bool* in_mem) {
<< endl << debug_internal() << endl << client->debug_string();
VLOG_QUERY << ss.str();
}
return Status::MemoryLimitExceeded("Query did not have enough memory to get the minimum required "
return add_exec_msg("Query did not have enough memory to get the minimum required "
"buffers in the block manager.");
}

Expand Down Expand Up @@ -1089,8 +1103,8 @@ Status BufferedBlockMgr2::find_buffer(
// There are no free buffers. If spills are disabled or there no unpinned blocks we
// can write, return. We can't get a buffer.
if (!_enable_spill) {
return Status::InternalError("Spilling has been disabled for plans,"
"current memory usage has reached the bottleneck."
return add_exec_msg("Spilling has been disabled for plans,"
"current memory usage has reached the bottleneck. "
"You can avoid the behavior via increasing the mem limit "
"by session variable exec_mem_limior or enable spilling.");
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/buffered_block_mgr2.h
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,9 @@ class BufferedBlockMgr2 {
bool validate() const;
std::string debug_internal() const;

// Add BE hostname and fragmentid for debug tuning
Status add_exec_msg(const std::string& msg) const;

// Size of the largest/default block in bytes.
const int64_t _max_block_size;

Expand Down Expand Up @@ -638,6 +641,9 @@ class BufferedBlockMgr2 {
typedef boost::unordered_map<TUniqueId, boost::weak_ptr<BufferedBlockMgr2> > BlockMgrsMap;
static BlockMgrsMap _s_query_to_block_mgrs;

// Unowned.
RuntimeState* _state;

}; // class BufferedBlockMgr2

} // end namespace doris
Expand Down