diff --git a/src/storage/meta/catalog.cpp b/src/storage/meta/catalog.cpp index 4c9727aecc..4050acc35b 100644 --- a/src/storage/meta/catalog.cpp +++ b/src/storage/meta/catalog.cpp @@ -12,6 +12,10 @@ NewCatalog::NewCatalog(SharedPtr dir) : current_dir_(std::move(dir)) { } +// do not only use this method to create database +// it will not record database in transaction, so when you commit transaction +// it will lost operation +// use Txn::CreateDatabase instead EntryResult NewCatalog::CreateDatabase(NewCatalog* catalog, const String& db_name, @@ -48,6 +52,10 @@ NewCatalog::CreateDatabase(NewCatalog* catalog, return res; } +// do not only use this method to drop database +// it will not record database in transaction, so when you commit transaction +// it will lost operation +// use Txn::DropDatabase instead EntryResult NewCatalog::DropDatabase(NewCatalog* catalog, const String& db_name, @@ -115,8 +123,6 @@ NewCatalog::Databases(NewCatalog* catalog, Txn* txn) { nlohmann::json NewCatalog::Serialize(const NewCatalog* catalog) { - SharedPtr current_dir_{nullptr}; - nlohmann::json json_res; json_res["current_dir"] = *catalog->current_dir_; diff --git a/src/storage/meta/db_meta.cpp b/src/storage/meta/db_meta.cpp index bee2e44c2d..8a43c36eb6 100644 --- a/src/storage/meta/db_meta.cpp +++ b/src/storage/meta/db_meta.cpp @@ -3,6 +3,7 @@ // #include "db_meta.h" +#include "common/types/internal_types.h" #include "main/logger.h" #include "common/utility/defer_op.h" #include "storage/txn/txn_manager.h" @@ -33,7 +34,7 @@ DBMeta::CreateNewEntry(DBMeta* db_meta, res = db_entry.get(); db_meta->entry_list_.emplace_front(std::move(db_entry)); -// rw_locker_.unlock(); + LOG_TRACE("New database entry is added."); return {res, nullptr}; } else { @@ -50,8 +51,7 @@ DBMeta::CreateNewEntry(DBMeta* db_meta, } DBEntry* header_db_entry = (DBEntry*)header_base_entry; - if(header_db_entry->commit_ts_ < UNCOMMIT_TS) { - // Committed + if(header_db_entry->Committed()) { if(begin_ts > header_db_entry->commit_ts_) { if(header_db_entry->deleted_) { // No conflict @@ -144,8 +144,7 @@ DBMeta::DropNewEntry(DBMeta* db_meta, u64 txn_id, TxnTimeStamp begin_ts, TxnMana } DBEntry* header_db_entry = (DBEntry*)header_base_entry; - if(header_db_entry->commit_ts_ < UNCOMMIT_TS) { - // Committed + if(header_db_entry->Committed()) { if(begin_ts > header_db_entry->commit_ts_) { // No conflict if(header_db_entry->deleted_) { @@ -193,7 +192,7 @@ DBMeta::DeleteNewEntry(DBMeta* db_meta, u64 txn_id, TxnManager* txn_mgr) { auto removed_iter = std::remove_if(db_meta->entry_list_.begin(), db_meta->entry_list_.end(), - [&](UniquePtr& entry)->bool { + [&](auto& entry)->bool { return entry->txn_id_ == txn_id; }); @@ -215,8 +214,7 @@ DBMeta::GetEntry(DBMeta* db_meta, u64 txn_id, TxnTimeStamp begin_ts) { return {nullptr, MakeUnique("No valid db entry.")}; } - if(db_entry->commit_ts_ < UNCOMMIT_TS) { - // committed + if(db_entry->Committed()) { if(begin_ts > db_entry->commit_ts_) { if(db_entry->deleted_) { LOG_TRACE("DB is dropped.") @@ -226,12 +224,15 @@ DBMeta::GetEntry(DBMeta* db_meta, u64 txn_id, TxnTimeStamp begin_ts) { } } } else { - // Only committed txn is visible. Committing txn isn't visble. - - // not committed, but the same txn is also visible - if(txn_id == db_entry->txn_id_) { - // same txn - return {db_entry.get(), nullptr}; + // Only committed txn is visible. Committing txn isn't visble, + // except same txn is visible + if(txn_id == db_entry->txn_id_ ) { + if (db_entry->deleted_) { + LOG_TRACE("DB is dropped.") + return {nullptr, MakeUnique("DB is dropped.")}; + } else { + return {db_entry.get(), nullptr}; + } } } } diff --git a/src/storage/meta/entry/base_entry.h b/src/storage/meta/entry/base_entry.h index 66f6dd948f..97d5b41490 100644 --- a/src/storage/meta/entry/base_entry.h +++ b/src/storage/meta/entry/base_entry.h @@ -61,6 +61,21 @@ struct BaseEntry { struct EntryResult { BaseEntry* entry_; UniquePtr err_; + + bool Success() { + return err_ == nullptr; + } + + bool Fail() { + return err_ != nullptr; + } + + String ToString() { + if(err_ == nullptr) { + return "Success"; + } + return *err_.get(); + } }; } diff --git a/src/storage/txn/txn.h b/src/storage/txn/txn.h index 14a10ddebf..915cdcc6aa 100644 --- a/src/storage/txn/txn.h +++ b/src/storage/txn/txn.h @@ -116,6 +116,11 @@ class Txn { return txn_context_.GetCommitTS(); } + inline TxnTimeStamp + BeginTS() { + return txn_context_.GetBeginTS(); + } + inline TxnState GetTxnState() { return txn_context_.GetTxnState(); diff --git a/test/unittest/storage/meta/catalog.cpp b/test/unittest/storage/meta/catalog.cpp new file mode 100644 index 0000000000..345188a7d9 --- /dev/null +++ b/test/unittest/storage/meta/catalog.cpp @@ -0,0 +1,286 @@ +// +// Created by tangdonghai on 23-9-5. +// + +#include +#include +#include +#include +#include "base_test.h" +#include "common/types/internal_types.h" +#include "main/infinity.h" +#include "storage/buffer/buffer_manager.h" +#include "storage/meta/catalog.h" +#include "storage/meta/entry/base_entry.h" +#include "storage/txn/txn.h" +#include "storage/txn/txn_manager.h" +class CatalogTest : public BaseTest { + void + SetUp() override { + infinity::GlobalResourceUsage::Init(); + std::shared_ptr config_path = nullptr; + infinity::Infinity::instance().Init(config_path); + + system("rm -rf /tmp/infinity/data/db"); + system("rm -rf /tmp/infinity/_tmp"); + } + + void + TearDown() override { + infinity::Infinity::instance().UnInit(); + EXPECT_EQ(infinity::GlobalResourceUsage::GetObjectCount(), 0); + EXPECT_EQ(infinity::GlobalResourceUsage::GetRawMemoryCount(), 0); + infinity::GlobalResourceUsage::UnInit(); + } + +}; + +// txn1: create db1, get db1, delete db1, get db1, commit +// txn2: get db1, get db1, commit +TEST_F(CatalogTest, simple_test1) { + using namespace infinity; + + // create bufferManager && TxnManager + SizeT memory_limit = 1024 * 1024 * 1024; + SharedPtr temp_path = MakeShared("/tmp/infinity/_tmp"); + SharedPtr base_path = MakeShared("/tmp/infinity/data"); + BufferManager buffer_mgr(memory_limit, base_path, temp_path); + + UniquePtr dir = MakeUnique("db"); + NewCatalog new_catalog(std::move(dir)); + TxnManager txn_mgr(&new_catalog, &buffer_mgr); + + // start txn1 + auto* txn1 = txn_mgr.CreateTxn(); + txn1->BeginTxn(); + + // start txn2 + auto* txn2 = txn_mgr.CreateTxn(); + txn2->BeginTxn(); + HashMap databases; + + // create db in empty catalog should be success + { + EntryResult res; + res = NewCatalog::CreateDatabase(&new_catalog, "db1", txn1->TxnID(), txn1->BeginTS(), &txn_mgr); + EXPECT_TRUE(res.Success()); + // store this entry + databases["db1"] = res.entry_; + } + + { + EntryResult res; + res = NewCatalog::GetDatabase(&new_catalog, "db1", txn1->TxnID(), txn1->BeginTS()); + // should be visible to same txn + EXPECT_TRUE(res.Success()); + EXPECT_EQ(res.entry_, databases["db1"]); + + // should not be visible to other txn + res = NewCatalog::GetDatabase(&new_catalog, "db1", txn2->TxnID(), txn2->BeginTS()); + EXPECT_TRUE(res.Fail()); + } + + // drop db should be success + { + EntryResult res; + res = NewCatalog::DropDatabase(&new_catalog, "db1", txn1->TxnID(), txn1->BeginTS(), &txn_mgr); + EXPECT_TRUE(res.Success()); + EXPECT_EQ(res.entry_, databases["db1"]); + // remove this entry + databases.erase("db1"); + + res = NewCatalog::GetDatabase(&new_catalog, "db1", txn1->TxnID(), txn1->BeginTS()); + // should not be visible to same txn + EXPECT_TRUE(res.Fail()); + + // should not be visible to other txn + res = NewCatalog::GetDatabase(&new_catalog, "db1", txn2->TxnID(), txn2->BeginTS()); + EXPECT_TRUE(res.Fail()); + } + + txn1->CommitTxn(); + txn2->CommitTxn(); +} + +// txn1: create db1, commit. +// txn2: start, get db1, commit +// txn3: start, get db1, delete db1, commit +TEST_F(CatalogTest, simple_test2) { + using namespace infinity; + + // create bufferManager && TxnManager + SizeT memory_limit = 1024 * 1024 * 1024; + SharedPtr temp_path = MakeShared("/tmp/infinity/_tmp"); + SharedPtr base_path = MakeShared("/tmp/infinity/data"); + BufferManager buffer_mgr(memory_limit, base_path, temp_path); + + UniquePtr dir = MakeUnique("db"); + NewCatalog new_catalog(std::move(dir)); + TxnManager txn_mgr(&new_catalog, &buffer_mgr); + + // start txn1 + auto* txn1 = txn_mgr.CreateTxn(); + txn1->BeginTxn(); + + // start txn2 + auto* txn2 = txn_mgr.CreateTxn(); + txn2->BeginTxn(); + + HashMap databases; + + // create db in empty catalog should be success + { + EntryResult res; + res = txn1->CreateDatabase("db1"); + EXPECT_TRUE(res.Success()); + // store this entry + databases["db1"] = res.entry_; + } + + txn1->CommitTxn(); + + // should not be visible to txn2 + { + EntryResult res; + res = NewCatalog::GetDatabase(&new_catalog, "db1", txn2->TxnID(), txn2->BeginTS()); + EXPECT_TRUE(res.Fail()); + } + + txn2->CommitTxn(); + + auto* txn3 = txn_mgr.CreateTxn(); + txn3->BeginTxn(); + + + // should be visible to txn3 + { + EntryResult res; + res = NewCatalog::GetDatabase(&new_catalog, "db1", txn3->TxnID(), txn3->BeginTS()); + EXPECT_TRUE(res.Success()); + EXPECT_EQ(res.entry_, databases["db1"]); + + res = txn3->DropDatabase("db1"); + EXPECT_TRUE(res.Success()); + // should be different db entry + EXPECT_NE(res.entry_, databases["db1"]); + // remove this entry + databases.erase("db1"); + + // should not be visible to other txn + res = NewCatalog::GetDatabase(&new_catalog, "db1", txn3->TxnID(), txn3->BeginTS()); + EXPECT_TRUE(res.Fail()); + } + + txn3->CommitTxn(); +} + +TEST_F(CatalogTest, concurrent_test) { + using namespace infinity; + + // create bufferManager && TxnManager + SizeT memory_limit = 1024 * 1024 * 1024; + SharedPtr temp_path = MakeShared("/tmp/infinity/_tmp"); + SharedPtr base_path = MakeShared("/tmp/infinity/data"); + BufferManager buffer_mgr(memory_limit, base_path, temp_path); + + UniquePtr dir = MakeUnique("db"); + NewCatalog new_catalog(std::move(dir)); + TxnManager txn_mgr(&new_catalog, &buffer_mgr); + + // start txn1 && txn2 + auto* txn1 = txn_mgr.CreateTxn(); + txn1->BeginTxn(); + auto* txn2 = txn_mgr.CreateTxn(); + txn2->BeginTxn(); + + // lock protect databases + std::mutex lock; + HashMap databases; + + auto write_routine = [&](int start, Txn* txn) { + EntryResult res; + for(int i = start; i < 1000; i += 2) { + String db_name = "db" + std::to_string(i); + res = txn1->CreateDatabase(db_name); + EXPECT_TRUE(res.Success()); + // store this entry + lock.lock(); + databases[db_name] = res.entry_; + lock.unlock(); + } + }; + + std::thread write_thread1(write_routine, 0, txn1); + std::thread write_thread2(write_routine, 1, txn2); + + write_thread1.join(); + write_thread2.join(); + + txn1->CommitTxn(); + txn2->CommitTxn(); + + // start txn3 && txn4 + auto* txn3 = txn_mgr.CreateTxn(); + txn3->BeginTxn(); + auto* txn4 = txn_mgr.CreateTxn(); + txn4->BeginTxn(); + + auto read_routine = [&](Txn* txn) { + EntryResult res; + for(int i = 0; i < 1000; i++) { + String db_name = "db" + std::to_string(i); + res = NewCatalog::GetDatabase(&new_catalog, db_name, txn->TxnID(), txn->BeginTS()); + EXPECT_TRUE(res.Success()); + // only read, don't need lock + EXPECT_EQ(res.entry_, databases[db_name]); + } + }; + + std::thread read_thread1(read_routine, txn3); + std::thread read_thread2(read_routine, txn4); + read_thread1.join(); + read_thread2.join(); + + txn3->CommitTxn(); + txn4->CommitTxn(); + + // start txn5 && txn6 + auto* txn5 = txn_mgr.CreateTxn(); + txn5->BeginTxn(); + auto* txn6 = txn_mgr.CreateTxn(); + txn6->BeginTxn(); + + auto drop_routine = [&](int start, Txn* txn) { + EntryResult res; + for(int i = start; i < 1000; i += 2) { + String db_name = "db" + std::to_string(i); + res = txn->DropDatabase(db_name); + EXPECT_TRUE(res.Success()); + // store this entry + lock.lock(); + databases.erase(db_name); + lock.unlock(); + } + }; + + std::thread drop_thread1(drop_routine,0, txn5); + std::thread drop_thread2(drop_routine,1, txn6); + drop_thread1.join(); + drop_thread2.join(); + + txn5->CommitTxn(); + txn6->CommitTxn(); + + // start txn7 + auto* txn7 = txn_mgr.CreateTxn(); + txn7->BeginTxn(); + + // check all has been dropped + EntryResult res; + for(int i = 0; i < 1000; i++) { + String db_name = "db" + std::to_string(i); + res = NewCatalog::GetDatabase(&new_catalog, db_name, txn7->TxnID(), txn7->BeginTS()); + EXPECT_TRUE(res.Fail()); + } +} +