Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: restart from compact and add test case. #2363

Merged
merged 11 commits into from
Dec 13, 2024
8 changes: 8 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ jobs:
echo "MINIO_CONTAINER=${MINIO_CONTAINER}" >> $GITHUB_ENV
echo "MINIO_DIR=${MINIO_DIR}" >> $GITHUB_ENV
sudo docker rm -f -v ${MINIO_CONTAINER} && sudo rm -fr ${MINIO_DIR} && sudo mkdir ${MINIO_DIR} && sudo docker run -d --net=container:${BUILDER_CONTAINER} --name ${MINIO_CONTAINER} -e "MINIO_ROOT_PASSWORD=minioadmin" -e "MINIO_ROOT_USER=minioadmin" -v ${MINIO_DIR}:/data quay.io/minio/minio server /data --console-address ":9006" --address ":9005" && sleep 5s
if ! sudo docker ps --filter "name=${MINIO_CONTAINER}" --filter "status=running" | grep -q ${MINIO_CONTAINER}; then
echo "Minio container is not running"
exit 1
fi

- name: Start infinity debug version with minio
if: ${{ !cancelled() && !failure() }}
Expand Down Expand Up @@ -291,6 +295,10 @@ jobs:
echo "MINIO_CONTAINER=${MINIO_CONTAINER}" >> $GITHUB_ENV
echo "MINIO_DIR=${MINIO_DIR}" >> $GITHUB_ENV
sudo docker rm -f -v ${MINIO_CONTAINER} && sudo rm -fr ${MINIO_DIR} && sudo mkdir ${MINIO_DIR} && sudo docker run -d --net=container:${BUILDER_CONTAINER} --name ${MINIO_CONTAINER} -e "MINIO_ROOT_PASSWORD=minioadmin" -e "MINIO_ROOT_USER=minioadmin" -v ${MINIO_DIR}:/data quay.io/minio/minio server /data --console-address ":9006" --address ":9005" && sleep 5s
if ! sudo docker ps --filter "name=${MINIO_CONTAINER}" --filter "status=running" | grep -q ${MINIO_CONTAINER}; then
echo "Minio container is not running"
exit 1
fi

- name: Start infinity release version with minio
if: ${{ !cancelled() && !failure() }}
Expand Down
2 changes: 0 additions & 2 deletions conf/pytest_parallel_infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ log_level = "trace"

[storage]
persistence_dir = "/var/infinity/persistence"
compact_interval = "10s"
cleanup_interval = "0s"

[buffer]
buffer_manager_size = "8GB"
Expand Down
27 changes: 23 additions & 4 deletions python/restart_test/infinity_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,23 @@ def init(self, config_path: str | None = None):
self.process = subprocess.Popen(cmd, shell=True, env=my_env)
self.i += 1

def uninit(self):
def uninit(self, kill: bool = False, timeout: int = 60):
if self.process is None:
return
timeout = 60
pids = []
for child in psutil.Process(self.process.pid).children(recursive=True):
pids.append(child.pid)
if len(pids) == 0:
raise Exception("Cannot find infinity process.")

if kill:
os.system(f"kill -9 {' '.join(map(str, pids))}")
time.sleep(1)
while any(psutil.pid_exists(pid) for pid in pids):
time.sleep(1)
self.process = None
return

ret = os.system(f"bash {self.script_path} {timeout} {' '.join(map(str, pids))}")
if ret != 0:
raise Exception("An error occurred.")
Expand Down Expand Up @@ -122,7 +130,14 @@ def connect(self, uri: str):


def infinity_runner_decorator_factory(
config_path: str | None, uri: str, infinity_runner: InfinityRunner, shutdown_out: bool = False
config_path: str | None,
uri: str,
infinity_runner: InfinityRunner,
*,
shutdown_out: bool = False,
kill: bool = False,
terminate_timeout: int = 60,
check_kill: bool = True
):
def decorator(f):
def wrapper(*args, **kwargs):
Expand All @@ -136,7 +151,11 @@ def wrapper(*args, **kwargs):
except Exception:
if not shutdown_out:
raise
infinity_runner.uninit()
try:
infinity_runner.uninit(kill, terminate_timeout)
except Exception:
if check_kill:
raise

return wrapper

Expand Down
56 changes: 55 additions & 1 deletion python/restart_test/test_compact.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def part1(infinity_obj):
table_obj.import_data(dataset_path, import_options)
table_obj.compact()


part1()
import_time = 4

Expand All @@ -89,3 +88,58 @@ def part2(infinity_obj):
assert count_star == 9 * import_time

part2()

def test_compact_restart_repeatedly(self, infinity_runner: InfinityRunner):
config1 = "test/data/config/restart_test/test_compact/1.toml"
config2 = "test/data/config/restart_test/test_compact/2.toml"

uri = common_values.TEST_LOCAL_HOST
infinity_runner.clear()

decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner)
decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner)

table_name = "test_compact2"
import_path = "test/data/csv/embedding_int_dim3.csv"
import_num = 1000
import_options = None
kill_num = 10
file_lines = 3

@decorator1
def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
db_obj.drop_table(table_name, ConflictType.Ignore)
table_obj = db_obj.create_table(
table_name,
{
"col1": {"type": "int"},
"col2": {"type": "vector, 3, float"},
},
)

part1()

import_n = 0

@decorator2
def part2(infinity_obj):
nonlocal import_n
table_obj = infinity_obj.get_database("default_db").get_table(table_name)
data_dict, _, _ = table_obj.output(["count(*)"]).to_result()
count_star = data_dict["count(star)"][0]
assert count_star == import_n * file_lines

for i in range(import_num):
table_obj.import_data(import_path, import_options)
import_n += 1

for i in range(kill_num):
part2()

@decorator1
def part3(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
db_obj.drop_table(table_name)

part3()
2 changes: 1 addition & 1 deletion python/restart_test/test_fulltext.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_fulltext(self, infinity_runner: InfinityRunner, config: str):
infinity_runner.clear()

decorator = infinity_runner_decorator_factory(config, uri, infinity_runner)
decorator2 = infinity_runner_decorator_factory(config, uri, infinity_runner, True)
decorator2 = infinity_runner_decorator_factory(config, uri, infinity_runner, shutdown_out=True)

@decorator
def part1(infinity_obj):
Expand Down
2 changes: 1 addition & 1 deletion python/restart_test/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def insert_inner(
shutdown = False
error = False

decorator = infinity_runner_decorator_factory(config, uri, infinity_runner, True)
decorator = infinity_runner_decorator_factory(config, uri, infinity_runner, shutdown_out=True)

def insert_func(table_obj):
nonlocal cur_insert_n, shutdown, error
Expand Down
2 changes: 1 addition & 1 deletion python/restart_test/test_insert_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def insert_import_inner(
logger = infinity_runner.logger
write_i = 0

decorator = infinity_runner_decorator_factory(config, uri, infinity_runner, True)
decorator = infinity_runner_decorator_factory(config, uri, infinity_runner, shutdown_out=True)

def insert_import_func(table_obj):
nonlocal cur_n, insert_finish, shutdown, error, write_i
Expand Down
2 changes: 1 addition & 1 deletion python/restart_test/test_shutdown_pytest.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def test_shutdown_pytest(self, infinity_runner: InfinityRunner, pytest_mark: str
gen = self.run_pytest_seperately(test_dir, pytest_mark=pytest_mark)

decorator = infinity_runner_decorator_factory(
config, uri, infinity_runner, True
config, uri, infinity_runner, shutdown_out=True
)

def shutdown_func():
Expand Down
2 changes: 1 addition & 1 deletion src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void FileWorker::ReadFromFile(bool from_spill) {
SizeT file_size = 0;
auto [file_handle, status] = VirtualStore::Open(read_path, FileAccessMode::kRead);
if (!status.ok()) {
UnrecoverableError(status.message());
UnrecoverableError(fmt::format("Read path: {}, error: {}", read_path, status.message()));
}
if (use_object_cache) {
file_handle->Seek(obj_addr_.part_offset_);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ void Catalog::LoadFromEntryDelta(UniquePtr<CatalogDeltaEntry> delta_entry, Buffe
auto begin_ts = op->begin_ts_;
std::string_view encode = *op->encode_;
MergeFlag merge_flag = op->merge_flag_;
if (op->commit_ts_ < full_ckp_commit_ts_) {
if (op->commit_ts_ <= full_ckp_commit_ts_) {
// Ignore the old txn
continue;
}
Expand Down
18 changes: 17 additions & 1 deletion src/storage/meta/entry/segment_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,22 @@ SharedPtr<BlockEntry> SegmentEntry::GetBlockEntryByID(BlockID block_id) const {
return block_entries_[block_id];
}

SegmentStatus SegmentEntry::GetSaveStatus(TxnTimeStamp ts) const {
switch (status_) {
case SegmentStatus::kUnsealed:
case SegmentStatus::kSealed: {
return status_;
}
case SegmentStatus::kCompacting:
case SegmentStatus::kNoDelete: {
return SegmentStatus::kSealed;
}
case SegmentStatus::kDeprecated: {
return ts >= deprecate_ts_ ? SegmentStatus::kDeprecated : SegmentStatus::kSealed;
}
}
};

nlohmann::json SegmentEntry::Serialize(TxnTimeStamp max_commit_ts) {
nlohmann::json json_res;

Expand All @@ -554,7 +570,7 @@ nlohmann::json SegmentEntry::Serialize(TxnTimeStamp max_commit_ts) {
json_res["commit_ts"] = TxnTimeStamp(this->commit_ts_);
json_res["begin_ts"] = TxnTimeStamp(this->begin_ts_);
json_res["txn_id"] = TransactionID(this->txn_id_);
json_res["status"] = static_cast<std::underlying_type_t<SegmentStatus>>(this->status_);
json_res["status"] = static_cast<std::underlying_type_t<SegmentStatus>>(this->GetSaveStatus(max_commit_ts));
if (status_ != SegmentStatus::kUnsealed) {
LOG_TRACE(fmt::format("SegmentEntry::Serialize: Begin try to save FastRoughFilter to json file"));
this->GetFastRoughFilter()->SaveToJsonFile(json_res);
Expand Down
2 changes: 2 additions & 0 deletions src/storage/meta/entry/segment_entry.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public:

static SharedPtr<SegmentEntry> Deserialize(const nlohmann::json &table_entry_json, TableEntry *table_entry, BufferManager *buffer_mgr);

SegmentStatus GetSaveStatus(TxnTimeStamp ts) const;

public:
void AddBlockReplay(SharedPtr<BlockEntry> block_entry);

Expand Down
3 changes: 2 additions & 1 deletion src/storage/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ Status Storage::AdminToWriter() {
UnrecoverableError("Memory index tracer was initialized before.");
}
memory_index_tracer_ = MakeUnique<BGMemIndexTracer>(config_ptr_->MemIndexMemoryQuota(), new_catalog_.get(), txn_mgr_.get());
cleanup_info_tracer_ = MakeUnique<CleanupInfoTracer>();

bg_processor_->Start();

Expand Down Expand Up @@ -634,7 +635,6 @@ Status Storage::SetStorageMode(StorageMode target_mode) {
LOG_WARN(fmt::format("Set unchanged mode"));
return Status::OK();
}
cleanup_info_tracer_ = MakeUnique<CleanupInfoTracer>();
switch (current_mode) {
case StorageMode::kUnInitialized: {
if (target_mode != StorageMode::kAdmin) {
Expand Down Expand Up @@ -732,6 +732,7 @@ Status Storage::AdminToReaderBottom(TxnTimeStamp system_start_ts) {
UnrecoverableError("Memory index tracer was initialized before.");
}
memory_index_tracer_ = MakeUnique<BGMemIndexTracer>(config_ptr_->MemIndexMemoryQuota(), new_catalog_.get(), txn_mgr_.get());
cleanup_info_tracer_ = MakeUnique<CleanupInfoTracer>();

new_catalog_->StartMemoryIndexCommit();
new_catalog_->MemIndexRecover(buffer_mgr_.get(), system_start_ts);
Expand Down
14 changes: 9 additions & 5 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import global_resource_usage;
namespace infinity {

TxnManager::TxnManager(BufferManager *buffer_mgr, WalManager *wal_mgr, TxnTimeStamp start_ts)
: buffer_mgr_(buffer_mgr), wal_mgr_(wal_mgr), current_ts_(start_ts), is_running_(false) {
: buffer_mgr_(buffer_mgr), wal_mgr_(wal_mgr), current_ts_(start_ts), max_committed_ts_(start_ts), is_running_(false) {
#ifdef INFINITY_DEBUG
GlobalResourceUsage::IncrObjectCount("TxnManager");
#endif
Expand Down Expand Up @@ -131,7 +131,7 @@ TxnTimeStamp TxnManager::GetWriteCommitTS(Txn *txn) {
current_ts_ += 2;
TxnTimeStamp commit_ts = current_ts_;
wait_conflict_ck_.emplace(commit_ts, nullptr);
committing_txns_.emplace(txn);
committing_txns_.emplace(commit_ts, txn);
txn->SetTxnWrite();
return commit_ts;
}
Expand All @@ -143,8 +143,7 @@ bool TxnManager::CheckTxnConflict(Txn *txn) {
{
std::lock_guard guard(locker_);
// LOG_INFO(fmt::format("Txn {}(commit_ts:{}) check conflict", txn->TxnID(), txn->CommitTS()));
for (Txn *committing_txn : committing_txns_) {
TxnTimeStamp committing_ts = committing_txn->CommitTS();
for (auto &[committing_ts, committing_txn] : committing_txns_) {
if (commit_ts > committing_ts) {
candidate_txns.push_back(committing_txn->shared_from_this());
min_checking_ts = std::min(min_checking_ts, committing_txn->BeginTS());
Expand Down Expand Up @@ -345,8 +344,9 @@ void TxnManager::CleanupTxn(Txn *txn) {
TransactionID txn_id = txn->TxnID();
{
// cleanup the txn from committing_txn and txm_map
auto commit_ts = txn->CommitTS();
std::lock_guard guard(locker_);
SizeT remove_n = committing_txns_.erase(txn);
SizeT remove_n = committing_txns_.erase(commit_ts);
if (remove_n == 0) {
UnrecoverableError("Txn not found in committing_txns_");
}
Expand All @@ -355,6 +355,10 @@ void TxnManager::CleanupTxn(Txn *txn) {
String error_message = fmt::format("Txn: {} not found in txn map", txn_id);
UnrecoverableError(error_message);
}

if (committing_txns_.empty() || committing_txns_.begin()->first > commit_ts) {
max_committed_ts_ = commit_ts;
}
}
break;
}
Expand Down
5 changes: 4 additions & 1 deletion src/storage/txn/txn_manager.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public:
// Only used by follower and learner when received the replicated log from leader
void SetStartTS(TxnTimeStamp new_start_ts) { current_ts_ = new_start_ts; }

TxnTimeStamp max_committed_ts() { return max_committed_ts_; }

private:
mutable std::mutex locker_{};
BufferManager *buffer_mgr_{};
Expand All @@ -113,12 +115,13 @@ private:
WalManager *wal_mgr_;

Deque<WeakPtr<Txn>> beginned_txns_; // sorted by begin ts
HashSet<Txn *> committing_txns_; // the txns in committing stage, can use flat_map
Map<TxnTimeStamp, Txn *> committing_txns_; // the txns in committing stage
Set<TxnTimeStamp> checking_ts_{}; // the begin ts of txn that is used to check conflict

Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{}; // sorted by commit ts

Atomic<TxnTimeStamp> current_ts_{}; // The next txn ts
Atomic<TxnTimeStamp> max_committed_ts_{};
TxnTimeStamp ckp_begin_ts_ = UNCOMMIT_TS; // current ckp begin ts, UNCOMMIT_TS if no ckp is happening, UNCOMMIT_TS is a maximum u64 integer

// For stop the txn manager
Expand Down
2 changes: 1 addition & 1 deletion src/storage/wal/catalog_delta_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ AddTableEntryOp::AddTableEntryOp(TableEntry *table_entry, TxnTimeStamp commit_ts
table_comment_(table_entry->GetTableComment()) {}

AddSegmentEntryOp::AddSegmentEntryOp(SegmentEntry *segment_entry, TxnTimeStamp commit_ts, String segment_filter_binary_data)
: CatalogDeltaOperation(CatalogDeltaOpType::ADD_SEGMENT_ENTRY, segment_entry, commit_ts), status_(segment_entry->status()),
: CatalogDeltaOperation(CatalogDeltaOpType::ADD_SEGMENT_ENTRY, segment_entry, commit_ts), status_(segment_entry->GetSaveStatus(commit_ts)),
column_count_(segment_entry->column_count()), row_count_(segment_entry->row_count()), // FIXME: use append_state
actual_row_count_(segment_entry->actual_row_count()), // FIXME: use append_state
row_capacity_(segment_entry->row_capacity()), min_row_ts_(segment_entry->min_row_ts()), max_row_ts_(segment_entry->max_row_ts()),
Expand Down
4 changes: 3 additions & 1 deletion src/storage/wal/wal_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,9 @@ void WalManager::FullCheckpointInner(Txn *txn) {
TxnTimeStamp last_ckp_ts = last_ckp_ts_;
TxnTimeStamp last_full_ckp_ts = last_full_ckp_ts_;
auto [max_commit_ts, wal_size] = GetCommitState();
max_commit_ts = std::min(max_commit_ts, txn->BeginTS()); // txn commit after txn->BeginTS() should be ignored
// max_commit_ts = std::min(max_commit_ts, txn->BeginTS()); // txn commit after txn->BeginTS() should be ignored
TxnManager *txn_mgr = storage_->txn_manager();
max_commit_ts = txn_mgr->max_committed_ts();
// wal_size may be larger than the actual size. but it's ok. it only makes the swap of wal file a little bit later.

if (max_commit_ts == last_full_ckp_ts) {
Expand Down
22 changes: 22 additions & 0 deletions test/data/config/restart_test/test_compact/2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[general]
version = "0.5.0"
time_zone = "utc-8"

[network]
[log]
log_to_stdout = true
log_level = "trace"

[storage]
data_dir = "/var/infinity/data"
optimize_interval = "0s"
cleanup_interval = "2s"
compact_interval = "1s"
persistence_dir = ""

[buffer]
[wal]
delta_checkpoint_interval = "1s"
full_checkpoint_interval = "3s"

[resource]
Loading
Loading