diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index f3f4a7dac01..d65c7153196 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -570,7 +570,7 @@ if (UNIX) add_custom_target(lint ${BUILD_SUPPORT_DIR}/cpplint.py --verbose=2 --linelength=90 - --filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/c++11,-runtime/references + --filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/c++11,-runtime/references,-build/include_order ${FILTERED_LINT_FILES}) endif (UNIX) diff --git a/cpp/src/arrow/io/CMakeLists.txt b/cpp/src/arrow/io/CMakeLists.txt index 87e227ef80d..d2e3491b75f 100644 --- a/cpp/src/arrow/io/CMakeLists.txt +++ b/cpp/src/arrow/io/CMakeLists.txt @@ -38,6 +38,7 @@ set(ARROW_IO_TEST_LINK_LIBS ${ARROW_IO_PRIVATE_LINK_LIBS}) set(ARROW_IO_SRCS + file.cc memory.cc ) @@ -103,12 +104,17 @@ if (APPLE) INSTALL_NAME_DIR "@rpath") endif() +ADD_ARROW_TEST(io-file-test) +ARROW_TEST_LINK_LIBRARIES(io-file-test + ${ARROW_IO_TEST_LINK_LIBS}) + ADD_ARROW_TEST(io-memory-test) ARROW_TEST_LINK_LIBRARIES(io-memory-test ${ARROW_IO_TEST_LINK_LIBS}) # Headers: top level install(FILES + file.h hdfs.h interfaces.h memory.h diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc new file mode 100644 index 00000000000..87bae7f3928 --- /dev/null +++ b/cpp/src/arrow/io/file.cc @@ -0,0 +1,485 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Ensure 64-bit off_t for platforms where it matters +#ifdef _FILE_OFFSET_BITS +#undef _FILE_OFFSET_BITS +#endif + +#define _FILE_OFFSET_BITS 64 + +#include "arrow/io/file.h" + +#if _WIN32 || _WIN64 +#if _WIN64 +#define ENVIRONMENT64 +#else +#define ENVIRONMENT32 +#endif +#endif + +// sys/mman.h not present in Visual Studio or Cygwin +#ifdef _WIN32 +#ifndef NOMINMAX +#define NOMINMAX +#endif +#include "arrow/io/mman.h" +#undef Realloc +#undef Free +#include +#else +#include +#endif + +#include +#include +#include + +#ifndef _MSC_VER // POSIX-like platforms + +#include + +// Not available on some platforms +#ifndef errno_t +#define errno_t int +#endif + +#endif // _MSC_VER + +// defines that +#if defined(__MINGW32__) +#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR +#elif defined(_MSC_VER) // Visual Studio + +#else // gcc / clang on POSIX platforms +#define ARROW_WRITE_SHMODE S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH +#endif + +// ---------------------------------------------------------------------- +// C++ standard library + +#include +#include +#include +#include +#include +#include + +#if defined(_MSC_VER) +#include +#include +#endif + +// ---------------------------------------------------------------------- +// file compatibility stuff + +#if defined(__MINGW32__) // MinGW +// nothing +#elif defined(_MSC_VER) // Visual Studio +#include +#else // POSIX / Linux +// nothing +#endif + +#include + +// POSIX systems do not have this +#ifndef O_BINARY +#define O_BINARY 0 +#endif + +// ---------------------------------------------------------------------- +// Other Arrow includes + +#include "arrow/io/interfaces.h" + +#include "arrow/util/buffer.h" +#include "arrow/util/memory-pool.h" +#include "arrow/util/status.h" + +namespace arrow { +namespace io { + +// ---------------------------------------------------------------------- +// Cross-platform file compatability layer + +static inline Status CheckOpenResult( + int ret, int errno_actual, const char* filename, size_t filename_length) { + if (ret == -1) { + // TODO: errno codes to strings + std::stringstream ss; + ss << "Failed to open file: "; +#if defined(_MSC_VER) + // using wchar_t + + // this requires c++11 + std::wstring_convert, wchar_t> converter; + std::wstring wide_string( + reinterpret_cast(filename), filename_length / sizeof(wchar_t)); + std::string byte_string = converter.to_bytes(wide_string); + ss << byte_string; +#else + ss << filename; +#endif + return Status::IOError(ss.str()); + } + return Status::OK(); +} + +#define CHECK_LSEEK(retval) \ + if ((retval) == -1) return Status::IOError("lseek failed"); + +static inline int64_t lseek64_compat(int fd, int64_t pos, int whence) { +#if defined(_MSC_VER) + return _lseeki64(fd, pos, whence); +#else + return lseek(fd, pos, whence); +#endif +} + +static inline Status FileOpenReadable(const std::string& filename, int* fd) { + int ret; + errno_t errno_actual = 0; +#if defined(_MSC_VER) + // https://msdn.microsoft.com/en-us/library/w64k0ytk.aspx + + // See GH #209. Here we are assuming that the filename has been encoded in + // utf-16le so that unicode filenames can be supported + const int nwchars = static_cast(filename.size()) / sizeof(wchar_t); + std::vector wpath(nwchars + 1); + memcpy(wpath.data(), filename.data(), filename.size()); + memcpy(wpath.data() + nwchars, L"\0", sizeof(wchar_t)); + + errno_actual = _wsopen_s(fd, wpath.data(), _O_RDONLY | _O_BINARY, _SH_DENYNO, _S_IREAD); + ret = *fd; +#else + ret = *fd = open(filename.c_str(), O_RDONLY | O_BINARY); + errno_actual = errno; +#endif + + return CheckOpenResult(ret, errno_actual, filename.c_str(), filename.size()); +} + +static inline Status FileOpenWriteable(const std::string& filename, int* fd) { + int ret; + errno_t errno_actual = 0; + +#if defined(_MSC_VER) + // https://msdn.microsoft.com/en-us/library/w64k0ytk.aspx + // Same story with wchar_t as above + const int nwchars = static_cast(filename.size()) / sizeof(wchar_t); + std::vector wpath(nwchars + 1); + memcpy(wpath.data(), filename.data(), filename.size()); + memcpy(wpath.data() + nwchars, L"\0", sizeof(wchar_t)); + + errno_actual = _wsopen_s( + fd, wpath.data(), _O_WRONLY | _O_CREAT | _O_BINARY, _SH_DENYNO, _S_IWRITE); + ret = *fd; + +#else + ret = *fd = open(filename.c_str(), O_WRONLY | O_CREAT | O_BINARY, ARROW_WRITE_SHMODE); +#endif + return CheckOpenResult(ret, errno_actual, filename.c_str(), filename.size()); +} + +static inline Status FileTell(int fd, int64_t* pos) { + int64_t current_pos; + +#if defined(_MSC_VER) + current_pos = _telli64(fd); + if (current_pos == -1) { return Status::IOError("_telli64 failed"); } +#else + current_pos = lseek64_compat(fd, 0, SEEK_CUR); + CHECK_LSEEK(current_pos); +#endif + + *pos = current_pos; + return Status::OK(); +} + +static inline Status FileSeek(int fd, int64_t pos) { + int64_t ret = lseek64_compat(fd, pos, SEEK_SET); + CHECK_LSEEK(ret); + return Status::OK(); +} + +static inline Status FileRead( + int fd, uint8_t* buffer, int64_t nbytes, int64_t* bytes_read) { +#if defined(_MSC_VER) + if (nbytes > INT32_MAX) { return Status::IOError("Unable to read > 2GB blocks yet"); } + *bytes_read = _read(fd, buffer, static_cast(nbytes)); +#else + *bytes_read = read(fd, buffer, nbytes); +#endif + + if (*bytes_read == -1) { + // TODO(wesm): errno to string + return Status::IOError("Error reading bytes from file"); + } + + return Status::OK(); +} + +static inline Status FileWrite(int fd, const uint8_t* buffer, int64_t nbytes) { + int ret; +#if defined(_MSC_VER) + if (nbytes > INT32_MAX) { + return Status::IOError("Unable to write > 2GB blocks to file yet"); + } + ret = _write(fd, buffer, static_cast(nbytes)); +#else + ret = write(fd, buffer, nbytes); +#endif + + if (ret == -1) { + // TODO(wesm): errno to string + return Status::IOError("Error writing bytes to file"); + } + return Status::OK(); +} + +static inline Status FileGetSize(int fd, int64_t* size) { + int64_t ret; + + // Save current position + int64_t current_position = lseek64_compat(fd, 0, SEEK_CUR); + CHECK_LSEEK(current_position); + + // move to end of the file + ret = lseek64_compat(fd, 0, SEEK_END); + CHECK_LSEEK(ret); + + // Get file length + ret = lseek64_compat(fd, 0, SEEK_CUR); + CHECK_LSEEK(ret); + + *size = ret; + + // Restore file position + ret = lseek64_compat(fd, current_position, SEEK_SET); + CHECK_LSEEK(ret); + + return Status::OK(); +} + +static inline Status FileClose(int fd) { + int ret; + +#if defined(_MSC_VER) + ret = _close(fd); +#else + ret = close(fd); +#endif + + if (ret == -1) { return Status::IOError("error closing file"); } + return Status::OK(); +} + +class OSFile { + public: + OSFile() : fd_(-1), is_open_(false), size_(-1) {} + + ~OSFile() {} + + Status OpenWritable(const std::string& path) { + RETURN_NOT_OK(FileOpenWriteable(path, &fd_)); + path_ = path; + is_open_ = true; + return Status::OK(); + } + + Status OpenReadable(const std::string& path) { + RETURN_NOT_OK(FileOpenReadable(path, &fd_)); + RETURN_NOT_OK(FileGetSize(fd_, &size_)); + + // The position should be 0 after GetSize + // RETURN_NOT_OK(Seek(0)); + + path_ = path; + is_open_ = true; + return Status::OK(); + } + + Status Close() { + if (is_open_) { + RETURN_NOT_OK(FileClose(fd_)); + is_open_ = false; + } + return Status::OK(); + } + + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + return FileRead(fd_, out, nbytes, bytes_read); + } + + Status Seek(int64_t pos) { + if (pos > size_) { pos = size_; } + return FileSeek(fd_, pos); + } + + Status Tell(int64_t* pos) const { return FileTell(fd_, pos); } + + Status Write(const uint8_t* data, int64_t length) { + if (length < 0) { return Status::IOError("Length must be non-negative"); } + return FileWrite(fd_, data, length); + } + + int fd() const { return fd_; } + + bool is_open() const { return is_open_; } + const std::string& path() const { return path_; } + + int64_t size() const { return size_; } + + private: + std::string path_; + + // File descriptor + int fd_; + + bool is_open_; + int64_t size_; +}; + +// ---------------------------------------------------------------------- +// ReadableFile implementation + +class ReadableFile::ReadableFileImpl : public OSFile { + public: + explicit ReadableFileImpl(MemoryPool* pool) : OSFile(), pool_(pool) {} + + Status Open(const std::string& path) { return OpenReadable(path); } + + Status ReadBuffer(int64_t nbytes, std::shared_ptr* out) { + auto buffer = std::make_shared(pool_); + RETURN_NOT_OK(buffer->Resize(nbytes)); + + int64_t bytes_read = 0; + RETURN_NOT_OK(Read(nbytes, &bytes_read, buffer->mutable_data())); + + // XXX: heuristic + if (bytes_read < nbytes / 2) { RETURN_NOT_OK(buffer->Resize(bytes_read)); } + + *out = buffer; + return Status::OK(); + } + + private: + MemoryPool* pool_; +}; + +ReadableFile::ReadableFile(MemoryPool* pool) { + impl_.reset(new ReadableFileImpl(pool)); +} + +ReadableFile::~ReadableFile() { + impl_->Close(); +} + +Status ReadableFile::Open(const std::string& path, std::shared_ptr* file) { + *file = std::shared_ptr(new ReadableFile(default_memory_pool())); + return (*file)->impl_->Open(path); +} + +Status ReadableFile::Open(const std::string& path, MemoryPool* memory_pool, + std::shared_ptr* file) { + *file = std::shared_ptr(new ReadableFile(memory_pool)); + return (*file)->impl_->Open(path); +} + +Status ReadableFile::Close() { + return impl_->Close(); +} + +Status ReadableFile::Tell(int64_t* pos) { + return impl_->Tell(pos); +} + +Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + return impl_->Read(nbytes, bytes_read, out); +} + +Status ReadableFile::ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + RETURN_NOT_OK(Seek(position)); + return impl_->Read(nbytes, bytes_read, out); +} + +Status ReadableFile::ReadAt( + int64_t position, int64_t nbytes, std::shared_ptr* out) { + RETURN_NOT_OK(Seek(position)); + return impl_->ReadBuffer(nbytes, out); +} + +Status ReadableFile::GetSize(int64_t* size) { + *size = impl_->size(); + return Status::OK(); +} + +Status ReadableFile::Seek(int64_t pos) { + return impl_->Seek(pos); +} + +bool ReadableFile::supports_zero_copy() const { + return false; +} + +int ReadableFile::file_descriptor() const { + return impl_->fd(); +} + +// ---------------------------------------------------------------------- +// FileOutputStream + +class FileOutputStream::FileOutputStreamImpl : public OSFile { + public: + Status Open(const std::string& path) { return OpenWritable(path); } +}; + +FileOutputStream::FileOutputStream() { + impl_.reset(new FileOutputStreamImpl()); +} + +FileOutputStream::~FileOutputStream() { + impl_->Close(); +} + +Status FileOutputStream::Open( + const std::string& path, std::shared_ptr* file) { + // private ctor + *file = std::shared_ptr(new FileOutputStream()); + return (*file)->impl_->Open(path); +} + +Status FileOutputStream::Close() { + return impl_->Close(); +} + +Status FileOutputStream::Tell(int64_t* pos) { + return impl_->Tell(pos); +} + +Status FileOutputStream::Write(const uint8_t* data, int64_t length) { + return impl_->Write(data, length); +} + +int FileOutputStream::file_descriptor() const { + return impl_->fd(); +} + +} // namespace io +} // namespace arrow diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h new file mode 100644 index 00000000000..5e714ea9667 --- /dev/null +++ b/cpp/src/arrow/io/file.h @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// IO interface implementations for OS files + +#ifndef ARROW_IO_FILE_H +#define ARROW_IO_FILE_H + +#include +#include +#include + +#include "arrow/io/interfaces.h" +#include "arrow/util/macros.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Buffer; +class MemoryPool; +class Status; + +namespace io { + +class ARROW_EXPORT FileOutputStream : public OutputStream { + public: + ~FileOutputStream(); + + static Status Open(const std::string& path, std::shared_ptr* file); + + // OutputStream interface + Status Close() override; + Status Tell(int64_t* position) override; + Status Write(const uint8_t* data, int64_t nbytes) override; + + int file_descriptor() const; + + private: + FileOutputStream(); + + class ARROW_NO_EXPORT FileOutputStreamImpl; + std::unique_ptr impl_; +}; + +// Operating system file +class ARROW_EXPORT ReadableFile : public ReadableFileInterface { + public: + ~ReadableFile(); + + // Open file, allocate memory (if needed) from default memory pool + static Status Open(const std::string& path, std::shared_ptr* file); + + // Open file with one's own memory pool for memory allocations + static Status Open(const std::string& path, MemoryPool* memory_pool, + std::shared_ptr* file); + + Status Close() override; + Status Tell(int64_t* position) override; + + Status ReadAt( + int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + + Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; + Status GetSize(int64_t* size) override; + Status Seek(int64_t position) override; + + bool supports_zero_copy() const override; + + int file_descriptor() const; + + private: + explicit ReadableFile(MemoryPool* pool); + + class ARROW_NO_EXPORT ReadableFileImpl; + std::unique_ptr impl_; +}; + +} // namespace io +} // namespace arrow + +#endif // ARROW_IO_FILE_H diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc new file mode 100644 index 00000000000..cde769ffb61 --- /dev/null +++ b/cpp/src/arrow/io/io-file-test.cc @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" + +#include "arrow/io/file.h" +#include "arrow/io/test-common.h" +#include "arrow/util/memory-pool.h" + +namespace arrow { +namespace io { + +static bool FileExists(const std::string& path) { + return std::ifstream(path.c_str()).good(); +} + +static bool FileIsClosed(int fd) { + if (-1 != fcntl(fd, F_GETFD)) { return false; } + return errno == EBADF; +} + +class FileTestFixture : public ::testing::Test { + public: + void SetUp() { + path_ = "arrow-test-io-file-output-stream.txt"; + EnsureFileDeleted(); + } + + void TearDown() { EnsureFileDeleted(); } + + void EnsureFileDeleted() { + if (FileExists(path_)) { std::remove(path_.c_str()); } + } + + protected: + std::string path_; +}; + +// ---------------------------------------------------------------------- +// File output tests + +class TestFileOutputStream : public FileTestFixture { + public: + void OpenFile() { ASSERT_OK(FileOutputStream::Open(path_, &file_)); } + + protected: + std::shared_ptr file_; +}; + +TEST_F(TestFileOutputStream, DestructorClosesFile) { + int fd; + { + std::shared_ptr file; + ASSERT_OK(FileOutputStream::Open(path_, &file)); + fd = file->file_descriptor(); + } + ASSERT_TRUE(FileIsClosed(fd)); +} + +TEST_F(TestFileOutputStream, Close) { + OpenFile(); + + const char* data = "testdata"; + ASSERT_OK(file_->Write(reinterpret_cast(data), strlen(data))); + + int fd = file_->file_descriptor(); + file_->Close(); + + ASSERT_TRUE(FileIsClosed(fd)); + + // Idempotent + file_->Close(); + + std::shared_ptr rd_file; + ASSERT_OK(ReadableFile::Open(path_, &rd_file)); + + int64_t size = 0; + ASSERT_OK(rd_file->GetSize(&size)); + ASSERT_EQ(strlen(data), size); +} + +TEST_F(TestFileOutputStream, InvalidWrites) { + OpenFile(); + + const char* data = ""; + + ASSERT_RAISES(IOError, file_->Write(reinterpret_cast(data), -1)); +} + +TEST_F(TestFileOutputStream, Tell) { + OpenFile(); + + int64_t position; + + ASSERT_OK(file_->Tell(&position)); + ASSERT_EQ(0, position); + + const char* data = "testdata"; + ASSERT_OK(file_->Write(reinterpret_cast(data), 8)); + ASSERT_OK(file_->Tell(&position)); + ASSERT_EQ(8, position); +} + +// ---------------------------------------------------------------------- +// File input tests + +class TestReadableFile : public FileTestFixture { + public: + void OpenFile() { ASSERT_OK(ReadableFile::Open(path_, &file_)); } + + void MakeTestFile() { + std::string data = "testdata"; + std::ofstream stream; + stream.open(path_.c_str()); + stream << data; + } + + protected: + std::shared_ptr file_; +}; + +TEST_F(TestReadableFile, DestructorClosesFile) { + MakeTestFile(); + + int fd; + { + std::shared_ptr file; + ASSERT_OK(ReadableFile::Open(path_, &file)); + fd = file->file_descriptor(); + } + ASSERT_TRUE(FileIsClosed(fd)); +} + +TEST_F(TestReadableFile, Close) { + MakeTestFile(); + OpenFile(); + + int fd = file_->file_descriptor(); + file_->Close(); + + ASSERT_TRUE(FileIsClosed(fd)); + + // Idempotent + file_->Close(); +} + +TEST_F(TestReadableFile, SeekTellSize) { + MakeTestFile(); + OpenFile(); + + int64_t position; + ASSERT_OK(file_->Tell(&position)); + ASSERT_EQ(0, position); + + ASSERT_OK(file_->Seek(4)); + ASSERT_OK(file_->Tell(&position)); + ASSERT_EQ(4, position); + + ASSERT_OK(file_->Seek(100)); + ASSERT_OK(file_->Tell(&position)); + + // now at EOF + ASSERT_EQ(8, position); + + int64_t size; + ASSERT_OK(file_->GetSize(&size)); + ASSERT_EQ(8, size); + + // does not support zero copy + ASSERT_FALSE(file_->supports_zero_copy()); +} + +TEST_F(TestReadableFile, Read) { + uint8_t buffer[50]; + + MakeTestFile(); + OpenFile(); + + int64_t bytes_read; + ASSERT_OK(file_->Read(4, &bytes_read, buffer)); + ASSERT_EQ(4, bytes_read); + ASSERT_EQ(0, std::memcmp(buffer, "test", 4)); + + ASSERT_OK(file_->Read(10, &bytes_read, buffer)); + ASSERT_EQ(4, bytes_read); + ASSERT_EQ(0, std::memcmp(buffer, "data", 4)); +} + +TEST_F(TestReadableFile, ReadAt) { + uint8_t buffer[50]; + const char* test_data = "testdata"; + + MakeTestFile(); + OpenFile(); + + int64_t bytes_read; + int64_t position; + + ASSERT_OK(file_->ReadAt(0, 4, &bytes_read, buffer)); + ASSERT_EQ(4, bytes_read); + ASSERT_EQ(0, std::memcmp(buffer, "test", 4)); + + // position advanced + ASSERT_OK(file_->Tell(&position)); + ASSERT_EQ(4, position); + + ASSERT_OK(file_->ReadAt(4, 10, &bytes_read, buffer)); + ASSERT_EQ(4, bytes_read); + ASSERT_EQ(0, std::memcmp(buffer, "data", 4)); + + // position advanced to EOF + ASSERT_OK(file_->Tell(&position)); + ASSERT_EQ(8, position); + + // Check buffer API + std::shared_ptr buffer2; + + ASSERT_OK(file_->ReadAt(0, 4, &buffer2)); + ASSERT_EQ(4, buffer2->size()); + + Buffer expected(reinterpret_cast(test_data), 4); + ASSERT_TRUE(buffer2->Equals(expected)); + + // position advanced + ASSERT_OK(file_->Tell(&position)); + ASSERT_EQ(4, position); +} + +TEST_F(TestReadableFile, NonExistentFile) { + ASSERT_RAISES(IOError, ReadableFile::Open("0xDEADBEEF.txt", &file_)); +} + +class MyMemoryPool : public MemoryPool { + public: + MyMemoryPool() : num_allocations_(0) {} + + Status Allocate(int64_t size, uint8_t** out) override { + *out = reinterpret_cast(std::malloc(size)); + ++num_allocations_; + return Status::OK(); + } + + void Free(uint8_t* buffer, int64_t size) override { std::free(buffer); } + + int64_t bytes_allocated() const override { return -1; } + + int64_t num_allocations() const { return num_allocations_; } + + private: + int64_t num_allocations_; +}; + +TEST_F(TestReadableFile, CustomMemoryPool) { + MakeTestFile(); + + MyMemoryPool pool; + ASSERT_OK(ReadableFile::Open(path_, &pool, &file_)); + + std::shared_ptr buffer; + ASSERT_OK(file_->ReadAt(0, 4, &buffer)); + ASSERT_OK(file_->ReadAt(4, 8, &buffer)); + + ASSERT_EQ(2, pool.num_allocations()); +} + +} // namespace io +} // namespace arrow diff --git a/cpp/src/arrow/io/libhdfs_shim.cc b/cpp/src/arrow/io/libhdfs_shim.cc index 0b805abf94c..f256c31b4f4 100644 --- a/cpp/src/arrow/io/libhdfs_shim.cc +++ b/cpp/src/arrow/io/libhdfs_shim.cc @@ -33,8 +33,8 @@ #ifndef _WIN32 #include #else -#include #include +#include // TODO(wesm): address when/if we add windows support // #include diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 51601a0a626..6989d732ca7 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -94,7 +94,7 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { Status WriteInternal(const uint8_t* data, int64_t nbytes); // Hide the internal details of this class for now - class MemoryMappedFileImpl; + class ARROW_NO_EXPORT MemoryMappedFileImpl; std::unique_ptr impl_; }; diff --git a/cpp/src/arrow/io/mman.h b/cpp/src/arrow/io/mman.h new file mode 100644 index 00000000000..00d1f93601d --- /dev/null +++ b/cpp/src/arrow/io/mman.h @@ -0,0 +1,189 @@ +// Copyright https://code.google.com/p/mman-win32/ +// +// Licensed under the MIT License; +// You may obtain a copy of the License at +// +// https://opensource.org/licenses/MIT + +#ifndef _MMAN_WIN32_H +#define _MMAN_WIN32_H + +// Allow use of features specific to Windows XP or later. +#ifndef _WIN32_WINNT +// Change this to the appropriate value to target other versions of Windows. +#define _WIN32_WINNT 0x0501 + +#endif + +#include +#include +#include +#include + +#define PROT_NONE 0 +#define PROT_READ 1 +#define PROT_WRITE 2 +#define PROT_EXEC 4 + +#define MAP_FILE 0 +#define MAP_SHARED 1 +#define MAP_PRIVATE 2 +#define MAP_TYPE 0xf +#define MAP_FIXED 0x10 +#define MAP_ANONYMOUS 0x20 +#define MAP_ANON MAP_ANONYMOUS + +#define MAP_FAILED ((void*)-1) + +/* Flags for msync. */ +#define MS_ASYNC 1 +#define MS_SYNC 2 +#define MS_INVALIDATE 4 + +#ifndef FILE_MAP_EXECUTE +#define FILE_MAP_EXECUTE 0x0020 +#endif + +static int __map_mman_error(const DWORD err, const int deferr) { + if (err == 0) return 0; + // TODO: implement + return err; +} + +static DWORD __map_mmap_prot_page(const int prot) { + DWORD protect = 0; + + if (prot == PROT_NONE) return protect; + + if ((prot & PROT_EXEC) != 0) { + protect = ((prot & PROT_WRITE) != 0) ? PAGE_EXECUTE_READWRITE : PAGE_EXECUTE_READ; + } else { + protect = ((prot & PROT_WRITE) != 0) ? PAGE_READWRITE : PAGE_READONLY; + } + + return protect; +} + +static DWORD __map_mmap_prot_file(const int prot) { + DWORD desiredAccess = 0; + + if (prot == PROT_NONE) return desiredAccess; + + if ((prot & PROT_READ) != 0) desiredAccess |= FILE_MAP_READ; + if ((prot & PROT_WRITE) != 0) desiredAccess |= FILE_MAP_WRITE; + if ((prot & PROT_EXEC) != 0) desiredAccess |= FILE_MAP_EXECUTE; + + return desiredAccess; +} + +void* mmap(void* addr, size_t len, int prot, int flags, int fildes, off_t off) { + HANDLE fm, h; + + void* map = MAP_FAILED; + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4293) +#endif + + const DWORD dwFileOffsetLow = + (sizeof(off_t) <= sizeof(DWORD)) ? (DWORD)off : (DWORD)(off & 0xFFFFFFFFL); + const DWORD dwFileOffsetHigh = + (sizeof(off_t) <= sizeof(DWORD)) ? (DWORD)0 : (DWORD)((off >> 32) & 0xFFFFFFFFL); + const DWORD protect = __map_mmap_prot_page(prot); + const DWORD desiredAccess = __map_mmap_prot_file(prot); + + const off_t maxSize = off + (off_t)len; + + const DWORD dwMaxSizeLow = + (sizeof(off_t) <= sizeof(DWORD)) ? (DWORD)maxSize : (DWORD)(maxSize & 0xFFFFFFFFL); + const DWORD dwMaxSizeHigh = (sizeof(off_t) <= sizeof(DWORD)) + ? (DWORD)0 + : (DWORD)((maxSize >> 32) & 0xFFFFFFFFL); + +#ifdef _MSC_VER +#pragma warning(pop) +#endif + + errno = 0; + + if (len == 0 + /* Unsupported flag combinations */ + || (flags & MAP_FIXED) != 0 + /* Usupported protection combinations */ + || prot == PROT_EXEC) { + errno = EINVAL; + return MAP_FAILED; + } + + h = ((flags & MAP_ANONYMOUS) == 0) ? (HANDLE)_get_osfhandle(fildes) + : INVALID_HANDLE_VALUE; + + if ((flags & MAP_ANONYMOUS) == 0 && h == INVALID_HANDLE_VALUE) { + errno = EBADF; + return MAP_FAILED; + } + + fm = CreateFileMapping(h, NULL, protect, dwMaxSizeHigh, dwMaxSizeLow, NULL); + + if (fm == NULL) { + errno = __map_mman_error(GetLastError(), EPERM); + return MAP_FAILED; + } + + map = MapViewOfFile(fm, desiredAccess, dwFileOffsetHigh, dwFileOffsetLow, len); + + CloseHandle(fm); + + if (map == NULL) { + errno = __map_mman_error(GetLastError(), EPERM); + return MAP_FAILED; + } + + return map; +} + +int munmap(void* addr, size_t len) { + if (UnmapViewOfFile(addr)) return 0; + + errno = __map_mman_error(GetLastError(), EPERM); + + return -1; +} + +int mprotect(void* addr, size_t len, int prot) { + DWORD newProtect = __map_mmap_prot_page(prot); + DWORD oldProtect = 0; + + if (VirtualProtect(addr, len, newProtect, &oldProtect)) return 0; + + errno = __map_mman_error(GetLastError(), EPERM); + + return -1; +} + +int msync(void* addr, size_t len, int flags) { + if (FlushViewOfFile(addr, len)) return 0; + + errno = __map_mman_error(GetLastError(), EPERM); + + return -1; +} + +int mlock(const void* addr, size_t len) { + if (VirtualLock((LPVOID)addr, len)) return 0; + + errno = __map_mman_error(GetLastError(), EPERM); + + return -1; +} + +int munlock(const void* addr, size_t len) { + if (VirtualUnlock((LPVOID)addr, len)) return 0; + + errno = __map_mman_error(GetLastError(), EPERM); + + return -1; +} + +#endif diff --git a/cpp/src/arrow/ipc/adapter.cc b/cpp/src/arrow/ipc/adapter.cc index 89b7fb987c6..99974a4a4c7 100644 --- a/cpp/src/arrow/ipc/adapter.cc +++ b/cpp/src/arrow/ipc/adapter.cc @@ -23,12 +23,12 @@ #include #include "arrow/array.h" +#include "arrow/io/interfaces.h" +#include "arrow/io/memory.h" #include "arrow/ipc/Message_generated.h" #include "arrow/ipc/metadata-internal.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" -#include "arrow/io/interfaces.h" -#include "arrow/io/memory.h" #include "arrow/schema.h" #include "arrow/table.h" #include "arrow/type.h" diff --git a/cpp/src/arrow/ipc/file.cc b/cpp/src/arrow/ipc/file.cc index 2bf10dde266..c68244d5025 100644 --- a/cpp/src/arrow/ipc/file.cc +++ b/cpp/src/arrow/ipc/file.cc @@ -22,10 +22,10 @@ #include #include +#include "arrow/io/interfaces.h" #include "arrow/ipc/adapter.h" #include "arrow/ipc/metadata.h" #include "arrow/ipc/util.h" -#include "arrow/io/interfaces.h" #include "arrow/util/buffer.h" #include "arrow/util/logging.h" #include "arrow/util/status.h" diff --git a/cpp/src/arrow/types/primitive-test.cc b/cpp/src/arrow/types/primitive-test.cc index 87eb0fe3a8b..ffebb9269bd 100644 --- a/cpp/src/arrow/types/primitive-test.cc +++ b/cpp/src/arrow/types/primitive-test.cc @@ -238,7 +238,8 @@ void TestPrimitiveBuilder::Check( } typedef ::testing::Types Primitives; + PInt32, PInt64, PFloat, PDouble> + Primitives; TYPED_TEST_CASE(TestPrimitiveBuilder, Primitives); diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h index d320d6adb7c..b22f07dd634 100644 --- a/cpp/src/arrow/util/logging.h +++ b/cpp/src/arrow/util/logging.h @@ -117,10 +117,10 @@ class CerrLog { // return so we create a new class to give it a hint. class FatalLog : public CerrLog { public: - FatalLog(int /* severity */) // NOLINT - : CerrLog(ARROW_FATAL) {} + explicit FatalLog(int /* severity */) // NOLINT + : CerrLog(ARROW_FATAL){} // NOLINT - [[noreturn]] ~FatalLog() { + [[noreturn]] ~FatalLog() { if (has_logged_) { std::cerr << std::endl; } std::exit(1); } diff --git a/cpp/src/arrow/util/memory-pool.cc b/cpp/src/arrow/util/memory-pool.cc index fed149bc359..9f83afe4cb2 100644 --- a/cpp/src/arrow/util/memory-pool.cc +++ b/cpp/src/arrow/util/memory-pool.cc @@ -17,13 +17,13 @@ #include "arrow/util/memory-pool.h" -#include #include #include #include +#include -#include "arrow/util/status.h" #include "arrow/util/logging.h" +#include "arrow/util/status.h" namespace arrow { diff --git a/cpp/src/arrow/util/status-test.cc b/cpp/src/arrow/util/status-test.cc index 45e0ff361ac..e0ff20fea12 100644 --- a/cpp/src/arrow/util/status-test.cc +++ b/cpp/src/arrow/util/status-test.cc @@ -17,8 +17,8 @@ #include "gtest/gtest.h" -#include "arrow/util/status.h" #include "arrow/test-util.h" +#include "arrow/util/status.h" namespace arrow {