Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pick exception handle logic to 2.1 #40547

Closed
wants to merge 7 commits into from
23 changes: 17 additions & 6 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,26 +234,37 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
struct evhttp_request* ev_req = req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);

SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());

int64_t start_read_data_time = MonotonicNanos();
Status st = ctx->allocate_schema_buffer();
if (!st.ok()) {
ctx->status = st;
return;
}
while (evbuffer_get_length(evbuf) > 0) {
auto bb = ByteBuffer::allocate(128 * 1024);
ByteBufferPtr bb;
st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
st = ctx->body_sink->append(bb);
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
if (ctx->schema_buffer->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
}

if (!st.ok() && !ctx->status.ok()) {
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
Expand Down
11 changes: 9 additions & 2 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,13 +339,20 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
struct evhttp_request* ev_req = req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);

SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());

int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
auto bb = ByteBuffer::allocate(128 * 1024);
ByteBufferPtr bb;
Status st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
st = ctx->body_sink->append(bb);
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
Expand Down
9 changes: 5 additions & 4 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,14 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
if (!stream_load_ctx) {
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}
if (need_schema == true) {
if (need_schema) {
RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer());
// Here, a portion of the data is processed to parse column information
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
stream_load_ctx->schema_buffer->pos /* total_length */);
stream_load_ctx->schema_buffer->flip();
RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer));
stream_load_ctx->schema_buffer()->pos /* total_length */);
stream_load_ctx->schema_buffer()->flip();
RETURN_IF_ERROR(pipe->append(stream_load_ctx->schema_buffer()));
RETURN_IF_ERROR(pipe->finish());
*file_reader = std::move(pipe);
} else {
Expand Down
7 changes: 5 additions & 2 deletions be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
}

Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) {
ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size + 1));
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
ByteBufferPtr buf;
RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf));
buf->put_bytes(data, size);
buf->flip();
return _append(buf, proto_byte_size);
Expand Down Expand Up @@ -145,7 +147,8 @@ Status StreamLoadPipe::append(const char* data, size_t size) {
// need to allocate a new chunk, min chunk is 64k
size_t chunk_size = std::max(_min_chunk_size, size - pos);
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
_write_buf = ByteBuffer::allocate(chunk_size);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf));
_write_buf->put_bytes(data + pos, size - pos);
return Status::OK();
}
Expand Down
7 changes: 6 additions & 1 deletion be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ bool MemTable::need_agg() const {
return false;
}

Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
if (_keys_type == KeysType::DUP_KEYS && _tablet_schema->num_key_columns() == 0) {
Expand All @@ -529,4 +529,9 @@ Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
return Status::OK();
}

Status MemTable::to_block(std::unique_ptr<vectorized::Block>* res) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_to_block(res));
return Status::OK();
}

} // namespace doris
3 changes: 3 additions & 0 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ class MemTable {
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
RowInBlock* row_in_skiplist);

// Used to wrapped by to_block to do exception handle logic
Status _to_block(std::unique_ptr<vectorized::Block>* res);

private:
int64_t _tablet_id;
bool _enable_unique_key_mow = false;
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_creator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Status SegmentFlusher::flush_single_block(const vectorized::Block* block, int32_
return Status::OK();
}

Status SegmentFlusher::_parse_variant_columns(vectorized::Block& block) {
Status SegmentFlusher::_internal_parse_variant_columns(vectorized::Block& block) {
size_t num_rows = block.rows();
if (num_rows == 0) {
return Status::OK();
Expand Down
6 changes: 5 additions & 1 deletion be/src/olap/rowset/segment_creator.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ class SegmentFlusher {
bool need_buffering();

private:
Status _parse_variant_columns(vectorized::Block& block);
// This method will catch exception when allocate memory failed
Status _parse_variant_columns(vectorized::Block& block) {
RETURN_IF_CATCH_EXCEPTION({ return _internal_parse_variant_columns(block); });
}
Status _internal_parse_variant_columns(vectorized::Block& block);
Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer,
const vectorized::Block* block, size_t row_offset, size_t row_num);
Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
Expand Down
14 changes: 8 additions & 6 deletions be/src/olap/rowset/segment_v2/binary_dict_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,19 +142,22 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
}
}

OwnedSlice BinaryDictPageBuilder::finish() {
Status BinaryDictPageBuilder::finish(OwnedSlice* slice) {
if (VLOG_DEBUG_IS_ON && _encoding_type == DICT_ENCODING) {
VLOG_DEBUG << "dict page size:" << _dict_builder->size();
}

DCHECK(!_finished);
_finished = true;

OwnedSlice data_slice = _data_page_builder->finish();
OwnedSlice data_slice;
RETURN_IF_ERROR(_data_page_builder->finish(&data_slice));
// TODO(gaodayue) separate page header and content to avoid this copy
_buffer.append(data_slice.slice().data, data_slice.slice().size);
RETURN_IF_CATCH_EXCEPTION(
{ _buffer.append(data_slice.slice().data, data_slice.slice().size); });
encode_fixed32_le(&_buffer[0], _encoding_type);
return _buffer.build();
*slice = _buffer.build();
return Status::OK();
}

Status BinaryDictPageBuilder::reset() {
Expand Down Expand Up @@ -183,8 +186,7 @@ uint64_t BinaryDictPageBuilder::size() const {
}

Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) {
*dictionary_page = _dict_builder->finish();
return Status::OK();
return _dict_builder->finish(dictionary_page);
}

Status BinaryDictPageBuilder::get_first_value(void* value) const {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/binary_dict_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class BinaryDictPageBuilder : public PageBuilderHelper<BinaryDictPageBuilder> {

Status add(const uint8_t* vals, size_t* count) override;

OwnedSlice finish() override;
Status finish(OwnedSlice* slice) override;

Status reset() override;

Expand Down
25 changes: 14 additions & 11 deletions be/src/olap/rowset/segment_v2/binary_plain_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,22 @@ class BinaryPlainPageBuilder : public PageBuilderHelper<BinaryPlainPageBuilder<T
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
DCHECK(!_finished);
_finished = true;
// Set up trailer
for (uint32_t _offset : _offsets) {
put_fixed32_le(&_buffer, _offset);
}
put_fixed32_le(&_buffer, _offsets.size());
if (_offsets.size() > 0) {
_copy_value_at(0, &_first_value);
_copy_value_at(_offsets.size() - 1, &_last_value);
}
return _buffer.build();
RETURN_IF_CATCH_EXCEPTION({
// Set up trailer
for (uint32_t _offset : _offsets) {
put_fixed32_le(&_buffer, _offset);
}
put_fixed32_le(&_buffer, _offsets.size());
if (_offsets.size() > 0) {
_copy_value_at(0, &_first_value);
_copy_value_at(_offsets.size() - 1, &_last_value);
}
*slice = _buffer.build();
});
return Status::OK();
}

Status reset() override {
Expand Down
23 changes: 13 additions & 10 deletions be/src/olap/rowset/segment_v2/binary_prefix_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,21 @@ Status BinaryPrefixPageBuilder::add(const uint8_t* vals, size_t* add_count) {
return Status::OK();
}

OwnedSlice BinaryPrefixPageBuilder::finish() {
Status BinaryPrefixPageBuilder::finish(OwnedSlice* slice) {
DCHECK(!_finished);
_finished = true;
put_fixed32_le(&_buffer, (uint32_t)_count);
uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
_buffer.append(&restart_point_internal, 1);
auto restart_point_size = _restart_points_offset.size();
for (uint32_t i = 0; i < restart_point_size; ++i) {
put_fixed32_le(&_buffer, _restart_points_offset[i]);
}
put_fixed32_le(&_buffer, restart_point_size);
return _buffer.build();
RETURN_IF_CATCH_EXCEPTION({
put_fixed32_le(&_buffer, (uint32_t)_count);
uint8_t restart_point_internal = RESTART_POINT_INTERVAL;
_buffer.append(&restart_point_internal, 1);
auto restart_point_size = _restart_points_offset.size();
for (uint32_t i = 0; i < restart_point_size; ++i) {
put_fixed32_le(&_buffer, _restart_points_offset[i]);
}
put_fixed32_le(&_buffer, restart_point_size);
*slice = _buffer.build();
});
return Status::OK();
}

const uint8_t* BinaryPrefixPageDecoder::_decode_value_lengths(const uint8_t* ptr, uint32_t* shared,
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/binary_prefix_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BinaryPrefixPageBuilder : public PageBuilderHelper<BinaryPrefixPageBuilder

Status add(const uint8_t* vals, size_t* add_count) override;

OwnedSlice finish() override;
Status finish(OwnedSlice* slice) override;

Status reset() override {
_restart_points_offset.clear();
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/bitshuffle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,13 @@ class BitshufflePageBuilder : public PageBuilderHelper<BitshufflePageBuilder<Typ
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
if (_count > 0) {
_first_value = cell(0);
_last_value = cell(_count - 1);
}
return _finish(SIZE_OF_TYPE);
RETURN_IF_CATCH_EXCEPTION({ *slice = _finish(SIZE_OF_TYPE); });
return Status::OK();
}

Status reset() override {
Expand Down
10 changes: 6 additions & 4 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ class NullBitmapBuilder {
// Returns whether the building nullmap contains nullptr
bool has_null() const { return _has_null; }

OwnedSlice finish() {
Status finish(OwnedSlice* slice) {
_rle_encoder.Flush();
return _bitmap_buf.build();
RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); });
return Status::OK();
}

void reset() {
Expand Down Expand Up @@ -723,14 +724,15 @@ Status ScalarColumnWriter::finish_current_page() {

// build data page body : encoded values + [nullmap]
std::vector<Slice> body;
OwnedSlice encoded_values = _page_builder->finish();
OwnedSlice encoded_values;
RETURN_IF_ERROR(_page_builder->finish(&encoded_values));
RETURN_IF_ERROR(_page_builder->reset());
body.push_back(encoded_values.slice());

OwnedSlice nullmap;
if (_null_bitmap_builder != nullptr) {
if (is_nullable() && _null_bitmap_builder->has_null()) {
nullmap = _null_bitmap_builder->finish();
RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap));
body.push_back(nullmap.slice());
}
_null_bitmap_builder->reset();
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/rowset/segment_v2/frame_of_reference_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,12 @@ class FrameOfReferencePageBuilder : public PageBuilderHelper<FrameOfReferencePag
return Status::OK();
}

OwnedSlice finish() override {
Status finish(OwnedSlice* slice) override {
DCHECK(!_finished);
_finished = true;
_encoder->flush();
return _buf.build();
RETURN_IF_CATCH_EXCEPTION({ *slice = _buf.build(); });
return Status::OK();
}

Status reset() override {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/indexed_column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ Status IndexedColumnWriter::_finish_current_data_page(size_t& num_val) {
ordinal_t first_ordinal = _num_values - num_values_in_page;

// IndexedColumn doesn't have NULLs, thus data page body only contains encoded values
OwnedSlice page_body = _data_page_builder->finish();
OwnedSlice page_body;
RETURN_IF_ERROR(_data_page_builder->finish(&page_body));
RETURN_IF_ERROR(_data_page_builder->reset());

PageFooterPB footer;
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/page_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class PageBuilder {

// Finish building the current page, return the encoded data.
// This api should be followed by reset() before reusing the builder
virtual OwnedSlice finish() = 0;
// It will return error status when memory allocated failed during finish
virtual Status finish(OwnedSlice* owned_slice) = 0;

// Get the dictionary page for dictionary encoding mode column.
virtual Status get_dictionary_page(OwnedSlice* dictionary_page) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ Status PageIO::write_page(io::FileWriter* writer, const std::vector<Slice>& body
return Status::OK();
}

Status PageIO::read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer) {
Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'read_and_decompress_page_' exceeds recommended size/complexity thresholds [readability-function-size]

Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
               ^
Additional context

be/src/olap/rowset/segment_v2/page_io.cpp:113: 115 lines including whitespace and comments (threshold 80)

Status PageIO::read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
               ^

Slice* body, PageFooterPB* footer) {
opts.sanity_check();
opts.stats->total_pages_num++;

Expand Down
11 changes: 10 additions & 1 deletion be/src/olap/rowset/segment_v2/page_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,17 @@ class PageIO {
// `handle' holds the memory of page data,
// `body' points to page body,
// `footer' stores the page footer.
// This method is exception safe, it will failed when allocate memory failed.
static Status read_and_decompress_page(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer);
Slice* body, PageFooterPB* footer) {
RETURN_IF_CATCH_EXCEPTION(
{ return read_and_decompress_page_(opts, handle, body, footer); });
}

private:
// An internal method that not deal with exception.
static Status read_and_decompress_page_(const PageReadOptions& opts, PageHandle* handle,
Slice* body, PageFooterPB* footer);
};

} // namespace segment_v2
Expand Down
Loading
Loading