diff --git a/CMakeLists.txt b/CMakeLists.txt index 8f0fc4cdea2..e9a690b03ad 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -722,6 +722,7 @@ set(SOURCES env/env_chroot.cc env/env_encryption.cc env/env_hdfs.cc + env/env_inspected.cc env/file_system.cc env/file_system_tracer.cc env/fs_remap.cc diff --git a/TARGETS b/TARGETS index cd1f5274a31..40f02d2a177 100644 --- a/TARGETS +++ b/TARGETS @@ -230,6 +230,7 @@ cpp_library( "env/env_chroot.cc", "env/env_encryption.cc", "env/env_hdfs.cc", + "env/env_inspected.cc", "env/env_posix.cc", "env/file_system.cc", "env/file_system_tracer.cc", @@ -562,6 +563,7 @@ cpp_library( "env/env_chroot.cc", "env/env_encryption.cc", "env/env_hdfs.cc", + "env/env_inspected.cc", "env/env_posix.cc", "env/file_system.cc", "env/file_system_tracer.cc", diff --git a/env/env_basic_test.cc b/env/env_basic_test.cc index 40413b569fc..b179ad0662c 100644 --- a/env/env_basic_test.cc +++ b/env/env_basic_test.cc @@ -14,6 +14,7 @@ #include "rocksdb/convenience.h" #include "rocksdb/env.h" #include "rocksdb/env_encryption.h" +#include "rocksdb/env_inspected.h" #include "test_util/testharness.h" namespace ROCKSDB_NAMESPACE { @@ -81,6 +82,41 @@ static Env* GetTestFS() { EXPECT_NE(fs_env, nullptr); return fs_env; } + +class DummyFileSystemInspector : public FileSystemInspector { + public: + DummyFileSystemInspector(size_t refill_bytes = 0) + : refill_bytes_(refill_bytes) {} + + Status Read(size_t len, size_t* allowed) override { + assert(allowed); + if (refill_bytes_ == 0) { + *allowed = len; + } else { + *allowed = std::min(refill_bytes_, len); + } + return Status::OK(); + } + + Status Write(size_t len, size_t* allowed) override { + assert(allowed); + if (refill_bytes_ == 0) { + *allowed = len; + } else { + *allowed = std::min(refill_bytes_, len); + } + return Status::OK(); + } + + private: + size_t refill_bytes_; +}; + +static Env* GetInspectedEnv() { + static std::unique_ptr inspected_env(NewFileSystemInspectedEnv( + Env::Default(), std::make_shared(1))); + return inspected_env.get(); +} #endif // ROCKSDB_LITE } // namespace @@ -121,6 +157,9 @@ INSTANTIATE_TEST_CASE_P(EncryptedEnv, EnvMoreTestWithParam, INSTANTIATE_TEST_CASE_P(MemEnv, EnvBasicTestWithParam, ::testing::Values(&GetMemoryEnv)); +INSTANTIATE_TEST_CASE_P(InspectedEnv, EnvBasicTestWithParam, + ::testing::Values(&GetInspectedEnv)); + namespace { // Returns a vector of 0 or 1 Env*, depending whether an Env is registered for diff --git a/env/env_inspected.cc b/env/env_inspected.cc new file mode 100644 index 00000000000..354af1cfc2b --- /dev/null +++ b/env/env_inspected.cc @@ -0,0 +1,401 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +#ifndef ROCKSDB_LITE +#include "rocksdb/env_inspected.h" + +namespace ROCKSDB_NAMESPACE { + +class InspectedSequentialFile : public SequentialFileWrapper { + public: + InspectedSequentialFile(std::unique_ptr&& target, + std::shared_ptr inspector) + : SequentialFileWrapper(target.get()), + owner_(std::move(target)), + inspector_(inspector) {} + + Status Read(size_t n, Slice* result, char* scratch) override { + assert(inspector_); + Status s; + size_t offset = 0; + size_t allowed = 0; + while (offset < n) { + s = inspector_->Read(n - offset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= n - offset); + if (allowed > 0) { + s = SequentialFileWrapper::Read(allowed, result, scratch + offset); + if (!s.ok()) { + break; + } + size_t actual_read = result->size(); + if (result->data() != scratch + offset) { + // Only possible when underlying file ignores or misuses user + // provided buffer. Reject this case. + memmove(scratch + offset, result->data(), actual_read); + assert(false); + } + offset += actual_read; + if (actual_read < allowed) { + break; + } + } + } + *result = Slice(scratch, offset); + return s; + } + + Status PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) override { + assert(inspector_); + Status s; + size_t roffset = 0; + size_t allowed = 0; + while (roffset < n) { + s = inspector_->Read(n - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= n - roffset); + if (allowed > 0) { + s = SequentialFileWrapper::PositionedRead(offset + roffset, allowed, + result, scratch + roffset); + if (!s.ok()) { + break; + } + size_t actual_read = result->size(); + if (result->data() != scratch + roffset) { + memmove(scratch + roffset, result->data(), actual_read); + assert(false); + } + roffset += actual_read; + if (actual_read < allowed) { + break; + } + } + } + *result = Slice(scratch, roffset); + return s; + } + + private: + std::unique_ptr owner_; + std::shared_ptr inspector_; +}; + +class InspectedRandomAccessFile : public RandomAccessFileWrapper { + public: + InspectedRandomAccessFile(std::unique_ptr&& target, + std::shared_ptr inspector) + : RandomAccessFileWrapper(target.get()), + owner_(std::move(target)), + inspector_(inspector) {} + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + assert(inspector_); + Status s; + size_t roffset = 0; + size_t allowed = 0; + while (roffset < n) { + s = inspector_->Read(n - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= n - roffset); + if (allowed > 0) { + s = RandomAccessFileWrapper::Read(offset + roffset, allowed, result, + scratch + roffset); + if (!s.ok()) { + break; + } + size_t actual_read = result->size(); + if (result->data() != scratch + roffset) { + memmove(scratch + roffset, result->data(), actual_read); + assert(false); + } + roffset += actual_read; + if (actual_read < allowed) { + break; + } + } + } + *result = Slice(scratch, roffset); + return s; + } + + // TODO: support parallel MultiRead + Status MultiRead(ReadRequest* reqs, size_t num_reqs) override { + assert(reqs != nullptr); + for (size_t i = 0; i < num_reqs; ++i) { + ReadRequest& req = reqs[i]; + req.status = Read(req.offset, req.len, &req.result, req.scratch); + } + return Status::OK(); + } + + private: + std::unique_ptr owner_; + std::shared_ptr inspector_; +}; + +class InspectedWritableFile : public WritableFileWrapper { + public: + InspectedWritableFile(std::unique_ptr&& target, + std::shared_ptr inspector) + : WritableFileWrapper(target.get()), + owner_(std::move(target)), + inspector_(inspector) {} + + Status Append(const Slice& data) override { + assert(inspector_); + Status s; + size_t size = data.size(); + size_t offset = 0; + size_t allowed = 0; + while (offset < size) { + s = inspector_->Write(size - offset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= size - offset); + if (allowed > 0) { + s = WritableFileWrapper::Append(Slice(data.data() + offset, allowed)); + if (!s.ok()) { + break; + } + } + offset += allowed; + } + return s; + } + + Status Append(const Slice& data, + const DataVerificationInfo& verification_info) override { + assert(inspector_); + Status s; + size_t size = data.size(); + size_t offset = 0; + size_t allowed = 0; + while (offset < size) { + s = inspector_->Write(size - offset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= size - offset); + if (allowed > 0) { + s = WritableFileWrapper::Append(Slice(data.data() + offset, allowed), + verification_info); + if (!s.ok()) { + break; + } + } + offset += allowed; + } + return s; + } + + Status PositionedAppend(const Slice& data, uint64_t offset) override { + assert(inspector_); + Status s; + size_t size = data.size(); + size_t roffset = 0; + size_t allowed = 0; + while (roffset < size) { + s = inspector_->Write(size - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= size - roffset); + if (allowed > 0) { + s = WritableFileWrapper::PositionedAppend( + Slice(data.data() + roffset, allowed), offset + roffset); + if (!s.ok()) { + break; + } + } + roffset += allowed; + } + return s; + } + + Status PositionedAppend( + const Slice& data, uint64_t offset, + const DataVerificationInfo& verification_info) override { + assert(inspector_); + Status s; + size_t size = data.size(); + size_t roffset = 0; + size_t allowed = 0; + while (roffset < size) { + s = inspector_->Write(size - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= size - roffset); + if (allowed > 0) { + s = WritableFileWrapper::PositionedAppend( + Slice(data.data() + roffset, allowed), offset + roffset, + verification_info); + if (!s.ok()) { + break; + } + } + roffset += allowed; + } + return s; + } + + private: + std::unique_ptr owner_; + std::shared_ptr inspector_; +}; + +class InspectedRandomRWFile : public RandomRWFileWrapper { + public: + InspectedRandomRWFile(std::unique_ptr&& target, + std::shared_ptr inspector) + : RandomRWFileWrapper(target.get()), + owner_(std::move(target)), + inspector_(inspector) {} + + Status Write(uint64_t offset, const Slice& data) override { + assert(inspector_); + Status s; + size_t size = data.size(); + size_t roffset = 0; + size_t allowed = 0; + while (roffset < size) { + s = inspector_->Write(size - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= size - roffset); + if (allowed > 0) { + s = RandomRWFileWrapper::Write(offset + roffset, + Slice(data.data() + roffset, allowed)); + if (!s.ok()) { + break; + } + } + roffset += allowed; + } + return s; + } + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + assert(inspector_); + Status s; + size_t roffset = 0; + size_t allowed = 0; + while (roffset < n) { + s = inspector_->Read(n - roffset, &allowed); + if (!s.ok()) { + return s; + } + assert(allowed <= n - roffset); + if (allowed > 0) { + s = RandomRWFileWrapper::Read(offset + roffset, allowed, result, + scratch + roffset); + if (!s.ok()) { + return s; + } + size_t actual_read = result->size(); + if (result->data() != scratch + roffset) { + memmove(scratch + roffset, result->data(), actual_read); + assert(false); + } + roffset += actual_read; + if (actual_read < allowed) { + break; + } + } + } + *result = Slice(scratch, roffset); + return s; + } + + private: + std::unique_ptr owner_; + std::shared_ptr inspector_; +}; + +FileSystemInspectedEnv::FileSystemInspectedEnv( + Env* base_env, std::shared_ptr& inspector) + : EnvWrapper(base_env), inspector_(inspector) {} + +Status FileSystemInspectedEnv::NewSequentialFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + auto s = EnvWrapper::NewSequentialFile(fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedSequentialFile(std::move(*result), inspector_)); + return s; +} + +Status FileSystemInspectedEnv::NewRandomAccessFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + auto s = EnvWrapper::NewRandomAccessFile(fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedRandomAccessFile(std::move(*result), inspector_)); + return s; +} + +Status FileSystemInspectedEnv::NewWritableFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + auto s = EnvWrapper::NewWritableFile(fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedWritableFile(std::move(*result), inspector_)); + return s; +} + +Status FileSystemInspectedEnv::ReopenWritableFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + auto s = EnvWrapper::ReopenWritableFile(fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedWritableFile(std::move(*result), inspector_)); + return s; +} + +Status FileSystemInspectedEnv::ReuseWritableFile( + const std::string& fname, const std::string& old_fname, + std::unique_ptr* result, const EnvOptions& options) { + auto s = EnvWrapper::ReuseWritableFile(fname, old_fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedWritableFile(std::move(*result), inspector_)); + return s; +} + +Status FileSystemInspectedEnv::NewRandomRWFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& options) { + auto s = EnvWrapper::NewRandomRWFile(fname, result, options); + if (!s.ok()) { + return s; + } + result->reset(new InspectedRandomRWFile(std::move(*result), inspector_)); + return s; +} + +Env* NewFileSystemInspectedEnv(Env* base_env, + std::shared_ptr inspector) { + return new FileSystemInspectedEnv(base_env, inspector); +} + +} // namespace ROCKSDB_NAMESPACE +#endif // !ROCKSDB_LITE diff --git a/include/rocksdb/env_inspected.h b/include/rocksdb/env_inspected.h new file mode 100644 index 00000000000..f5f2ee0c667 --- /dev/null +++ b/include/rocksdb/env_inspected.h @@ -0,0 +1,59 @@ +// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0. + +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include + +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { + +// Interface to inspect storage requests. FileSystemInspectedEnv will consult +// FileSystemInspector before issuing actual disk IO. +class FileSystemInspector { + public: + virtual ~FileSystemInspector() = default; + + virtual Status Read(size_t len, size_t* allowed) = 0; + virtual Status Write(size_t len, size_t* allowed) = 0; +}; + +// An Env with underlying IO requests being inspected. It holds a reference to +// an external FileSystemInspector to consult for IO inspection. +class FileSystemInspectedEnv : public EnvWrapper { + public: + FileSystemInspectedEnv(Env* base_env, + std::shared_ptr& inspector); + + Status NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + Status ReopenWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + Status ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options) override; + Status NewRandomRWFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + + private: + const std::shared_ptr inspector_; +}; + +extern Env* NewFileSystemInspectedEnv( + Env* base_env, std::shared_ptr inspector); + +} // namespace ROCKSDB_NAMESPACE + +#endif // !ROCKSDB_LITE diff --git a/src.mk b/src.mk index f532888440b..12ceeed7e33 100644 --- a/src.mk +++ b/src.mk @@ -89,6 +89,7 @@ LIB_SOURCES = \ env/env_chroot.cc \ env/env_encryption.cc \ env/env_hdfs.cc \ + env/env_inspected.cc \ env/env_posix.cc \ env/file_system.cc \ env/fs_posix.cc \