Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](exception) catch exception for streamload and validate column #40092

Merged
merged 8 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 30 additions & 28 deletions be/src/http/action/http_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,37 +237,39 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());

int64_t start_read_data_time = MonotonicNanos();
Status st = ctx->allocate_schema_buffer();
if (!st.ok()) {
ctx->status = st;
return;
}
while (evbuffer_get_length(evbuf) > 0) {
try {
auto bb = ByteBuffer::allocate(128 * 1024);
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
}
if (!st.ok() && !ctx->status.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
} catch (const doris::Exception& e) {
if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
ctx->status = Status::MemoryLimitExceeded(
fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string()));
ByteBufferPtr bb;
st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
st = ctx->body_sink->append(bb);
// schema_buffer stores 1M of data for parsing column information
// need to determine whether to cache for the first time
if (ctx->is_read_schema) {
if (ctx->schema_buffer()->pos + remove_bytes < config::stream_tvf_buffer_size) {
ctx->schema_buffer()->put_bytes(bb->ptr, remove_bytes);
} else {
LOG(INFO) << "use a portion of data to request fe to obtain column information";
ctx->is_read_schema = false;
ctx->status = process_put(req, ctx);
}
ctx->status = Status::Error<false>(e.code(), e.to_string());
}
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
}
// after all the data has been read and it has not reached 1M, it will execute here
if (ctx->is_read_schema) {
Expand Down
35 changes: 16 additions & 19 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,25 +344,22 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {

int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
try {
auto bb = ByteBuffer::allocate(128 * 1024);
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
} catch (const doris::Exception& e) {
if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
ctx->status = Status::MemoryLimitExceeded(
fmt::format("PreCatch error code:{}, {}, ", e.code(), e.to_string()));
}
ctx->status = Status::Error<false>(e.code(), e.to_string());
}
ByteBufferPtr bb;
Status st = ByteBuffer::allocate(128 * 1024, &bb);
if (!st.ok()) {
ctx->status = st;
return;
}
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
st = ctx->body_sink->append(bb);
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st << ", " << ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
}
int64_t read_data_time = MonotonicNanos() - start_read_data_time;
int64_t last_receive_and_read_data_cost_nanos = ctx->receive_and_read_data_cost_nanos;
Expand Down
1 change: 1 addition & 0 deletions be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ Status FileFactory::create_pipe_reader(const TUniqueId& load_id, io::FileReaderS
return Status::InternalError("unknown stream load id: {}", UniqueId(load_id).to_string());
}
if (need_schema) {
RETURN_IF_ERROR(stream_load_ctx->allocate_schema_buffer());
// Here, a portion of the data is processed to parse column information
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
Expand Down
4 changes: 2 additions & 2 deletions be/src/io/fs/stream_load_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t proto_byte_size) {
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
ByteBufferPtr buf;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(buf, 128 * 1024));
RETURN_IF_ERROR(ByteBuffer::allocate(128 * 1024, &buf));
buf->put_bytes(data, size);
buf->flip();
return _append(buf, proto_byte_size);
Expand Down Expand Up @@ -148,7 +148,7 @@ Status StreamLoadPipe::append(const char* data, size_t size) {
size_t chunk_size = std::max(_min_chunk_size, size - pos);
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(_write_buf, chunk_size));
RETURN_IF_ERROR(ByteBuffer::allocate(chunk_size, &_write_buf));
_write_buf->put_bytes(data + pos, size - pos);
return Status::OK();
}
Expand Down
8 changes: 5 additions & 3 deletions be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,17 @@ class StreamLoadContext {

bool is_mow_table() const;

ByteBufferPtr schema_buffer() {
Status allocate_schema_buffer() {
if (_schema_buffer == nullptr) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
ExecEnv::GetInstance()->stream_load_pipe_tracker());
_schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size);
return ByteBuffer::allocate(config::stream_tvf_buffer_size, &_schema_buffer);
}
return _schema_buffer;
return Status::OK();
}

ByteBufferPtr schema_buffer() { return _schema_buffer; }

public:
static const int default_txn_id = -1;
// load type, eg: ROUTINE LOAD/MANUAL LOAD
Expand Down
9 changes: 2 additions & 7 deletions be/src/util/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,8 @@ struct ByteBuffer;
using ByteBufferPtr = std::shared_ptr<ByteBuffer>;

struct ByteBuffer : private Allocator<false> {
static ByteBufferPtr allocate(size_t size) {
ByteBufferPtr ptr(new ByteBuffer(size));
return ptr;
}

static Status create_and_allocate(ByteBufferPtr& ptr, size_t size) {
ptr = ByteBufferPtr(new ByteBuffer(size));
static Status allocate(const size_t size, ByteBufferPtr* ptr) {
RETURN_IF_CATCH_EXCEPTION({ *ptr = ByteBufferPtr(new ByteBuffer(size)); });
return Status::OK();
}

Expand Down
11 changes: 5 additions & 6 deletions be/src/vec/sink/vtablet_block_convertor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,11 @@ DecimalType OlapTableBlockConvertor::_get_decimalv3_min_or_max(const TypeDescrip
return DecimalType(value);
}

Status OlapTableBlockConvertor::_validate_column(RuntimeState* state, const TypeDescriptor& type,
bool is_nullable, vectorized::ColumnPtr column,
size_t slot_index, bool* stop_processing,
fmt::memory_buffer& error_prefix,
const uint32_t row_count,
vectorized::IColumn::Permutation* rows) {
Status OlapTableBlockConvertor::_internal_validate_column(
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
yiguolei marked this conversation as resolved.
Show resolved Hide resolved
RuntimeState* state, const TypeDescriptor& type, bool is_nullable,
vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing,
fmt::memory_buffer& error_prefix, const uint32_t row_count,
vectorized::IColumn::Permutation* rows) {
DCHECK((rows == nullptr) || (rows->size() == row_count));
fmt::memory_buffer error_msg;
auto set_invalid_and_append_error_msg = [&](int row) {
Expand Down
13 changes: 12 additions & 1 deletion be/src/vec/sink/vtablet_block_convertor.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,18 @@ class OlapTableBlockConvertor {
Status _validate_column(RuntimeState* state, const TypeDescriptor& type, bool is_nullable,
vectorized::ColumnPtr column, size_t slot_index, bool* stop_processing,
fmt::memory_buffer& error_prefix, const uint32_t row_count,
vectorized::IColumn::Permutation* rows = nullptr);
vectorized::IColumn::Permutation* rows = nullptr) {
RETURN_IF_CATCH_EXCEPTION({
return _internal_validate_column(state, type, is_nullable, column, slot_index,
stop_processing, error_prefix, row_count, rows);
});
}

Status _internal_validate_column(RuntimeState* state, const TypeDescriptor& type,
bool is_nullable, vectorized::ColumnPtr column,
size_t slot_index, bool* stop_processing,
fmt::memory_buffer& error_prefix, const uint32_t row_count,
vectorized::IColumn::Permutation* rows = nullptr);

// make input data valid for OLAP table
// return number of invalid/filtered rows.
Expand Down
3 changes: 2 additions & 1 deletion be/test/util/byte_buffer2_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class ByteBufferTest : public testing::Test {
};

TEST_F(ByteBufferTest, normal) {
auto buf = ByteBuffer::allocate(4);
ByteBufferPtr buf;
Status st = ByteBuffer::allocate(4, &buf);
EXPECT_EQ(0, buf->pos);
EXPECT_EQ(4, buf->limit);
EXPECT_EQ(4, buf->capacity);
Expand Down
Loading