Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions be/src/exec/analytic_eval_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "exprs/agg_fn_evaluator.h"
#include "exprs/anyval_util.h"

#include "runtime/buffered_tuple_stream.h"
#include "runtime/descriptors.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
Expand Down Expand Up @@ -195,9 +194,19 @@ Status AnalyticEvalNode::open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
//RETURN_IF_ERROR(QueryMaintenance(state));
RETURN_IF_ERROR(child(0)->open(state));
// RETURN_IF_ERROR(state->block_mgr()->RegisterClient(2, mem_tracker(), state, &client_));
_input_stream.reset(new BufferedTupleStream(state, child(0)->row_desc(), state->block_mgr()));
RETURN_IF_ERROR(_input_stream->init(runtime_profile()));
RETURN_IF_ERROR(state->block_mgr2()->register_client(2, mem_tracker(), state, &client_));
_input_stream.reset(new BufferedTupleStream2(state, child(0)->row_desc(), state->block_mgr2(), client_, false, true));
RETURN_IF_ERROR(_input_stream->init(id(), runtime_profile(), true));

bool got_read_buffer;
RETURN_IF_ERROR(_input_stream->prepare_for_read(true, &got_read_buffer));
if (!got_read_buffer) {
std::string msg("Failed to acquire initial read buffer for analytic function "
"evaluation. Reducing query concurrency or increasing the memory limit may "
"help this query to complete successfully.");
return mem_tracker()->MemLimitExceeded(state, msg, -1);
}

DCHECK_EQ(_evaluators.size(), _fn_ctxs.size());

for (int i = 0; i < _evaluators.size(); ++i) {
Expand Down Expand Up @@ -673,19 +682,20 @@ Status AnalyticEvalNode::process_child_batch(RuntimeState* state) {

try_add_result_tuple_for_curr_row(stream_idx, row);

Status status = Status::OK();
// Buffer the entire input row to be returned later with the analytic eval results.
if (UNLIKELY(!_input_stream->add_row(row))) {
if (UNLIKELY(!_input_stream->add_row(row, &status))) {
// AddRow returns false if an error occurs (available via status()) or there is
// not enough memory (status() is OK). If there isn't enough memory, we unpin
// the stream and continue writing/reading in unpinned mode.
// TODO: Consider re-pinning later if the output stream is fully consumed.
RETURN_IF_ERROR(_input_stream->status());
// RETURN_IF_ERROR(_input_stream->UnpinStream());
RETURN_IF_ERROR(status);
RETURN_IF_ERROR(_input_stream->unpin_stream());
VLOG_FILE << id() << " Unpin input stream while adding row idx=" << stream_idx;

if (!_input_stream->add_row(row)) {
if (!_input_stream->add_row(row, &status)) {
// Rows should be added in unpinned mode unless an error occurs.
RETURN_IF_ERROR(_input_stream->status());
RETURN_IF_ERROR(status);
DCHECK(false);
}
}
Expand Down
9 changes: 5 additions & 4 deletions be/src/exec/analytic_eval_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
#include "exec/exec_node.h"
#include "exprs/expr.h"
//#include "exprs/expr_context.h"
#include "runtime/buffered_block_mgr.h"
#include "runtime/buffered_tuple_stream.h"
#include "runtime/buffered_block_mgr2.h"
#include "runtime/buffered_tuple_stream2.inline.h"
#include "runtime/buffered_tuple_stream2.h"
#include "runtime/tuple.h"
#include "thrift/protocol/TDebugProtocol.h"

Expand Down Expand Up @@ -307,7 +308,7 @@ class AnalyticEvalNode : public ExecNode {
boost::scoped_ptr<RowBatch> _curr_child_batch;

// Block manager client used by _input_stream. Not owned.
// BufferedBlockMgr::Client* client_;
BufferedBlockMgr2::Client* client_;

// Buffers input rows added in process_child_batch() until enough rows are able to
// be returned by get_next_output_batch(), in which case row batches are returned from
Expand All @@ -317,7 +318,7 @@ class AnalyticEvalNode : public ExecNode {
// buffered data exceeds the available memory in the underlying BufferedBlockMgr,
// _input_stream is unpinned (i.e., possibly spilled to disk if necessary).
// TODO: Consider re-pinning unpinned streams when possible.
boost::scoped_ptr<BufferedTupleStream> _input_stream;
boost::scoped_ptr<BufferedTupleStream2> _input_stream;

// Pool used for O(1) allocations that live until close.
boost::scoped_ptr<MemPool> _mem_pool;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/exchange_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ Status ExchangeNode::close(RuntimeState* state) {
if (_stream_recvr != NULL) {
_stream_recvr->close();
}
_stream_recvr.reset();
// _stream_recvr.reset();
return ExecNode::close(state);
}

Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/runtime")

set(RUNTIME_FILES
broker_mgr.cpp
buffered_block_mgr.cpp
buffered_tuple_stream.cpp
buffered_tuple_stream_ir.cpp
buffer_control_block.cpp
merge_sorter.cpp
client_cache.cpp
data_stream_mgr.cpp
data_stream_sender.cpp
Expand Down
94 changes: 0 additions & 94 deletions be/src/runtime/buffered_block_mgr.cpp

This file was deleted.

170 changes: 0 additions & 170 deletions be/src/runtime/buffered_block_mgr.h

This file was deleted.

Loading