Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pageutils will truncate the file if write failed #4925

Merged
merged 13 commits into from
May 19, 2022
21 changes: 15 additions & 6 deletions dbms/src/Storages/Page/PageUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ void ftruncateFile(T & file, off_t length)
DB::throwFromErrno(fmt::format("Cannot truncate file: {}. ", file->getFileName()), ErrorCodes::CANNOT_FTRUNCATE);
}


// TODO: split current api into V2 and V3.
// Too many args in this function.
// Also split read
template <typename T>
void writeFile(
T & file,
Expand All @@ -161,6 +163,7 @@ void writeFile(
size_t to_write,
const WriteLimiterPtr & write_limiter = nullptr,
const bool background = false,
const bool truncate_if_failed = true,
[[maybe_unused]] bool enable_failpoint = false)
{
if (write_limiter)
Expand Down Expand Up @@ -192,15 +195,21 @@ void writeFile(
{
ProfileEvents::increment(ProfileEvents::PSMWriteFailed);
auto saved_errno = errno;
// If error occurs, apply `ftruncate` try to truncate the broken bytes we have written.
// Note that the result of this ftruncate is ignored, there is nothing we can do to
// handle ftruncate error. The errno may change after ftruncate called.
int truncate_res = ::ftruncate(file->getFd(), offset);

int truncate_res = 0;
// If write failed in V3, Don't do truncate
if (truncate_if_failed)
{
// If error occurs, apply `ftruncate` try to truncate the broken bytes we have written.
// Note that the result of this ftruncate is ignored, there is nothing we can do to
// handle ftruncate error. The errno may change after ftruncate called.
truncate_res = ::ftruncate(file->getFd(), offset);
}
Comment on lines +199 to +207
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We introduce this ftruncate in #1934 for PageStorage V2 to handle error caused by incomplete write.
But for PageStorage V3, we perform random write in BlobFile, so we should not do ftruncate for V3

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR keeps the behavior not changed for V2, but disable the truncate after error happen for V3


DB::throwFromErrno(fmt::format("Cannot write to file {},[truncate_res = {}],[errno_after_truncate = {}],"
"[bytes_written={},to_write={},offset = {}]",
file->getFileName(),
truncate_res,
truncate_if_failed ? DB::toString(truncate_res) : "no need truncate",
strerror(errno),
bytes_written,
to_write,
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Storages/Page/V2/PageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ bool PageFile::LinkingMetaAdapter::linkToNewSequenceNext(WriteBatch::SequenceID

if (binary_version == PageFormat::V2)
{
const UInt64 num_fields = PageUtil::get<UInt64>(pos);
const auto num_fields = PageUtil::get<UInt64>(pos);
entry.field_offsets.reserve(num_fields);
for (size_t i = 0; i < num_fields; ++i)
{
Expand Down Expand Up @@ -649,7 +649,7 @@ void PageFile::MetaMergingReader::moveNext(PageFormat::Version * v)

if (binary_version == PageFormat::V2)
{
const UInt64 num_fields = PageUtil::get<UInt64>(pos);
const auto num_fields = PageUtil::get<UInt64>(pos);
entry.field_offsets.reserve(num_fields);
for (size_t i = 0; i < num_fields; ++i)
{
Expand Down Expand Up @@ -792,7 +792,15 @@ size_t PageFile::Writer::write(DB::WriteBatch & wb, PageEntriesEdit & edit, cons
SCOPE_EXIT({ page_file.free(data_buf.begin(), data_buf.size()); });

auto write_buf = [&](WritableFilePtr & file, UInt64 offset, ByteBuffer buf, bool enable_failpoint) {
PageUtil::writeFile(file, offset, buf.begin(), buf.size(), write_limiter, background, enable_failpoint);
PageUtil::writeFile(
file,
offset,
buf.begin(),
buf.size(),
write_limiter,
background,
/*truncate_if_failed=*/true,
/*enable_failpoint=*/enable_failpoint);
if (sync_on_write)
PageUtil::syncFile(file);
};
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Storages/Page/V2/tests/gtest_page_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace tests
{
static const std::string FileName = "page_util_test";

TEST(PageUtils_test, ReadWriteFile)
TEST(PageUtilsTest, ReadWriteFile)
{
::remove(FileName.c_str());

Expand All @@ -41,7 +41,7 @@ TEST(PageUtils_test, ReadWriteFile)
buff_write[i] = i % 0xFF;
}
WritableFilePtr file_for_write = std::make_shared<PosixWritableFile>(FileName, true, -1, 0666);
PageUtil::writeFile(file_for_write, 0, buff_write, buff_size, /*write_limiter*/ nullptr, /*background*/ false, /*enable_failpoint*/ true);
PageUtil::writeFile(file_for_write, 0, buff_write, buff_size, /*write_limiter*/ nullptr, /*background*/ false, /*truncate_if_failed*/ true, /*enable_failpoint*/ true);
PageUtil::syncFile(file_for_write);
file_for_write->close();

Expand All @@ -53,15 +53,15 @@ TEST(PageUtils_test, ReadWriteFile)
::remove(FileName.c_str());
}

TEST(PageUtils_test, FileNotExists)
TEST(PageUtilsTest, FileNotExists)
{
::remove(FileName.c_str());

int fd = PageUtil::openFile<true, false>(FileName);
ASSERT_EQ(fd, 0);
}

TEST(PageUtils_test, BigReadWriteFile)
TEST(PageUtilsTest, BigReadWriteFile)
{
::remove(FileName.c_str());

Expand All @@ -78,7 +78,7 @@ TEST(PageUtils_test, BigReadWriteFile)
buff_write[i] = i % 0xFF;
}

PageUtil::writeFile(file_for_write, 0, buff_write, buff_size, nullptr, /*background*/ false, /*enable_failpoint*/ false);
PageUtil::writeFile(file_for_write, 0, buff_write, buff_size, nullptr, /*background*/ false, /*truncate_if_failed*/ true, /*enable_failpoint*/ false);
PageUtil::syncFile(file_for_write);
file_for_write->close();

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/Page/V3/BlobFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ void BlobFile::write(char * buffer, size_t offset, size_t size, const WriteLimit
});

#ifndef NDEBUG
PageUtil::writeFile(wrfile, offset, buffer, size, write_limiter, background, true);
PageUtil::writeFile(wrfile, offset, buffer, size, write_limiter, background, /*truncate_if_failed=*/false, /*enable_failpoint=*/true);
#else
PageUtil::writeFile(wrfile, offset, buffer, size, write_limiter, background, false);
PageUtil::writeFile(wrfile, offset, buffer, size, write_limiter, background, /*truncate_if_failed=*/false, /*enable_failpoint=*/false);
#endif
PageUtil::syncFile(wrfile);

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ PageEntriesEdit BlobStore::write(DB::WriteBatch & wb, const WriteLimiterPtr & wr
catch (DB::Exception & e)
{
removePosFromStats(blob_id, offset_in_file, actually_allocated_size);
LOG_FMT_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] [actually_allocated_size={}] write failed.", blob_id, offset_in_file, all_page_data_size, actually_allocated_size);
LOG_FMT_ERROR(log, "[blob_id={}] [offset_in_file={}] [size={}] [actually_allocated_size={}] write failed [error={}]", blob_id, offset_in_file, all_page_data_size, actually_allocated_size, e.message());
throw e;
}

Expand Down
9 changes: 8 additions & 1 deletion dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,14 @@ size_t LogWriter::writtenBytes() const

void LogWriter::flush(const WriteLimiterPtr & write_limiter, const bool background)
{
PageUtil::writeFile(log_file, written_bytes, write_buffer.buffer().begin(), write_buffer.offset(), write_limiter, /*background*/ background, /*enable_failpoint*/ false);
PageUtil::writeFile(log_file,
written_bytes,
write_buffer.buffer().begin(),
write_buffer.offset(),
write_limiter,
/*background=*/background,
/*truncate_if_failed=*/false,
/*enable_failpoint=*/false);
log_file->fsync();
written_bytes += write_buffer.offset();

Expand Down