Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](S3FileWriter) Fix boundary issue when multipart upload #43037

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 31 additions & 23 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,12 @@ Status S3FileWriter::_build_upload_buffer() {
Status S3FileWriter::_close_impl() {
VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native();

if (_cur_part_num == 1 && _pending_buf) {
if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}

if (_bytes_appended == 0) {
DCHECK(_cur_part_num == 1);
DCHECK_EQ(_cur_part_num, 1);
// No data written, but need to create an empty file
RETURN_IF_ERROR(_build_upload_buffer());
if (!_used_by_s3_committer) {
Expand All @@ -220,10 +220,15 @@ Status S3FileWriter::_close_impl() {
}
}

if (_pending_buf != nullptr) {
if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded
_countdown_event.add_count();
RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
_pending_buf = nullptr;
} else if (_bytes_appended != 0) { // Non-empty file and has nothing to be uploaded
// NOTE: When the data size is a multiple of config::s3_write_buffer_size,
// _cur_part_num may exceed the actual number of parts that need to be uploaded.
// This is because it is incremented by 1 in advance within the S3FileWriter::appendv method.
_cur_part_num--;
}

RETURN_IF_ERROR(_complete());
Expand Down Expand Up @@ -327,26 +332,29 @@ Status S3FileWriter::_complete() {
_wait_until_finish("Complete");
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1",
std::make_pair(&_failed, &_completed_parts));
if (!_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side.
if (_failed || _completed_parts.size() != _cur_part_num) {
_st = Status::InternalError(
"error status {}, have failed {}, complete parts {}, cur part num {}, whole "
"parts {}, file path {}, file size {}, has left buffer {}",
_st, _failed, _completed_parts.size(), _cur_part_num, _dump_completed_part(),
_obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr);
LOG(WARNING) << _st;
return _st;
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts);
if (resp.status.code != ErrorCode::OK) {
LOG_WARNING("Compltet multi part upload failed because {}, file path {}",
resp.status.msg, _obj_storage_path_opts.path.native());
return {resp.status.code, std::move(resp.status.msg)};
}
if (_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side.
s3_file_created_total << 1; // Assume that it will be created successfully
return Status::OK();
}

if (_failed || _completed_parts.size() != _cur_part_num) {
_st = Status::InternalError(
"error status={} failed={} #complete_parts={} #expected_parts={} "
"completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}",
_st, _failed, _completed_parts.size(), _cur_part_num, _dump_completed_part(),
_obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr);
LOG(WARNING) << _st;
return _st;
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts);
if (resp.status.code != ErrorCode::OK) {
LOG_WARNING("Compltet multi part upload failed because {}, file path {}", resp.status.msg,
_obj_storage_path_opts.path.native());
return {resp.status.code, std::move(resp.status.msg)};
}
s3_file_created_total << 1;
return Status::OK();
Expand Down
Loading