From 1d2ed1ef3941e1add7be4bad2173da02f71ae557 Mon Sep 17 00:00:00 2001 From: Tianyu Liu Date: Sat, 18 Jan 2025 01:40:06 -0500 Subject: [PATCH] Use KvikIO for host_read/write --- cpp/src/io/utilities/data_sink.cpp | 14 +++---- cpp/src/io/utilities/datasource.cpp | 40 ++++++++------------ cpp/src/io/utilities/file_io_utilities.cpp | 43 ---------------------- cpp/src/io/utilities/file_io_utilities.hpp | 24 ------------ 4 files changed, 21 insertions(+), 100 deletions(-) diff --git a/cpp/src/io/utilities/data_sink.cpp b/cpp/src/io/utilities/data_sink.cpp index 23e6d6e2921..abbef4ce410 100644 --- a/cpp/src/io/utilities/data_sink.cpp +++ b/cpp/src/io/utilities/data_sink.cpp @@ -25,8 +25,6 @@ #include -#include - namespace cudf { namespace io { @@ -38,9 +36,6 @@ class file_sink : public data_sink { explicit file_sink(std::string const& filepath) { detail::force_init_cuda_context(); - _output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc); - if (!_output_stream.is_open()) { detail::throw_on_file_open_failure(filepath, true); } - cufile_integration::set_up_kvikio(); _kvikio_file = kvikio::FileHandle(filepath, "w"); CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode %s.", @@ -52,12 +47,14 @@ class file_sink : public data_sink { void host_write(void const* data, size_t size) override { - _output_stream.seekp(_bytes_written); - _output_stream.write(static_cast(data), size); + _kvikio_file.pwrite(data, size, _bytes_written).get(); _bytes_written += size; } - void flush() override { _output_stream.flush(); } + void flush() override + { + // NOOP. _kvikio_file write is unbuffered and does not need flush. + } size_t bytes_written() override { return _bytes_written; } @@ -90,7 +87,6 @@ class file_sink : public data_sink { } private: - std::ofstream _output_stream; size_t _bytes_written = 0; kvikio::FileHandle _kvikio_file; }; diff --git a/cpp/src/io/utilities/datasource.cpp b/cpp/src/io/utilities/datasource.cpp index dc3d6d3a28c..ea98c7b953c 100644 --- a/cpp/src/io/utilities/datasource.cpp +++ b/cpp/src/io/utilities/datasource.cpp @@ -49,36 +49,29 @@ namespace { */ class file_source : public datasource { public: - explicit file_source(char const* filepath) : _file(filepath, O_RDONLY) + explicit file_source(char const* filepath) { detail::force_init_cuda_context(); cufile_integration::set_up_kvikio(); - _kvikio_file = kvikio::FileHandle(filepath); + _kvikio_file = kvikio::FileHandle(filepath, "r"); CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode %s.", _kvikio_file.is_compat_mode_preferred() ? "on" : "off"); } std::unique_ptr host_read(size_t offset, size_t size) override { - lseek(_file.desc(), offset, SEEK_SET); - // Clamp length to available data - ssize_t const read_size = std::min(size, _file.size() - offset); - + auto const read_size = std::min(size, _kvikio_file.nbytes() - offset); std::vector v(read_size); - CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed"); + CUDF_EXPECTS(_kvikio_file.pread(v.data(), read_size, offset).get() == read_size, "read failed"); return buffer::create(std::move(v)); } size_t host_read(size_t offset, size_t size, uint8_t* dst) override { - lseek(_file.desc(), offset, SEEK_SET); - // Clamp length to available data - auto const read_size = std::min(size, _file.size() - offset); - - CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast(read_size), - "read failed"); + auto const read_size = std::min(size, _kvikio_file.nbytes() - offset); + CUDF_EXPECTS(_kvikio_file.pread(dst, read_size, offset).get() == read_size, "read failed"); return read_size; } @@ -98,7 +91,7 @@ class file_source : public datasource { { CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file."); - auto const read_size = std::min(size, _file.size() - offset); + auto const read_size = std::min(size, _kvikio_file.nbytes() - offset); return _kvikio_file.pread(dst, read_size, offset); } @@ -121,12 +114,9 @@ class file_source : public datasource { return datasource::buffer::create(std::move(out_data)); } - [[nodiscard]] size_t size() const override { return _file.size(); } + [[nodiscard]] size_t size() const override { return _kvikio_file.nbytes(); } protected: - detail::file_wrapper _file; - - private: kvikio::FileHandle _kvikio_file; }; @@ -141,9 +131,9 @@ class memory_mapped_source : public file_source { explicit memory_mapped_source(char const* filepath, size_t offset, size_t max_size_estimate) : file_source(filepath) { - if (_file.size() != 0) { + if (_kvikio_file.nbytes() != 0) { // Memory mapping is not exclusive, so we can include the whole region we expect to read - map(_file.desc(), offset, max_size_estimate); + map(_kvikio_file.fd(), offset, max_size_estimate); } } @@ -155,7 +145,7 @@ class memory_mapped_source : public file_source { std::unique_ptr host_read(size_t offset, size_t size) override { // Clamp length to available data - auto const read_size = std::min(size, +_file.size() - offset); + auto const read_size = std::min(size, +_kvikio_file.nbytes() - offset); // If the requested range is outside of the mapped region, read from the file if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { @@ -179,7 +169,7 @@ class memory_mapped_source : public file_source { size_t host_read(size_t offset, size_t size, uint8_t* dst) override { // Clamp length to available data - auto const read_size = std::min(size, +_file.size() - offset); + auto const read_size = std::min(size, +_kvikio_file.nbytes() - offset); // If the requested range is outside of the mapped region, read from the file if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) { @@ -194,12 +184,14 @@ class memory_mapped_source : public file_source { private: void map(int fd, size_t offset, size_t size) { - CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file", std::overflow_error); + CUDF_EXPECTS(offset < _kvikio_file.nbytes(), "Offset is past end of file", std::overflow_error); // Offset for `mmap()` must be page aligned _map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1); - if (size == 0 || (offset + size) > _file.size()) { size = _file.size() - offset; } + if (size == 0 || (offset + size) > _kvikio_file.nbytes()) { + size = _kvikio_file.nbytes() - offset; + } // Size for `mmap()` needs to include the page padding _map_size = size + (offset - _map_offset); diff --git a/cpp/src/io/utilities/file_io_utilities.cpp b/cpp/src/io/utilities/file_io_utilities.cpp index d03ead3e5d6..6ef3d643d9b 100644 --- a/cpp/src/io/utilities/file_io_utilities.cpp +++ b/cpp/src/io/utilities/file_io_utilities.cpp @@ -34,24 +34,6 @@ namespace cudf { namespace io { namespace detail { -namespace { - -[[nodiscard]] int open_file_checked(std::string const& filepath, int flags, mode_t mode) -{ - auto const fd = open(filepath.c_str(), flags, mode); - if (fd == -1) { throw_on_file_open_failure(filepath, flags & O_CREAT); } - - return fd; -} - -[[nodiscard]] size_t get_file_size(int file_descriptor) -{ - struct stat st {}; - CUDF_EXPECTS(fstat(file_descriptor, &st) != -1, "Cannot query file size"); - return static_cast(st.st_size); -} - -} // namespace void force_init_cuda_context() { @@ -61,31 +43,6 @@ void force_init_cuda_context() cudaFree(nullptr); } -[[noreturn]] void throw_on_file_open_failure(std::string const& filepath, bool is_create) -{ - // save errno because it may be overwritten by subsequent calls - auto const err = errno; - - if (auto const path = std::filesystem::path(filepath); is_create) { - CUDF_EXPECTS(std::filesystem::exists(path.parent_path()), - "Cannot create output file; directory does not exist"); - - } else { - CUDF_EXPECTS(std::filesystem::exists(path), "Cannot open file; it does not exist"); - } - - std::array error_msg_buffer{}; - auto const error_msg = strerror_r(err, error_msg_buffer.data(), 1024); - CUDF_FAIL("Cannot open file; failed with errno: " + std::string{error_msg}); -} - -file_wrapper::file_wrapper(std::string const& filepath, int flags, mode_t mode) - : fd(open_file_checked(filepath.c_str(), flags, mode)), _size{get_file_size(fd)} -{ -} - -file_wrapper::~file_wrapper() { close(fd); } - } // namespace detail } // namespace io } // namespace cudf diff --git a/cpp/src/io/utilities/file_io_utilities.hpp b/cpp/src/io/utilities/file_io_utilities.hpp index 5f5b02dca4d..9be05997d3a 100644 --- a/cpp/src/io/utilities/file_io_utilities.hpp +++ b/cpp/src/io/utilities/file_io_utilities.hpp @@ -28,33 +28,9 @@ namespace cudf { namespace io { namespace detail { -[[noreturn]] void throw_on_file_open_failure(std::string const& filepath, bool is_create); - // Call before any cuFile API calls to ensure the CUDA context is initialized. void force_init_cuda_context(); -/** - * @brief Class that provides RAII for file handling. - */ -class file_wrapper { - int fd = -1; - size_t _size = 0; - - public: - explicit file_wrapper(std::string const& filepath, int flags, mode_t mode = 0); - ~file_wrapper(); - [[nodiscard]] auto size() const { return _size; } - [[nodiscard]] auto desc() const { return fd; } -}; - -/** - * @brief Byte range to be read/written in a single operation. - */ -CUDF_EXPORT struct file_io_slice { - size_t offset; - size_t size; -}; - } // namespace detail } // namespace io } // namespace cudf