Skip to content

Commit

Permalink
feat: add a cli tool to read wal meta information (#1584)
Browse files Browse the repository at this point in the history
## Rationale
#1567 

## Detailed Changes
A cli tool to decode wal segment.

## Test Plan
Manual test

<img width="935" alt="image"
src="https://github.com/user-attachments/assets/a9a88ca8-74a9-41f7-b338-9ea642ea27be">


<img width="1053" alt="image"
src="https://github.com/user-attachments/assets/e9d80981-9940-43ac-9c36-546b7dd77b5f">
  • Loading branch information
zealchen authored Nov 1, 2024
1 parent 8fecd5d commit 4cbe82c
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 6 deletions.
44 changes: 40 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ sqlparser = { version = "0.39.0", features = ["serde"] }
system_catalog = { path = "src/system_catalog" }
table_engine = { path = "src/table_engine" }
table_kv = { path = "src/components/table_kv" }
tabled = "0.16.0"
tempfile = "3.1.0"
test_util = { path = "src/components/test_util" }
time_ext = { path = "src/components/time_ext" }
Expand Down
2 changes: 2 additions & 0 deletions src/tools/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,7 @@ parquet = { workspace = true }
parquet_ext = { workspace = true }
runtime = { workspace = true }
table_engine = { workspace = true }
tabled = { workspace = true }
time_ext = { workspace = true }
tokio = { workspace = true }
wal = { workspace = true, features = ["wal-local-storage"] }
163 changes: 163 additions & 0 deletions src/tools/src/bin/wal-reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! A cli to query region segment meta

use std::{
fs::{self},
sync::Arc,
};

use clap::Parser;
use runtime::Builder;
use tabled::{Table, Tabled};
use wal::local_storage_impl::segment::{Region, RegionManager, SegmentView};

#[derive(Parser, Debug)]
#[clap(author, version, about = "A command line tool to read and display WAL (Write-Ahead Log) segment metadata", long_about = None)]
struct Args {
/// Data directory path
#[clap(short, long, default_value = "/tmp/horaedb/wal")]
data_dir: String,

/// Region id
#[clap(short = 'r', long, default_value = None)]
region_id: Option<u64>,

/// Segment id
#[clap(short = 's', long, default_value = None)]
segment_id: Option<u64>,

/// Table id
#[clap(long, default_value = None)]
table_id: Option<u64>,
}

#[derive(Tabled)]
struct SegmentInfo {
segment_id: u64,
min_seq: u64,
max_seq: u64,
version: u8,
current_size: usize,
segment_size: usize,
number_of_records: usize,
}

#[derive(Tabled)]
struct TableInfo {
table_id: u64,
min_seq: u64,
max_seq: u64,
}

impl SegmentInfo {
fn load(stm: &SegmentView) -> Self {
Self {
segment_id: stm.id,
min_seq: stm.min_seq,
max_seq: stm.max_seq,
version: stm.version,
current_size: stm.current_size,
segment_size: stm.segment_size,
number_of_records: stm.number_of_records,
}
}
}

const SEGMENT_SIZE: usize = 64 * 1024 * 1024;

impl TableInfo {
fn load(stm: &SegmentView, table_id: &Option<u64>) -> Vec<TableInfo> {
let mut datas = Vec::new();
for (t_id, (min_seq, max_seq)) in stm.tables.iter() {
if table_id.is_some() && table_id.unwrap() != *t_id {
continue;
}
datas.push(TableInfo {
table_id: *t_id,
min_seq: *min_seq,
max_seq: *max_seq,
});
}
datas
}
}

fn region_meta_dump(region: Arc<Region>, segment_id: &Option<u64>, table_id: &Option<u64>) {
let segments = region.meta();
for stm in segments.iter() {
if segment_id.is_some() && segment_id.unwrap() != stm.id {
continue;
}
println!("{}", "-".repeat(94));
let pretty_segment = Table::new([SegmentInfo::load(stm)]);
println!("{}", pretty_segment);
let pretty_table = Table::new(TableInfo::load(stm, table_id));
println!("{}", pretty_table);
}
}

fn pretty_error_then_exit(err_msg: &str) {
eprintln!("\x1b[31m{}\x1b[0m", err_msg);
std::process::exit(1);
}

fn main() {
let args = Args::parse();
println!("Data directory: {}", args.data_dir);

if !std::path::Path::new(&args.data_dir).is_dir() {
pretty_error_then_exit(
format!("Error: Data directory '{}' does not exist", &args.data_dir).as_str(),
);
}

let runtime = Arc::new(Builder::default().build().unwrap());
let region_manager = RegionManager::new(args.data_dir.clone(), 32, SEGMENT_SIZE, runtime);
let region_manager = match region_manager {
Ok(v) => v,
Err(e) => {
pretty_error_then_exit(format!("Error: {}", e).as_str());
unreachable!();
}
};

if let Some(region_id) = args.region_id {
let region = region_manager.get_region(region_id);
region_meta_dump(region.unwrap(), &args.segment_id, &args.table_id);
} else {
for entry in fs::read_dir(&args.data_dir).unwrap() {
let entry = entry.unwrap();
let path = entry.path();

if path.is_file() {
continue;
}

if let Some(dir_name) = path.file_name().and_then(|n| n.to_str()) {
// Parse region id from directory name
if let Ok(region_id) = dir_name.parse::<u64>() {
let region = region_manager.get_region(region_id);
region_meta_dump(region.unwrap(), &args.segment_id, &args.table_id);
}
}
}
}

region_manager.close_all().unwrap();
}
2 changes: 1 addition & 1 deletion src/wal/src/local_storage_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@

pub mod config;
mod record_encoding;
mod segment;
pub mod segment;
pub mod wal_manager;
39 changes: 38 additions & 1 deletion src/wal/src/local_storage_impl/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,32 @@ impl SegmentManager {
}
}

pub struct SegmentView {
pub id: u64,
pub min_seq: SequenceNumber,
pub max_seq: SequenceNumber,
pub version: u8,
pub current_size: usize,
pub segment_size: usize,
pub number_of_records: usize,
pub tables: HashMap<TableId, (SequenceNumber, SequenceNumber)>,
}

impl SegmentView {
fn new(seg: &Segment) -> Self {
Self {
id: seg.id,
min_seq: seg.min_seq,
max_seq: seg.max_seq,
version: seg.version,
current_size: seg.current_size,
segment_size: seg.segment_size,
number_of_records: seg.record_position.len(),
tables: seg.table_ranges.clone(),
}
}
}

#[derive(Debug)]
pub struct Region {
/// Identifier for regions.
Expand Down Expand Up @@ -785,6 +811,17 @@ impl Region {
Ok(next_sequence_num - 1)
}

pub fn meta(&self) -> Vec<SegmentView> {
let mut segments: Vec<SegmentView> = Vec::with_capacity(10);
let all_segments = self.segment_manager.all_segments.lock().unwrap();
for lock_seg in all_segments.values() {
let seg = lock_seg.lock().unwrap();
let stm = SegmentView::new(&seg);
segments.push(stm);
}
segments
}

pub fn read(&self, ctx: &ReadContext, req: &ReadRequest) -> Result<BatchLogIteratorAdapter> {
// Check read range's validity.
let start = if let Some(start) = req.start.as_start_sequence_number() {
Expand Down Expand Up @@ -922,7 +959,7 @@ impl RegionManager {

/// Retrieve a region by its `region_id`. If the region does not exist,
/// create a new one.
fn get_region(&self, region_id: RegionId) -> Result<Arc<Region>> {
pub fn get_region(&self, region_id: RegionId) -> Result<Arc<Region>> {
let mut regions = self.regions.lock().unwrap();
if let Some(region) = regions.get(&region_id) {
return Ok(region.clone());
Expand Down

0 comments on commit 4cbe82c

Please sign in to comment.