diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp index 0ce2f50ffc4a94..86db457809d851 100644 --- a/be/src/exec/analytic_eval_node.cpp +++ b/be/src/exec/analytic_eval_node.cpp @@ -20,7 +20,6 @@ #include "exprs/agg_fn_evaluator.h" #include "exprs/anyval_util.h" -#include "runtime/buffered_tuple_stream.h" #include "runtime/descriptors.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" @@ -195,9 +194,19 @@ Status AnalyticEvalNode::open(RuntimeState* state) { RETURN_IF_CANCELLED(state); //RETURN_IF_ERROR(QueryMaintenance(state)); RETURN_IF_ERROR(child(0)->open(state)); - // RETURN_IF_ERROR(state->block_mgr()->RegisterClient(2, mem_tracker(), state, &client_)); - _input_stream.reset(new BufferedTupleStream(state, child(0)->row_desc(), state->block_mgr())); - RETURN_IF_ERROR(_input_stream->init(runtime_profile())); + RETURN_IF_ERROR(state->block_mgr2()->register_client(2, mem_tracker(), state, &client_)); + _input_stream.reset(new BufferedTupleStream2(state, child(0)->row_desc(), state->block_mgr2(), client_, false, true)); + RETURN_IF_ERROR(_input_stream->init(id(), runtime_profile(), true)); + + bool got_read_buffer; + RETURN_IF_ERROR(_input_stream->prepare_for_read(true, &got_read_buffer)); + if (!got_read_buffer) { + std::string msg("Failed to acquire initial read buffer for analytic function " + "evaluation. Reducing query concurrency or increasing the memory limit may " + "help this query to complete successfully."); + return mem_tracker()->MemLimitExceeded(state, msg, -1); + } + DCHECK_EQ(_evaluators.size(), _fn_ctxs.size()); for (int i = 0; i < _evaluators.size(); ++i) { @@ -673,19 +682,20 @@ Status AnalyticEvalNode::process_child_batch(RuntimeState* state) { try_add_result_tuple_for_curr_row(stream_idx, row); + Status status = Status::OK(); // Buffer the entire input row to be returned later with the analytic eval results. - if (UNLIKELY(!_input_stream->add_row(row))) { + if (UNLIKELY(!_input_stream->add_row(row, &status))) { // AddRow returns false if an error occurs (available via status()) or there is // not enough memory (status() is OK). If there isn't enough memory, we unpin // the stream and continue writing/reading in unpinned mode. // TODO: Consider re-pinning later if the output stream is fully consumed. - RETURN_IF_ERROR(_input_stream->status()); - // RETURN_IF_ERROR(_input_stream->UnpinStream()); + RETURN_IF_ERROR(status); + RETURN_IF_ERROR(_input_stream->unpin_stream()); VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx; - if (!_input_stream->add_row(row)) { + if (!_input_stream->add_row(row, &status)) { // Rows should be added in unpinned mode unless an error occurs. - RETURN_IF_ERROR(_input_stream->status()); + RETURN_IF_ERROR(status); DCHECK(false); } } diff --git a/be/src/exec/analytic_eval_node.h b/be/src/exec/analytic_eval_node.h index 882b3d391cc97e..52516f6587073d 100644 --- a/be/src/exec/analytic_eval_node.h +++ b/be/src/exec/analytic_eval_node.h @@ -21,8 +21,9 @@ #include "exec/exec_node.h" #include "exprs/expr.h" //#include "exprs/expr_context.h" -#include "runtime/buffered_block_mgr.h" -#include "runtime/buffered_tuple_stream.h" +#include "runtime/buffered_block_mgr2.h" +#include "runtime/buffered_tuple_stream2.inline.h" +#include "runtime/buffered_tuple_stream2.h" #include "runtime/tuple.h" #include "thrift/protocol/TDebugProtocol.h" @@ -307,7 +308,7 @@ class AnalyticEvalNode : public ExecNode { boost::scoped_ptr _curr_child_batch; // Block manager client used by _input_stream. Not owned. - // BufferedBlockMgr::Client* client_; + BufferedBlockMgr2::Client* client_; // Buffers input rows added in process_child_batch() until enough rows are able to // be returned by get_next_output_batch(), in which case row batches are returned from @@ -317,7 +318,7 @@ class AnalyticEvalNode : public ExecNode { // buffered data exceeds the available memory in the underlying BufferedBlockMgr, // _input_stream is unpinned (i.e., possibly spilled to disk if necessary). // TODO: Consider re-pinning unpinned streams when possible. - boost::scoped_ptr _input_stream; + boost::scoped_ptr _input_stream; // Pool used for O(1) allocations that live until close. boost::scoped_ptr _mem_pool; diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index b56af60555db18..a75d70194a28d5 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -110,7 +110,7 @@ Status ExchangeNode::close(RuntimeState* state) { if (_stream_recvr != NULL) { _stream_recvr->close(); } - _stream_recvr.reset(); + // _stream_recvr.reset(); return ExecNode::close(state); } diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 642668ef59e606..1df18a34de9c15 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -25,11 +25,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/runtime") set(RUNTIME_FILES broker_mgr.cpp - buffered_block_mgr.cpp - buffered_tuple_stream.cpp - buffered_tuple_stream_ir.cpp buffer_control_block.cpp - merge_sorter.cpp client_cache.cpp data_stream_mgr.cpp data_stream_sender.cpp diff --git a/be/src/runtime/buffered_block_mgr.cpp b/be/src/runtime/buffered_block_mgr.cpp deleted file mode 100644 index 1615792e68a63d..00000000000000 --- a/be/src/runtime/buffered_block_mgr.cpp +++ /dev/null @@ -1,94 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "exec/exec_node.h" -#include "runtime/buffered_block_mgr.h" -#include "runtime/runtime_state.h" -#include "runtime/mem_pool.h" -#include "util/runtime_profile.h" -#include "runtime/mem_tracker.h" - -namespace doris { - -//BufferedBlockMgr::BlockMgrsMap BufferedBlockMgr::query_to_block_mgrs_; -//SpinLock BufferedBlockMgr::static_block_mgrs_lock_; - -// BufferedBlockMgr::Block methods. -BufferedBlockMgr::Block::Block() - : _buffer_desc(NULL), - // _block_mgr(block_mgr), - _valid_data_len(0), - _num_rows(0) -{ -} - -Status BufferedBlockMgr::create(RuntimeState* state, - int64_t block_size, boost::shared_ptr* block_mgr) { - block_mgr->reset(new BufferedBlockMgr(state, block_size)); - (*block_mgr)->init(state); - return Status::OK(); -} - -void BufferedBlockMgr::init(RuntimeState* state) { - _state = state; - _tuple_pool.reset(new MemPool(state->instance_mem_tracker())); -} - -void BufferedBlockMgr::Block::init() { - _valid_data_len = 0; - _num_rows = 0; -} - -BufferedBlockMgr::BufferedBlockMgr(RuntimeState* state, int64_t block_size) - : _max_block_size(block_size) -{ -} - -Status BufferedBlockMgr::get_new_block(Block** block, int64_t len) { - DCHECK_LE(len, _max_block_size) << "Cannot request blocks bigger than max_len"; - - *block = NULL; - Block* new_block = _obj_pool.add(new Block()); - if (UNLIKELY(new_block == NULL)) { - return Status::InternalError("Allocate memory failed."); - } - - uint8_t* buffer = _tuple_pool->allocate(len); - if (UNLIKELY(buffer == NULL)) { - return Status::InternalError("Allocate memory failed."); - } - - //new_block->set_buffer_desc(_obj_pool.Add(new BufferDescriptor(buffer, len)); - new_block->_buffer_desc = _obj_pool.add(new BufferDescriptor(buffer, len)); - new_block->_buffer_desc->block = new_block; - *block = new_block; - - RETURN_IF_LIMIT_EXCEEDED(_state, "Buffered block mgr, while getting new block"); - - return Status::OK(); -} - -//Status BufferedBlockMgr::Block::delete() { - //// TODO: delete block or not,we should delete the new BLOCK - //return Status::OK(); -//} - -Status BufferedBlockMgr::get_new_block(Block** block) { - return get_new_block(block, _max_block_size); -} - -} // namespace doris diff --git a/be/src/runtime/buffered_block_mgr.h b/be/src/runtime/buffered_block_mgr.h deleted file mode 100644 index e268cf61b7b3f5..00000000000000 --- a/be/src/runtime/buffered_block_mgr.h +++ /dev/null @@ -1,170 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef INF_DORIS_QE_SRC_BE_SRC_RUNTIME_BUFFERED_BLOCK_MGR_H -#define INF_DORIS_QE_SRC_BE_SRC_RUNTIME_BUFFERED_BLOCK_MGR_H - -#include -#include -#include -#include -#include "runtime/mem_pool.h" -#include "common/logging.h" -#include - -namespace doris { - -class RuntimeState; - -class BufferedBlockMgr { -private: - struct BufferDescriptor; - -public: - class Block { //}: public InternalQueue::Node { - public: - ~Block() {} - void add_row() { - ++_num_rows; - } - int num_rows() const { - return _num_rows; - } - - // Allocates the specified number of bytes from this block. - template T* allocate(int size) { - DCHECK_GE(bytes_remaining(), size); - uint8_t* current_location = _buffer_desc->buffer + _valid_data_len; - _valid_data_len += size; - return reinterpret_cast(current_location); - } - - // Return the number of remaining bytes that can be allocated in this block. - int bytes_remaining() const { - DCHECK(_buffer_desc != NULL); - return _buffer_desc->len - _valid_data_len; - } - - // Return size bytes from the most recent allocation. - void return_allocation(int size) { - DCHECK_GE(_valid_data_len, size); - _valid_data_len -= size; - } - - // Pointer to start of the block data in memory. Only guaranteed to be valid if the - // block is pinned. - uint8_t* buffer() const { - DCHECK(_buffer_desc != NULL); - return _buffer_desc->buffer; - } - - // Return the number of bytes allocated in this block. - int64_t valid_data_len() const { - return _valid_data_len; - } - - // Returns the length of the underlying buffer. Only callable if the block is - // pinned. - int64_t buffer_len() const { - return _buffer_desc->len; - } - - // Returns true if this block is the max block size. Only callable if the block - // is pinned. - bool is_max_size() const { - return _buffer_desc->len == _block_mgr->max_block_size(); - } - - Status delete_block() { - return Status::OK(); - } - - // Debug helper method to print the state of a block. - std::string debug_string() const { - return ""; - } - - private: - friend class BufferedBlockMgr; - - //Block(BufferedBlockMgr* block_mgr); - Block(); - - void init(); - - // Pointer to the buffer associated with the block. NULL if the block is not in - // memory and cannot be changed while the block is pinned or being written. - BufferDescriptor* _buffer_desc; - - // Parent block manager object. Responsible for maintaining the state of the block. - BufferedBlockMgr* _block_mgr; - - // Length of valid (i.e. allocated) data within the block. - int64_t _valid_data_len; - - // Number of rows in this block. - int _num_rows; - }; // class Block - - static Status create(RuntimeState* state, - int64_t block_size, boost::shared_ptr* block_mgr); - - ~BufferedBlockMgr() {}; - - Status get_new_block(Block** block, int64_t len); - - Status get_new_block(Block** block); - int64_t max_block_size() const { - return _max_block_size; - } - -private: - - // Descriptor for a single memory buffer in the pool. - struct BufferDescriptor { //}: public InternalQueue::Node { - // Start of the buffer - uint8_t* buffer; - - // Length of the buffer - int64_t len; - - // Block that this buffer is assigned to. May be NULL. - Block* block; - - // Iterator into all_io_buffers_ for this buffer. - std::list::iterator all_buffers_it; - - BufferDescriptor(uint8_t* buf, int64_t len) - : buffer(buf), len(len), block(NULL) { - } - }; //block - -private: - BufferedBlockMgr(RuntimeState* state, int64_t block_size); - void init(RuntimeState* state); - // Size of the largest/default block in bytes. - const int64_t _max_block_size; - ObjectPool _obj_pool; - //MemPool* _tuple_pool; - boost::scoped_ptr _tuple_pool; - RuntimeState* _state; - -}; // class BufferedBlockMgr - -} // namespace doris. - -#endif diff --git a/be/src/runtime/buffered_block_mgr2.cc b/be/src/runtime/buffered_block_mgr2.cc index f5772c79733933..01db890cc31b20 100644 --- a/be/src/runtime/buffered_block_mgr2.cc +++ b/be/src/runtime/buffered_block_mgr2.cc @@ -210,8 +210,7 @@ BufferedBlockMgr2::BufferedBlockMgr2(RuntimeState* state, TmpFileMgr* tmp_file_m _max_block_size(block_size), // Keep two writes in flight per scratch disk so the disks can stay busy. _block_write_threshold(tmp_file_mgr->num_active_tmp_devices() * 2), - // _disable_spill(state->query_ctx().disable_spilling), - _disable_spill(false), + _enable_spill(state->enable_spill()), _query_id(state->query_id()), _tmp_file_mgr(tmp_file_mgr), _initialized(false), @@ -736,7 +735,7 @@ Status BufferedBlockMgr2::unpin_block(Block* block) { } Status BufferedBlockMgr2::write_unpinned_blocks() { - if (_disable_spill) { + if (!_enable_spill) { return Status::OK(); } @@ -1089,11 +1088,11 @@ Status BufferedBlockMgr2::find_buffer( if (_free_io_buffers.empty()) { // There are no free buffers. If spills are disabled or there no unpinned blocks we // can write, return. We can't get a buffer. - if (_disable_spill) { - return Status::InternalError("Spilling has been disabled for plans that do not have stats and " - "are not hinted to prevent potentially bad plans from using too many cluster " - "resources. Compute stats on these tables, hint the plan or disable this " - "behavior via query options to enable spilling."); + if (!_enable_spill) { + return Status::InternalError("Spilling has been disabled for plans," + "current memory usage has reached the bottleneck." + "You can avoid the behavior via increasing the mem limit " + "by session variable exec_mem_limior or enable spilling."); } // Third, this block needs to use a buffer that was unpinned from another block. @@ -1214,7 +1213,7 @@ bool BufferedBlockMgr2::validate() const { // Check if we're writing blocks when the number of free buffers falls below // threshold. We don't write blocks after cancellation. - if (!_is_cancelled && !_unpinned_blocks.empty() && !_disable_spill && + if (!_is_cancelled && !_unpinned_blocks.empty() && _enable_spill && (_free_io_buffers.size() + _non_local_outstanding_writes < _block_write_threshold)) { // TODO: this isn't correct when write_unpinned_blocks() fails during the call to diff --git a/be/src/runtime/buffered_block_mgr2.h b/be/src/runtime/buffered_block_mgr2.h index dba452b0db477f..982fa07e7366f5 100644 --- a/be/src/runtime/buffered_block_mgr2.h +++ b/be/src/runtime/buffered_block_mgr2.h @@ -515,9 +515,9 @@ class BufferedBlockMgr2 { // Equal to the number of disks. const int _block_write_threshold; - // If true, spilling is disabled. The client calls will fail if there is not enough + // If false, spilling is disabled. The client calls will fail if there is not enough // memory. - const bool _disable_spill; + const bool _enable_spill; const TUniqueId _query_id; diff --git a/be/src/runtime/buffered_tuple_stream.cpp b/be/src/runtime/buffered_tuple_stream.cpp deleted file mode 100644 index 8f4dd1924aeec4..00000000000000 --- a/be/src/runtime/buffered_tuple_stream.cpp +++ /dev/null @@ -1,492 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/buffered_tuple_stream.h" - -#include -//#include - -#include "runtime/descriptors.h" -#include "runtime/row_batch.h" -#include "runtime/tuple_row.h" -#include "util/bit_util.h" -#include "util/debug_util.h" -#include "util/pretty_printer.h" -#include "common/status.h" - -namespace doris { -//using namespace strings; - -// The first NUM_SMALL_BLOCKS of the tuple stream are made of blocks less than the -// io size. These blocks never spill. -static const int64_t INITIAL_BLOCK_SIZES[] = -{ 64 * 1024, 512 * 1024 }; -static const int NUM_SMALL_BLOCKS = sizeof(INITIAL_BLOCK_SIZES) / sizeof(int64_t); - -std::string BufferedTupleStream::RowIdx::debug_string() const { - std::stringstream ss; - ss << "RowIdx block=" << block() << " offset=" << offset() << " idx=" << idx(); - return ss.str(); -} - -BufferedTupleStream::BufferedTupleStream(RuntimeState* state, - const RowDescriptor& row_desc, BufferedBlockMgr* block_mgr) - : _use_small_buffers(false), - _delete_on_read(false), - _read_write(true), - _state(state), - _desc(row_desc), - _nullable_tuple(row_desc.is_any_tuple_nullable()), - _block_mgr(block_mgr), - // block_mgr_client_(client), - _total_byte_size(0), - _read_ptr(NULL), - _read_tuple_idx(0), - _read_bytes(0), - _rows_returned(0), - _read_block_idx(-1), - _write_block(NULL), - _num_small_blocks(0), - _closed(false), - _num_rows(0), - // pinned_(true), - _pin_timer(NULL), - _unpin_timer(NULL), - _get_new_block_timer(NULL) { - _null_indicators_read_block = _null_indicators_write_block = -1; - _read_block = _blocks.end(); - _fixed_tuple_row_size = 0; - - for (int i = 0; i < _desc.tuple_descriptors().size(); ++i) { - const TupleDescriptor* tuple_desc = _desc.tuple_descriptors()[i]; - const int tuple_byte_size = tuple_desc->byte_size(); - _fixed_tuple_row_size += tuple_byte_size; - - if (tuple_desc->string_slots().empty()) { - continue; - } - - _string_slots.push_back(make_pair(i, tuple_desc->string_slots())); - } -} - -std::string BufferedTupleStream::debug_string() const { - std::stringstream ss; - ss << "BufferedTupleStream num_rows=" << _num_rows << " rows_returned=" - << _rows_returned << " delete_on_read=" << (_delete_on_read ? "true" : "false") - << " closed=" << (_closed ? "true" : "false") - << " write_block=" << _write_block << " _read_block="; - - if (_read_block == _blocks.end()) { - ss << ""; - } else { - ss << *_read_block; - } - - ss << " blocks=[\n"; - - for (std::list::const_iterator it = _blocks.begin(); - it != _blocks.end(); ++it) { - ss << "{" << (*it)->debug_string() << "}"; - - if (*it != _blocks.back()) { - ss << ",\n"; - } - } - - ss << "]"; - return ss.str(); -} - -Status BufferedTupleStream::init(RuntimeProfile* profile) { - if (profile != NULL) { - _get_new_block_timer = ADD_TIMER(profile, "GetNewBlockTime"); - } - - if (_block_mgr->max_block_size() < INITIAL_BLOCK_SIZES[0]) { - _use_small_buffers = false; - } - - bool got_block = false; - RETURN_IF_ERROR(new_block_for_write(_fixed_tuple_row_size, &got_block)); - - if (!got_block) { - return Status::InternalError("Allocate memory failed."); - } - - DCHECK(_write_block != NULL); - - if (_read_write) { - RETURN_IF_ERROR(prepare_for_read()); - }; - - return Status::OK(); -} - -void BufferedTupleStream::close() { - for (std::list::iterator it = _blocks.begin(); - it != _blocks.end(); ++it) { - (*it)->delete_block(); - } - - _blocks.clear(); - _closed = true; -} - -Status BufferedTupleStream::new_block_for_write(int min_size, bool* got_block) { - DCHECK(!_closed); - - if (min_size > _block_mgr->max_block_size()) { - std::stringstream err_msg; - err_msg << "Cannot process row that is bigger than the IO size " - << "(row_size=" << PrettyPrinter::print(min_size, TUnit::BYTES) - << ". To run this query, increase the io size"; - return Status::InternalError(err_msg.str()); - } - - int64_t block_len = _block_mgr->max_block_size(); - BufferedBlockMgr::Block* new_block = NULL; - { - SCOPED_TIMER(_get_new_block_timer); - RETURN_IF_ERROR(_block_mgr->get_new_block(&new_block, block_len)); - } - - // Compute and allocate the block header with the null indicators - _null_indicators_write_block = compute_num_null_indicator_bytes(block_len); - new_block->allocate(_null_indicators_write_block); - _write_tuple_idx = 0; - - _blocks.push_back(new_block); - _block_start_idx.push_back(new_block->buffer()); - _write_block = new_block; - DCHECK_EQ(_write_block->num_rows(), 0); - _total_byte_size += block_len; - - *got_block = (new_block != NULL); - return Status::OK(); -} - -Status BufferedTupleStream::next_block_for_read() { - DCHECK(!_closed); - DCHECK(_read_block != _blocks.end()); - - ++_read_block; - ++_read_block_idx; - - _read_ptr = NULL; - _read_tuple_idx = 0; - _read_bytes = 0; - - if (_read_block != _blocks.end()) { - _null_indicators_read_block = - compute_num_null_indicator_bytes((*_read_block)->buffer_len()); - _read_ptr = (*_read_block)->buffer() + _null_indicators_read_block; - } - - return Status::OK(); -} - -Status BufferedTupleStream::prepare_for_read(bool* got_buffer) { - DCHECK(!_closed); - - if (_blocks.empty()) { - return Status::OK(); - } - - _read_block = _blocks.begin(); - DCHECK(_read_block != _blocks.end()); - _null_indicators_read_block = - compute_num_null_indicator_bytes((*_read_block)->buffer_len()); - _read_ptr = (*_read_block)->buffer() + _null_indicators_read_block; - _read_tuple_idx = 0; - _read_bytes = 0; - _rows_returned = 0; - _read_block_idx = 0; - - if (got_buffer != NULL) { - *got_buffer = true; - } - - return Status::OK(); -} - -int BufferedTupleStream::compute_num_null_indicator_bytes(int block_size) const { - if (_nullable_tuple) { - // We assume that all rows will use their max size, so we may be underutilizing the - // space, i.e. we may have some unused space in case of rows with NULL tuples. - const uint32_t tuples_per_row = _desc.tuple_descriptors().size(); - const uint32_t min_row_size_in_bits = 8 * _fixed_tuple_row_size + tuples_per_row; - const uint32_t block_size_in_bits = 8 * block_size; - const uint32_t max_num_rows = block_size_in_bits / min_row_size_in_bits; - return - BitUtil::round_up_numi64(max_num_rows * tuples_per_row) * 8; - } else { - // If there are no nullable tuples then no need to waste space for null indicators. - return 0; - } -} - -Status BufferedTupleStream::get_next(RowBatch* batch, bool* eos, - std::vector* indices) { - if (_nullable_tuple) { - return get_next_internal(batch, eos, indices); - } else { - return get_next_internal(batch, eos, indices); - } -} - -template -Status BufferedTupleStream::get_next_internal(RowBatch* batch, bool* eos, - std::vector* indices) { - DCHECK(!_closed); - DCHECK(batch->row_desc().equals(_desc)); - *eos = (_rows_returned == _num_rows); - - if (*eos) { - return Status::OK(); - } - - DCHECK_GE(_null_indicators_read_block, 0); - - const uint64_t tuples_per_row = _desc.tuple_descriptors().size(); - DCHECK_LE(_read_tuple_idx / tuples_per_row, (*_read_block)->num_rows()); - DCHECK_EQ(_read_tuple_idx % tuples_per_row, 0); - int rows_returned_curr_block = _read_tuple_idx / tuples_per_row; - - int64_t data_len = (*_read_block)->valid_data_len() - _null_indicators_read_block; - - if (UNLIKELY(rows_returned_curr_block == (*_read_block)->num_rows())) { - // Get the next block in the stream. We need to do this at the beginning of - // the GetNext() call to ensure the buffer management semantics. NextBlockForRead() - // will recycle the memory for the rows returned from the *previous* call to - // GetNext(). - RETURN_IF_ERROR(next_block_for_read()); - DCHECK(_read_block != _blocks.end()) << debug_string(); - DCHECK_GE(_null_indicators_read_block, 0); - data_len = (*_read_block)->valid_data_len() - _null_indicators_read_block; - rows_returned_curr_block = 0; - } - - DCHECK(_read_block != _blocks.end()); - //DCHECK((*_read_block)->is_pinned()) << DebugString(); - DCHECK(_read_ptr != NULL); - - int64_t rows_left = _num_rows - _rows_returned; - int rows_to_fill = std::min( - static_cast(batch->capacity() - batch->num_rows()), rows_left); - DCHECK_GE(rows_to_fill, 1); - batch->add_rows(rows_to_fill); - uint8_t* tuple_row_mem = reinterpret_cast(batch->get_row(batch->num_rows())); - - - // Produce tuple rows from the current block and the corresponding position on the - // null tuple indicator. - std::vector local_indices; - - if (indices == NULL) { - // A hack so that we do not need to check whether 'indices' is not null in the - // tight loop. - indices = &local_indices; - } else { - DCHECK(!_delete_on_read); - DCHECK_EQ(batch->num_rows(), 0); - indices->clear(); - } - - indices->reserve(rows_to_fill); - - int i = 0; - uint8_t* null_word = NULL; - uint32_t null_pos = 0; - // Start reading from position _read_tuple_idx in the block. - uint64_t last_read_ptr = 0; - uint64_t last_read_row = _read_tuple_idx / tuples_per_row; - - while (i < rows_to_fill) { - // Check if current block is done. - if (UNLIKELY(rows_returned_curr_block + i == (*_read_block)->num_rows())) { - break; - } - - // Copy the row into the output batch. - TupleRow* row = reinterpret_cast(tuple_row_mem); - last_read_ptr = reinterpret_cast(_read_ptr); - indices->push_back(RowIdx()); - DCHECK_EQ(indices->size(), i + 1); - (*indices)[i].set(_read_block_idx, _read_bytes + _null_indicators_read_block, - last_read_row); - - if (HasNullableTuple) { - for (int j = 0; j < tuples_per_row; ++j) { - // Stitch together the tuples from the block and the NULL ones. - null_word = (*_read_block)->buffer() + (_read_tuple_idx >> 3); - null_pos = _read_tuple_idx & 7; - ++_read_tuple_idx; - const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0); - // Copy tuple and advance _read_ptr. If it it is a NULL tuple, it calls SetTuple - // with Tuple* being 0x0. To do that we multiply the current _read_ptr with - // false (0x0). - row->set_tuple(j, reinterpret_cast( - reinterpret_cast(_read_ptr) * is_not_null)); - _read_ptr += _desc.tuple_descriptors()[j]->byte_size() * is_not_null; - } - - const uint64_t row_read_bytes = - reinterpret_cast(_read_ptr) - last_read_ptr; - DCHECK_GE(_fixed_tuple_row_size, row_read_bytes); - _read_bytes += row_read_bytes; - last_read_ptr = reinterpret_cast(_read_ptr); - } else { - // When we know that there are no nullable tuples we can safely copy them without - // checking for nullability. - for (int j = 0; j < tuples_per_row; ++j) { - row->set_tuple(j, reinterpret_cast(_read_ptr)); - _read_ptr += _desc.tuple_descriptors()[j]->byte_size(); - } - - _read_bytes += _fixed_tuple_row_size; - _read_tuple_idx += tuples_per_row; - } - - tuple_row_mem += sizeof(Tuple*) * tuples_per_row; - - // Update string slot ptrs. - for (int j = 0; j < _string_slots.size(); ++j) { - Tuple* tuple = row->get_tuple(_string_slots[j].first); - - if (HasNullableTuple && tuple == NULL) { - continue; - } - - DCHECK(tuple != nullptr); - - for (int k = 0; k < _string_slots[j].second.size(); ++k) { - const SlotDescriptor* slot_desc = _string_slots[j].second[k]; - - if (tuple->is_null(slot_desc->null_indicator_offset())) { - continue; - } - - StringValue* sv = tuple->get_string_slot(slot_desc->tuple_offset()); - DCHECK_LE(sv->len, data_len - _read_bytes); - sv->ptr = reinterpret_cast(_read_ptr); - _read_ptr += sv->len; - _read_bytes += sv->len; - } - } - - ++last_read_row; - ++i; - } - - batch->commit_rows(i); - _rows_returned += i; - *eos = (_rows_returned == _num_rows); - - if (rows_returned_curr_block + i == (*_read_block)->num_rows()) { - // No more data in this block. Mark this batch as needing to return so - // the caller can pass the rows up the operator tree. - batch->mark_need_to_return(); - } - - DCHECK_EQ(indices->size(), i); - return Status::OK(); -} - -// TODO: Move this somewhere in general. We don't want this function inlined -// for the buffered tuple stream case though. -// TODO: In case of null-able tuples we ignore the space we could have saved from the -// null tuples of this row. -int BufferedTupleStream::compute_row_size(TupleRow* row) const { - int size = _fixed_tuple_row_size; - - for (int i = 0; i < _string_slots.size(); ++i) { - Tuple* tuple = row->get_tuple(_string_slots[i].first); - - if (_nullable_tuple && tuple == NULL) { - continue; - } - - DCHECK(tuple != nullptr); - - for (int j = 0; j < _string_slots[i].second.size(); ++j) { - const SlotDescriptor* slot_desc = _string_slots[i].second[j]; - - if (tuple->is_null(slot_desc->null_indicator_offset())) { - continue; - } - - StringValue* sv = tuple->get_string_slot(slot_desc->tuple_offset()); - size += sv->len; - } - } - - return size; -} - -inline uint8_t* BufferedTupleStream::allocate_row(int size) { - DCHECK(!_closed); - - if (UNLIKELY(_write_block == NULL || _write_block->bytes_remaining() < size)) { - bool got_block = false; - _status = new_block_for_write(size, &got_block); - - if (!_status.ok() || !got_block) { - return NULL; - } - } - - DCHECK(_write_block != NULL); - // DCHECK(_write_block->is_pinned()); - DCHECK_GE(_write_block->bytes_remaining(), size); - ++_num_rows; - _write_block->add_row(); - return _write_block->allocate(size); -} - -inline void BufferedTupleStream::get_tuple_row(const RowIdx& idx, TupleRow* row) const { - DCHECK(!_closed); - //DCHECK(is_pinned()); - DCHECK(!_delete_on_read); - DCHECK_EQ(_blocks.size(), _block_start_idx.size()); - DCHECK_LT(idx.block(), _blocks.size()); - - uint8_t* data = _block_start_idx[idx.block()] + idx.offset(); - - if (_nullable_tuple) { - // Stitch together the tuples from the block and the NULL ones. - const int tuples_per_row = _desc.tuple_descriptors().size(); - uint32_t tuple_idx = idx.idx() * tuples_per_row; - - for (int i = 0; i < tuples_per_row; ++i) { - const uint8_t* null_word = _block_start_idx[idx.block()] + (tuple_idx >> 3); - const uint32_t null_pos = tuple_idx & 7; - const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0); - row->set_tuple(i, reinterpret_cast( - reinterpret_cast(data) * is_not_null)); - data += _desc.tuple_descriptors()[i]->byte_size() * is_not_null; - ++tuple_idx; - } - } else { - for (int i = 0; i < _desc.tuple_descriptors().size(); ++i) { - row->set_tuple(i, reinterpret_cast(data)); - data += _desc.tuple_descriptors()[i]->byte_size(); - } - } -} - -} diff --git a/be/src/runtime/buffered_tuple_stream.h b/be/src/runtime/buffered_tuple_stream.h deleted file mode 100644 index 4a6082031c2d9e..00000000000000 --- a/be/src/runtime/buffered_tuple_stream.h +++ /dev/null @@ -1,431 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef INF_DORIS_QE_SRC_BE_RUNTIME_BUFFERED_TUPLE_STREAM_H -#define INF_DORIS_QE_SRC_BE_RUNTIME_BUFFERED_TUPLE_STREAM_H - -#include "common/status.h" -#include "runtime/buffered_block_mgr.h" -#include -#include "util/runtime_profile.h" - -namespace doris { - -class BufferedBlockMgr; -class RuntimeProfile; -class RuntimeState; -class RowBatch; -class RowDescriptor; -class SlotDescriptor; -class TupleRow; - -// Class that provides an abstraction for a stream of tuple rows. Rows can be -// added to the stream and returned. Rows are returned in the order they are added. -// -// The underlying memory management is done by the BufferedBlockMgr. -// -// The tuple stream consists of a number of small (less than io sized blocks) before -// an arbitrary number of io sized blocks. The smaller blocks do not spill and are -// there to lower the minimum buffering requirements. For example, an operator that -// needs to maintain 64 streams (1 buffer per partition) would need, by default, -// 64 * 8MB = 512MB of buffering. A query with 5 of these operators would require -// 2.56 GB just to run any query, regardless of how much of that is used. This is -// problematic for small queries. Instead we will start with a fixed number of small -// buffers and only start using IO sized buffers when those fill up. The small buffers -// never spill. -// The stream will *not* automatically switch from using small buffers to io sized -// buffers. -// -// The BufferedTupleStream is *not* thread safe from the caller's point of view. It is -// expected that all the APIs are called from a single thread. Internally, the -// object is thread safe wrt to the underlying block mgr. -// -// Buffer management: -// The stream is either pinned or unpinned, set via PinStream() and UnpinStream(). -// Blocks are optionally deleted as they are read, set with the delete_on_read c'tor -// parameter. -// -// Block layout: -// At the header of each block, starting at position 0, there is a bitstring with null -// indicators for all the tuples in each row in the block. Then there are the tuple rows. -// We further optimize the codepaths when we know that no tuple is nullable, indicated -// by 'nullable_tuple_'. -// -// Tuple row layout: -// Tuples are stored back to back. Each tuple starts with the fixed length portion, -// directly followed by the var len portion. (Fixed len and var len are interleaved). -// If any tuple in the row is nullable, then there is a bitstring of null tuple indicators -// at the header of the block. The order of bits in the null indicators bitstring -// corresponds to the order of tuples in the block. The NULL tuples are not stored in the -// body of the block, only as set bits in the null indicators bitsting. -// -// The behavior of reads and writes is as follows: -// Read: -// 1. Delete on read (delete_on_read_): Blocks are deleted as we go through the stream. -// The data returned by the tuple stream is valid until the next read call so the -// caller does not need to copy if it is streaming. -// 2. Unpinned: Blocks remain in blocks_ and are unpinned after reading. -// 3. Pinned: Blocks remain in blocks_ and are left pinned after reading. If the next -// block in the stream cannot be pinned, the read call will fail and the caller needs -// to free memory from the underlying block mgr. -// Write: -// 1. Unpinned: Unpin blocks as they fill up. This means only a single (i.e. the -// current) block needs to be in memory regardless of the input size (if read_write is -// true, then two blocks need to be in memory). -// 2. Pinned: Blocks are left pinned. If we run out of blocks, the write will fail and -// the caller needs to free memory from the underlying block mgr. -// -// TODO: we need to be able to do read ahead in the BufferedBlockMgr. It currently -// only has PinAllBlocks() which is blocking. We need a non-blocking version of this or -// some way to indicate a block will need to be pinned soon. -// TODO: see if this can be merged with Sorter::Run. The key difference is that this -// does not need to return rows in the order they were added, which allows it to be -// simpler. -// TODO: we could compact the small buffers when we need to spill but they use very -// little memory so ths might not be very useful. -// TODO: improvements: -// - Think about how to layout for the var len data more, possibly filling in them -// from the end of the same block. Don't interleave fixed and var len data. -// - It would be good to allocate the null indicators at the end of each block and grow -// this array as new rows are inserted in the block. If we do so, then there will be -// fewer gaps in case of many rows with NULL tuples. -// - We will want to multithread this. Add a AddBlock() call so the synchronization -// happens at the block level. This is a natural extension. -// - Instead of allocating all blocks from the block_mgr, allocate some blocks that -// are much smaller (e.g. 16K and doubling up to the block size). This way, very -// small streams (a common case) will use very little memory. This small blocks -// are always in memory since spilling them frees up negligible memory. -// - Return row batches in GetNext() instead of filling one in -// - Should we 32-bit align the start of the tuple rows? Now it is byte-aligned. -class BufferedTupleStream { -public: - ~BufferedTupleStream() {} - // Ordinal index into the stream to retrieve a row in O(1) time. This index can - // only be used if the stream is pinned. - // To read a row from a stream we need three pieces of information that we squeeze in - // 64 bits: - // - The index of the block. The block id is stored in 16 bits. We can have up to - // 64K blocks per tuple stream. With 8MB blocks that is 512GB per stream. - // - The offset of the start of the row (data) within the block. Since blocks are 8MB - // we use 24 bits for the offsets. (In theory we could use 23 bits.) - // - The idx of the row in the block. We need this for retrieving the null indicators. - // We use 24 bits for this index as well. - struct RowIdx { - static const uint64_t BLOCK_MASK = 0xFFFF; - static const uint64_t BLOCK_SHIFT = 0; - static const uint64_t OFFSET_MASK = 0xFFFFFF0000; - static const uint64_t OFFSET_SHIFT = 16; - static const uint64_t IDX_MASK = 0xFFFFFF0000000000; - static const uint64_t IDX_SHIFT = 40; - - uint64_t block() const { - return (data & BLOCK_MASK); - }; - - uint64_t offset() const { - return (data & OFFSET_MASK) >> OFFSET_SHIFT; - }; - - uint64_t idx() const { - return (data & IDX_MASK) >> IDX_SHIFT; - } - - uint64_t set(uint64_t block, uint64_t offset, uint64_t idx) { - DCHECK_LE(block, BLOCK_MASK) - << "Cannot have more than 2^16 = 64K blocks in a tuple stream."; - DCHECK_LE(offset, OFFSET_MASK >> OFFSET_SHIFT) - << "Cannot have blocks larger than 2^24 = 16MB"; - DCHECK_LE(idx, IDX_MASK >> IDX_SHIFT) - << "Cannot have more than 2^24 = 16M rows in a block."; - data = block | (offset << OFFSET_SHIFT) | (idx << IDX_SHIFT); - return data; - } - - std::string debug_string() const; - - uint64_t data; - }; - - // row_desc: description of rows stored in the stream. This is the desc for rows - // that are added and the rows being returned. - // block_mgr: Underlying block mgr that owns the data blocks. - // delete_on_read: Blocks are deleted after they are read. - // use_initial_small_buffers: If true, the initial N buffers allocated for the - // tuple stream use smaller than io sized buffers. - // read_write: Stream allows interchanging read and write operations. Requires at - // least two blocks may be pinned. - BufferedTupleStream(RuntimeState* state, const RowDescriptor& row_desc, - BufferedBlockMgr* block_mgr); - - // Initializes the tuple stream object. Must be called once before any of the - // other APIs. - // If pinned, the tuple stream starts of pinned, otherwise it is unpinned. - // If profile is non-NULL, counters are created. - Status init(RuntimeProfile* profile); - Status init() { - return init(NULL); - } - - // Adds a single row to the stream. Returns false if an error occurred. - // BufferedTupleStream will do a deep copy of the memory in the row. - // *dst is the ptr to the memory (in the underlying block) that this row - // was copied to. - bool add_row(TupleRow* row, uint8_t** dst); - bool add_row(TupleRow* row) { - return add_row(row, NULL); - } - - // Allocates space to store a row of size 'size'. Returns NULL if there is - // not enough memory. The returned memory is guaranteed to fit on one block. - uint8_t* allocate_row(int size); - - // Populates 'row' with the row at 'idx'. The stream must be pinned. The row must have - // been allocated with the stream's row desc. - void get_tuple_row(const RowIdx& idx, TupleRow* row) const; - - // Prepares the stream for reading. If _read_write, this does not need to be called in - // order to begin reading, otherwise this must be called after the last add_row() and - // before get_next(). - // If got_buffer is NULL, this function will fail (with a bad status) if no buffer - // is available. If got_buffer is non-null, this function will not fail on OOM and - // *got_buffer is true if a buffer was pinned. - Status prepare_for_read(bool* got_buffer); - Status prepare_for_read() { - return prepare_for_read(NULL); - } - - // Pins all blocks in this stream and switches to pinned mode. - // If there is not enough memory, *pinned is set to false and the stream is unmodified. - // If already_reserved is true, the caller has already made a reservation on - // block_mgr_client_ to pin the stream. - //Status PinStream(bool already_reserved, bool* pinned); - - // Unpins stream. If all is true, all blocks are unpinned, otherwise all blocks - // except the _write_block and _read_block are unpinned. - // Status UnpinStream(bool all = false); - - // Get the next batch of output rows. Memory is still owned by the BufferedTupleStream - // and must be copied out by the caller. - // If 'indices' is non-NULL, that is also populated for each returned row with the - // index for that row. - Status get_next(RowBatch* batch, bool* eos, std::vector* indices); - Status get_next(RowBatch* batch, bool* eos) { - return get_next(batch, eos, NULL); - } - - // Returns all the rows in the stream in batch. This pins the entire stream - // in the process. - // *got_rows is false if the stream could not be pinned. - Status get_rows(boost::scoped_ptr* batch, bool* got_rows); - - // Must be called once at the end to cleanup all resources. Idempotent. - void close(); - - // Returns the status of the stream. We don't want to return a more costly Status - // object on add_row() which is way that API returns a bool. - Status status() const { - return _status; - } - - // Number of rows in the stream. - int64_t num_rows() const { - return _num_rows; - } - - // Number of rows returned via get_next(). - int64_t rows_returned() const { - return _rows_returned; - } - - // Returns the byte size necessary to store the entire stream in memory. - int64_t byte_size() const { - return _total_byte_size; - } - - // Returns the byte size of the stream that is currently pinned in memory. - // If ignore_current is true, the _write_block memory is not included. - int64_t bytes_in_mem(bool ignore_current) const; - - // bool is_pinned() const { return pinned_; } - //int blocks_pinned() const { return num_pinned_; } - //int blocks_unpinned() const { return _blocks.size() - num_pinned_ - _num_small_blocks; } - bool has_read_block() const { - return _read_block != _blocks.end(); - } - bool has_write_block() const { - return _write_block != NULL; - } - bool using_small_buffers() const { - return _use_small_buffers; - } - - std::string debug_string() const; - -private: - // If true, this stream is still using small buffers. - bool _use_small_buffers; - - // If true, blocks are deleted after they are read. - const bool _delete_on_read; - - // If true, read and write operations may be interleaved. Otherwise all calls - // to add_row() must occur before calling prepare_for_read() and subsequent calls to - // get_next(). - const bool _read_write; - - // Runtime state instance used to check for cancellation. Not owned. - RuntimeState* const _state; - - // Description of rows stored in the stream. - const RowDescriptor& _desc; - - // Whether any tuple in the rows is nullable. - const bool _nullable_tuple; - - // Sum of the fixed length portion of all the tuples in _desc. - int _fixed_tuple_row_size; - - // Max size (in bytes) of null indicators bitstring in the current read and write - // blocks. If 0, it means that there is no need to store null indicators for this - // RowDesc. We calculate this value based on the block's size and the - // _fixed_tuple_row_size. When not 0, this value is also an upper bound for the number - // of (rows * tuples_per_row) in this block. - uint32_t _null_indicators_read_block; - uint32_t _null_indicators_write_block; - - // Vector of all the strings slots grouped by tuple_idx. - std::vector > > _string_slots; - - // Block manager and client used to allocate, pin and release blocks. Not owned. - BufferedBlockMgr* _block_mgr; - // BufferedBlockMgr::Client* block_mgr_client_; - - // List of blocks in the stream. - std::list _blocks; - - // Total size of _blocks, including small blocks. - int64_t _total_byte_size; - - // Iterator pointing to the current block for read. If _read_write, this is always a - // valid block, otherwise equal to list.end() until prepare_for_read() is called. - std::list::iterator _read_block; - - // For each block in the stream, the buffer of the start of the block. This is only - // valid when the stream is pinned, giving random access to data in the stream. - // This is not maintained for _delete_on_read. - std::vector _block_start_idx; - - // Current ptr offset in _read_block's buffer. - uint8_t* _read_ptr; - - // Current idx of the tuple read from the _read_block buffer. - uint32_t _read_tuple_idx; - - // Current idx of the tuple written at the _write_block buffer. - uint32_t _write_tuple_idx; - - // Bytes read in _read_block. - int64_t _read_bytes; - - // Number of rows returned to the caller from get_next(). - int64_t _rows_returned; - - // The block index of the current read block. - int _read_block_idx; - - // The current block for writing. NULL if there is no available block to write to. - BufferedBlockMgr::Block* _write_block; - - // Number of pinned blocks in _blocks, stored to avoid iterating over the list - // to compute bytes_in_mem and bytes_unpinned. - // This does not include small blocks. - // int num_pinned_; - - // The total number of small blocks in _blocks; - int _num_small_blocks; - - bool _closed; // Used for debugging. - Status _status; - - // Number of rows stored in the stream. - int64_t _num_rows; - - // If true, this stream has been explicitly pinned by the caller. This changes the - // memory management of the stream. The blocks are not unpinned until the caller calls - // UnpinAllBlocks(). If false, only the _write_block and/or _read_block are pinned - // (both are if _read_write is true). - // bool pinned_; - - // Counters added by this object to the parent runtime profile. - RuntimeProfile::Counter* _pin_timer; - RuntimeProfile::Counter* _unpin_timer; - RuntimeProfile::Counter* _get_new_block_timer; - - // Copies 'row' into _write_block. Returns false if there is not enough space in - // '_write_block'. - // *dst is the ptr to the memory (in the underlying write block) where this row - // was copied to. - template - bool deep_copy_internal(TupleRow* row, uint8_t** dst); - - // Wrapper of the templated deep_copy_internal() function. - bool deep_copy(TupleRow* row, uint8_t** dst); - - // Gets a new block from the _block_mgr, updating _write_block and _write_tuple_idx, and - // setting *got_block. If there are no blocks available, *got_block is set to false - // and _write_block is unchanged. - // min_size is the minimum number of bytes required for this block. - Status new_block_for_write(int min_size, bool* got_block); - - // Reads the next block from the _block_mgr. This blocks if necessary. - // Updates _read_block, _read_ptr, _read_tuple_idx and _read_bytes. - Status next_block_for_read(); - - // Returns the byte size of this row when encoded in a block. - int compute_row_size(TupleRow* row) const; - - // Unpins block if it is an io sized block and updates tracking stats. - // Status UnpinBlock(BufferedBlockMgr::Block* block); - - // Templated get_next implementation. - template - Status get_next_internal(RowBatch* batch, bool* eos, std::vector* indices); - - // Computes the number of bytes needed for null indicators for a block of 'block_size' - int compute_num_null_indicator_bytes(int block_size) const; -}; - -inline bool BufferedTupleStream::add_row(TupleRow* row, uint8_t** dst) { - DCHECK(!_closed); - - if (LIKELY(deep_copy(row, dst))) { - return true; - } - - bool got_block = false; - _status = new_block_for_write(compute_row_size(row), &got_block); - - if (!_status.ok() || !got_block) { - return false; - } - - return deep_copy(row, dst); -} - -} - -#endif diff --git a/be/src/runtime/buffered_tuple_stream_ir.cpp b/be/src/runtime/buffered_tuple_stream_ir.cpp deleted file mode 100644 index 0fa77ee0e68d5c..00000000000000 --- a/be/src/runtime/buffered_tuple_stream_ir.cpp +++ /dev/null @@ -1,141 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/buffered_tuple_stream.h" - -#include "runtime/descriptors.h" -#include "runtime/tuple_row.h" - -namespace doris { -bool BufferedTupleStream::deep_copy(TupleRow* row, uint8_t** dst) { - if (_nullable_tuple) { - return deep_copy_internal(row, dst); - } else { - return deep_copy_internal(row, dst); - } -} - -// TODO: this really needs codegen -template -bool BufferedTupleStream::deep_copy_internal(TupleRow* row, uint8_t** dst) { - if (UNLIKELY(_write_block == NULL)) { - return false; - } - - DCHECK_GE(_null_indicators_write_block, 0); - - const uint64_t tuples_per_row = _desc.tuple_descriptors().size(); - - if (UNLIKELY((_write_block->bytes_remaining() < _fixed_tuple_row_size) || - (HasNullableTuple && - (_write_tuple_idx + tuples_per_row > _null_indicators_write_block * 8)))) { - return false; - } - - // Allocate the maximum possible buffer for the fixed portion of the tuple. - uint8_t* tuple_buf = _write_block->allocate(_fixed_tuple_row_size); - - if (dst != NULL) { - *dst = tuple_buf; - } - - // Total bytes allocated in _write_block for this row. Saved so we can roll back - // if this row doesn't fit. - int bytes_allocated = _fixed_tuple_row_size; - - // Copy the not NULL fixed len tuples. For the NULL tuples just update the NULL tuple - // indicator. - if (HasNullableTuple) { - DCHECK_GT(_null_indicators_write_block, 0); - uint8_t* null_word = NULL; - uint32_t null_pos = 0; - // Calculate how much space it should return. - int to_return = 0; - - for (int i = 0; i < tuples_per_row; ++i) { - null_word = _write_block->buffer() + (_write_tuple_idx >> 3); // / 8 - null_pos = _write_tuple_idx & 7; - ++_write_tuple_idx; - const int tuple_size = _desc.tuple_descriptors()[i]->byte_size(); - Tuple* t = row->get_tuple(i); - const uint8_t mask = 1 << (7 - null_pos); - - if (t != NULL) { - *null_word &= ~mask; - memcpy(tuple_buf, t, tuple_size); - tuple_buf += tuple_size; - } else { - *null_word |= mask; - to_return += tuple_size; - } - } - - DCHECK_LE(_write_tuple_idx - 1, _null_indicators_write_block * 8); - _write_block->return_allocation(to_return); - bytes_allocated -= to_return; - } else { - // If we know that there are no nullable tuples no need to set the nullability flags. - DCHECK_EQ(_null_indicators_write_block, 0); - - for (int i = 0; i < tuples_per_row; ++i) { - const int tuple_size = _desc.tuple_descriptors()[i]->byte_size(); - Tuple* t = row->get_tuple(i); - // TODO: Once IMPALA-1306 (Avoid passing empty tuples of non-materialized slots) - // is delivered, the check below should become DCHECK_NOTNULL(t). - DCHECK(t != NULL || tuple_size == 0); - memcpy(tuple_buf, t, tuple_size); - tuple_buf += tuple_size; - } - } - - // Copy string slots. Note: we do not need to convert the string ptrs to offsets - // on the write path, only on the read. The tuple data is immediately followed - // by the string data so only the len information is necessary. - for (int i = 0; i < _string_slots.size(); ++i) { - Tuple* tuple = row->get_tuple(_string_slots[i].first); - - if (HasNullableTuple && tuple == NULL) { - continue; - } - - for (int j = 0; j < _string_slots[i].second.size(); ++j) { - const SlotDescriptor* slot_desc = _string_slots[i].second[j]; - - if (tuple->is_null(slot_desc->null_indicator_offset())) { - continue; - } - - StringValue* sv = tuple->get_string_slot(slot_desc->tuple_offset()); - - if (LIKELY(sv->len > 0)) { - if (UNLIKELY(_write_block->bytes_remaining() < sv->len)) { - _write_block->return_allocation(bytes_allocated); - return false; - } - - uint8_t* buf = _write_block->allocate(sv->len); - bytes_allocated += sv->len; - memcpy(buf, sv->ptr, sv->len); - } - } - } - - _write_block->add_row(); - ++_num_rows; - return true; -} -} diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index 7aec53f9f25796..8a4b9d3d7291ee 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -329,7 +329,7 @@ Status DataStreamRecvr::create_merger(const TupleRowComparator& less_than) { input_batch_suppliers.reserve(_sender_queues.size()); // Create the merger that will a single stream of sorted rows. - _merger.reset(new SortedRunMerger(less_than, &_row_desc, _profile, false)); + _merger.reset(new SortedRunMerger(less_than, &_row_desc, _profile.get(), false)); for (int i = 0; i < _sender_queues.size(); ++i) { input_batch_suppliers.push_back( @@ -360,9 +360,14 @@ DataStreamRecvr::DataStreamRecvr( _row_desc(row_desc), _is_merging(is_merging), _num_buffered_bytes(0), - _profile(profile), _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) { - _mem_tracker.reset(new MemTracker(-1, "DataStreamRecvr", parent_tracker)); + _profile.reset(new RuntimeProfile(nullptr, "DataStreamRecvr")); + profile->add_child(_profile.get(), true, nullptr); + + // TODO: Now the parent tracker may cause problem when we need spill to disk, so we + // replace parent_tracker with nullptr, fix future + _mem_tracker.reset(new MemTracker(_profile.get(), -1, "DataStreamRecvr", nullptr)); + // _mem_tracker.reset(new MemTracker(_profile.get(), -1, "DataStreamRecvr", parent_tracker)); // Create one queue per sender if is_merging is true. int num_queues = is_merging ? num_senders : 1; @@ -423,7 +428,7 @@ void DataStreamRecvr::close() { _mgr = NULL; _merger.reset(); _mem_tracker->close(); - _mem_tracker->unregister_from_parent(); +// _mem_tracker->unregister_from_parent(); _mem_tracker.reset(); } diff --git a/be/src/runtime/data_stream_recvr.h b/be/src/runtime/data_stream_recvr.h index 722533516978ae..cc6639a6dfe7e8 100644 --- a/be/src/runtime/data_stream_recvr.h +++ b/be/src/runtime/data_stream_recvr.h @@ -170,7 +170,7 @@ class DataStreamRecvr { ObjectPool _sender_queue_pool; // Runtime profile storing the counters below. - RuntimeProfile* _profile; + std::unique_ptr _profile; // Number of bytes received RuntimeProfile::Counter* _bytes_received_counter; diff --git a/be/src/runtime/merge_sorter.cpp b/be/src/runtime/merge_sorter.cpp deleted file mode 100644 index 9bcdac33f67fda..00000000000000 --- a/be/src/runtime/merge_sorter.cpp +++ /dev/null @@ -1,669 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/merge_sorter.h" -#include "runtime/buffered_block_mgr.h" -#include "runtime/row_batch.h" -#include "runtime/runtime_state.h" -#include "util/runtime_profile.h" -#include -#include -#include "runtime/mem_tracker.h" - -namespace doris { -// A run is a sequence of blocks containing tuples that are or will eventually be in -// sorted order. -// A run may maintain two sequences of blocks - one containing the tuples themselves, -// (i.e. fixed-len slots and ptrs to var-len data), and the other for the var-length -// column data pointed to by those tuples. -// Tuples in a run may be sorted in place (in-memory) and merged using a merger. -class MergeSorter::Run { -public: - // materialize_slots is true for runs constructed from input rows. The input rows are - // materialized into single sort tuples using the expressions in - // _sort_tuple_slot_expr_ctxs. For intermediate merges, the tuples are already - // materialized so materialize_slots is false. - Run(MergeSorter* parent, TupleDescriptor* sort_tuple_desc, bool materialize_slots); - - ~Run() {} - // Initialize the run for input rows by allocating the minimum number of required - // blocks - one block for fixed-len data added to _fixed_len_blocks, one for the - // initially unsorted var-len data added to _var_len_blocks, and one to copy sorted - // var-len data into (var_len_copy_block_). - Status init(); - - // Add a batch of input rows to the current run. Returns the number - // of rows actually added in num_processed. If the run is full (no more blocks can - // be allocated), num_processed may be less than the number of rows in the batch. - // If _materialize_slots is true, materializes the input rows using the expressions - // in _sorter->_sort_tuple_slot_expr_ctxs, else just copies the input rows. - template - Status add_batch(RowBatch* batch, int start_index, int* num_processed); - - // Interface for merger - get the next batch of rows from this run. The callee (Run) - // still owns the returned batch. Calls get_next(RowBatch*, bool*). - Status get_next_batch(RowBatch** sorted_batch); - -private: - friend class MergeSorter; - friend class TupleSorter; - - template - Status get_next(RowBatch* output_batch, bool* eos); - - // Check if the current run can be extended by a block. Add the newly allocated block - // to block_sequence, or set added to false if the run could not be extended. - // If the run is sorted (produced by an intermediate merge), unpin the last block in - // block_sequence before allocating and adding a new block - the run can always be - // extended in this case. If the run is unsorted, check max_blocks_in_unsorted_run_ - // to see if a block can be added to the run. Also updates the sort bytes counter. - Status try_add_block(std::vector* block_sequence, bool* added); - - // Prepare to read a sorted run. Pins the first block(s) in the run if the run was - // previously unpinned. - Status prepare_read(); - - // Copy the StringValue data in var_values to dest in order and update the StringValue - // ptrs to point to the copied data. - void copy_var_len_data(char* dest, const std::vector& var_values); - - // Parent sorter object. - const MergeSorter* _sorter; - - // Materialized sort tuple. Input rows are materialized into 1 tuple (with descriptor - // _sort_tuple_desc) before sorting. - const TupleDescriptor* _sort_tuple_desc; - - // Sizes of sort tuple and block. - const int _sort_tuple_size; - const int _block_size; - - const bool _has_var_len_slots; - - // True if the sort tuple must be materialized from the input batch in add_batch(). - // _materialize_slots is true for runs being constructed from input batches, and - // is false for runs being constructed from intermediate merges. - const bool _materialize_slots; - - // True if the run is sorted. Set to true after an in-memory sort, and initialized to - // true for runs resulting from merges. - bool _is_sorted; - - // Sequence of blocks in this run containing the fixed-length portion of the - // sort tuples comprising this run. The data pointed to by the var-len slots are in - // _var_len_blocks. If _is_sorted is true, the tuples in _fixed_len_blocks will be in - // sorted order. - // _fixed_len_blocks[i] is NULL iff it has been deleted. - std::vector _fixed_len_blocks; - - // Sequence of blocks in this run containing the var-length data corresponding to the - // var-length column data from _fixed_len_blocks. These are reconstructed to be in - // sorted order in UnpinAllBlocks(). - // _var_len_blocks[i] is NULL iff it has been deleted. - std::vector _var_len_blocks; - - // Number of tuples so far in this run. - int64_t _num_tuples; - - // Number of tuples returned via get_next(), maintained for debug purposes. - int64_t _num_tuples_returned; - - - // Members used when a run is read in get_next() - // The index into the fixed_ and _var_len_blocks vectors of the current blocks being - // processed in get_next(). - int _fixed_len_blocks_index; - - // Offset into the current fixed length data block being processed. - int _fixed_len_block_offset; -}; // class MergeSorter::Run - -// Sorts a sequence of tuples from a run in place using a provided tuple comparator. -// Quick sort is used for sequences of tuples larger that 16 elements, and -// insertion sort is used for smaller sequences. -// The TupleSorter is initialized with a RuntimeState instance to check for -// cancellation during an in-memory sort. -class MergeSorter::TupleSorter { -public: - TupleSorter(const TupleRowComparator& less_than_comp, int64_t block_size, - int tuple_size, RuntimeState* state); - - ~TupleSorter() { - delete[] _temp_tuple_buffer; - delete[] _swap_buffer; - } - - // Performs a quicksort for tuples in 'run' followed by an insertion sort to - // finish smaller blocks. - // Returns early if stste_->is_cancelled() is true. No status - // is returned - the caller must check for cancellation. - - void sort(Run* run) { - _run = run; - sort_helper(TupleIterator(this, 0), TupleIterator(this, _run->_num_tuples)); - run->_is_sorted = true; - } - -private: - static const int INSERTION_THRESHOLD = 16; - // static const int INSERTION_THRESHOLD = FLAGS_insertion_threadhold; - - // Helper class used to iterate over tuples in a run during quick sort and insertion - // sort. - class TupleIterator { - public: - TupleIterator(TupleSorter* parent, int64_t index) - : _parent(parent), - _index(index) { - DCHECK_GE(index, 0); - DCHECK_LE(index, _parent->_run->_num_tuples); - // If the run is empty, only _index is initialized. - if (_parent->_run->_num_tuples == 0) { - return; - } - // If the iterator is initialized to past the end, set up _buffer_start and - // _block_index as if it pointing to the last tuple. Add _tuple_size bytes to - // _current_tuple, so everything is correct when prev() is invoked. - int past_end_bytes = 0; - if (UNLIKELY(index >= _parent->_run->_num_tuples)) { - past_end_bytes = parent->_tuple_size; - _index = _parent->_run->_num_tuples; - index = _index - 1; - } - _block_index = index / parent->_block_capacity; - _buffer_start = parent->_run->_fixed_len_blocks[_block_index]->buffer(); - int block_offset = (index % parent->_block_capacity) * parent->_tuple_size; - _current_tuple = _buffer_start + block_offset + past_end_bytes; - } - ~TupleIterator() {} - // Sets _current_tuple to point to the next tuple in the run. Increments - // block_index and resets buffer if the next tuple is in the next block. - void next() { - _current_tuple += _parent->_tuple_size; - ++_index; - if (UNLIKELY(_current_tuple > _buffer_start + _parent->_last_tuple_block_offset && - _index < _parent->_run->_num_tuples)) { - // Don't increment block index, etc. past the end. - ++_block_index; - DCHECK_LT(_block_index, _parent->_run->_fixed_len_blocks.size()); - _buffer_start = _parent->_run->_fixed_len_blocks[_block_index]->buffer(); - _current_tuple = _buffer_start; - } - } - - // Sets current_tuple to point to the previous tuple in the run. Decrements - // block_index and resets buffer if the new tuple is in the previous block. - void prev() { - _current_tuple -= _parent->_tuple_size; - --_index; - if (UNLIKELY(_current_tuple < _buffer_start && _index >= 0)) { - --_block_index; - DCHECK_GE(_block_index, 0); - _buffer_start = _parent->_run->_fixed_len_blocks[_block_index]->buffer(); - _current_tuple = _buffer_start + _parent->_last_tuple_block_offset; - } - } - - private: - friend class TupleSorter; - - // Pointer to the tuple sorter. - TupleSorter* _parent; - - // Index of the current tuple in the run. - int64_t _index; - - // Pointer to the current tuple. - uint8_t* _current_tuple = nullptr; - - // Start of the buffer containing current tuple. - uint8_t* _buffer_start = nullptr; - - // Index into _run._fixed_len_blocks of the block containing the current tuple. - int _block_index; - }; - - // Size of the tuples in memory. - const int _tuple_size; - - // Number of tuples per block in a run. - const int _block_capacity; - - // Offset in bytes of the last tuple in a block, calculated from block and tuple sizes. - const int _last_tuple_block_offset; - - // Tuple comparator that returns true if lhs < rhs. - const TupleRowComparator _less_than_comp; - - // Runtime state instance to check for cancellation. Not owned. - RuntimeState* const _state; - - // The run to be sorted. - Run* _run; - - // Temporarily allocated space to copy and swap tuples (Both are used in partition()). - // temp_tuple_ points to _temp_tuple_buffer. Owned by this TupleSorter instance. - TupleRow* _temp_tuple_row; - uint8_t* _temp_tuple_buffer; - uint8_t* _swap_buffer; - - // Perform an insertion sort for rows in the range [first, last) in a run. - void insertion_sort(const TupleIterator& first, const TupleIterator& last); - - // Partitions the sequence of tuples in the range [first, last) in a run into two - // groups around the pivot tuple - i.e. tuples in first group are <= the pivot, and - // tuples in the second group are >= pivot. Tuples are swapped in place to create the - // groups and the index to the first element in the second group is returned. - // Checks _state->is_cancelled() and returns early with an invalid result if true. - TupleIterator partition(TupleIterator first, TupleIterator last, Tuple* pivot); - - // Performs a quicksort of rows in the range [first, last). - // followed by insertion sort for smaller groups of elements. - // Checks _state->is_cancelled() and returns early if true. - void sort_helper(TupleIterator first, TupleIterator last); - - // Swaps tuples pointed to by left and right using the swap buffer. - void swap(uint8_t* left, uint8_t* right); -}; // class TupleSorter - -// MergeSorter::Run methods -MergeSorter::Run::Run(MergeSorter* parent, TupleDescriptor* sort_tuple_desc, - bool materialize_slots) - : _sorter(parent), - _sort_tuple_desc(sort_tuple_desc), - _sort_tuple_size(sort_tuple_desc->byte_size()), - _block_size(parent->_block_mgr->max_block_size()), - _has_var_len_slots(sort_tuple_desc->string_slots().size() > 0), - _materialize_slots(materialize_slots), - _is_sorted(!materialize_slots), - _num_tuples(0) { -} - -Status MergeSorter::Run::init() { - BufferedBlockMgr::Block* block = NULL; - RETURN_IF_ERROR( - _sorter->_block_mgr->get_new_block(&block)); - DCHECK(block != NULL); - _fixed_len_blocks.push_back(block); - - if (_has_var_len_slots) { - RETURN_IF_ERROR( - _sorter->_block_mgr->get_new_block(&block)); - DCHECK(block != NULL); - _var_len_blocks.push_back(block); - } - - if (!_is_sorted) { - _sorter->_initial_runs_counter->update(1); - } - return Status::OK(); -} - -template -Status MergeSorter::Run::add_batch(RowBatch* batch, int start_index, int* num_processed) { - *num_processed = 0; - BufferedBlockMgr::Block* cur_fixed_len_block = _fixed_len_blocks.back(); - - DCHECK_EQ(_materialize_slots, !_is_sorted); - DCHECK_EQ(_materialize_slots, true); - - // Input rows are copied/materialized into tuples allocated in _fixed_len_blocks. - // The variable length column data are copied into blocks stored in _var_len_blocks. - // Input row processing is split into two loops. - // The inner loop processes as many input rows as will fit in cur_fixed_len_block. - // The outer loop allocates a new block for fixed-len data if the input batch is - // not exhausted. - - // cur_input_index is the index into the input 'batch' of the current - // input row being processed. - int cur_input_index = start_index; - std::vector var_values; - var_values.reserve(_sort_tuple_desc->string_slots().size()); - while (cur_input_index < batch->num_rows()) { - // tuples_remaining is the number of tuples to copy/materialize into - // cur_fixed_len_block. - int tuples_remaining = cur_fixed_len_block->bytes_remaining() / _sort_tuple_size; - tuples_remaining = std::min(batch->num_rows() - cur_input_index, tuples_remaining); - - for (int i = 0; i < tuples_remaining; ++i) { - int total_var_len = 0; - TupleRow* input_row = batch->get_row(cur_input_index); - Tuple* new_tuple = cur_fixed_len_block->allocate(_sort_tuple_size); - if (_materialize_slots) { - new_tuple->materialize_exprs(input_row, *_sort_tuple_desc, - _sorter->_sort_tuple_slot_expr_ctxs, NULL, &var_values, &total_var_len); - if (total_var_len > _sorter->_block_mgr->max_block_size()) { - std::stringstream ss; - ss << "Variable length data in a single tuple larger than block size "; - ss << total_var_len; - ss << " > " << _sorter->_block_mgr->max_block_size(); - return Status::InternalError(ss.str()); - } - } - - if (has_var_len_data) { - BufferedBlockMgr::Block* cur_var_len_block = _var_len_blocks.back(); - if (cur_var_len_block->bytes_remaining() < total_var_len) { - bool added; - RETURN_IF_ERROR(try_add_block(&_var_len_blocks, &added)); - if (added) { - cur_var_len_block = _var_len_blocks.back(); - } else { - // There wasn't enough space in the last var-len block for this tuple, and - // the run could not be extended. Return the fixed-len allocation and exit. - // dhc: we can't get here, because we can get the new block. If we can't get new block, - // we will exit in tryAddBlock(MemTracker exceed). - cur_fixed_len_block->return_allocation(_sort_tuple_size); - return Status::OK(); - } - } - - char* var_data_ptr = cur_var_len_block->allocate(total_var_len); - if (_materialize_slots) { - copy_var_len_data(var_data_ptr, var_values); - } - } - - ++_num_tuples; - ++*num_processed; - ++cur_input_index; - } - - // If there are still rows left to process, get a new block for the fixed-length - // tuples. If the run is already too long, return. - if (cur_input_index < batch->num_rows()) { - bool added; - RETURN_IF_ERROR(try_add_block(&_fixed_len_blocks, &added)); - if (added) { - cur_fixed_len_block = _fixed_len_blocks.back(); - } else { - return Status::OK(); - } - } - } - - return Status::OK(); -} - -Status MergeSorter::Run::prepare_read() { - _fixed_len_blocks_index = 0; - _fixed_len_block_offset = 0; - //var_len_blocks_index_ = 0; - _num_tuples_returned = 0; - - return Status::OK(); -} - -template -Status MergeSorter::Run::get_next(RowBatch* output_batch, bool* eos) { - if (_fixed_len_blocks_index == _fixed_len_blocks.size()) { - *eos = true; - DCHECK_EQ(_num_tuples_returned, _num_tuples); - return Status::OK(); - } else { - *eos = false; - } - - BufferedBlockMgr::Block* fixed_len_block = _fixed_len_blocks[_fixed_len_blocks_index]; - - // get_next fills rows into the output batch until a block boundary is reached. - while (!output_batch->is_full() && - _fixed_len_block_offset < fixed_len_block->valid_data_len()) { - Tuple* input_tuple = reinterpret_cast( - fixed_len_block->buffer() + _fixed_len_block_offset); - - int output_row_idx = output_batch->add_row(); - output_batch->get_row(output_row_idx)->set_tuple(0, input_tuple); - output_batch->commit_last_row(); - _fixed_len_block_offset += _sort_tuple_size; - ++_num_tuples_returned; - } - - if (_fixed_len_block_offset >= fixed_len_block->valid_data_len()) { - ++_fixed_len_blocks_index; - _fixed_len_block_offset = 0; - } - - return Status::OK(); -} - -Status MergeSorter::Run::try_add_block(std::vector* block_sequence, - bool* added) { - DCHECK(!block_sequence->empty()); - - BufferedBlockMgr::Block* last_block = block_sequence->back(); - _sorter->_sorted_data_size->update(last_block->valid_data_len()); - - BufferedBlockMgr::Block* new_block; - RETURN_IF_ERROR(_sorter->_block_mgr->get_new_block(&new_block)); - if (new_block != NULL) { - *added = true; - block_sequence->push_back(new_block); - } else { - *added = false; - } - return Status::OK(); -} - -void MergeSorter::Run::copy_var_len_data(char* dest, const std::vector& var_values) { - BOOST_FOREACH(StringValue* var_val, var_values) { - memcpy(dest, var_val->ptr, var_val->len); - var_val->ptr = dest; - dest += var_val->len; - } -} - - -// MergeSorter::TupleSorter methods. -MergeSorter::TupleSorter::TupleSorter(const TupleRowComparator& comp, int64_t block_size, - int tuple_size, RuntimeState* state) - : _tuple_size(tuple_size), - _block_capacity(block_size / tuple_size), - _last_tuple_block_offset(tuple_size * ((block_size / tuple_size) - 1)), - _less_than_comp(comp), - _state(state) { - _temp_tuple_buffer = new uint8_t[tuple_size]; - _temp_tuple_row = reinterpret_cast(&_temp_tuple_buffer); - _swap_buffer = new uint8_t[tuple_size]; -} - - -// Sort the sequence of tuples from [first, last). -// Begin with a sorted sequence of size 1 [first, first+1). -// During each pass of the outermost loop, add the next tuple (at position 'i') to -// the sorted sequence by comparing it to each element of the sorted sequence -// (reverse order) to find its correct place in the sorted sequence, copying tuples -// along the way. -void MergeSorter::TupleSorter::insertion_sort(const TupleIterator& first, - const TupleIterator& last) { - TupleIterator insert_iter = first; - insert_iter.next(); - for (; insert_iter._index < last._index; insert_iter.next()) { - // insert_iter points to the tuple after the currently sorted sequence that must - // be inserted into the sorted sequence. Copy to _temp_tuple_row since it may be - // overwritten by the one at position 'insert_iter - 1' - memcpy(_temp_tuple_buffer, insert_iter._current_tuple, _tuple_size); - - // 'iter' points to the tuple that _temp_tuple_row will be compared to. - // 'copy_to' is the where iter should be copied to if it is >= _temp_tuple_row. - // copy_to always to the next row after 'iter' - TupleIterator iter = insert_iter; - iter.prev(); - uint8_t* copy_to = insert_iter._current_tuple; - while (_less_than_comp(_temp_tuple_row, - reinterpret_cast(&iter._current_tuple))) { - memcpy(copy_to, iter._current_tuple, _tuple_size); - copy_to = iter._current_tuple; - // Break if 'iter' has reached the first row, meaning that _temp_tuple_row - // will be inserted in position 'first' - if (iter._index <= first._index) break; - iter.prev(); - } - - memcpy(copy_to, _temp_tuple_buffer, _tuple_size); - } -} - -MergeSorter::TupleSorter::TupleIterator MergeSorter::TupleSorter::partition(TupleIterator first, - TupleIterator last, Tuple* pivot) { - - // Copy pivot into temp_tuple since it points to a tuple within [first, last). - memcpy(_temp_tuple_buffer, pivot, _tuple_size); - - last.prev(); - while (true) { - // Search for the first and last out-of-place elements, and swap them. - while (_less_than_comp(reinterpret_cast(&first._current_tuple), - _temp_tuple_row)) { - first.next(); - } - while (_less_than_comp(_temp_tuple_row, - reinterpret_cast(&last._current_tuple))) { - last.prev(); - } - - if (first._index >= last._index) break; - // swap first and last tuples. - swap(first._current_tuple, last._current_tuple); - - first.next(); - last.prev(); - } - - return first; - -} - -void MergeSorter::TupleSorter::sort_helper(TupleIterator first, TupleIterator last) { - if (UNLIKELY(_state->is_cancelled())) return; - // Use insertion sort for smaller sequences. - while (last._index - first._index > INSERTION_THRESHOLD) { - TupleIterator iter(this, first._index + (last._index - first._index)/2); - // Parititon() splits the tuples in [first, last) into two groups (<= pivot - // and >= pivot) in-place. 'cut' is the index of the first tuple in the second group. - TupleIterator cut = partition(first, last, - reinterpret_cast(iter._current_tuple)); - sort_helper(cut, last); - last = cut; - if (UNLIKELY(_state->is_cancelled())) return; - } - insertion_sort(first, last); -} - -inline void MergeSorter::TupleSorter::swap(uint8_t* left, uint8_t* right) { - memcpy(_swap_buffer, left, _tuple_size); - memcpy(left, right, _tuple_size); - memcpy(right, _swap_buffer, _tuple_size); -} - -// MergeSorter methods -MergeSorter::MergeSorter(const TupleRowComparator& compare_less_than, - const std::vector& slot_materialize_expr_ctxs, - RowDescriptor* output_row_desc, - RuntimeProfile* profile, RuntimeState* state) - : _state(state), - _compare_less_than(compare_less_than), - _block_mgr(state->block_mgr()), - _output_row_desc(output_row_desc), - _sort_tuple_slot_expr_ctxs(slot_materialize_expr_ctxs), - _profile(profile) { - TupleDescriptor* sort_tuple_desc = output_row_desc->tuple_descriptors()[0]; - _has_var_len_slots = sort_tuple_desc->string_slots().size() > 0; - _in_mem_tuple_sorter.reset(new TupleSorter(compare_less_than, - _block_mgr->max_block_size(), sort_tuple_desc->byte_size(), state)); - - _initial_runs_counter = ADD_COUNTER(_profile, "InitialRunsCreated", TUnit::UNIT); - _num_merges_counter = ADD_COUNTER(_profile, "TotalMergesPerformed", TUnit::UNIT); - _in_mem_sort_timer = ADD_TIMER(_profile, "InMemorySortTime"); - _sorted_data_size = ADD_COUNTER(_profile, "SortDataSize", TUnit::BYTES); - - _unsorted_run = _obj_pool.add(new Run(this, sort_tuple_desc, true)); - _unsorted_run->init(); -} - -MergeSorter::~MergeSorter() { -} - -Status MergeSorter::add_batch(RowBatch* batch) { - int num_processed = 0; - int cur_batch_index = 0; - - while (cur_batch_index < batch->num_rows()) { - if (_has_var_len_slots) { - _unsorted_run->add_batch(batch, cur_batch_index, &num_processed); - } else { - _unsorted_run->add_batch(batch, cur_batch_index, &num_processed); - } - - cur_batch_index += num_processed; - if (cur_batch_index < batch->num_rows()) { - return Status::InternalError("run is full"); - } - } - return Status::OK(); -} - -Status MergeSorter::input_done() { -// Sort the tuples accumulated so far in the current run. - RETURN_IF_ERROR(sort_run()); - - DCHECK(_sorted_runs.size() == 1); - -// The entire input fit in one run. Read sorted rows in get_next() directly -// from the sorted run. - _sorted_runs.back()->prepare_read(); - - return Status::OK(); -} - -Status MergeSorter::get_next(RowBatch* output_batch, bool* eos) { - DCHECK(_sorted_runs.size() == 1); - - // In this case, only TupleRows are copied into output_batch. Sorted tuples are left - // in the pinned blocks in the single sorted run. - RETURN_IF_ERROR(_sorted_runs.back()->get_next(output_batch, eos)); - - return Status::OK(); -} - -Status MergeSorter::sort_run() { - BufferedBlockMgr::Block* last_block = _unsorted_run->_fixed_len_blocks.back(); - if (last_block->valid_data_len() > 0) { - _sorted_data_size->update(last_block->valid_data_len()); - } else { - // need to delete block? - _unsorted_run->_fixed_len_blocks.pop_back(); - } - if (_has_var_len_slots) { - last_block = _unsorted_run->_var_len_blocks.back(); - if (last_block->valid_data_len() > 0) { - _sorted_data_size->update(last_block->valid_data_len()); - } else { - // need to delete block? - _unsorted_run->_var_len_blocks.pop_back(); - } - } - { - SCOPED_TIMER(_in_mem_sort_timer); - _in_mem_tuple_sorter->sort(_unsorted_run); - RETURN_IF_CANCELLED(_state); - } - _sorted_runs.push_back(_unsorted_run); - _unsorted_run = NULL; - return Status::OK(); -} -} // namespace doris diff --git a/be/src/runtime/merge_sorter.h b/be/src/runtime/merge_sorter.h deleted file mode 100644 index 445c064ec4f83e..00000000000000 --- a/be/src/runtime/merge_sorter.h +++ /dev/null @@ -1,166 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_QUERY_RUNTIME_SORTER_H -#define DORIS_BE_SRC_QUERY_RUNTIME_SORTER_H - -#include "runtime/buffered_block_mgr.h" -#include "util/tuple_row_compare.h" -#include "common/object_pool.h" -#include "util/runtime_profile.h" - -namespace doris { -class RuntimeProfile; -class RowBatch; -struct BufferDescriptor; - -// Sorter contains the external sort implementation. Its purpose is to sort arbitrarily -// large input data sets with a fixed memory budget by spilling data to disk if -// necessary. BufferedBlockMgr is used to allocate and manage blocks of data to be -// sorted. -// -// The client API for Sorter is as follows: -// AddBatch() is used to add input rows to be sorted. Multiple tuples in an input row are -// materialized into a row with a single tuple (the sort tuple) using the materialization -// exprs in sort_tuple_slot_expr_ctxs_. The sort tuples are sorted according to the sort -// parameters and output by the sorter. -// AddBatch() can be called multiple times. -// -// InputDone() is called to indicate the end of input. If multiple sorted runs were -// created, it triggers intermediate merge steps (if necessary) and creates the final -// merger that returns results via GetNext(). -// -// GetNext() is used to retrieve sorted rows. It can be called multiple times. -// AddBatch(), InputDone() and GetNext() must be called in that order. -// -// Batches of input rows are collected into a sequence of pinned BufferedBlockMgr blocks -// called a run. The maximum size of a run is determined by the maximum available buffers -// in the block manager. After the run is full, it is sorted in memory, unpinned and the -// next run is collected. The variable-length column data (e.g. string slots) in the -// materialized sort tuples are stored in separate sequence of blocks from the tuples -// themselves. -// When the blocks containing tuples in a run are unpinned, the var-len slot pointers are -// converted to offsets from the start of the first var-len data block. When a block is -// read back, these offsets are converted back to pointers. -// The in-memory sorter sorts the fixed-length tuples in-place. The output rows have the -// same schema as the materialized sort tuples. -// -// After the input is consumed, the sorter is left with one or more sorted runs. The -// client calls GetNext(output_batch) to retrieve batches of sorted rows. If there are -// multiple runs, the runs are merged using SortedRunMerger to produce a stream of sorted -// tuples. At least one block per run (two if there are var-length slots) must be pinned -// in memory during a merge, so multiple merges may be necessary if the number of runs is -// too large. During a merge, rows from multiple sorted input runs are compared and copied -// into a single larger run. One input batch is created to hold tuple rows for each -// input run, and one batch is created to hold deep copied rows (i.e. ptrs + data) from -// the output of the merge. -// -// If there is a single sorted run (i.e. no merge required), only tuple rows are -// copied into the output batch supplied by GetNext, and the data itself is left in -// pinned blocks held by the sorter. -// -// During a merge, one row batch is created for each input run, and one batch is created -// for the output of the merge (if is not the final merge). It is assumed that the memory -// for these batches have already been accounted for in the memory budget for the sort. -// That is, the memory for these batches does not come out of the block buffer manager. -// -// TODO: Not necessary to actually copy var-len data - instead take ownership of the -// var-length data in the input batch. Copying can be deferred until a run is unpinned. -// TODO: When the first run is constructed, create a sequence of pointers to materialized -// tuples. If the input fits in memory, the pointers can be sorted instead of sorting the -// tuples in place. -class MergeSorter { -public: - // sort_tuple_slot_exprs are the slot exprs used to materialize the tuple to be sorted. - // compare_less_than is a comparator for the sort tuples (returns true if lhs < rhs). - // merge_batch_size_ is the size of the batches created to provide rows to the merger - // and retrieve rows from an intermediate merger. - MergeSorter(const TupleRowComparator& compare_less_than, - const std::vector& sort_tuple_slot_expr_ctxs, - RowDescriptor* output_row_desc, - RuntimeProfile* profile, RuntimeState* state); - - ~MergeSorter(); - - // Adds a batch of input rows to the current unsorted run. - Status add_batch(RowBatch* batch); - - // Called to indicate there is no more input. Triggers the creation of merger(s) if - // necessary. - Status input_done(); - - // Get the next batch of sorted output rows from the sorter. - Status get_next(RowBatch* batch, bool* eos); - -private: - class Run; - class TupleSorter; - - // Runtime state instance used to check for cancellation. Not owned. - RuntimeState* const _state; - - // Sorts _unsorted_run and appends it to the list of sorted runs. Deletes any empty - // blocks at the end of the run. Updates the sort bytes counter if necessary. - Status sort_run(); - - - // In memory sorter and less-than comparator. - TupleRowComparator _compare_less_than; - boost::scoped_ptr _in_mem_tuple_sorter; - - // Block manager object used to allocate, pin and release runs. Not owned by Sorter. - BufferedBlockMgr* _block_mgr; - - // Handle to block mgr to make allocations from. - //BufferedBlockMgr::Client* block_mgr_client_; - - // True if the tuples to be sorted have var-length slots. - bool _has_var_len_slots; - - // The current unsorted run that is being collected. Is sorted and added to - // _sorted_runs after it is full (i.e. number of blocks allocated == max available - // buffers) or after the input is complete. Owned and placed in _obj_pool. - // When it is added to _sorted_runs, it is set to NULL. - Run* _unsorted_run; - - // List of sorted runs that have been produced but not merged. _unsorted_run is added - // to this list after an in-memory sort. Sorted runs produced by intermediate merges - // are also added to this list. Runs are added to the object pool. - std::list _sorted_runs; - - // Descriptor for the sort tuple. Input rows are materialized into 1 tuple before - // sorting. Not owned by the Sorter. - RowDescriptor* _output_row_desc; - - // Expressions used to materialize the sort tuple. Contains one expr per slot in the - // tuple. - std::vector _sort_tuple_slot_expr_ctxs; - - // Pool of owned Run objects. - ObjectPool _obj_pool; - - // Runtime profile and counters for this sorter instance. - RuntimeProfile* _profile; - RuntimeProfile::Counter* _initial_runs_counter; - RuntimeProfile::Counter* _num_merges_counter; - RuntimeProfile::Counter* _in_mem_sort_timer; - RuntimeProfile::Counter* _sorted_data_size; -}; - -} // namespace doris - -#endif diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 3aa41d5d105f2a..de0f3effc8f97d 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -27,7 +27,6 @@ #include "exec/exec_node.h" #include "exprs/expr.h" #include "exprs/timezone_db.h" -#include "runtime/buffered_block_mgr.h" #include "runtime/buffered_block_mgr2.h" #include "runtime/bufferpool/reservation_util.h" #include "runtime/descriptors.h" @@ -126,7 +125,6 @@ RuntimeState::RuntimeState(const TQueryGlobals& query_globals) } RuntimeState::~RuntimeState() { - _block_mgr.reset(); _block_mgr2.reset(); // close error log file if (_error_log_file != nullptr && _error_log_file->is_open()) { @@ -297,11 +295,8 @@ Status RuntimeState::init_buffer_poolstate() { } Status RuntimeState::create_block_mgr() { - DCHECK(_block_mgr.get() == NULL); DCHECK(_block_mgr2.get() == NULL); - RETURN_IF_ERROR(BufferedBlockMgr::create(this, config::sorter_block_size, &_block_mgr)); - int64_t block_mgr_limit = _query_mem_tracker->limit(); if (block_mgr_limit < 0) { block_mgr_limit = std::numeric_limits::max(); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index c3c487328c0026..cc88e5b9da599d 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -201,10 +201,6 @@ class RuntimeState { // on first use. Status create_codegen(); - BufferedBlockMgr* block_mgr() { - DCHECK(_block_mgr.get() != NULL); - return _block_mgr.get(); - } BufferedBlockMgr2* block_mgr2() { DCHECK(_block_mgr2.get() != NULL); return _block_mgr2.get(); @@ -467,6 +463,10 @@ class RuntimeState { return _query_options.disable_stream_preaggregations; } + bool enable_spill() const { + return _query_options.enable_spilling; + } + // the following getters are only valid after Prepare() InitialReservations* initial_reservations() const { return _initial_reservations; @@ -495,10 +495,6 @@ class RuntimeState { // Allow TestEnv to set block_mgr manually for testing. friend class TestEnv; - // Use a custom block manager for the query for testing purposes. - void set_block_mgr(const boost::shared_ptr& block_mgr) { - _block_mgr = block_mgr; - } // Use a custom block manager for the query for testing purposes. void set_block_mgr2(const boost::shared_ptr& block_mgr) { _block_mgr2 = block_mgr; @@ -580,7 +576,6 @@ class RuntimeState { // BufferedBlockMgr object used to allocate and manage blocks of input data in memory // with a fixed memory budget. // The block mgr is shared by all fragments for this query. - boost::shared_ptr _block_mgr; boost::shared_ptr _block_mgr2; // This is the node id of the root node for this plan fragment. This is used as the diff --git a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java index 70292740f50a7c..dda8da1864d270 100644 --- a/fe/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -507,6 +507,7 @@ public TQueryOptions toThrift() { if (maxPushdownConditionsPerColumn > -1) { tResult.setMax_pushdown_conditions_per_column(maxPushdownConditionsPerColumn); } + tResult.setEnable_spilling(enableSpilling); return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 8845c81c01dde7..0f5c231ad2d55e 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -136,6 +136,8 @@ struct TQueryOptions { // see BE config `max_pushdown_conditions_per_column` for details // if set, this will overwrite the BE config. 30: optional i32 max_pushdown_conditions_per_column + // whether enable spilling to disk + 31: optional bool enable_spilling = false; }