diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 24b72a4b6c902c..e40b9e171eb08f 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -204,12 +204,12 @@ Status S3FileWriter::_build_upload_buffer() { Status S3FileWriter::_close_impl() { VLOG_DEBUG << "S3FileWriter::close, path: " << _obj_storage_path_opts.path.native(); - if (_cur_part_num == 1 && _pending_buf) { + if (_cur_part_num == 1 && _pending_buf) { // data size is less than config::s3_write_buffer_size RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size()); } if (_bytes_appended == 0) { - DCHECK(_cur_part_num == 1); + DCHECK_EQ(_cur_part_num, 1); // No data written, but need to create an empty file RETURN_IF_ERROR(_build_upload_buffer()); if (!_used_by_s3_committer) { @@ -220,10 +220,15 @@ Status S3FileWriter::_close_impl() { } } - if (_pending_buf != nullptr) { + if (_pending_buf != nullptr) { // there is remaining data in buffer need to be uploaded _countdown_event.add_count(); RETURN_IF_ERROR(FileBuffer::submit(std::move(_pending_buf))); _pending_buf = nullptr; + } else if (_bytes_appended != 0) { // Non-empty file and has nothing to be uploaded + // NOTE: When the data size is a multiple of config::s3_write_buffer_size, + // _cur_part_num may exceed the actual number of parts that need to be uploaded. + // This is because it is incremented by 1 in advance within the S3FileWriter::appendv method. + _cur_part_num--; } RETURN_IF_ERROR(_complete()); @@ -327,26 +332,29 @@ Status S3FileWriter::_complete() { _wait_until_finish("Complete"); TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:1", std::make_pair(&_failed, &_completed_parts)); - if (!_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side. - if (_failed || _completed_parts.size() != _cur_part_num) { - _st = Status::InternalError( - "error status {}, have failed {}, complete parts {}, cur part num {}, whole " - "parts {}, file path {}, file size {}, has left buffer {}", - _st, _failed, _completed_parts.size(), _cur_part_num, _dump_completed_part(), - _obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr); - LOG(WARNING) << _st; - return _st; - } - // make sure _completed_parts are ascending order - std::sort(_completed_parts.begin(), _completed_parts.end(), - [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; }); - TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts); - auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts); - if (resp.status.code != ErrorCode::OK) { - LOG_WARNING("Compltet multi part upload failed because {}, file path {}", - resp.status.msg, _obj_storage_path_opts.path.native()); - return {resp.status.code, std::move(resp.status.msg)}; - } + if (_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side. + s3_file_created_total << 1; // Assume that it will be created successfully + return Status::OK(); + } + + if (_failed || _completed_parts.size() != _cur_part_num) { + _st = Status::InternalError( + "error status={} failed={} #complete_parts={} #expected_parts={} " + "completed_parts_list={} file_path={} file_size={} has left buffer not uploaded={}", + _st, _failed, _completed_parts.size(), _cur_part_num, _dump_completed_part(), + _obj_storage_path_opts.path.native(), _bytes_appended, _pending_buf != nullptr); + LOG(WARNING) << _st; + return _st; + } + // make sure _completed_parts are ascending order + std::sort(_completed_parts.begin(), _completed_parts.end(), + [](auto& p1, auto& p2) { return p1.part_num < p2.part_num; }); + TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2", &_completed_parts); + auto resp = client->complete_multipart_upload(_obj_storage_path_opts, _completed_parts); + if (resp.status.code != ErrorCode::OK) { + LOG_WARNING("Compltet multi part upload failed because {}, file path {}", resp.status.msg, + _obj_storage_path_opts.path.native()); + return {resp.status.code, std::move(resp.status.msg)}; } s3_file_created_total << 1; return Status::OK();