Skip to content

Commit

Permalink
http api support list segments and blocks (#925)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

 - [x] List segments of a table
 - [x] List blocks of a segment

Issue link:#779

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

Signed-off-by: morphes1995 <morphes1995@gmail.com>
  • Loading branch information
morphes1995 authored Apr 2, 2024
1 parent 2f6f138 commit f3e4d3c
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 90 deletions.
61 changes: 61 additions & 0 deletions docs/references/http_api_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,37 @@ curl --request GET \
}
```

## Show segments list

Show segments list of specific table

#### Request

```
curl --request GET \
--url localhost:23820/databases/{database_name}/tables/{table_name}/segments/ \
--header 'accept: application/json'
```

#### Response

- 200 success

```
{
"error_code": 0,
"table_name": "test1",
"segments": [
{
"id": "0",
"size": "192.07KB",
"status": "Unsealed"
}
]
}
```


## Show segment

Show specific segment details
Expand Down Expand Up @@ -784,6 +815,36 @@ curl --request GET \
}
```

## Show blocks list

Show blocks list of specific segment

#### Request

```
curl --request GET \
--url localhost:23820/databases/{database_name}/tables/{table_name}/segments/{segment_id}/blocks/ \
--header 'accept: application/json'
```

#### Response

- 200 success

```
{
"error_code": 0,
"segment_id": 0,
"blocks": [
{
"id": "0",
"row_count": "2",
"size": "192.07KB"
}
]
}
```

## Show block

Show specific block detail
Expand Down
4 changes: 2 additions & 2 deletions src/executor/explain_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ void ExplainPhysicalPlan::Explain(const PhysicalShow *show_node, SharedPtr<Vecto
result->emplace_back(MakeShared<String>(show_str));

String output_columns_str = String(intent_size, ' ');
output_columns_str += " - output columns: [path, size]";
output_columns_str += " - output columns: [id, status, size]";
result->emplace_back(MakeShared<String>(output_columns_str));
break;
}
Expand Down Expand Up @@ -1329,7 +1329,7 @@ void ExplainPhysicalPlan::Explain(const PhysicalShow *show_node, SharedPtr<Vecto
result->emplace_back(MakeShared<String>(show_str));

String output_columns_str = String(intent_size, ' ');
output_columns_str += " - output columns: [path, size]";
output_columns_str += " - output columns: [id, size, row_count]";
result->emplace_back(MakeShared<String>(output_columns_str));
break;
}
Expand Down
127 changes: 50 additions & 77 deletions src/executor/operator/physical_show.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,14 +207,16 @@ void PhysicalShow::Init() {
break;
}
case ShowType::kShowSegments: {
output_names_->reserve(2);
output_types_->reserve(2);
output_names_->reserve(3);
output_types_->reserve(3);

output_names_->emplace_back("path");
output_names_->emplace_back("id");
output_names_->emplace_back("status");
output_names_->emplace_back("size");

output_types_->emplace_back(bigint_type);
output_types_->emplace_back(varchar_type);
output_types_->emplace_back(varchar_type);
break;
}
case ShowType::kShowSegment: {
Expand Down Expand Up @@ -246,14 +248,16 @@ void PhysicalShow::Init() {
}

case ShowType::kShowBlocks: {
output_names_->reserve(2);
output_types_->reserve(2);
output_names_->reserve(3);
output_types_->reserve(3);

output_names_->emplace_back("path");
output_names_->emplace_back("id");
output_names_->emplace_back("size");
output_names_->emplace_back("row_count");

output_types_->emplace_back(bigint_type);
output_types_->emplace_back(varchar_type);
output_types_->emplace_back(varchar_type);
output_types_->emplace_back(bigint_type);
break;
}

Expand Down Expand Up @@ -1148,7 +1152,6 @@ void PhysicalShow::ExecuteShowColumns(QueryContext *query_context, ShowOperatorS

void PhysicalShow::ExecuteShowSegments(QueryContext *query_context, ShowOperatorState *show_operator_state) {
auto txn = query_context->GetTxn();
TxnTimeStamp begin_ts = txn->BeginTS();

auto [table_entry, status] = txn->GetTableByName(db_name_, object_name_);
if (!status.ok()) {
Expand All @@ -1158,70 +1161,39 @@ void PhysicalShow::ExecuteShowSegments(QueryContext *query_context, ShowOperator
}

auto varchar_type = MakeShared<DataType>(LogicalType::kVarchar);

Vector<SharedPtr<ColumnDef>> column_defs = {
MakeShared<ColumnDef>(0, varchar_type, "path", HashSet<ConstraintType>()),
MakeShared<ColumnDef>(1, varchar_type, "size", HashSet<ConstraintType>()),
auto bigint_type = MakeShared<DataType>(LogicalType::kBigInt);
UniquePtr<DataBlock> output_block_ptr = DataBlock::MakeUniquePtr();
Vector<SharedPtr<DataType>> column_types{
bigint_type,
varchar_type,
varchar_type,
};
output_block_ptr->Init(column_types);

UniquePtr<DataBlock> output_block_ptr = DataBlock::MakeUniquePtr();
auto chuck_filling = [&](const std::function<u64(const String &)> &file_size_func, const String &path) {
for (auto &[_, segment_entry] : table_entry->segment_map()) {
SizeT column_id = 0;
{
Value value = Value::MakeVarchar(path);
Value value = Value::MakeBigInt(segment_entry->segment_id());
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
Value value = Value::MakeVarchar(Utility::FormatByteSize(file_size_func(path)));

Value value = Value::MakeVarchar(SegmentEntry::SegmentStatusToString(segment_entry->status()));
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
};
Vector<SharedPtr<DataType>> column_types{
varchar_type,
varchar_type,
};
output_block_ptr->Init(column_types);

if (segment_id_.has_value() && block_id_.has_value()) {
if (auto segment_entry = table_entry->GetSegmentByID(*segment_id_, begin_ts); segment_entry) {
auto *block_entry = segment_entry->GetBlockEntryByID(*block_id_).get();
if (block_entry != nullptr) {
auto version_path = block_entry->VersionFilePath();

chuck_filling(LocalFileSystem::GetFileSizeByPath, version_path);
SizeT column_count = table_entry->ColumnCount();
for (SizeT column_id = 0; column_id < column_count; ++column_id) {
auto block_column_entry = block_entry->GetColumnBlockEntry(column_id);
auto col_file_path = block_column_entry->FilePath();

chuck_filling(LocalFileSystem::GetFileSizeByPath, col_file_path);
for (auto &outline : block_column_entry->OutlinePaths()) {
chuck_filling(LocalFileSystem::GetFileSizeByPath, outline);
}
}
}
}
} else if (segment_id_.has_value()) {
if (auto segment_entry = table_entry->GetSegmentByID(*segment_id_, begin_ts); segment_entry) {
auto block_entry_iter = BlockEntryIter(segment_entry.get());
for (auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) {
const auto &dir_path = *block_entry->base_dir();

chuck_filling(LocalFileSystem::GetFolderSizeByPath, dir_path);
}
}
} else {
for (auto &[_, segment] : table_entry->segment_map()) { // FIXME: use table_ref here.
const auto &dir_path = *segment->segment_dir();

chuck_filling(LocalFileSystem::GetFolderSizeByPath, dir_path);
++column_id;
{
const auto &seg_size = Utility::FormatByteSize(LocalFileSystem::GetFolderSizeByPath(*segment_entry->segment_dir()));
Value value = Value::MakeVarchar(seg_size);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

output_block_ptr->Finalize();
show_operator_state->output_.emplace_back(std::move(output_block_ptr));
}
Expand Down Expand Up @@ -1347,42 +1319,43 @@ void PhysicalShow::ExecuteShowBlocks(QueryContext *query_context, ShowOperatorSt
return;
}

auto bigint_type = MakeShared<DataType>(LogicalType::kBigInt);
auto varchar_type = MakeShared<DataType>(LogicalType::kVarchar);

Vector<SharedPtr<ColumnDef>> column_defs = {
MakeShared<ColumnDef>(0, varchar_type, "path", HashSet<ConstraintType>()),
MakeShared<ColumnDef>(1, varchar_type, "size", HashSet<ConstraintType>()),
UniquePtr<DataBlock> output_block_ptr = DataBlock::MakeUniquePtr();
Vector<SharedPtr<DataType>> column_types{
bigint_type,
varchar_type,
bigint_type,
};
output_block_ptr->Init(column_types);

UniquePtr<DataBlock> output_block_ptr = DataBlock::MakeUniquePtr();
auto chuck_filling = [&](const std::function<u64(const String &)> &file_size_func, const String &path) {
auto segment_entry = table_entry->GetSegmentByID(*segment_id_, begin_ts);
if (!segment_entry) {
RecoverableError(Status::SegmentNotExist(*segment_id_));
return;
}
auto block_entry_iter = BlockEntryIter(segment_entry.get());
for (auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) {
SizeT column_id = 0;
{
Value value = Value::MakeVarchar(path);
Value value = Value::MakeBigInt(block_entry->block_id());
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}

++column_id;
{
Value value = Value::MakeVarchar(Utility::FormatByteSize(file_size_func(path)));

const auto &blk_size = Utility::FormatByteSize(LocalFileSystem::GetFolderSizeByPath(*block_entry->base_dir()));
Value value = Value::MakeVarchar(blk_size);
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
};
Vector<SharedPtr<DataType>> column_types{
varchar_type,
varchar_type,
};
output_block_ptr->Init(column_types);

if (auto segment_entry = table_entry->GetSegmentByID(*segment_id_, begin_ts); segment_entry) {
auto block_entry_iter = BlockEntryIter(segment_entry.get());
for (auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) {
const auto& dir_path = *block_entry->base_dir();

chuck_filling(LocalFileSystem::GetFolderSizeByPath, dir_path);
++column_id;
{
Value value = Value::MakeBigInt(block_entry->row_count());
ValueExpression value_expr(value);
value_expr.AppendToChunk(output_block_ptr->column_vectors[column_id]);
}
}

Expand Down
31 changes: 31 additions & 0 deletions src/main/infinity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,21 @@ QueryResult Infinity::ShowSegment(const String &db_name,const String &table_name
return result;
}

QueryResult Infinity::ShowSegments(const String &db_name,const String &table_name) {
UniquePtr<QueryContext> query_context_ptr = MakeUnique<QueryContext>(session_.get());
query_context_ptr->Init(InfinityContext::instance().config(),
InfinityContext::instance().task_scheduler(),
InfinityContext::instance().storage(),
InfinityContext::instance().resource_manager(),
InfinityContext::instance().session_manager());
UniquePtr<ShowStatement> show_statement = MakeUnique<ShowStatement>();
show_statement->schema_name_ = db_name;
show_statement->table_name_ = table_name;
show_statement->show_type_ = ShowStmtType::kSegments;
QueryResult result = query_context_ptr->QueryStatement(show_statement.get());
return result;
}

QueryResult Infinity::ShowBlock(const String &db_name,const String &table_name, const SegmentID &segment_id, const BlockID &block_id) {
UniquePtr<QueryContext> query_context_ptr = MakeUnique<QueryContext>(session_.get());
query_context_ptr->Init(InfinityContext::instance().config(),
Expand All @@ -439,6 +454,22 @@ QueryResult Infinity::ShowBlock(const String &db_name,const String &table_name,
return result;
}

QueryResult Infinity::ShowBlocks(const String &db_name,const String &table_name, const SegmentID &segment_id) {
UniquePtr<QueryContext> query_context_ptr = MakeUnique<QueryContext>(session_.get());
query_context_ptr->Init(InfinityContext::instance().config(),
InfinityContext::instance().task_scheduler(),
InfinityContext::instance().storage(),
InfinityContext::instance().resource_manager(),
InfinityContext::instance().session_manager());
UniquePtr<ShowStatement> show_statement = MakeUnique<ShowStatement>();
show_statement->schema_name_ = db_name;
show_statement->table_name_ = table_name;
show_statement->segment_id_ = segment_id;
show_statement->show_type_ = ShowStmtType::kBlocks;
QueryResult result = query_context_ptr->QueryStatement(show_statement.get());
return result;
}

QueryResult Infinity::Insert(const String &db_name, const String &table_name, Vector<String> *columns, Vector<Vector<ParsedExpr *> *> *values) {
UniquePtr<QueryContext> query_context_ptr = MakeUnique<QueryContext>(session_.get());
query_context_ptr->Init(InfinityContext::instance().config(),
Expand Down
4 changes: 4 additions & 0 deletions src/main/infinity.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,12 @@ public:

QueryResult ShowSegment(const String &db_name,const String &table_name, const SegmentID &segment_id);

QueryResult ShowSegments(const String &db_name,const String &table_name);

QueryResult ShowBlock(const String &db_name,const String &table_name, const SegmentID &segment_id, const BlockID &block_id);

QueryResult ShowBlocks(const String &db_name,const String &table_name, const SegmentID &segment_id);

QueryResult Insert(const String &db_name, const String &table_name, Vector<String> *columns, Vector<Vector<ParsedExpr *> *> *values);

QueryResult Import(const String &db_name, const String &table_name, const String &path, ImportOptions import_options);
Expand Down
Loading

0 comments on commit f3e4d3c

Please sign in to comment.