Skip to content

Commit

Permalink
feat:Support read local file async
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Jan 17, 2025
1 parent c2a6586 commit 166e854
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 16 deletions.
20 changes: 18 additions & 2 deletions velox/common/file/File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ uint64_t InMemoryWriteFile::size() const {
return file_->size();
}

LocalReadFile::LocalReadFile(std::string_view path) : path_(path) {
LocalReadFile::LocalReadFile(std::string_view path, folly::Executor* executor)
: executor_(executor), path_(path) {
fd_ = open(path_.c_str(), O_RDONLY);
if (fd_ < 0) {
if (errno == ENOENT) {
Expand All @@ -153,7 +154,8 @@ LocalReadFile::LocalReadFile(std::string_view path) : path_(path) {
size_ = ret;
}

LocalReadFile::LocalReadFile(int32_t fd) : fd_(fd) {}
LocalReadFile::LocalReadFile(int32_t fd, folly::Executor* executor)
: executor_(executor), fd_(fd) {}

LocalReadFile::~LocalReadFile() {
const int ret = close(fd_);
Expand Down Expand Up @@ -238,6 +240,20 @@ uint64_t LocalReadFile::preadv(
return totalBytesRead;
}

folly::SemiFuture<uint64_t> LocalReadFile::preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
auto [promise, future] = folly::makePromiseContract<uint64_t>();
executor_->add([this,
_promise = std::move(promise),
_offset = offset,
_buffers = buffers]() mutable {
auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers);
_promise.setTry(std::move(delegateFuture).getTry());
});
return std::move(future);
}

uint64_t LocalReadFile::size() const {
return size_;
}
Expand Down
14 changes: 12 additions & 2 deletions velox/common/file/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <string>
#include <string_view>

#include <folly/Executor.h>
#include <folly/Range.h>
#include <folly/futures/Future.h>

Expand Down Expand Up @@ -266,11 +267,11 @@ class InMemoryWriteFile final : public WriteFile {
/// files match against any filepath starting with '/'.
class LocalReadFile final : public ReadFile {
public:
explicit LocalReadFile(std::string_view path);
LocalReadFile(std::string_view path, folly::Executor* executor = nullptr);

/// TODO: deprecate this after creating local file all through velox fs
/// interface.
explicit LocalReadFile(int32_t fd);
LocalReadFile(int32_t fd, folly::Executor* executor = nullptr);

~LocalReadFile();

Expand All @@ -283,6 +284,14 @@ class LocalReadFile final : public ReadFile {
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const final;

folly::SemiFuture<uint64_t> preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const override;

bool hasPreadvAsync() const override {
return executor_ != nullptr;
}

uint64_t memoryUsage() const final;

bool shouldCoalesce() const final {
Expand All @@ -303,6 +312,7 @@ class LocalReadFile final : public ReadFile {
private:
void preadInternal(uint64_t offset, uint64_t length, char* pos) const;

folly::Executor* const executor_;
std::string path_;
int32_t fd_;
long size_;
Expand Down
37 changes: 27 additions & 10 deletions velox/common/file/FileSystems.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/common/file/FileSystems.h"
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <folly/synchronization/CallOnce.h>
#include "velox/common/base/Exceptions.h"
#include "velox/common/file/File.h"
Expand Down Expand Up @@ -79,8 +80,19 @@ folly::once_flag localFSInstantiationFlag;
// Implement Local FileSystem.
class LocalFileSystem : public FileSystem {
public:
explicit LocalFileSystem(std::shared_ptr<const config::ConfigBase> config)
: FileSystem(config) {}
LocalFileSystem(
std::shared_ptr<const config::ConfigBase> config,
const FileSystemOptions& options)
: FileSystem(config),
executor_(
options.readAheadEnabled
? std::make_unique<folly::CPUThreadPoolExecutor>(
std::max(
1,
static_cast<int32_t>(
std::thread::hardware_concurrency() / 2)),
std::make_shared<folly::NamedThreadFactory>("ReadAhead"))
: nullptr) {}

~LocalFileSystem() override {}

Expand All @@ -98,7 +110,7 @@ class LocalFileSystem : public FileSystem {
std::unique_ptr<ReadFile> openFileForRead(
std::string_view path,
const FileOptions& /*unused*/) override {
return std::make_unique<LocalReadFile>(extractPath(path));
return std::make_unique<LocalReadFile>(extractPath(path), executor_.get());
}

std::unique_ptr<WriteFile> openFileForWrite(
Expand Down Expand Up @@ -216,23 +228,28 @@ class LocalFileSystem : public FileSystem {

static std::function<std::shared_ptr<
FileSystem>(std::shared_ptr<const config::ConfigBase>, std::string_view)>
fileSystemGenerator() {
return [](std::shared_ptr<const config::ConfigBase> properties,
std::string_view filePath) {
fileSystemGenerator(const FileSystemOptions& options) {
return [&options](
std::shared_ptr<const config::ConfigBase> properties,
std::string_view filePath) {
// One instance of Local FileSystem is sufficient.
// Initialize on first access and reuse after that.
static std::shared_ptr<FileSystem> lfs;
folly::call_once(localFSInstantiationFlag, [&properties]() {
lfs = std::make_shared<LocalFileSystem>(properties);
folly::call_once(localFSInstantiationFlag, [&properties, &options]() {
lfs = std::make_shared<LocalFileSystem>(properties, options);
});
return lfs;
};
}

private:
const std::unique_ptr<folly::CPUThreadPoolExecutor> executor_;
};
} // namespace

void registerLocalFileSystem() {
void registerLocalFileSystem(const FileSystemOptions& options) {
registerFileSystem(
LocalFileSystem::schemeMatcher(), LocalFileSystem::fileSystemGenerator());
LocalFileSystem::schemeMatcher(),
LocalFileSystem::fileSystemGenerator(options));
}
} // namespace facebook::velox::filesystems
11 changes: 10 additions & 1 deletion velox/common/file/FileSystems.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ struct DirectoryOptions : FileOptions {
"make-directory-config"};
};

struct FileSystemOptions {
/// Now only local file system respects this option, Spark spills to local
/// file while native Presto spills to remote storage which supports read
/// async. Use an executor to submit the read async task. We can extend to
/// other file systems in need.
bool readAheadEnabled{false};
};

/// An abstract FileSystem
class FileSystem {
public:
Expand Down Expand Up @@ -163,6 +171,7 @@ void registerFileSystem(
std::string_view)> fileSystemGenerator);

/// Register the local filesystem.
void registerLocalFileSystem();
void registerLocalFileSystem(
const FileSystemOptions& options = FileSystemOptions());

} // namespace facebook::velox::filesystems
42 changes: 41 additions & 1 deletion velox/common/file/tests/FileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include <fcntl.h>
#include <folly/executors/CPUThreadPoolExecutor.h>

#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/File.h"
Expand Down Expand Up @@ -66,7 +67,10 @@ void writeDataWithOffset(WriteFile* writeFile) {
ASSERT_EQ(writeFile->size(), 15 + kOneMB);
}

void readData(ReadFile* readFile, bool checkFileSize = true) {
void readData(
ReadFile* readFile,
bool checkFileSize = true,
bool testReadAsync = false) {
if (checkFileSize) {
ASSERT_EQ(readFile->size(), 15 + kOneMB);
}
Expand Down Expand Up @@ -105,6 +109,30 @@ void readData(ReadFile* readFile, bool checkFileSize = true) {
ASSERT_EQ(std::string_view(head, sizeof(head)), "aaaaabbbbbcc");
ASSERT_EQ(std::string_view(middle, sizeof(middle)), "cccc");
ASSERT_EQ(std::string_view(tail, sizeof(tail)), "ccddddd");
if (testReadAsync) {
std::vector<folly::Range<char*>> buffers1 = {
folly::Range<char*>(head, sizeof(head)),
folly::Range<char*>(nullptr, (char*)(uint64_t)500000)};
auto future1 = readFile->preadvAsync(0, buffers1);
const auto offset1 = sizeof(head) + 500000;
std::vector<folly::Range<char*>> buffers2 = {
folly::Range<char*>(middle, sizeof(middle)),
folly::Range<char*>(
nullptr,
(char*)(uint64_t)(15 + kOneMB - offset1 - sizeof(middle) -
sizeof(tail)))};
auto future2 = readFile->preadvAsync(offset1, buffers2);
std::vector<folly::Range<char*>> buffers3 = {
folly::Range<char*>(tail, sizeof(tail))};
const auto offset2 = 15 + kOneMB - sizeof(tail);
auto future3 = readFile->preadvAsync(offset2, buffers3);
ASSERT_EQ(offset1, future1.wait().value());
ASSERT_EQ(offset2 - offset1, future2.wait().value());
ASSERT_EQ(sizeof(tail), future3.wait().value());
ASSERT_EQ(std::string_view(head, sizeof(head)), "aaaaabbbbbcc");
ASSERT_EQ(std::string_view(middle, sizeof(middle)), "cccc");
ASSERT_EQ(std::string_view(tail, sizeof(tail)), "ccddddd");
}
}

// We could templated this test, but that's kinda overkill for how simple it is.
Expand Down Expand Up @@ -157,6 +185,12 @@ class LocalFileTest : public ::testing::TestWithParam<bool> {
}

const bool useFaultyFs_;
const std::unique_ptr<folly::CPUThreadPoolExecutor> executor_ =
std::make_unique<folly::CPUThreadPoolExecutor>(
std::max(
1,
static_cast<int32_t>(std::thread::hardware_concurrency() / 2)),
std::make_shared<folly::NamedThreadFactory>("FileReadAheadTest"));
};

TEST_P(LocalFileTest, writeAndRead) {
Expand Down Expand Up @@ -185,6 +219,12 @@ TEST_P(LocalFileTest, writeAndRead) {
writeFile->close();
ASSERT_EQ(writeFile->size(), 15 + kOneMB);
}
// Test read async.
if (!useFaultyFs_) {
auto readFile =
std::make_shared<LocalReadFile>(filename, executor_.get());
readData(readFile.get(), true, true);
}
auto readFile = fs->openFileForRead(filename);
readData(readFile.get());
}
Expand Down

0 comments on commit 166e854

Please sign in to comment.