Skip to content

Commit

Permalink
Refactor cleanup ts
Browse files Browse the repository at this point in the history
Signed-off-by: Jin Hai <haijin.chn@gmail.com>
  • Loading branch information
JinHai-CN committed Nov 6, 2024
1 parent a63ced2 commit 6dc5187
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 30 deletions.
13 changes: 7 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,17 @@ elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "Debug")
message(STATUS "Enable AddressSanitizer")

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-stack-protector -fno-var-tracking ")
add_compile_options(-fsanitize=address -fsanitize-recover=all -fsanitize=leak)
add_link_options(-fsanitize=address -fsanitize-recover=all -fsanitize=leak)
# add_compile_options(-fsanitize=address -fsanitize-recover=all -fsanitize=leak)
# add_link_options(-fsanitize=address -fsanitize-recover=all -fsanitize=leak)

add_compile_options("-fno-omit-frame-pointer")
add_link_options("-fno-omit-frame-pointer")

# add_compile_options("-fsanitize=undefined")
# add_link_options("-fsanitize=undefined")
# add_compile_options("-fsanitize=undefined")
# add_link_options("-fsanitize=undefined")

# add_compile_options("-fsanitize=thread")
# add_link_options("-fsanitize=thread")
add_compile_options("-fsanitize=thread")
add_link_options("-fsanitize=thread")
else()
message(STATUS "Disable AddressSanitizer because jemalloc")
endif()
Expand Down Expand Up @@ -202,6 +202,7 @@ endif()

if(CMAKE_BUILD_TYPE STREQUAL "Debug")
ADD_DEFINITIONS(-D INFINITY_DEBUG)
ADD_DEFINITIONS(-D INFINITY_STATS)
endif()

# find_package(Boost REQUIRED)
Expand Down
2 changes: 1 addition & 1 deletion src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ TxnTimeStamp Txn::Commit() {
txn_store_.PrepareCommit1(); // Only for import and compact, pre-commit segment
// LOG_INFO(fmt::format("Txn {} commit ts: {}", txn_id_, commit_ts));

if (txn_mgr_->CheckConflict(this)) {
if (txn_mgr_->CheckTxnConflict(this)) {
LOG_ERROR(fmt::format("Txn: {} is rolled back. rollback ts: {}", txn_id_, commit_ts));
wal_entry_ = nullptr;
txn_mgr_->SendToWAL(this);
Expand Down
37 changes: 21 additions & 16 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import global_resource_usage;
namespace infinity {

TxnManager::TxnManager(BufferManager *buffer_mgr, WalManager *wal_mgr, TxnTimeStamp start_ts)
: buffer_mgr_(buffer_mgr), wal_mgr_(wal_mgr), start_ts_(start_ts), is_running_(false) {
: buffer_mgr_(buffer_mgr), wal_mgr_(wal_mgr), current_ts_(start_ts), is_running_(false) {
#ifdef INFINITY_DEBUG
GlobalResourceUsage::IncrObjectCount("TxnManager");
#endif
Expand All @@ -65,7 +65,7 @@ Txn *TxnManager::BeginTxn(UniquePtr<String> txn_text, bool ckp_txn) {
u64 new_txn_id = ++catalog_ptr->next_txn_id_;

// Record the start ts of the txn
TxnTimeStamp begin_ts = start_ts_ + 1;
TxnTimeStamp begin_ts = current_ts_ + 1;
if (ckp_txn) {
if (ckp_begin_ts_ != UNCOMMIT_TS) {
// not set ckp_begin_ts_ may not truncate the wal file.
Expand Down Expand Up @@ -120,23 +120,23 @@ bool TxnManager::CheckIfCommitting(TransactionID txn_id, TxnTimeStamp begin_ts)
// Prepare to commit ReadTxn
TxnTimeStamp TxnManager::GetReadCommitTS(Txn *txn) {
std::lock_guard guard(locker_);
TxnTimeStamp commit_ts = start_ts_ + 1;
TxnTimeStamp commit_ts = current_ts_ + 1;
txn->SetTxnRead();
return commit_ts;
}

// Prepare to commit WriteTxn
TxnTimeStamp TxnManager::GetWriteCommitTS(Txn *txn) {
std::lock_guard guard(locker_);
start_ts_ += 2;
TxnTimeStamp commit_ts = start_ts_;
current_ts_ += 2;
TxnTimeStamp commit_ts = current_ts_;
wait_conflict_ck_.emplace(commit_ts, nullptr);
committing_txns_.emplace(txn);
txn->SetTxnWrite();
return commit_ts;
}

bool TxnManager::CheckConflict(Txn *txn) {
bool TxnManager::CheckTxnConflict(Txn *txn) {
TxnTimeStamp commit_ts = txn->CommitTS();
Vector<SharedPtr<Txn>> candidate_txns;
TxnTimeStamp min_checking_ts = UNCOMMIT_TS;
Expand Down Expand Up @@ -286,29 +286,34 @@ UniquePtr<TxnInfo> TxnManager::GetTxnInfoByID(TransactionID txn_id) const {
return MakeUnique<TxnInfo>(iter->first, iter->second->GetTxnText());
}

TxnTimeStamp TxnManager::CurrentTS() const { return start_ts_; }
TxnTimeStamp TxnManager::CurrentTS() const { return current_ts_; }

TxnTimeStamp TxnManager::GetNewTimeStamp() { return start_ts_ + 1; }
TxnTimeStamp TxnManager::GetNewTimeStamp() { return current_ts_ + 1; }

TxnTimeStamp TxnManager::GetCleanupScanTS() {
std::lock_guard guard(locker_);
TxnTimeStamp first_uncommitted_begin_ts = start_ts_;
TxnTimeStamp first_uncommitted_begin_ts = current_ts_;
// Get earliest txn of the ongoing transactions
while (!beginned_txns_.empty()) {
auto first_txn = beginned_txns_.front().lock();
if (first_txn.get() != nullptr) {
Txn* first_txn = beginned_txns_.front().get();
if (first_txn != nullptr) {
first_uncommitted_begin_ts = first_txn->BeginTS();
break;
}
beginned_txns_.pop_front();
}
TxnTimeStamp checkpointed_ts = wal_mgr_->GetCheckpointedTS();
TxnTimeStamp res = std::min(first_uncommitted_begin_ts, checkpointed_ts);

// Get the earlier txn ts between current ongoing txn and last checkpoint ts
TxnTimeStamp last_checkpoint_ts = wal_mgr_->LastCheckpointTS();
TxnTimeStamp least_ts = std::min(first_uncommitted_begin_ts, last_checkpoint_ts);

// Get the earlier txn ts between current least txn and checking conflict TS
if (!checking_ts_.empty()) {
res = std::min(res, *checking_ts_.begin());
least_ts = std::min(least_ts, *checking_ts_.begin());
}

LOG_INFO(fmt::format("Cleanup scan ts: {}, checkpoint ts: {}", res, checkpointed_ts));
return res;
LOG_INFO(fmt::format("Cleanup scan ts: {}, checkpoint ts: {}", least_ts, last_checkpoint_ts));
return least_ts;
}

void TxnManager::CleanupTxn(Txn *txn) {
Expand Down
10 changes: 5 additions & 5 deletions src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public:

TxnTimeStamp GetWriteCommitTS(Txn *txn);

bool CheckConflict(Txn *txn);
bool CheckTxnConflict(Txn *txn);

void SendToWAL(Txn *txn);

Expand Down Expand Up @@ -103,7 +103,7 @@ public:
bool InCheckpointProcess(TxnTimeStamp commit_ts);

// Only used by follower and learner when received the replicated log from leader
void SetStartTS(TxnTimeStamp new_start_ts) { start_ts_ = new_start_ts; }
void SetStartTS(TxnTimeStamp new_start_ts) { current_ts_ = new_start_ts; }

private:
mutable std::mutex locker_{};
Expand All @@ -112,14 +112,14 @@ private:
HashMap<TransactionID, SharedPtr<Txn>> txn_map_{};
WalManager *wal_mgr_;

Deque<WeakPtr<Txn>> beginned_txns_; // sorted by begin ts
List<SharedPtr<Txn>> beginned_txns_; // sorted by begin ts
HashSet<Txn *> committing_txns_; // the txns in committing stage, can use flat_map
Set<TxnTimeStamp> checking_ts_{}; // the begin ts of txn that is used to check conflict

Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{}; // sorted by commit ts

Atomic<TxnTimeStamp> start_ts_{}; // The next txn ts
TxnTimeStamp ckp_begin_ts_ = UNCOMMIT_TS; // cur ckp begin ts, 0 if no ckp is happening
Atomic<TxnTimeStamp> current_ts_{}; // The next txn ts
TxnTimeStamp ckp_begin_ts_ = UNCOMMIT_TS; // current ckp begin ts, UNCOMMIT_TS if no ckp is happening, UNCOMMIT_TS is a maximum u64 integer

// For stop the txn manager
atomic_bool is_running_{false};
Expand Down
2 changes: 1 addition & 1 deletion src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void WalManager::PutEntries(Vector<WalEntry *> wal_entries) {
wait_flush_.EnqueueBulk(wal_entries);
}

TxnTimeStamp WalManager::GetCheckpointedTS() { return last_ckp_ts_ == UNCOMMIT_TS ? 0 : last_ckp_ts_; }
TxnTimeStamp WalManager::LastCheckpointTS() { return last_ckp_ts_ == UNCOMMIT_TS ? 0 : last_ckp_ts_; }

Vector<SharedPtr<String>> WalManager::GetDiffWalEntryString(TxnTimeStamp start_timestamp) const {

Expand Down
2 changes: 1 addition & 1 deletion src/storage/wal/wal_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public:

void ReplayWalEntry(const WalEntry &entry, bool on_startup, bool is_replay);

TxnTimeStamp GetCheckpointedTS();
TxnTimeStamp LastCheckpointTS();

Vector<SharedPtr<String>> GetDiffWalEntryString(TxnTimeStamp timestamp) const;
void UpdateCommitState(TxnTimeStamp commit_ts, i64 wal_size);
Expand Down

0 comments on commit 6dc5187

Please sign in to comment.