Skip to content

Commit

Permalink
Use KvikIO for host_read/write
Browse files Browse the repository at this point in the history
  • Loading branch information
kingcrimsontianyu committed Jan 18, 2025
1 parent 0bf6347 commit 1d2ed1e
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 100 deletions.
14 changes: 5 additions & 9 deletions cpp/src/io/utilities/data_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@

#include <rmm/cuda_stream_view.hpp>

#include <fstream>

namespace cudf {
namespace io {

Expand All @@ -38,9 +36,6 @@ class file_sink : public data_sink {
explicit file_sink(std::string const& filepath)
{
detail::force_init_cuda_context();
_output_stream.open(filepath, std::ios::out | std::ios::binary | std::ios::trunc);
if (!_output_stream.is_open()) { detail::throw_on_file_open_failure(filepath, true); }

cufile_integration::set_up_kvikio();
_kvikio_file = kvikio::FileHandle(filepath, "w");
CUDF_LOG_INFO("Writing a file using kvikIO, with compatibility mode %s.",
Expand All @@ -52,12 +47,14 @@ class file_sink : public data_sink {

void host_write(void const* data, size_t size) override
{
_output_stream.seekp(_bytes_written);
_output_stream.write(static_cast<char const*>(data), size);
_kvikio_file.pwrite(data, size, _bytes_written).get();
_bytes_written += size;
}

void flush() override { _output_stream.flush(); }
void flush() override
{
// NOOP. _kvikio_file write is unbuffered and does not need flush.
}

size_t bytes_written() override { return _bytes_written; }

Expand Down Expand Up @@ -90,7 +87,6 @@ class file_sink : public data_sink {
}

private:
std::ofstream _output_stream;
size_t _bytes_written = 0;
kvikio::FileHandle _kvikio_file;
};
Expand Down
40 changes: 16 additions & 24 deletions cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,36 +49,29 @@ namespace {
*/
class file_source : public datasource {
public:
explicit file_source(char const* filepath) : _file(filepath, O_RDONLY)
explicit file_source(char const* filepath)
{
detail::force_init_cuda_context();
cufile_integration::set_up_kvikio();
_kvikio_file = kvikio::FileHandle(filepath);
_kvikio_file = kvikio::FileHandle(filepath, "r");
CUDF_LOG_INFO("Reading a file using kvikIO, with compatibility mode %s.",
_kvikio_file.is_compat_mode_preferred() ? "on" : "off");
}

std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
ssize_t const read_size = std::min(size, _file.size() - offset);

auto const read_size = std::min(size, _kvikio_file.nbytes() - offset);
std::vector<uint8_t> v(read_size);
CUDF_EXPECTS(read(_file.desc(), v.data(), read_size) == read_size, "read failed");
CUDF_EXPECTS(_kvikio_file.pread(v.data(), read_size, offset).get() == read_size, "read failed");
return buffer::create(std::move(v));
}

size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
lseek(_file.desc(), offset, SEEK_SET);

// Clamp length to available data
auto const read_size = std::min(size, _file.size() - offset);

CUDF_EXPECTS(read(_file.desc(), dst, read_size) == static_cast<ssize_t>(read_size),
"read failed");
auto const read_size = std::min(size, _kvikio_file.nbytes() - offset);
CUDF_EXPECTS(_kvikio_file.pread(dst, read_size, offset).get() == read_size, "read failed");
return read_size;
}

Expand All @@ -98,7 +91,7 @@ class file_source : public datasource {
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
auto const read_size = std::min(size, _kvikio_file.nbytes() - offset);
return _kvikio_file.pread(dst, read_size, offset);
}

Expand All @@ -121,12 +114,9 @@ class file_source : public datasource {
return datasource::buffer::create(std::move(out_data));
}

[[nodiscard]] size_t size() const override { return _file.size(); }
[[nodiscard]] size_t size() const override { return _kvikio_file.nbytes(); }

protected:
detail::file_wrapper _file;

private:
kvikio::FileHandle _kvikio_file;
};

Expand All @@ -141,9 +131,9 @@ class memory_mapped_source : public file_source {
explicit memory_mapped_source(char const* filepath, size_t offset, size_t max_size_estimate)
: file_source(filepath)
{
if (_file.size() != 0) {
if (_kvikio_file.nbytes() != 0) {
// Memory mapping is not exclusive, so we can include the whole region we expect to read
map(_file.desc(), offset, max_size_estimate);
map(_kvikio_file.fd(), offset, max_size_estimate);
}
}

Expand All @@ -155,7 +145,7 @@ class memory_mapped_source : public file_source {
std::unique_ptr<buffer> host_read(size_t offset, size_t size) override
{
// Clamp length to available data
auto const read_size = std::min(size, +_file.size() - offset);
auto const read_size = std::min(size, +_kvikio_file.nbytes() - offset);

// If the requested range is outside of the mapped region, read from the file
if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) {
Expand All @@ -179,7 +169,7 @@ class memory_mapped_source : public file_source {
size_t host_read(size_t offset, size_t size, uint8_t* dst) override
{
// Clamp length to available data
auto const read_size = std::min(size, +_file.size() - offset);
auto const read_size = std::min(size, +_kvikio_file.nbytes() - offset);

// If the requested range is outside of the mapped region, read from the file
if (offset < _map_offset or offset + read_size > (_map_offset + _map_size)) {
Expand All @@ -194,12 +184,14 @@ class memory_mapped_source : public file_source {
private:
void map(int fd, size_t offset, size_t size)
{
CUDF_EXPECTS(offset < _file.size(), "Offset is past end of file", std::overflow_error);
CUDF_EXPECTS(offset < _kvikio_file.nbytes(), "Offset is past end of file", std::overflow_error);

// Offset for `mmap()` must be page aligned
_map_offset = offset & ~(sysconf(_SC_PAGESIZE) - 1);

if (size == 0 || (offset + size) > _file.size()) { size = _file.size() - offset; }
if (size == 0 || (offset + size) > _kvikio_file.nbytes()) {
size = _kvikio_file.nbytes() - offset;
}

// Size for `mmap()` needs to include the page padding
_map_size = size + (offset - _map_offset);
Expand Down
43 changes: 0 additions & 43 deletions cpp/src/io/utilities/file_io_utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,6 @@
namespace cudf {
namespace io {
namespace detail {
namespace {

[[nodiscard]] int open_file_checked(std::string const& filepath, int flags, mode_t mode)
{
auto const fd = open(filepath.c_str(), flags, mode);
if (fd == -1) { throw_on_file_open_failure(filepath, flags & O_CREAT); }

return fd;
}

[[nodiscard]] size_t get_file_size(int file_descriptor)
{
struct stat st {};
CUDF_EXPECTS(fstat(file_descriptor, &st) != -1, "Cannot query file size");
return static_cast<size_t>(st.st_size);
}

} // namespace

void force_init_cuda_context()
{
Expand All @@ -61,31 +43,6 @@ void force_init_cuda_context()
cudaFree(nullptr);
}

[[noreturn]] void throw_on_file_open_failure(std::string const& filepath, bool is_create)
{
// save errno because it may be overwritten by subsequent calls
auto const err = errno;

if (auto const path = std::filesystem::path(filepath); is_create) {
CUDF_EXPECTS(std::filesystem::exists(path.parent_path()),
"Cannot create output file; directory does not exist");

} else {
CUDF_EXPECTS(std::filesystem::exists(path), "Cannot open file; it does not exist");
}

std::array<char, 1024> error_msg_buffer{};
auto const error_msg = strerror_r(err, error_msg_buffer.data(), 1024);
CUDF_FAIL("Cannot open file; failed with errno: " + std::string{error_msg});
}

file_wrapper::file_wrapper(std::string const& filepath, int flags, mode_t mode)
: fd(open_file_checked(filepath.c_str(), flags, mode)), _size{get_file_size(fd)}
{
}

file_wrapper::~file_wrapper() { close(fd); }

} // namespace detail
} // namespace io
} // namespace cudf
24 changes: 0 additions & 24 deletions cpp/src/io/utilities/file_io_utilities.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,9 @@ namespace cudf {
namespace io {
namespace detail {

[[noreturn]] void throw_on_file_open_failure(std::string const& filepath, bool is_create);

// Call before any cuFile API calls to ensure the CUDA context is initialized.
void force_init_cuda_context();

/**
* @brief Class that provides RAII for file handling.
*/
class file_wrapper {
int fd = -1;
size_t _size = 0;

public:
explicit file_wrapper(std::string const& filepath, int flags, mode_t mode = 0);
~file_wrapper();
[[nodiscard]] auto size() const { return _size; }
[[nodiscard]] auto desc() const { return fd; }
};

/**
* @brief Byte range to be read/written in a single operation.
*/
CUDF_EXPORT struct file_io_slice {
size_t offset;
size_t size;
};

} // namespace detail
} // namespace io
} // namespace cudf

0 comments on commit 1d2ed1e

Please sign in to comment.