Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions plugins/experimental/stek_share/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
#include <mutex>
#include <deque>
#include <cmath>
#include <atomic>

#define PLUGIN "stek_share"
#define PLUGIN_NAME "stek_share"

class PluginThreads
{
Expand Down Expand Up @@ -64,7 +65,7 @@ class PluginThreads
}

private:
bool shut_down = false;
std::atomic<bool> shut_down = false;
std::deque<pthread_t> threads_queue;
std::mutex threads_mutex;
};
Expand Down
26 changes: 13 additions & 13 deletions plugins/experimental/stek_share/log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,33 +34,33 @@ class STEKShareLogStore : public nuraft::log_store

__nocopy__(STEKShareLogStore);

uint64_t next_slot() const;
uint64_t next_slot() const override;

uint64_t start_index() const;
uint64_t start_index() const override;

nuraft::ptr<nuraft::log_entry> last_entry() const;
nuraft::ptr<nuraft::log_entry> last_entry() const override;

uint64_t append(nuraft::ptr<nuraft::log_entry> &entry);
uint64_t append(nuraft::ptr<nuraft::log_entry> &entry) override;

void write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> &entry);
void write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> &entry) override;

nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(uint64_t start, uint64_t end);
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(uint64_t start, uint64_t end) override;

nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries_ext(uint64_t start, uint64_t end,
int64_t batch_size_hint_in_bytes = 0);
int64_t batch_size_hint_in_bytes = 0) override;

nuraft::ptr<nuraft::log_entry> entry_at(uint64_t index);
nuraft::ptr<nuraft::log_entry> entry_at(uint64_t index) override;

uint64_t term_at(uint64_t index);
uint64_t term_at(uint64_t index) override;

nuraft::ptr<nuraft::buffer> pack(uint64_t index, int32_t cnt);
nuraft::ptr<nuraft::buffer> pack(uint64_t index, int32_t cnt) override;

void apply_pack(uint64_t index, nuraft::buffer &pack);
void apply_pack(uint64_t index, nuraft::buffer &pack) override;

bool compact(uint64_t last_log_index);
bool compact(uint64_t last_log_index) override;

bool
flush()
flush() override
{
return true;
}
Expand Down
32 changes: 16 additions & 16 deletions plugins/experimental/stek_share/state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ class STEKShareSM : public nuraft::state_machine
~STEKShareSM() {}

nuraft::ptr<nuraft::buffer>
pre_commit(const uint64_t log_idx, nuraft::buffer &data)
pre_commit(const uint64_t log_idx, nuraft::buffer &data) override
{
return nullptr;
}

nuraft::ptr<nuraft::buffer>
commit(const uint64_t log_idx, nuraft::buffer &data)
commit(const uint64_t log_idx, nuraft::buffer &data) override
{
// Extract bytes from "data".
size_t len = 0;
nuraft::buffer_serializer bs_data(data);
void *byte_array = bs_data.get_bytes(len);
// TSDebug(PLUGIN, "commit %lu: %s", log_idx, hex_str(std::string(reinterpret_cast<char *>(byte_array), len)).c_str());
// TSDebug(PLUGIN_NAME, "commit %lu: %s", log_idx, hex_str(std::string(reinterpret_cast<char *>(byte_array), len)).c_str());

assert(len == SSL_TICKET_KEY_SIZE);

Expand Down Expand Up @@ -89,23 +89,23 @@ class STEKShareSM : public nuraft::state_machine
}

void
commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> &new_conf)
commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> &new_conf) override
{
// Nothing to do with configuration change. Just update committed index.
last_committed_idx_ = log_idx;
}

void
rollback(const uint64_t log_idx, nuraft::buffer &data)
rollback(const uint64_t log_idx, nuraft::buffer &data) override
{
// Nothing to do here since we don't have pre-commit.
}

int
read_logical_snp_obj(nuraft::snapshot &s, void *&user_snp_ctx, uint64_t obj_id, nuraft::ptr<nuraft::buffer> &data_out,
bool &is_last_obj)
bool &is_last_obj) override
{
// TSDebug(PLUGIN, "read snapshot %lu term %lu object ID %lu", s.get_last_log_idx(), s.get_last_log_term(), obj_id);
// TSDebug(PLUGIN_NAME, "read snapshot %lu term %lu object ID %lu", s.get_last_log_idx(), s.get_last_log_term(), obj_id);

is_last_obj = true;

Expand All @@ -124,9 +124,9 @@ class STEKShareSM : public nuraft::state_machine
}

void
save_logical_snp_obj(nuraft::snapshot &s, uint64_t &obj_id, nuraft::buffer &data, bool is_first_obj, bool is_last_obj)
save_logical_snp_obj(nuraft::snapshot &s, uint64_t &obj_id, nuraft::buffer &data, bool is_first_obj, bool is_last_obj) override
{
// TSDebug(PLUGIN, "save snapshot %lu term %lu object ID %lu", s.get_last_log_idx(), s.get_last_log_term(), obj_id);
// TSDebug(PLUGIN_NAME, "save snapshot %lu term %lu object ID %lu", s.get_last_log_idx(), s.get_last_log_term(), obj_id);

size_t len = 0;
nuraft::buffer_serializer bs_data(data);
Expand All @@ -150,9 +150,9 @@ class STEKShareSM : public nuraft::state_machine
}

bool
apply_snapshot(nuraft::snapshot &s)
apply_snapshot(nuraft::snapshot &s) override
{
// TSDebug(PLUGIN, "apply snapshot %lu term %lu", s.get_last_log_idx(), s.get_last_log_term());
// TSDebug(PLUGIN_NAME, "apply snapshot %lu term %lu", s.get_last_log_idx(), s.get_last_log_term());

{
std::lock_guard<std::mutex> l(snapshot_lock_);
Expand All @@ -168,12 +168,12 @@ class STEKShareSM : public nuraft::state_machine
}

void
free_user_snp_ctx(void *&user_snp_ctx)
free_user_snp_ctx(void *&user_snp_ctx) override
{
}

nuraft::ptr<nuraft::snapshot>
last_snapshot()
last_snapshot() override
{
// Just return the latest snapshot.
std::lock_guard<std::mutex> l(snapshot_lock_);
Expand All @@ -184,15 +184,15 @@ class STEKShareSM : public nuraft::state_machine
}

uint64_t
last_commit_index()
last_commit_index() override
{
return last_committed_idx_;
}

void
create_snapshot(nuraft::snapshot &s, nuraft::async_result<bool>::handler_type &when_done)
create_snapshot(nuraft::snapshot &s, nuraft::async_result<bool>::handler_type &when_done) override
{
// TSDebug(PLUGIN, "create snapshot %lu term %lu", s.get_last_log_idx(), s.get_last_log_term());
// TSDebug(PLUGIN_NAME, "create snapshot %lu term %lu", s.get_last_log_idx(), s.get_last_log_term());

ssl_ticket_key_t local_stek;
{
Expand Down
14 changes: 7 additions & 7 deletions plugins/experimental/stek_share/state_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,45 @@ class STEKShareSMGR : public nuraft::state_mgr
~STEKShareSMGR() {}

nuraft::ptr<nuraft::cluster_config>
load_config()
load_config() override
{
return saved_config_;
}

void
save_config(const nuraft::cluster_config &config)
save_config(const nuraft::cluster_config &config) override
{
nuraft::ptr<nuraft::buffer> buf = config.serialize();
saved_config_ = nuraft::cluster_config::deserialize(*buf);
}

void
save_state(const nuraft::srv_state &state)
save_state(const nuraft::srv_state &state) override
{
nuraft::ptr<nuraft::buffer> buf = state.serialize();
saved_state_ = nuraft::srv_state::deserialize(*buf);
}

nuraft::ptr<nuraft::srv_state>
read_state()
read_state() override
{
return saved_state_;
}

nuraft::ptr<nuraft::log_store>
load_log_store()
load_log_store() override
{
return cur_log_store_;
}

int32_t
server_id()
server_id() override
{
return my_id_;
}

void
system_exit(const int exit_code)
system_exit(const int exit_code) override
{
}

Expand Down
Loading