Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support read local file async #11869

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

jinchengchenghh
Copy link
Contributor

@jinchengchenghh jinchengchenghh commented Dec 16, 2024

Add struct FileSystemOptions to function registerLocalFileSystem, default readAhead false, Use an executor to submit the read async task, the number thread of pool is half of system concurrency.
The performance is same with before when device is SSD, can save about 50% time in HDD.

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Dec 16, 2024
Copy link

netlify bot commented Dec 16, 2024

Deploy Preview for meta-velox canceled.

Name Link
🔨 Latest commit 6d8a602
🔍 Latest deploy log https://app.netlify.com/sites/meta-velox/deploys/678c57ea44914600080b4aa2

@jinchengchenghh
Copy link
Contributor Author

Linux has aio interface, https://www.man7.org/linux/man-pages/man7/aio.7.html, but is not widely used, not sure which is better. I will also implement mmap to find which one is better. Maybe aio is better when the device is slow.

@jinchengchenghh
Copy link
Contributor Author

@Yuhta Yuhta requested a review from xiaoxmeng December 17, 2024 23:26
@Yuhta
Copy link
Contributor

Yuhta commented Dec 17, 2024

Usually cache is SSD so it's hard to tell which one is faster

@jinchengchenghh
Copy link
Contributor Author

Usually cache is SSD so it's hard to tell which one is faster

In that case, we may need mmap, we could add the option to let user decides which to use.

Some one may use HDD. I test the async read in my local SSD environment, it shows time is similar with before.

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add options in file system options to enable async read? And only local file system needs to respect that option by creating a executor to support that? Also we shall use cpu executor here instead of io executor

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh left some comments. Thanks!

LocalReadFile::LocalReadFile(std::string_view path) : path_(path) {
LocalReadFile::LocalReadFile(std::string_view path)
: executor_(std::make_unique<folly::IOThreadPoolExecutor>(
1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

half of hardware concurrency?

@@ -299,6 +308,7 @@ class LocalReadFile final : public ReadFile {
private:
void preadInternal(uint64_t offset, uint64_t length, char* pos) const;

const std::unique_ptr<folly::IOThreadPoolExecutor> executor_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local file system owns the thread pool instead of each individual open file

auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers);
_promise->setValue(delegateFuture.wait().value());
});
return future;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The local file system shutdown or dtor needs to wait for the executor to shutdown first? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ThreadPoolExecutor dtor will let join keep alive https://github.com/facebook/folly/blob/main/folly/executors/ThreadPoolExecutor.cpp#L63, so local file system should wait for the executor to shutdown first.

@jinchengchenghh
Copy link
Contributor Author

Let's add options in file system options to enable async read? And only local file system needs to respect that option by creating a executor to support that? Also we shall use cpu executor here instead of io executor

If we add the option, we will also need to set it by QueryConfig in FileInputStream. Do we add this config? All the file system should respect this config unless the config indicates to it only applies to local file system. Spark only spills to local file, does not support to spill to remote storage. But we can support other file system by creating an executor too. no use case for it now.

@@ -129,7 +129,11 @@ uint64_t InMemoryWriteFile::size() const {
return file_->size();
}

LocalReadFile::LocalReadFile(std::string_view path) : path_(path) {
LocalReadFile::LocalReadFile(std::string_view path)
: executor_(std::make_unique<folly::IOThreadPoolExecutor>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no need to run this in the context of an event base thread.

Use folly::CPUThreadPoolExecutor instead?

folly::SemiFuture<uint64_t> LocalReadFile::preadvAsync(
uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
auto promise = std::make_unique<folly::Promise<uint64_t>>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit no need to heap allocate the promise

auto [p, f] = folly::makePromiseContract<uint64_t>();

then move the promise into the lambda

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the compilation failed by setTry because p is const.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can add mutable to the lambda to the executor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your guidance, fixed as you say, now it works well.

_offset = offset,
_buffers = buffers]() {
auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers);
_promise->setValue(delegateFuture.wait().value());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit maybe promise.setTry(delegateFuture.getTry())

to avoid throwing inside the executor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compile failed by

auto promise = std::make_unique<folly::Promise<uint64_t>>();
  auto future = promise->getSemiFuture();
  executor_->add([this,
                  _promise = std::move(promise),
                  _offset = offset,
                  _buffers = buffers]() {
    auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers);
    _promise->setTry(delegateFuture.getTry());
  });
[build] /mnt/DP_disk1/code/velox/velox/common/file/File.cpp: In lambda function:
[build] /mnt/DP_disk1/code/velox/velox/common/file/File.cpp:254:43: error: passing ‘folly::SemiFuture<long unsigned int>’ as ‘this’ argument discards qualifiers [-fpermissive]
[build]   254 |     _promise->setTry(delegateFuture.getTry());
[build]       |                      ~~~~~~~~~~~~~~~~~~~~~^~
[build] In file included from /usr/local/include/folly/futures/Future.h:2648,
[build]                  from /mnt/DP_disk1/code/velox/./velox/common/file/File.h:39,
[build]                  from /mnt/DP_disk1/code/velox/velox/common/file/File.cpp:17:
[build] /usr/local/include/folly/futures/Future-inl.h:2393:8: note:   in call to ‘folly::Try<T> folly::SemiFuture<T>::getTry() && [with T = long unsigned int]’
[build]  2393 | Try<T> SemiFuture<T>::getTry() && {
[build]       |        ^~~~~~~~~~~~~
[build] ninja: build stopped: subcommand failed.

Copy link
Contributor

@yuandagits yuandagits Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious, does _promise->setTry(std::move(delegateFuture).getTry()) work?

: FileSystem(config) {}
: FileSystem(config),
executor_(
config->get<bool>(core::QueryConfig::kSpillReadAheadEnabled, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be system option instead of per query config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it only appeals to function registerLocalFileSystem, do you mean adding a struct FileSystemOption?

void registerLocalFileSystem() {
  registerFileSystem(
      LocalFileSystem::schemeMatcher(), LocalFileSystem::fileSystemGenerator());
}

@xiaoxmeng
Copy link
Contributor

Usually cache is SSD so it's hard to tell which one is faster

yeah, for ssd, no need for async reada and it is only supposed to be used on hdd.

@jinchengchenghh jinchengchenghh force-pushed the readahead branch 2 times, most recently from a3b480e to c245084 Compare December 24, 2024 09:06
@jinchengchenghh
Copy link
Contributor Author

Could you help take a look? Thanks! @majetideepak

@majetideepak
Copy link
Collaborator

@jinchengchenghh I also think this should be an option.

If we add the option, we will also need to set it by QueryConfig in FileInputStream.

Can you explain this further? I thought #11783 changed to use Velox FS. Where should the QueryConfig be set?

All the file system should respect this config unless the config indicates to it only applies to local file system.

Let's name the config to be local file-system specific. All other file-system config names are specific anyway.

@jinchengchenghh
Copy link
Contributor Author

@jinchengchenghh I also think this should be an option.

If we add the option, we will also need to set it by QueryConfig in FileInputStream.

Can you explain this further? I thought #11783 changed to use Velox FS. Where should the QueryConfig be set?

All the file system should respect this config unless the config indicates to it only applies to local file system.

Let's name the config to be local file-system specific. All other file-system config names are specific anyway.

Thanks for your reply, discussed with @xiaoxmeng offline, I have changed it to the current version. Add FileSystemOptions to function registerLocalFileSystem, how about that?

Copy link
Collaborator

@majetideepak majetideepak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add FileSystemOptions to function registerLocalFileSystem, how about that?

I feel the current API is less flexible and error-prone. If we add it to options, then the user can get both sync and async versions of LocalReadFile.
I propose the following

  1. Add localfile.enable-async-read as an option.
  2. Inside LocalReadFile::openFileForRead create the executor and LocalReadFile instance based on the option.
  3. Modify LocalReadFile::preadv to be sync or async based on the executor.

This will prevent the local file system from adding a new public API preadvAsync. This can be private.
We can create a new LocalAsyncReadFile class if needed.

uint64_t offset,
const std::vector<folly::Range<char*>>& buffers) const {
auto [promise, future] = folly::makePromiseContract<uint64_t>();
executor_->add([this,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw an error if executor_ is nullptr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In current design, developer should call hasPreadvAsync first, I return executor null status, and then call preadvAsync, it should not be null here.

Copy link
Collaborator

@majetideepak majetideepak Dec 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should still assert for not nullptr here for safety. User might not have checked hasPreadvAsync. We should not allow a nullptr dereference in any scenario.

return std::make_unique<LocalReadFile>(extractPath(path));
folly::CPUThreadPoolExecutor* executor =
executor_ == nullptr ? nullptr : executor_.get();
return std::make_unique<LocalReadFile>(extractPath(path), executor);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executor_.get() should be sufficient here.

@jinchengchenghh
Copy link
Contributor Author

Current interface ReadFile has three public APIs, preadv, preadvAsync and hasPreadvAsync
https://github.com/facebookincubator/velox/blob/main/velox/common/file/File.h#L102,
LocalReadFile just implements it. Would you prefer to move the latter two APIs to private? This may deviate with current design. CC @xiaoxmeng

@majetideepak
Copy link
Collaborator

Current interface ReadFile has three public APIs, preadv, preadvAsync and hasPreadvAsync

@jinchengchenghh thanks for the clarification. I missed this. Then why do we need an option? Can we implement preadvAsync and hasPreadvAsync as you did without any options?

@xiaoxmeng
Copy link
Contributor

Current interface ReadFile has three public APIs, preadv, preadvAsync and hasPreadvAsync

@jinchengchenghh thanks for the clarification. I missed this. Then why do we need an option? Can we implement preadvAsync and hasPreadvAsync as you did without any options?

@majetideepak The async read support on local file system relies on a cpu executor running at the background for async io (note POSIX has async read support using signal mechanism which is not super reliable or ease to use long years back. Not sure how good it is now). This requires creating a cpu executor inside the local fs instance. The local fs is only used with ssd in Meta internal use cases which don't really need async read support performance wise. But it should be quite useful when used with HDD to pipeline io with computation. So it might be better to have an option for local fs to enable it or not by creating a dedicated cpu executor for that. We could extend local fs registration API to take this option on server startup. For Prestissimo, it could be a system config. thanks!

@majetideepak
Copy link
Collaborator

So it might be better to have an option for local fs to enable it or not by creating a dedicated cpu executor for that.

@xiaoxmeng To clarify, the usage would be preadv for SSD and preadvAsync for HDD.
Is there any downside if we always create a CPU executor for a LocalReadFile instance?

@Yuhta
Copy link
Contributor

Yuhta commented Dec 31, 2024

The newest wisdom for async file IO is io_uring. If we ever want to try mmap to avoid that copy from kernel buffer we should try io_uring instead. Either way it feels overkill for AP systems though.

@kecookier
Copy link
Contributor

@ccat3z Can you post your test results with the HDD here?

@ccat3z
Copy link
Contributor

ccat3z commented Jan 16, 2025

@ccat3z Can you post your test results with the HDD here?

I tested some internal jobs on HDD. According to the read spill metrics (spillReadWallNanos and spillReadBytes), async file reading can reduce the file reading time by 52%. However, the time to read spills in these jobs is very small, there's no noticeable speedup in the total time taken.

impl operator Total spillReadWallNanos (Seconds) Total spillReadBytes (GB) Time taken pre GB
before hash agg 174965.1473 113825.6283  
after hash agg 77624.91614 69806.41933 72.34%
before hash build 52.222391 13.31936284  
after hash build 50.677976 13.31939661 97.04%
before hash probe 401.133286 1157.013263  
after hash probe 199.017824 1157.072997 49.61%
before sort 57569.94362 75449.8015  
after sort 16694.44938 75554.49983 28.96%
before total 232988.4466 190445.7624  
after total 94569.06132 146531.3116 47.25%

@jinchengchenghh
Copy link
Contributor Author

Much thanks for your work! @ccat3z This can really helps us measure the performance.

@xiaoxmeng
Copy link
Contributor

So it might be better to have an option for local fs to enable it or not by creating a dedicated cpu executor for that.

@xiaoxmeng To clarify, the usage would be preadv for SSD and preadvAsync for HDD. Is there any downside if we always create a CPU executor for a LocalReadFile instance?

@majetideepak for local ssd, we don't want to create a cpu executor and do async read. This will create a bunch of threads without io performance benefit but might cause unnecessary thread context switch overhead.

Copy link
Contributor

@xiaoxmeng xiaoxmeng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinchengchenghh LGTM. Thanks!

@@ -79,6 +79,14 @@ struct DirectoryOptions : FileOptions {
"make-directory-config"};
};

struct FileSystemOptions {
/// Now only local file system respects this option, Spark spills to local
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  /// As for now, only local file system respects this option. It implements async read by using a background cpu executor. Some filesystem might has native async read-ahead support.

1,
static_cast<int32_t>(
std::thread::hardware_concurrency() / 2)),
std::make_shared<folly::NamedThreadFactory>("ReadAhead"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ReadAhead/LocalReadahead/

@@ -303,6 +312,7 @@ class LocalReadFile final : public ReadFile {
private:
void preadInternal(uint64_t offset, uint64_t length, char* pos) const;

folly::Executor* const executor_;
std::string path_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const std::string path_;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not const, this is two constructors, one input argument is path, one argument of another constructor is fd

@facebook-github-bot
Copy link
Contributor

@xiaoxmeng has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@majetideepak majetideepak added the ready-to-merge PR that have been reviewed and are ready for merging. PRs with this tag notify the Velox Meta oncall label Jan 17, 2025
@jinchengchenghh
Copy link
Contributor Author

#12113
#12100
#12071
The red CI is caused by these 3 issues, not related to this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. ready-to-merge PR that have been reviewed and are ready for merging. PRs with this tag notify the Velox Meta oncall
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants