diff --git a/be/src/http/action/http_stream.cpp b/be/src/http/action/http_stream.cpp index 7dbae6df731d84..c6176c52815459 100644 --- a/be/src/http/action/http_stream.cpp +++ b/be/src/http/action/http_stream.cpp @@ -237,37 +237,39 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) { SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); int64_t start_read_data_time = MonotonicNanos(); + Status st = ctx->allocate_schema_buffer(); + if (!st.ok()) { + ctx->status = st; + return; + } while (evbuffer_get_length(evbuf) > 0) { - try { - auto bb = ByteBuffer::allocate(128 * 1024); - auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); - bb->pos = remove_bytes; - bb->flip(); - auto st = ctx->body_sink->append(bb); - // schema_buffer stores 1M of data for parsing column information - // need to determine whether to cache for the first time - if (ctx->is_read_schema) { - if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { - ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); - } else { - LOG(INFO) << "use a portion of data to request fe to obtain column information"; - ctx->is_read_schema = false; - ctx->status = process_put(req, ctx); - } - } - if (!st.ok() && !ctx->status.ok()) { - LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); - ctx->status = st; - return; - } - ctx->receive_bytes += remove_bytes; - } catch (const doris::Exception& e) { - if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { - ctx->status = Status::MemoryLimitExceeded( - fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string())); + ByteBufferPtr bb; + st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } + auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); + bb->pos = remove_bytes; + bb->flip(); + st = ctx->body_sink->append(bb); + // schema_buffer stores 1M of data for parsing column information + // need to determine whether to cache for the first time + if (ctx->is_read_schema) { + if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) { + ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes); + } else { + LOG(INFO) << "use a portion of data to request fe to obtain column information"; + ctx->is_read_schema = false; + ctx->status = process_put(req, ctx); } - ctx->status = Status::Error(e.code(), e.to_string()); } + if (!st.ok()) { + LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); + ctx->status = st; + return; + } + ctx->receive_bytes += remove_bytes; } // after all the data has been read and it has not reached 1M, it will execute here if (ctx->is_read_schema) { diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index d0c5dff2075c6f..1a9420dea637db 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -344,25 +344,22 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) { int64_t start_read_data_time = MonotonicNanos(); while (evbuffer_get_length(evbuf) > 0) { - try { - auto bb = ByteBuffer::allocate(128 * 1024); - auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); - bb->pos = remove_bytes; - bb->flip(); - auto st = ctx->body_sink->append(bb); - if (!st.ok()) { - LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); - ctx->status = st; - return; - } - ctx->receive_bytes += remove_bytes; - } catch (const doris::Exception& e) { - if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) { - ctx->status = Status::MemoryLimitExceeded( - fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string())); - } - ctx->status = Status::Error(e.code(), e.to_string()); - } + ByteBufferPtr bb; + Status st = ByteBuffer::allocate(128 * 1024, &bb); + if (!st.ok()) { + ctx->status = st; + return; + } + auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); + bb->pos = remove_bytes; + bb->flip(); + st = ctx->body_sink->append(bb); + if (!st.ok()) { + LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief(); + ctx->status = st; + return; + } + ctx->receive_bytes += remove_bytes; } int64_t read_data_time = MonotonicNanos() - start_read_data_time; int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos; diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp index f4ce573c53533d..86907886f17933 100644 --- a/be/src/io/file_factory.cpp +++ b/be/src/io/file_factory.cpp @@ -206,6 +206,7 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string()); } if (need_schema) { + RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer()); // Here, a portion of the data is processed to parse column information auto pipe = std::make_shared( io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */, diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index ce91a2e839113c..0dc27e009d08ee 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -113,7 +113,7 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr* data, size_t Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) { SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); ByteBufferPtr buf; - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(buf, 128 * 1024)); + RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf)); buf->put_bytes(data, size); buf->flip(); return _append(buf, proto_byte_size); @@ -148,7 +148,7 @@ Status StreamLoadPipe::append(const char* data, size_t size) { size_t chunk_size = std::max(_min_chunk_size, size - pos); chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size); SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker()); - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(_write_buf, chunk_size)); + RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf)); _write_buf->put_bytes(data + pos, size - pos); return Status::OK(); } diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index 95e56e0b3faf4b..9d1601372f877d 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -121,15 +121,17 @@ class StreamLoadContext { bool is_mow_table() const; - ByteBufferPtr schema_buffer() { + Status allocate_schema_buffer() { if (_schema_buffer == nullptr) { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( ExecEnv::GetInstance()->stream_load_pipe_tracker()); - _schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size); + return ByteBuffer::allocate(config::stream_tvf_buffer_size, &_schema_buffer); } - return _schema_buffer; + return Status::OK(); } + ByteBufferPtr schema_buffer() { return _schema_buffer; } + public: static const int default_txn_id = -1; // load type, eg: ROUTINE LOAD/MANUAL LOAD diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h index 1499f51c05316b..aafd4506087d76 100644 --- a/be/src/util/byte_buffer.h +++ b/be/src/util/byte_buffer.h @@ -34,13 +34,8 @@ struct ByteBuffer; using ByteBufferPtr = std::shared_ptr; struct ByteBuffer : private Allocator { - static ByteBufferPtr allocate(size_t size) { - ByteBufferPtr ptr(new ByteBuffer(size)); - return ptr; - } - - static Status create_and_allocate(ByteBufferPtr& ptr, size_t size) { - ptr = ByteBufferPtr(new ByteBuffer(size)); + static Status allocate(const size_t size, ByteBufferPtr* ptr) { + RETURN_IF_CATCH_EXCEPTION({ *ptr = ByteBufferPtr(new ByteBuffer(size)); }); return Status::OK(); } diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp b/be/src/vec/sink/vtablet_block_convertor.cpp index feb6633511ea76..617668c035af45 100644 --- a/be/src/vec/sink/vtablet_block_convertor.cpp +++ b/be/src/vec/sink/vtablet_block_convertor.cpp @@ -182,12 +182,11 @@ DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip return DecimalType(value); } -Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const TypeDescriptor& type, - bool is_nullable, vectorized::ColumnPtr column, - size_t slot_index, bool* stop_processing, - fmt::memory_buffer& error_prefix, - const uint32_t row_count, - vectorized::IColumn::Permutation* rows) { +Status OlapTableBlockConvertor::_internal_validate_column( + RuntimeState* state, const TypeDescriptor& type, bool is_nullable, + vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, + fmt::memory_buffer& error_prefix, const uint32_t row_count, + vectorized::IColumn::Permutation* rows) { DCHECK((rows == nullptr) || (rows->size() == row_count)); fmt::memory_buffer error_msg; auto set_invalid_and_append_error_msg = [&](int row) { diff --git a/be/src/vec/sink/vtablet_block_convertor.h b/be/src/vec/sink/vtablet_block_convertor.h index 0db340ce6c27d4..7f866c38032775 100644 --- a/be/src/vec/sink/vtablet_block_convertor.h +++ b/be/src/vec/sink/vtablet_block_convertor.h @@ -69,7 +69,18 @@ class OlapTableBlockConvertor { Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable, vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing, fmt::memory_buffer& error_prefix, const uint32_t row_count, - vectorized::IColumn::Permutation* rows = nullptr); + vectorized::IColumn::Permutation* rows = nullptr) { + RETURN_IF_CATCH_EXCEPTION({ + return _internal_validate_column(state, type, is_nullable, column, slot_index, + stop_processing, error_prefix, row_count, rows); + }); + } + + Status _internal_validate_column(RuntimeState* state, const TypeDescriptor& type, + bool is_nullable, vectorized::ColumnPtr column, + size_t slot_index, bool* stop_processing, + fmt::memory_buffer& error_prefix, const uint32_t row_count, + vectorized::IColumn::Permutation* rows = nullptr); // make input data valid for OLAP table // return number of invalid/filtered rows. diff --git a/be/test/util/byte_buffer2_test.cpp b/be/test/util/byte_buffer2_test.cpp index 04b62cd5fe8f0e..73c38c9e404340 100644 --- a/be/test/util/byte_buffer2_test.cpp +++ b/be/test/util/byte_buffer2_test.cpp @@ -32,7 +32,8 @@ class ByteBufferTest : public testing::Test { }; TEST_F(ByteBufferTest, normal) { - auto buf = ByteBuffer::allocate(4); + ByteBufferPtr buf; + Status st = ByteBuffer::allocate(4, &buf); EXPECT_EQ(0, buf->pos); EXPECT_EQ(4, buf->limit); EXPECT_EQ(4, buf->capacity);