diff --git a/src/admin/admin_executor.cpp b/src/admin/admin_executor.cpp index 5eabd83358..cfca3fdc00 100644 --- a/src/admin/admin_executor.cpp +++ b/src/admin/admin_executor.cpp @@ -59,6 +59,7 @@ import utility; import peer_task; import infinity_exception; import node_info; +import catalog_delta_entry; namespace infinity { @@ -603,16 +604,43 @@ Vector> AdminExecutor::GetAllCheckpointEntries(QueryContext return checkpoint_entries; } + TxnTimeStamp max_checkpoint_ts = 0; WalListIterator iterator(wal_list); while (iterator.HasNext()) { auto wal_entry_ptr = iterator.Next(); for (auto &entry_cmd : wal_entry_ptr->cmds_) { if (entry_cmd->GetType() == WalCommandType::CHECKPOINT) { - checkpoint_entries.push_back(wal_entry_ptr); + WalCmdCheckpoint *checkpoint_cmd = static_cast(entry_cmd.get()); + max_checkpoint_ts = checkpoint_cmd->max_commit_ts_; } } } + std::string catalog_path = fmt::format("{}/{}", query_context->storage()->wal_manager()->data_path(), "catalog"); + auto catalog_fileinfo = CatalogFile::ParseValidCheckpointFilenames(catalog_path, max_checkpoint_ts); + if (!catalog_fileinfo.has_value()) { + String error_message = fmt::format("Wal Replay: Parse catalog file failed, catalog_path: {}", catalog_path); + UnrecoverableError(error_message); + } + + auto &[full_catalog_fileinfo, delta_catalog_fileinfo_array] = catalog_fileinfo.value(); + + auto full_ckp_file_name = Path(full_catalog_fileinfo.path_).filename().string(); + SharedPtr full_ckp_cmd = + MakeShared(full_catalog_fileinfo.max_commit_ts_, true, "catalog", full_ckp_file_name); + SharedPtr wal_entry = MakeShared(); + wal_entry->cmds_.emplace_back(full_ckp_cmd); + checkpoint_entries.emplace_back(wal_entry); + + for (const auto &delta_catalog_fileinfo : delta_catalog_fileinfo_array) { + auto delta_ckp_file_name = Path(delta_catalog_fileinfo.path_).filename().string(); + SharedPtr delta_ckp_cmd = + MakeShared(delta_catalog_fileinfo.max_commit_ts_, false, "catalog", delta_ckp_file_name); + wal_entry = MakeShared(); + wal_entry->cmds_.emplace_back(delta_ckp_cmd); + checkpoint_entries.emplace_back(wal_entry); + } + return checkpoint_entries; } @@ -730,6 +758,11 @@ QueryResult AdminExecutor::ShowCatalog(QueryContext *query_context, const AdminS } i64 catalog_file_index = admin_statement->catalog_file_index_.value(); + if (catalog_file_index >= (i64)(checkpoint_entries.size())) { + query_result.result_table_ = nullptr; + query_result.status_ = Status::UnexpectedError(fmt::format("Index is too large")); + return query_result; + } WalCmdCheckpoint *checkpoint_cmd = static_cast(checkpoint_entries[catalog_file_index]->cmds_[0].get()); if (checkpoint_cmd->is_full_checkpoint_) { @@ -816,7 +849,10 @@ QueryResult AdminExecutor::ShowCatalog(QueryContext *query_context, const AdminS ++column_id; { - String file_path = fmt::format("{}/{}", checkpoint_cmd->catalog_path_, checkpoint_cmd->catalog_name_); + String file_path = fmt::format("{}/{}/{}", + InfinityContext::instance().config()->DataDir(), + checkpoint_cmd->catalog_path_, + checkpoint_cmd->catalog_name_); SizeT file_size = VirtualStore::GetFileSize(file_path); Value value = Value::MakeVarchar(std::to_string(file_size)); ValueExpression value_expr(value); @@ -827,9 +863,123 @@ QueryResult AdminExecutor::ShowCatalog(QueryContext *query_context, const AdminS output_block_ptr->Finalize(); query_result.result_table_->Append(std::move(output_block_ptr)); } else { - query_result.result_table_ = nullptr; - query_result.status_ = Status::NotSupport("Delta catalog detail isn't implemented"); - return query_result; + + Vector> column_types{ + varchar_type, + varchar_type, + }; + + UniquePtr output_block_ptr = DataBlock::MakeUniquePtr(); + output_block_ptr->Init(column_types); + + { + SizeT column_id = 0; + { + Value value = Value::MakeVarchar("full_checkpoint"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + + ++column_id; + { + Value value = Value::MakeVarchar(checkpoint_cmd->is_full_checkpoint_ ? "true" : "false"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + } + + { + SizeT column_id = 0; + { + Value value = Value::MakeVarchar("max_commit_ts"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + + ++column_id; + { + Value value = Value::MakeVarchar(std::to_string(checkpoint_cmd->max_commit_ts_)); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + } + + { + SizeT column_id = 0; + { + Value value = Value::MakeVarchar("path"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + + ++column_id; + { + Value value = Value::MakeVarchar(checkpoint_cmd->catalog_path_); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + } + + { + SizeT column_id = 0; + { + Value value = Value::MakeVarchar("name"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + + ++column_id; + { + Value value = Value::MakeVarchar(checkpoint_cmd->catalog_name_); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + } + + { + SizeT column_id = 0; + { + Value value = Value::MakeVarchar("size"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + + ++column_id; + { + String file_path = fmt::format("{}/{}/{}", + InfinityContext::instance().config()->DataDir(), + checkpoint_cmd->catalog_path_, + checkpoint_cmd->catalog_name_); + SizeT file_size = VirtualStore::GetFileSize(file_path); + Value value = Value::MakeVarchar(std::to_string(file_size)); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + } + + { + SizeT column_id = 0; + { + Value value = Value::MakeVarchar("operation"); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + + ++column_id; + { + String file_path = fmt::format("{}/{}/{}", + InfinityContext::instance().config()->DataDir(), + checkpoint_cmd->catalog_path_, + checkpoint_cmd->catalog_name_); + UniquePtr catalog_delta_entry = Catalog::LoadFromFileDelta(file_path); + Value value = Value::MakeVarchar(catalog_delta_entry->ToString()); + ValueExpression value_expr(value); + value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]); + } + } + + output_block_ptr->Finalize(); + query_result.result_table_->Append(std::move(output_block_ptr)); } return query_result; diff --git a/src/admin/admin_executor.cppm b/src/admin/admin_executor.cppm index 0f8a8c9efb..01f71402f0 100644 --- a/src/admin/admin_executor.cppm +++ b/src/admin/admin_executor.cppm @@ -65,6 +65,7 @@ private: static QueryResult RecoverFromSnapshot(QueryContext* query_context, const AdminStatement* admin_statement); static Vector> GetAllCheckpointEntries(QueryContext* query_context, const AdminStatement* admin_statement); + static Vector GetAllCheckpointFiles(QueryContext* query_context, const AdminStatement* admin_statement); static Tuple, Status> LoadCatalogFiles(QueryContext* query_context, const AdminStatement* admin_statement, Vector>& ckp_entries); }; diff --git a/src/main/infinity_context.cpp b/src/main/infinity_context.cpp index f0deefb435..d417e16abd 100644 --- a/src/main/infinity_context.cpp +++ b/src/main/infinity_context.cpp @@ -129,7 +129,9 @@ Status InfinityContext::ChangeRole(NodeRole target_role, bool from_leader, const cluster_manager_->InitAsStandalone(); Status set_storage_status = storage_->SetStorageMode(StorageMode::kWritable); if (!set_storage_status.ok()) { - UnrecoverableError(fmt::format("Failed to init storage to standalone, messgae: {}", set_storage_status.message())); + cluster_manager_->UnInit(from_leader); + cluster_manager_->InitAsAdmin(); + return set_storage_status; } break; } diff --git a/src/storage/meta/catalog.cppm b/src/storage/meta/catalog.cppm index 79a5cf1675..8efb9883c3 100644 --- a/src/storage/meta/catalog.cppm +++ b/src/storage/meta/catalog.cppm @@ -244,11 +244,10 @@ public: static UniquePtr LoadFullCheckpoint(const String &file_name); void AttachDeltaCheckpoint(const String &file_name); + static UniquePtr LoadFromFileDelta(const String &catalog_path); private: static UniquePtr Deserialize(const nlohmann::json &catalog_json, BufferManager *buffer_mgr); - static UniquePtr LoadFromFileDelta(const String &catalog_path); - void LoadFromEntryDelta(UniquePtr delta_entry, BufferManager *buffer_mgr); public: