Skip to content

Commit

Permalink
VFS first phrase (#1485)
Browse files Browse the repository at this point in the history
Introduced PersistenceManager.
Adapted FileWorker for PersistenceManager.

Issue link:#1184

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
  • Loading branch information
yuzhichang authored Jul 17, 2024
1 parent 3378cac commit 46937c4
Show file tree
Hide file tree
Showing 26 changed files with 492 additions and 15 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/slow_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ jobs:
- name: Test embedded infinity for Python 3.10
if: ${{ !cancelled() && !failure() }}
run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && mkdir -p /var/infinity && source /usr/local/venv310/bin/activate && python3 tools/run_pysdk_local_infinity_test.py"
run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && mkdir -p /var/infinity && source /usr/local/venv310/bin/activate && python3 tools/run_pysdk_local_infinity_test.py"

- name: Test embedded infinity for Python 3.11
if: ${{ !cancelled() && !failure() }}
run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && mkdir -p /var/infinity && source /usr/local/venv310/bin/activate && python3 tools/run_pysdk_local_infinity_test.py"
run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && mkdir -p /var/infinity && source /usr/local/venv311/bin/activate && python3 tools/run_pysdk_local_infinity_test.py"

- name: Test embedded infinity for Python 3.12
if: ${{ !cancelled() && !failure() }}
run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && mkdir -p /var/infinity && source /usr/local/venv310/bin/activate && python3 tools/run_pysdk_local_infinity_test.py"
run: sudo docker exec ${BUILDER_CONTAINER} bash -c "cd /infinity/ && rm -fr /var/infinity && mkdir -p /var/infinity && source /usr/local/venv312/bin/activate && python3 tools/run_pysdk_local_infinity_test.py"

- name: Prepare sift dataset
if: ${{ !cancelled() && !failure() }}
Expand Down
2 changes: 2 additions & 0 deletions conf/infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,5 @@ wal_flush = "only_write"

[resource]
resource_dir = "/var/infinity/resource"

[persistence]
2 changes: 2 additions & 0 deletions conf/pytest_parallel_infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,5 @@ wal_dir = "/var/infinity/wal"

[resource]
resource_dir = "/var/infinity/resource"

[persistence]
8 changes: 8 additions & 0 deletions src/common/default_values.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ export {
constexpr SizeT DEFAULT_LOG_FILE_SIZE = 64 * 1024lu * 1024lu; // 64MB
constexpr std::string_view DEFAULT_LOG_FILE_SIZE_STR = "64MB"; // 64MB

// default persistence parameter
constexpr std::string_view DEFAULT_PERSISTENCE_DIR = ""; // Empty means disabled
constexpr std::string_view DEFAULT_PERSISTENCE_OBJECT_SIZE_LIMIT_STR = "100MB"; // 100MB
constexpr SizeT DEFAULT_PERSISTENCE_OBJECT_SIZE_LIMIT = 100 * 1024lu * 1024lu; // 100MB

// config name
constexpr std::string_view VERSION_OPTION_NAME = "version";
constexpr std::string_view TIME_ZONE_OPTION_NAME = "time_zone";
Expand All @@ -198,6 +203,9 @@ export {
constexpr std::string_view OPTIMIZE_INTERVAL_OPTION_NAME = "optimize_interval";
constexpr std::string_view MEM_INDEX_CAPACITY_OPTION_NAME = "mem_index_capacity";

constexpr std::string_view PERSISTENCE_DIR_OPTION_NAME = "persistence_dir";
constexpr std::string_view PERSISTENCE_OBJECT_SIZE_LIMIT_OPTION_NAME = "persistence_object_size_limit";

constexpr std::string_view BUFFER_MANAGER_SIZE_OPTION_NAME = "buffer_manager_size";
constexpr std::string_view LRU_NUM_OPTION_NAME = "lru_num";
constexpr std::string_view TEMP_DIR_OPTION_NAME = "temp_dir";
Expand Down
19 changes: 14 additions & 5 deletions src/common/stl.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ module;
#include <cstdlib>
#include <cstring>
#include <exception>
#include <source_location>
#include <filesystem>
#include <forward_list>
#include <functional>
Expand All @@ -41,9 +40,12 @@ module;
#include <random>
#include <set>
#include <shared_mutex>
#include <source_location>
#include <span>
#include <sstream>
#include <stdexcept>
#include <string>
#include <system_error>
#include <thread>
#include <type_traits>
#include <typeinfo>
Expand All @@ -52,7 +54,6 @@ module;
#include <utility>
#include <variant>
#include <vector>
#include <span>

export module stl;

Expand All @@ -72,9 +73,10 @@ export namespace std {
using std::max;
using std::min;

using std::to_string;
using std::from_chars;
using std::errc;
using std::error_code;
using std::from_chars;
using std::to_string;

using std::stoi;
using std::stol;
Expand All @@ -90,8 +92,10 @@ export namespace std {
using std::memcmp;
using std::strlen;

using std::time;
using std::fprintf;
using std::printf;
using std::sprintf;
using std::time;

using std::is_same;
using std::fill;
Expand Down Expand Up @@ -198,6 +202,8 @@ export namespace std {
using std::fstream;
using std::ios;

using std::endian;

using std::align;

using std::ptrdiff_t;
Expand All @@ -208,6 +214,8 @@ export namespace std {
namespace filesystem {
using std::filesystem::canonical;
using std::filesystem::copy;
using std::filesystem::copy_file;
using std::filesystem::copy_options;
using std::filesystem::exists;
using std::filesystem::file_size;
using std::filesystem::path;
Expand All @@ -222,6 +230,7 @@ export namespace std {
using std::iota;
using std::mt19937;
using std::random_device;
using std::uniform_int_distribution;
using std::uniform_real_distribution;

using std::exception;
Expand Down
57 changes: 57 additions & 0 deletions src/common/utility/uuid.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Refers to
// https://datatracker.ietf.org/doc/rfc9562/, chapter 5.7, UUID Version 7

module;

export module uuid;

import stl;

export namespace infinity {

struct UUID {
union {
u8 data[16];
u64 data64[2];
};

UUID() {
auto now = std::chrono::system_clock::now().time_since_epoch();
u64 ms = std::chrono::duration_cast<std::chrono::milliseconds>(now).count();
u64 ns = std::chrono::duration_cast<std::chrono::nanoseconds>(now).count();
data64[0] = (ms << 16) | (ns & 0xffff);
if constexpr (std::endian::native == std::endian::little) {
data64[0] = __builtin_bswap64(data64[0]);
}
std::mt19937 rng(std::random_device{}());
std::uniform_int_distribution<u64> dist;
data64[1] = dist(rng);
data[6] = (data[6] & 0x0f) | 0x70; // ver, 4-bit, 0b0111
data[8] = (data[8] & 0x3f) | 0x80; // var, 2-bit, 0b10
}

String to_string() const {
char buf[37];
std::sprintf(buf,
"%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x",
data[0],
data[1],
data[2],
data[3],
data[4],
data[5],
data[6],
data[7],
data[8],
data[9],
data[10],
data[11],
data[12],
data[13],
data[14],
data[15]);
return buf;
}
};

} // namespace infinity
88 changes: 88 additions & 0 deletions src/main/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,16 @@ Status Config::Init(const SharedPtr<String> &config_path, DefaultConfig* default
UnrecoverableError(status.message());
}

// Persistence Dir
String persistence_dir = DEFAULT_PERSISTENCE_DIR.data();
UniquePtr<StringOption> persistence_dir_option = MakeUnique<StringOption>(PERSISTENCE_DIR_OPTION_NAME, persistence_dir);
global_options_.AddOption(std::move(persistence_dir_option));

// Persistence Object Size Limit
i64 persistence_object_size_limit = DEFAULT_PERSISTENCE_OBJECT_SIZE_LIMIT;
UniquePtr<IntegerOption> persistence_object_size_limit_option =
MakeUnique<IntegerOption>(PERSISTENCE_OBJECT_SIZE_LIMIT_OPTION_NAME, persistence_object_size_limit, std::numeric_limits<i64>::max(), 0);
global_options_.AddOption(std::move(persistence_object_size_limit_option));
} else {
config_toml = toml::parse_file(*config_path);

Expand Down Expand Up @@ -1188,6 +1198,73 @@ Status Config::Init(const SharedPtr<String> &config_path, DefaultConfig* default
}
}

// Persistence
{
if (config_toml.contains("persistence")) {
auto persistence_config = config_toml["persistence"];
auto persistence_config_table = persistence_config.as_table();
for (auto &elem : *persistence_config_table) {
String var_name = String(elem.first);
GlobalOptionIndex option_index = global_options_.GetOptionIndex(var_name);
switch (option_index) {
case GlobalOptionIndex::kPersistenceDir: {
String persistence_dir;
if (elem.second.is_string()) {
persistence_dir = elem.second.value_or(DEFAULT_PERSISTENCE_DIR.data());
} else {
return Status::InvalidConfig("'persistence_dir' field isn't string, such as \"persistence\"");
}
UniquePtr<StringOption> persistence_dir_option = MakeUnique<StringOption>(PERSISTENCE_DIR_OPTION_NAME, persistence_dir);
global_options_.AddOption(std::move(persistence_dir_option));
break;
}
case GlobalOptionIndex::kPersistenceObjectSizeLimit: {
i64 persistence_object_size_limit;
if (elem.second.is_string()) {
String persistence_object_size_limit_str = elem.second.value_or(DEFAULT_PERSISTENCE_OBJECT_SIZE_LIMIT_STR.data());
auto res = ParseByteSize(persistence_object_size_limit_str, persistence_object_size_limit);
if (!res.ok()) {
return res;
}
} else {
return Status::InvalidConfig("'persistence_object_size_limit' field isn't string, such as \"100MB\"");
}
UniquePtr<IntegerOption> persistence_object_size_limit_option =
MakeUnique<IntegerOption>(PERSISTENCE_OBJECT_SIZE_LIMIT_OPTION_NAME,
persistence_object_size_limit,
std::numeric_limits<i64>::max(),
0);
if (!persistence_object_size_limit_option->Validate()) {
return Status::InvalidConfig(fmt::format("Invalid persistence_object_size_limit: {}", persistence_object_size_limit));
}
global_options_.AddOption(std::move(persistence_object_size_limit_option));
break;
}
case GlobalOptionIndex::kInvalid:
default: {
return Status::InvalidConfig(fmt::format("Unrecognized config parameter: {} in 'persistence' field", var_name));
}
}
}
if (global_options_.GetOptionByIndex(GlobalOptionIndex::kPersistenceDir) == nullptr) {
String persistence_dir = DEFAULT_PERSISTENCE_DIR.data();
UniquePtr<StringOption> persistence_dir_option = MakeUnique<StringOption>(PERSISTENCE_DIR_OPTION_NAME, persistence_dir);
global_options_.AddOption(std::move(persistence_dir_option));
}
if (global_options_.GetOptionByIndex(GlobalOptionIndex::kPersistenceObjectSizeLimit) == nullptr) {
i64 persistence_object_size_limit = DEFAULT_PERSISTENCE_OBJECT_SIZE_LIMIT;
UniquePtr<IntegerOption> persistence_object_size_limit_option =
MakeUnique<IntegerOption>(PERSISTENCE_OBJECT_SIZE_LIMIT_OPTION_NAME,
persistence_object_size_limit,
std::numeric_limits<i64>::max(),
0);
global_options_.AddOption(std::move(persistence_object_size_limit_option));
}
} else {
return Status::InvalidConfig("No 'persistence' section in configure file.");
}
}

// Buffer
{
if (config_toml.contains("buffer")) {
Expand Down Expand Up @@ -1728,6 +1805,17 @@ i64 Config::MemIndexCapacity() {
return global_options_.GetIntegerValue(GlobalOptionIndex::kMemIndexCapacity);
}

// Persistence
String Config::PersistenceDir() {
std::lock_guard<std::mutex> guard(mutex_);
return global_options_.GetStringValue(GlobalOptionIndex::kPersistenceDir);
}

i64 Config::PersistenceObjectSizeLimit() {
std::lock_guard<std::mutex> guard(mutex_);
return global_options_.GetIntegerValue(GlobalOptionIndex::kPersistenceObjectSizeLimit);
}

// Buffer
i64 Config::BufferManagerSize() {
std::lock_guard<std::mutex> guard(mutex_);
Expand Down
4 changes: 4 additions & 0 deletions src/main/config.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public:

i64 MemIndexCapacity();

// Persistence
String PersistenceDir();
i64 PersistenceObjectSizeLimit();

// Buffer
i64 BufferManagerSize();
SizeT LRUNum();
Expand Down
6 changes: 6 additions & 0 deletions src/main/infinity_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ void InfinityContext::Init(const SharedPtr<String> &config_path, DefaultConfig*
storage_ = MakeUnique<Storage>(config_.get());
storage_->Init();

String persistence_dir = config_->PersistenceDir();
if (!persistence_dir.empty()) {
i64 persistence_object_size_limit = config_->PersistenceObjectSizeLimit();
persistence_manager_ = MakeUnique<PersistenceManager>(persistence_dir, (SizeT)persistence_object_size_limit);
}

inverting_thread_pool_.resize(config_->CPULimit());
commiting_thread_pool_.resize(config_->CPULimit());
initialized_ = true;
Expand Down
4 changes: 4 additions & 0 deletions src/main/infinity_context.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import task_scheduler;
import storage;
import singleton;
import session_manager;
import persistence_manager;
import third_party;

namespace infinity {
Expand All @@ -35,6 +36,8 @@ public:

[[nodiscard]] inline Storage *storage() noexcept { return storage_.get(); }

[[nodiscard]] inline PersistenceManager *persistence_manager() noexcept { return persistence_manager_.get(); }

[[nodiscard]] inline ResourceManager *resource_manager() noexcept { return resource_manager_.get(); }

[[nodiscard]] inline SessionManager *session_manager() noexcept { return session_mgr_.get(); }
Expand All @@ -55,6 +58,7 @@ private:
UniquePtr<ResourceManager> resource_manager_{};
UniquePtr<TaskScheduler> task_scheduler_{};
UniquePtr<Storage> storage_{};
UniquePtr<PersistenceManager> persistence_manager_{};
UniquePtr<SessionManager> session_mgr_{};
// For fulltext index
ThreadPool inverting_thread_pool_{4};
Expand Down
3 changes: 3 additions & 0 deletions src/main/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ GlobalOptions::GlobalOptions() {
name2index_[String(OPTIMIZE_INTERVAL_OPTION_NAME)] = GlobalOptionIndex::kOptimizeIndexInterval;
name2index_[String(MEM_INDEX_CAPACITY_OPTION_NAME)] = GlobalOptionIndex::kMemIndexCapacity;

name2index_[String(PERSISTENCE_DIR_OPTION_NAME)] = GlobalOptionIndex::kPersistenceDir;
name2index_[String(PERSISTENCE_OBJECT_SIZE_LIMIT_OPTION_NAME)] = GlobalOptionIndex::kPersistenceObjectSizeLimit;

name2index_[String(BUFFER_MANAGER_SIZE_OPTION_NAME)] = GlobalOptionIndex::kBufferManagerSize;
name2index_[String(LRU_NUM_OPTION_NAME)] = GlobalOptionIndex::kLRUNum;
name2index_[String(TEMP_DIR_OPTION_NAME)] = GlobalOptionIndex::kTempDir;
Expand Down
4 changes: 3 additions & 1 deletion src/main/options.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ export enum class GlobalOptionIndex {
kFlushMethodAtCommit = 28,
kResourcePath = 29,
kRecordRunningQuery = 30,
kInvalid = 31
kPersistenceDir = 31,
kPersistenceObjectSizeLimit = 32,
kInvalid = 33
};

export struct GlobalOptions {
Expand Down
Loading

0 comments on commit 46937c4

Please sign in to comment.