Skip to content

Commit

Permalink
[enhancement](exception) catch exception for streamload and validate …
Browse files Browse the repository at this point in the history
…column (#40092)
  • Loading branch information
yiguolei authored Aug 29, 2024
1 parent 5618668 commit 6fae472
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 67 deletions.
58 changes: 30 additions & 28 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,37 +237,39 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());

int64_t start_read_data_time = MonotonicNanos();
Status st = ctx->allocate_schema_buffer();
if (!st.ok()) {
ctx->status = st;
return;
}
while (evbuffer_get_length(evbuf) > 0) {
try {
auto bb = ByteBuffer::allocate(128 * 1024);
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
}
if (!st.ok() && !ctx->status.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
} catch (const doris::Exception& e) {
if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
ctx->status = Status::MemoryLimitExceeded(
fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string()));
ByteBufferPtr bb;
st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
st = ctx->body_sink->append(bb);
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
ctx->status = Status::Error<false>(e.code(), e.to_string());
}
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
}
// after all the data has been read and it has not reached 1M, it will execute here
if (ctx->is_read_schema) {
Expand Down
35 changes: 16 additions & 19 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,25 +344,22 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {

int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
try {
auto bb = ByteBuffer::allocate(128 * 1024);
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
} catch (const doris::Exception& e) {
if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
ctx->status = Status::MemoryLimitExceeded(
fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string()));
}
ctx->status = Status::Error<false>(e.code(), e.to_string());
}
ByteBufferPtr bb;
Status st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
st = ctx->body_sink->append(bb);
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
}
int64_t read_data_time = MonotonicNanos() - start_read_data_time;
int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos;
Expand Down
1 change: 1 addition & 0 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}
if (need_schema) {
RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer());
// Here, a portion of the data is processed to parse column information
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
ByteBufferPtr buf;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(buf, 128 * 1024));
RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf));
buf->put_bytes(data, size);
buf->flip();
return _append(buf, proto_byte_size);
Expand Down Expand Up @@ -148,7 +148,7 @@ Status StreamLoadPipe::append(const char* data, size_t size) {
size_t chunk_size = std::max(_min_chunk_size, size - pos);
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(_write_buf, chunk_size));
RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf));
_write_buf->put_bytes(data + pos, size - pos);
return Status::OK();
}
Expand Down
8 changes: 5 additions & 3 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,17 @@ class StreamLoadContext {

bool is_mow_table() const;

ByteBufferPtr schema_buffer() {
Status allocate_schema_buffer() {
if (_schema_buffer == nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->stream_load_pipe_tracker());
_schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size);
return ByteBuffer::allocate(config::stream_tvf_buffer_size, &_schema_buffer);
}
return _schema_buffer;
return Status::OK();
}

ByteBufferPtr schema_buffer() { return _schema_buffer; }

public:
static const int default_txn_id = -1;
// load type, eg: ROUTINE LOAD/MANUAL LOAD
Expand Down
9 changes: 2 additions & 7 deletions be/src/util/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,8 @@ struct ByteBuffer;
using ByteBufferPtr = std::shared_ptr<ByteBuffer>;

struct ByteBuffer : private Allocator<false> {
static ByteBufferPtr allocate(size_t size) {
ByteBufferPtr ptr(new ByteBuffer(size));
return ptr;
}

static Status create_and_allocate(ByteBufferPtr& ptr, size_t size) {
ptr = ByteBufferPtr(new ByteBuffer(size));
static Status allocate(const size_t size, ByteBufferPtr* ptr) {
RETURN_IF_CATCH_EXCEPTION({ *ptr = ByteBufferPtr(new ByteBuffer(size)); });
return Status::OK();
}

Expand Down
11 changes: 5 additions & 6 deletions be/src/vec/sink/vtablet_block_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,11 @@ DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip
return DecimalType(value);
}

Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const TypeDescriptor& type,
bool is_nullable, vectorized::ColumnPtr column,
size_t slot_index, bool* stop_processing,
fmt::memory_buffer& error_prefix,
const uint32_t row_count,
vectorized::IColumn::Permutation* rows) {
Status OlapTableBlockConvertor::_internal_validate_column(
RuntimeState* state, const TypeDescriptor& type, bool is_nullable,
vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing,
fmt::memory_buffer& error_prefix, const uint32_t row_count,
vectorized::IColumn::Permutation* rows) {
DCHECK((rows == nullptr) || (rows->size() == row_count));
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](int row) {
Expand Down
13 changes: 12 additions & 1 deletion be/src/vec/sink/vtablet_block_convertor.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,18 @@ class OlapTableBlockConvertor {
Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable,
vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing,
fmt::memory_buffer& error_prefix, const uint32_t row_count,
vectorized::IColumn::Permutation* rows = nullptr);
vectorized::IColumn::Permutation* rows = nullptr) {
RETURN_IF_CATCH_EXCEPTION({
return _internal_validate_column(state, type, is_nullable, column, slot_index,
stop_processing, error_prefix, row_count, rows);
});
}

Status _internal_validate_column(RuntimeState* state, const TypeDescriptor& type,
bool is_nullable, vectorized::ColumnPtr column,
size_t slot_index, bool* stop_processing,
fmt::memory_buffer& error_prefix, const uint32_t row_count,
vectorized::IColumn::Permutation* rows = nullptr);

// make input data valid for OLAP table
// return number of invalid/filtered rows.
Expand Down
3 changes: 2 additions & 1 deletion be/test/util/byte_buffer2_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class ByteBufferTest : public testing::Test {
};

TEST_F(ByteBufferTest, normal) {
auto buf = ByteBuffer::allocate(4);
ByteBufferPtr buf;
Status st = ByteBuffer::allocate(4, &buf);
EXPECT_EQ(0, buf->pos);
EXPECT_EQ(4, buf->limit);
EXPECT_EQ(4, buf->capacity);
Expand Down

0 comments on commit 6fae472

Please sign in to comment.