From 292cf84532bcb40e203684c940b256c6a649b851 Mon Sep 17 00:00:00 2001 From: shen yushi Date: Thu, 7 Nov 2024 14:10:24 +0800 Subject: [PATCH] Fix bugs. (#2188) ### What problem does this PR solve? Fix bug issues. Issue link: Not solved: #2119 #2112 #2066 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --- python/restart_test/test_memidx.py | 33 ++++++++++--------- scripts/collect_log.py | 4 +-- src/main/infinity_context.cpp | 2 ++ .../knn_index/knn_hnsw/abstract_hnsw.cppm | 3 ++ .../persistence/persistence_manager.cpp | 14 ++++---- 5 files changed, 32 insertions(+), 24 deletions(-) diff --git a/python/restart_test/test_memidx.py b/python/restart_test/test_memidx.py index 1237733ac4..5c44a3b071 100644 --- a/python/restart_test/test_memidx.py +++ b/python/restart_test/test_memidx.py @@ -162,7 +162,6 @@ def part1(infinity_obj): table_obj1.insert( [{"c1": 2, "c2": [0.1, 0.2, 0.3, -0.2]} for i in range(6)] ) - table_obj1.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)]) infinity_obj.drop_database("db2", conflict_type=ConflictType.Ignore) db_obj2 = infinity_obj.create_database("db2") @@ -177,6 +176,10 @@ def part1(infinity_obj): table_obj2.insert( [{"c1": 2, "c2": [0.1, 0.2, 0.3, -0.2]} for i in range(6)] ) + + # wait memidx1 dump + time.sleep(1) + table_obj1.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)]) table_obj2.insert([{"c1": 4, "c2": [0.2, 0.1, 0.3, 0.4]} for i in range(6)]) part1() @@ -186,25 +189,25 @@ def part2(infinity_obj): # wait for optimize time.sleep(3) - idx1_files = list(pathlib.Path(data_dir).rglob(f"*{idx1_name}*")) - idx2_files = list(pathlib.Path(data_dir).rglob(f"*{idx2_name}*")) - assert len(idx1_files) == 1 - assert len(idx2_files) == 1 + idx1_dirs = list(pathlib.Path(data_dir).rglob(f"*{idx1_name}*")) + idx2_dirs = list(pathlib.Path(data_dir).rglob(f"*{idx2_name}*")) + assert len(idx1_dirs) == 1 + assert len(idx2_dirs) == 1 - idx1_dir = idx1_files[0] - idx1_files_in_dir = list(idx1_dir.glob("*")) - assert len(idx1_files_in_dir) == 3 + idx1_dir = idx1_dirs[0] + idx1_files = list(idx1_dir.glob("*")) + assert len(idx1_files) == 3 - idx2_dir = idx2_files[0] - idx2_files_in_dir = list(idx2_dir.glob("*")) - assert len(idx2_files_in_dir) == 3 + idx2_dir = idx2_dirs[0] + idx2_files = list(idx2_dir.glob("*")) + assert len(idx2_files) == 3 infinity_obj.cleanup() - idx1_files_in_dir = list(idx1_dir.glob("*")) - assert len(idx1_files_in_dir) == 1 + idx1_files = list(idx1_dir.glob("*")) + assert len(idx1_files) == 1 - idx2_files_in_dir = list(idx2_dir.glob("*")) - assert len(idx2_files_in_dir) == 1 + idx2_files = list(idx2_dir.glob("*")) + assert len(idx2_files) == 1 part2() diff --git a/scripts/collect_log.py b/scripts/collect_log.py index 7af4d6e6bd..9cb70ff79d 100644 --- a/scripts/collect_log.py +++ b/scripts/collect_log.py @@ -52,7 +52,7 @@ print("Error: stdout file not found") else: if failure: - shutil.copy(stdout_path, f"{output_dir}/{random_name}_1.log") + shutil.copy(stdout_path, f"{output_dir}/{random_name}_stdout.log") print(f"Last {show_lines} lines from {stdout_path}:") with open(stdout_path, "r") as f: lines = f.readlines() @@ -63,7 +63,7 @@ print("Error: stderror file not found") else: if failure: - shutil.copy(stderror_path, f"{output_dir}/{random_name}_2.log") + shutil.copy(stderror_path, f"{output_dir}/{random_name}_stderror.log") print(f"Last {show_lines} lines from {stderror_path}:") with open(stderror_path, "r") as f: lines = f.readlines() diff --git a/src/main/infinity_context.cpp b/src/main/infinity_context.cpp index d905b98acf..49af54152a 100644 --- a/src/main/infinity_context.cpp +++ b/src/main/infinity_context.cpp @@ -441,12 +441,14 @@ void InfinityContext::SetIndexThreadPool(SizeT thread_num) { thread_num = thread_num / 2; if (thread_num < 2) thread_num = 2; + LOG_TRACE(fmt::format("Set index thread pool size to {}", thread_num)); inverting_thread_pool_.resize(thread_num); commiting_thread_pool_.resize(thread_num); hnsw_build_thread_pool_.resize(thread_num); } void InfinityContext::RestoreIndexThreadPoolToDefault() { + LOG_TRACE("Restore index thread pool size to default"); inverting_thread_pool_.resize(4); commiting_thread_pool_.resize(2); hnsw_build_thread_pool_.resize(4); diff --git a/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm b/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm index 9cb5651af4..c956ed5bd3 100644 --- a/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm +++ b/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm @@ -128,6 +128,9 @@ private: template static void InsertVecs(Index &index, Iter &&iter, const HnswInsertConfig &config, SizeT &mem_usage) { auto &thread_pool = InfinityContext::instance().GetHnswBuildThreadPool(); + if (thread_pool.size() == 0) { + UnrecoverableError("Hnsw build thread pool is not initialized."); + } using T = std::decay_t; if constexpr (!std::is_same_v) { SizeT mem1 = index->mem_usage(); diff --git a/src/storage/persistence/persistence_manager.cpp b/src/storage/persistence/persistence_manager.cpp index 76846b4e2c..059b10370d 100644 --- a/src/storage/persistence/persistence_manager.cpp +++ b/src/storage/persistence/persistence_manager.cpp @@ -264,7 +264,7 @@ PersistReadResult PersistenceManager::GetObjCache(const String &file_path) { } result.cached_ = true; } else if (ObjStat *obj_stat = objects_->Get(it->second.obj_key_); obj_stat != nullptr) { - LOG_TRACE(fmt::format("GetObjCache object {} ref count {}", it->second.obj_key_, obj_stat->ref_count_)); + LOG_TRACE(fmt::format("GetObjCache object {}, file_path: {}, ref count {}", it->second.obj_key_, file_path, obj_stat->ref_count_)); String read_path = GetObjPath(result.obj_addr_.obj_key_); if (!VirtualStore::Exists(read_path)) { obj_stat->cached_ = false; @@ -406,12 +406,6 @@ void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr, return; } } - if (check_ref_count) { - if (obj_stat->ref_count_ > 0) { - String error_message = fmt::format("CleanupNoLock object {} ref count is {}", object_addr.obj_key_, obj_stat->ref_count_); - UnrecoverableError(error_message); - } - } Range orig_range(object_addr.part_offset_, object_addr.part_offset_ + object_addr.part_size_); Range range(orig_range); auto inst_it = obj_stat->deleted_ranges_.lower_bound(range); @@ -473,6 +467,12 @@ void PersistenceManager::CleanupNoLock(const ObjAddr &object_addr, String error_message = fmt::format("Failed to find object key"); UnrecoverableError(error_message); } + if (check_ref_count) { + if (obj_stat->ref_count_ > 0) { + String error_message = fmt::format("CleanupNoLock object {} ref count is {}", object_addr.obj_key_, obj_stat->ref_count_); + UnrecoverableError(error_message); + } + } drop_from_remote_keys.emplace_back(object_addr.obj_key_); objects_->Invalidate(object_addr.obj_key_); LOG_TRACE(fmt::format("Deleted object {}", object_addr.obj_key_));