Skip to content

Commit

Permalink
feat: Use Velox fs for ssd cache checkpoint file (#11783)
Browse files Browse the repository at this point in the history
Summary:

Switch the ssd cache checkpoint file to use Velox filesystem for file r/w operations, so that more advanced testing can be built by leveraging features like fault injections.

Differential Revision: D66892136
  • Loading branch information
zacw7 authored and facebook-github-bot committed Dec 10, 2024
1 parent b9cce6d commit 01a2da0
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 154 deletions.
228 changes: 114 additions & 114 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/common/base/SuccinctPrinter.h"
#include "velox/common/caching/FileIds.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/process/TraceContext.h"

#include <fcntl.h>
Expand All @@ -44,31 +45,6 @@ namespace facebook::velox::cache {

namespace {

// TODO: Remove this function once we migrate all files to velox fs.
//
// Disable 'copy on write' on the given file. Will throw if failed for any
// reason, including file system not supporting cow feature.
void disableCow(int32_t fd) {
#ifdef linux
int attr{0};
auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_GETFLAGS) failed: {}, {}",
res,
folly::errnoStr(errno));
attr |= FS_NOCOW_FL;
res = ioctl(fd, FS_IOC_SETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}",
res,
folly::errnoStr(errno));
#endif // linux
}

void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector<iovec>& iovecs) {
if (entry.tinyData() != nullptr) {
iovecs.push_back({entry.tinyData(), static_cast<size_t>(entry.size())});
Expand Down Expand Up @@ -668,7 +644,10 @@ void SsdFile::deleteCheckpoint(bool keepLog) {
evictLogWriteFile_->flush();
} else {
evictLogWriteFile_->close();
fs_->remove(getEvictLogFilePath());
const auto evictLogFilePath = getEvictLogFilePath();
if (fs_->exists(evictLogFilePath)) {
fs_->remove(evictLogFilePath);
}
evictLogWriteFile_.reset();
}
} catch (const std::exception& e) {
Expand All @@ -677,14 +656,19 @@ void SsdFile::deleteCheckpoint(bool keepLog) {
}
}

const auto checkpointPath = getCheckpointFilePath();
const auto checkpointRc = ::unlink(checkpointPath.c_str());
if (checkpointRc != 0) {
VELOX_SSD_CACHE_LOG(ERROR)
<< "Error in deleting checkpoint: " << checkpointRc;
}
if (checkpointRc != 0) {
++stats_.deleteCheckpointErrors;
if (checkpointWriteFile_ != nullptr) {
try {
checkpointWriteFile_->close();
const auto checkpointFilePath = getCheckpointFilePath();
if (fs_->exists(checkpointFilePath)) {
fs_->remove(checkpointFilePath);
}
checkpointWriteFile_.reset();
} catch (const std::exception& e) {
++stats_.deleteCheckpointErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< "Error in deleting checkpoint: " << e.what();
}
}
}

Expand Down Expand Up @@ -724,13 +708,6 @@ void SsdFile::checkpoint(bool force) {
checkpointDeleted_ = false;
bytesAfterCheckpoint_ = 0;
try {
const auto checkRc = [&](int32_t rc, const std::string& errMsg) {
if (rc < 0) {
VELOX_FAIL("{} with rc {} :{}", errMsg, rc, folly::errnoStr(errno));
}
return rc;
};

// We schedule the potentially long fsync of the cache file on another
// thread of the cache write executor, if available. If there is none, we do
// the sync on this thread at the end.
Expand All @@ -739,11 +716,14 @@ void SsdFile::checkpoint(bool force) {
return std::make_unique<int>(0);
});

std::ofstream state;
const auto checkpointPath = getCheckpointFilePath();
VELOX_SSD_CACHE_LOG(WARNING)
<< "Exeuctor is null: " << (executor_ == nullptr);
// fileSync->prepare();

executor_->add([source = fileSync]() { source->prepare(); });

try {
state.exceptions(std::ofstream::failbit);
state.open(checkpointPath, std::ios_base::out | std::ios_base::trunc);
checkpointWriteFile_->truncate(0);
// The checkpoint state file contains:
// int32_t The 4 bytes of checkpoint version,
// int32_t maxRegions,
Expand All @@ -753,71 +733,64 @@ void SsdFile::checkpoint(bool force) {
// kMapMarker,
// {fileId, offset, SSdRun} triples,
// kEndMarker.
state.write(checkpointVersion().data(), sizeof(int32_t));
state.write(asChar(&maxRegions_), sizeof(maxRegions_));
state.write(asChar(&numRegions_), sizeof(numRegions_));
const auto version = checkpointVersion();
auto buffer = folly::IOBuf::copyBuffer(version.data(), version.size());
buffer->appendToChain(
folly::IOBuf::copyBuffer(&maxRegions_, sizeof(maxRegions_)));
buffer->appendToChain(
folly::IOBuf::copyBuffer(&numRegions_, sizeof(numRegions_)));

// Copy the region scores before writing out for tsan.
const auto scoresCopy = tracker_.copyScores();
state.write(asChar(scoresCopy.data()), maxRegions_ * sizeof(uint64_t));
buffer->appendToChain(folly::IOBuf::copyBuffer(
scoresCopy.data(), maxRegions_ * sizeof(double)));
std::unordered_set<uint64_t> fileNums;
for (const auto& entry : entries_) {
const auto fileNum = entry.first.fileNum.id();
if (fileNums.insert(fileNum).second) {
state.write(asChar(&fileNum), sizeof(fileNum));
buffer->appendToChain(
folly::IOBuf::copyBuffer(&fileNum, sizeof(fileNum)));
const auto name = fileIds().string(fileNum);
const int32_t length = name.size();
state.write(asChar(&length), sizeof(length));
state.write(name.data(), length);
buffer->appendToChain(
folly::IOBuf::copyBuffer(&length, sizeof(length)));
buffer->appendToChain(
folly::IOBuf::copyBuffer(name.data(), name.size()));
}
}

const auto mapMarker = kCheckpointMapMarker;
state.write(asChar(&mapMarker), sizeof(mapMarker));
buffer->appendToChain(folly::IOBuf::copyBuffer(
&kCheckpointMapMarker, sizeof(kCheckpointMapMarker)));
for (auto& pair : entries_) {
const auto id = pair.first.fileNum.id();
state.write(asChar(&id), sizeof(id));
state.write(asChar(&pair.first.offset), sizeof(pair.first.offset));
buffer->appendToChain(folly::IOBuf::copyBuffer(&id, sizeof(id)));
buffer->appendToChain(folly::IOBuf::copyBuffer(
&pair.first.offset, sizeof(pair.first.offset)));
const auto offsetAndSize = pair.second.fileBits();
state.write(asChar(&offsetAndSize), sizeof(offsetAndSize));
buffer->appendToChain(
folly::IOBuf::copyBuffer(&offsetAndSize, sizeof(offsetAndSize)));
if (checksumEnabled_) {
const auto checksum = pair.second.checksum();
state.write(asChar(&checksum), sizeof(checksum));
buffer->appendToChain(
folly::IOBuf::copyBuffer(&checksum, sizeof(checksum)));
}
}
} catch (const std::exception& e) {
fileSync->close();
std::rethrow_exception(std::current_exception());
}

// NOTE: we need to ensure cache file data sync update completes before
// updating checkpoint file.
fileSync->move();

const auto endMarker = kCheckpointEndMarker;
state.write(asChar(&endMarker), sizeof(endMarker));
// NOTE: we need to ensure cache file data sync update completes before
// updating checkpoint file.
fileSync->move();

if (state.bad()) {
++stats_.writeCheckpointErrors;
checkRc(-1, "Write of checkpoint file");
} else {
buffer->appendToChain(folly::IOBuf::copyBuffer(
&kCheckpointEndMarker, sizeof(kCheckpointEndMarker)));
checkpointWriteFile_->append(std::move(buffer));
checkpointWriteFile_->flush();
++stats_.checkpointsWritten;
} catch (const std::exception& e) {
++stats_.writeCheckpointErrors;
VELOX_SSD_CACHE_LOG(ERROR) << "Error in writing cehckpoint: " << e.what();
fileSync->close();
std::rethrow_exception(std::current_exception());
}
state.close();

// Sync checkpoint data file. ofstream does not have a sync method, so open
// as fd and sync that.
const auto checkpointFd = checkRc(
::open(checkpointPath.c_str(), O_WRONLY),
"Open of checkpoint file for sync");
// TODO: add this as file open option after we migrate to use velox
// filesystem for ssd file access.
if (disableFileCow_) {
disableCow(checkpointFd);
}
VELOX_CHECK_GE(checkpointFd, 0);
checkRc(::fsync(checkpointFd), "Sync of checkpoint file");
::close(checkpointFd);

// NOTE: we shall truncate eviction log after checkpoint file sync
// completes so that we never recover from an old checkpoint file without
Expand All @@ -843,22 +816,35 @@ void SsdFile::initializeCheckpoint() {
}

bool hasCheckpoint = true;
std::ifstream state(getCheckpointFilePath());
if (!state.is_open()) {
std::unique_ptr<common::FileInputStream> checkpointInputStream;
filesystems::FileOptions writeFileOptions;
writeFileOptions.shouldThrowOnFileAlreadyExists = false;

const auto checkpointPath = getCheckpointFilePath();
try {
checkpointWriteFile_ =
fs_->openFileForWrite(checkpointPath, writeFileOptions);

auto checkpointReadFile = fs_->openFileForRead(checkpointPath);
checkpointInputStream = std::make_unique<common::FileInputStream>(
std::move(checkpointReadFile),
1 << 20,
memory::memoryManager()->cachePool());
} catch (std::exception& e) {
hasCheckpoint = false;
++stats_.openCheckpointErrors;
VELOX_SSD_CACHE_LOG(WARNING) << fmt::format(
"Starting shard {} without checkpoint, with checksum write {}, read verification {}, checkpoint file {}",
"Error openning checkpoint file {}: Starting shard {} without checkpoint, with checksum write {}, read verification {}, checkpoint file {}",
e.what(),
shardId_,
checksumEnabled_ ? "enabled" : "disabled",
checksumReadVerificationEnabled_ ? "enabled" : "disabled",
getCheckpointFilePath());
checkpointPath);
}

const auto logPath = getEvictLogFilePath();
filesystems::FileOptions evictLogFileOptions;
evictLogFileOptions.shouldThrowOnFileAlreadyExists = false;
try {
evictLogWriteFile_ = fs_->openFileForWrite(logPath, evictLogFileOptions);
evictLogWriteFile_ = fs_->openFileForWrite(logPath, writeFileOptions);
} catch (std::exception& e) {
++stats_.openLogErrors;
// Failure to open the log at startup is a process terminating error.
Expand All @@ -867,8 +853,7 @@ void SsdFile::initializeCheckpoint() {

try {
if (hasCheckpoint) {
state.exceptions(std::ifstream::failbit);
readCheckpoint(state);
readCheckpoint(std::move(checkpointInputStream));
}
} catch (const std::exception& e) {
++stats_.readCheckpointErrors;
Expand Down Expand Up @@ -936,23 +921,39 @@ void SsdFile::disableFileCow() {
if (evictLogWriteFile_ != nullptr) {
evictLogWriteFile_->setAttributes(attributes);
}
if (checkpointWriteFile_ != nullptr) {
checkpointWriteFile_->setAttributes(attributes);
}
#endif // linux
}

namespace {
template <typename T>
T readNumber(std::ifstream& stream) {
T readNumber(common::FileInputStream* stream) {
T data;
stream.read(asChar(&data), sizeof(T));
stream->readBytes(reinterpret_cast<uint8_t*>(&data), sizeof(T));
return data;
}

std::string readString(common::FileInputStream* stream, int32_t length) {
char data[length];
stream->readBytes(reinterpret_cast<uint8_t*>(&data), length);
return std::string(data, length);
}

template <typename T>
std::vector<T> readVector(common::FileInputStream* stream, int32_t size) {
std::vector<T> dataVector(size);
stream->readBytes(
reinterpret_cast<uint8_t*>(dataVector.data()), size * sizeof(T));
return dataVector;
}
} // namespace

void SsdFile::readCheckpoint(std::ifstream& state) {
char versionMagic[4];
state.read(versionMagic, sizeof(versionMagic));
void SsdFile::readCheckpoint(std::unique_ptr<common::FileInputStream> stream) {
const auto versionMagic = readString(stream.get(), 4);
const auto checkpoinHasChecksum =
isChecksumEnabledOnCheckpointVersion(std::string(versionMagic, 4));
isChecksumEnabledOnCheckpointVersion(versionMagic);
if (checksumEnabled_ && !checkpoinHasChecksum) {
VELOX_SSD_CACHE_LOG(WARNING) << fmt::format(
"Starting shard {} without checkpoint: checksum is enabled but the checkpoint was made without checksum, so skip the checkpoint recovery, checkpoint file {}",
Expand All @@ -961,23 +962,22 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
return;
}

const auto maxRegions = readNumber<int32_t>(state);
const auto maxRegions = readNumber<int32_t>(stream.get());
VELOX_CHECK_EQ(
maxRegions,
maxRegions_,
"Trying to start from checkpoint with a different capacity");
numRegions_ = readNumber<int32_t>(state);
std::vector<double> scores(maxRegions);
state.read(asChar(scores.data()), maxRegions_ * sizeof(double));
numRegions_ = readNumber<int32_t>(stream.get());

const auto scores = readVector<double>(stream.get(), maxRegions_);
std::unordered_map<uint64_t, StringIdLease> idMap;
for (;;) {
const auto id = readNumber<uint64_t>(state);
const auto id = readNumber<uint64_t>(stream.get());
if (id == kCheckpointMapMarker) {
break;
}
std::string name;
name.resize(readNumber<int32_t>(state));
state.read(name.data(), name.size());
const auto length = readNumber<int32_t>(stream.get());
const auto name = readString(stream.get(), length);
idMap[id] = StringIdLease(fileIds(), id, name);
}

Expand All @@ -998,15 +998,15 @@ void SsdFile::readCheckpoint(std::ifstream& state) {

std::vector<uint32_t> regionCacheSizes(numRegions_, 0);
for (;;) {
const auto fileNum = readNumber<uint64_t>(state);
const auto fileNum = readNumber<uint64_t>(stream.get());
if (fileNum == kCheckpointEndMarker) {
break;
}
const auto offset = readNumber<uint64_t>(state);
const auto fileBits = readNumber<uint64_t>(state);
const auto offset = readNumber<uint64_t>(stream.get());
const auto fileBits = readNumber<uint64_t>(stream.get());
uint32_t checksum = 0;
if (checkpoinHasChecksum) {
checksum = readNumber<uint32_t>(state);
checksum = readNumber<uint32_t>(stream.get());
}
const auto run = SsdRun(fileBits, checksum);
const auto region = regionIndex(run.offset());
Expand Down
Loading

0 comments on commit 01a2da0

Please sign in to comment.