Skip to content

Commit

Permalink
Implement admin show delta catalog
Browse files Browse the repository at this point in the history
Signed-off-by: Jin Hai <haijin.chn@gmail.com>
  • Loading branch information
JinHai-CN committed Nov 9, 2024
1 parent 0a485eb commit 8a09e1c
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 8 deletions.
160 changes: 155 additions & 5 deletions src/admin/admin_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import utility;
import peer_task;
import infinity_exception;
import node_info;
import catalog_delta_entry;

namespace infinity {

Expand Down Expand Up @@ -603,16 +604,43 @@ Vector<SharedPtr<WalEntry>> AdminExecutor::GetAllCheckpointEntries(QueryContext
return checkpoint_entries;
}

TxnTimeStamp max_checkpoint_ts = 0;
WalListIterator iterator(wal_list);
while (iterator.HasNext()) {
auto wal_entry_ptr = iterator.Next();
for (auto &entry_cmd : wal_entry_ptr->cmds_) {
if (entry_cmd->GetType() == WalCommandType::CHECKPOINT) {
checkpoint_entries.push_back(wal_entry_ptr);
WalCmdCheckpoint *checkpoint_cmd = static_cast<WalCmdCheckpoint *>(entry_cmd.get());
max_checkpoint_ts = checkpoint_cmd->max_commit_ts_;
}
}
}

std::string catalog_path = fmt::format("{}/{}", query_context->storage()->wal_manager()->data_path(), "catalog");
auto catalog_fileinfo = CatalogFile::ParseValidCheckpointFilenames(catalog_path, max_checkpoint_ts);
if (!catalog_fileinfo.has_value()) {
String error_message = fmt::format("Wal Replay: Parse catalog file failed, catalog_path: {}", catalog_path);
UnrecoverableError(error_message);
}

auto &[full_catalog_fileinfo, delta_catalog_fileinfo_array] = catalog_fileinfo.value();

auto full_ckp_file_name = Path(full_catalog_fileinfo.path_).filename().string();
SharedPtr<WalCmdCheckpoint> full_ckp_cmd =
MakeShared<WalCmdCheckpoint>(full_catalog_fileinfo.max_commit_ts_, true, "catalog", full_ckp_file_name);
SharedPtr<WalEntry> wal_entry = MakeShared<WalEntry>();
wal_entry->cmds_.emplace_back(full_ckp_cmd);
checkpoint_entries.emplace_back(wal_entry);

for (const auto &delta_catalog_fileinfo : delta_catalog_fileinfo_array) {
auto delta_ckp_file_name = Path(delta_catalog_fileinfo.path_).filename().string();
SharedPtr<WalCmdCheckpoint> delta_ckp_cmd =
MakeShared<WalCmdCheckpoint>(delta_catalog_fileinfo.max_commit_ts_, false, "catalog", delta_ckp_file_name);
wal_entry = MakeShared<WalEntry>();
wal_entry->cmds_.emplace_back(delta_ckp_cmd);
checkpoint_entries.emplace_back(wal_entry);
}

return checkpoint_entries;
}

Expand Down Expand Up @@ -730,6 +758,11 @@ QueryResult AdminExecutor::ShowCatalog(QueryContext *query_context, const AdminS
}

i64 catalog_file_index = admin_statement->catalog_file_index_.value();
if (catalog_file_index >= (i64)(checkpoint_entries.size())) {
query_result.result_table_ = nullptr;
query_result.status_ = Status::UnexpectedError(fmt::format("Index is too large"));
return query_result;
}
WalCmdCheckpoint *checkpoint_cmd = static_cast<WalCmdCheckpoint *>(checkpoint_entries[catalog_file_index]->cmds_[0].get());

if (checkpoint_cmd->is_full_checkpoint_) {
Expand Down Expand Up @@ -816,7 +849,10 @@ QueryResult AdminExecutor::ShowCatalog(QueryContext *query_context, const AdminS

++column_id;
{
String file_path = fmt::format("{}/{}", checkpoint_cmd->catalog_path_, checkpoint_cmd->catalog_name_);
String file_path = fmt::format("{}/{}/{}",
InfinityContext::instance().config()->DataDir(),
checkpoint_cmd->catalog_path_,
checkpoint_cmd->catalog_name_);
SizeT file_size = VirtualStore::GetFileSize(file_path);
Value value = Value::MakeVarchar(std::to_string(file_size));
ValueExpression value_expr(value);
Expand All @@ -827,9 +863,123 @@ QueryResult AdminExecutor::ShowCatalog(QueryContext *query_context, const AdminS
output_block_ptr->Finalize();
query_result.result_table_->Append(std::move(output_block_ptr));
} else {
query_result.result_table_ = nullptr;
query_result.status_ = Status::NotSupport("Delta catalog detail isn't implemented");
return query_result;

Vector<SharedPtr<DataType>> column_types{
varchar_type,
varchar_type,
};

UniquePtr<DataBlock> output_block_ptr = DataBlock::MakeUniquePtr();
output_block_ptr->Init(column_types);

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("full_checkpoint");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
Value value = Value::MakeVarchar(checkpoint_cmd->is_full_checkpoint_ ? "true" : "false");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("max_commit_ts");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
Value value = Value::MakeVarchar(std::to_string(checkpoint_cmd->max_commit_ts_));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("path");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
Value value = Value::MakeVarchar(checkpoint_cmd->catalog_path_);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("name");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
Value value = Value::MakeVarchar(checkpoint_cmd->catalog_name_);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("size");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
String file_path = fmt::format("{}/{}/{}",
InfinityContext::instance().config()->DataDir(),
checkpoint_cmd->catalog_path_,
checkpoint_cmd->catalog_name_);
SizeT file_size = VirtualStore::GetFileSize(file_path);
Value value = Value::MakeVarchar(std::to_string(file_size));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

{
SizeT column_id = 0;
{
Value value = Value::MakeVarchar("operation");
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
String file_path = fmt::format("{}/{}/{}",
InfinityContext::instance().config()->DataDir(),
checkpoint_cmd->catalog_path_,
checkpoint_cmd->catalog_name_);
UniquePtr<CatalogDeltaEntry> catalog_delta_entry = Catalog::LoadFromFileDelta(file_path);
Value value = Value::MakeVarchar(catalog_delta_entry->ToString());
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

output_block_ptr->Finalize();
query_result.result_table_->Append(std::move(output_block_ptr));
}

return query_result;
Expand Down
1 change: 1 addition & 0 deletions src/admin/admin_executor.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private:
static QueryResult RecoverFromSnapshot(QueryContext* query_context, const AdminStatement* admin_statement);

static Vector<SharedPtr<WalEntry>> GetAllCheckpointEntries(QueryContext* query_context, const AdminStatement* admin_statement);
static Vector<String> GetAllCheckpointFiles(QueryContext* query_context, const AdminStatement* admin_statement);
static Tuple<UniquePtr<Catalog>, Status> LoadCatalogFiles(QueryContext* query_context, const AdminStatement* admin_statement, Vector<SharedPtr<WalEntry>>& ckp_entries);
};

Expand Down
4 changes: 3 additions & 1 deletion src/main/infinity_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ Status InfinityContext::ChangeRole(NodeRole target_role, bool from_leader, const
cluster_manager_->InitAsStandalone();
Status set_storage_status = storage_->SetStorageMode(StorageMode::kWritable);
if (!set_storage_status.ok()) {
UnrecoverableError(fmt::format("Failed to init storage to standalone, messgae: {}", set_storage_status.message()));
cluster_manager_->UnInit(from_leader);
cluster_manager_->InitAsAdmin();
return set_storage_status;
}
break;
}
Expand Down
3 changes: 1 addition & 2 deletions src/storage/meta/catalog.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -244,11 +244,10 @@ public:
static UniquePtr<Catalog> LoadFullCheckpoint(const String &file_name);
void AttachDeltaCheckpoint(const String &file_name);

static UniquePtr<CatalogDeltaEntry> LoadFromFileDelta(const String &catalog_path);
private:
static UniquePtr<Catalog> Deserialize(const nlohmann::json &catalog_json, BufferManager *buffer_mgr);

static UniquePtr<CatalogDeltaEntry> LoadFromFileDelta(const String &catalog_path);

void LoadFromEntryDelta(UniquePtr<CatalogDeltaEntry> delta_entry, BufferManager *buffer_mgr);

public:
Expand Down

0 comments on commit 8a09e1c

Please sign in to comment.