Skip to content

Commit

Permalink
[fix](S3FileWriter) Fix boundary issue when multipart upload (#43037)
Browse files Browse the repository at this point in the history
When the file data size is a multiple of config::s3_write_buffer_size,
number of parts may exceed the actual number of parts that need to be
uploaded. This is because it is incremented by 1 in advance within the
S3FileWriter::appendv method.
  • Loading branch information
gavinchou authored Nov 5, 2024
1 parent 9e29566 commit be8b828
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,12 @@ Status S3FileWriter::_build_upload_buffer() {
Status S3FileWriter::_close_impl() {
VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native();

if (_cur_part_num == 1 && _pending_buf) {
if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}

if (_bytes_appended == 0) {
DCHECK(_cur_part_num == 1);
DCHECK_EQ(_cur_part_num, 1);
// No data written, but need to create an empty file
RETURN_IF_ERROR(_build_upload_buffer());
if (!_used_by_s3_committer) {
Expand All @@ -220,10 +220,15 @@ Status S3FileWriter::_close_impl() {
}
}

if (_pending_buf != nullptr) {
if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded
_countdown_event.add_count();
RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf)));
_pending_buf = nullptr;
} else if (_bytes_appended != 0) { // Non-empty file and has nothing to be uploaded
// NOTE: When the data size is a multiple of config::s3_write_buffer_size,
// _cur_part_num may exceed the actual number of parts that need to be uploaded.
// This is because it is incremented by 1 in advance within the S3FileWriter::appendv method.
_cur_part_num--;
}

RETURN_IF_ERROR(_complete());
Expand Down Expand Up @@ -327,26 +332,29 @@ Status S3FileWriter::_complete() {
_wait_until_finish("Complete");
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1",
std::make_pair(&_failed, &_completed_parts));
if (!_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side.
if (_failed || _completed_parts.size() != _cur_part_num) {
_st = Status::InternalError(
"error status {}, have failed {}, complete parts {}, cur part num {}, whole "
"parts {}, file path {}, file size {}, has left buffer {}",
_st, _failed, _completed_parts.size(), _cur_part_num, _dump_completed_part(),
_obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr);
LOG(WARNING) << _st;
return _st;
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts);
if (resp.status.code != ErrorCode::OK) {
LOG_WARNING("Compltet multi part upload failed because {}, file path {}",
resp.status.msg, _obj_storage_path_opts.path.native());
return {resp.status.code, std::move(resp.status.msg)};
}
if (_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side.
s3_file_created_total << 1; // Assume that it will be created successfully
return Status::OK();
}

if (_failed || _completed_parts.size() != _cur_part_num) {
_st = Status::InternalError(
"error status={} failed={} #complete_parts={} #expected_parts={} "
"completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}",
_st, _failed, _completed_parts.size(), _cur_part_num, _dump_completed_part(),
_obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr);
LOG(WARNING) << _st;
return _st;
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1.part_num < p2.part_num; });
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts);
auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts);
if (resp.status.code != ErrorCode::OK) {
LOG_WARNING("Compltet multi part upload failed because {}, file path {}", resp.status.msg,
_obj_storage_path_opts.path.native());
return {resp.status.code, std::move(resp.status.msg)};
}
s3_file_created_total << 1;
return Status::OK();
Expand Down

0 comments on commit be8b828

Please sign in to comment.