From f3cea5f470d8eb93c1229d576b1edd1ebc457e01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Thu, 16 Jan 2025 16:51:45 +1300 Subject: [PATCH] feat: read tool --- README.md | 7 +++++ crates/prepper-event/Cargo.toml | 2 +- crates/prepper-event/src/lib.rs | 11 ++++--- crates/prepper-event/src/row_data.rs | 2 +- crates/prepper-event/src/tamanu_id.rs | 3 +- crates/prepper-event/src/timestamp.rs | 3 +- crates/prepper/src/cli.rs | 3 ++ crates/prepper/src/cli/read.rs | 42 +++++++++++++++++++++------ 8 files changed, 56 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 84730f6..2b806cb 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,13 @@ Combine those like `?pub=pubname&slot=slotname` (URL syntax). To start from scratch delete the output folder (defaults to `history`). +For convenience, there's a built-in tool that will read the files and dump the +metadata (i.e. excluding the full row data) to JSON. + +``` +prepper read +``` + ## File format _Length-prefixed CBOR objects in rotated files._ diff --git a/crates/prepper-event/Cargo.toml b/crates/prepper-event/Cargo.toml index d7235d2..7810b39 100644 --- a/crates/prepper-event/Cargo.toml +++ b/crates/prepper-event/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" async-trait = "0.1.81" chrono = "0.4.39" clap = { version = "4.5.21", features = ["derive", "cargo"] } -jiff = "0.1.14" +jiff = { version = "0.1.14", features = ["serde"] } machine-uid = "0.5.3" miette = { workspace = true, features = ["fancy"] } minicbor = { version = "0.25.1", features = ["derive", "std"] } diff --git a/crates/prepper-event/src/lib.rs b/crates/prepper-event/src/lib.rs index 30c9022..026d550 100644 --- a/crates/prepper-event/src/lib.rs +++ b/crates/prepper-event/src/lib.rs @@ -1,6 +1,7 @@ use minicbor::{data::Int, Decode, Encode}; use pg_replicate::table::TableId; use row_data::RowData; +use serde::Serialize; pub use tamanu_id::TamanuId; pub use timestamp::Timestamp; use uuid::Uuid; @@ -12,7 +13,7 @@ pub mod uuid; pub const VERSION: u8 = 1; -#[derive(Clone, Debug, Encode, Decode)] +#[derive(Clone, Debug, Encode, Decode, Serialize)] pub struct Event { #[cbor(n(0))] pub version: u8, @@ -27,7 +28,7 @@ pub struct Event { pub snapshot: Snapshot, } -#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)] +#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode, Serialize)] #[cbor(map)] pub struct Table { #[cbor(n(1))] @@ -40,7 +41,7 @@ pub struct Table { pub name: String, } -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Encode, Decode)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Encode, Decode, Serialize)] #[cbor(map)] pub struct Device { #[cbor( @@ -54,7 +55,7 @@ pub struct Device { pub ts: Timestamp, } -#[derive(Clone, Debug, Encode, Decode)] +#[derive(Clone, Debug, Encode, Decode, Serialize)] #[cbor(map)] pub struct Snapshot { #[cbor(n(1))] @@ -70,6 +71,7 @@ pub struct Snapshot { pub deleted_at: Option, #[cbor(n(5))] + #[serde(skip)] pub sync_tick: Int, #[cbor(n(6))] @@ -78,5 +80,6 @@ pub struct Snapshot { // this is at the bottom so it's serialised last, such that the // metadata fields can be read and then the data skipped efficiently #[cbor(n(23))] + #[serde(skip)] pub data: RowData, } diff --git a/crates/prepper-event/src/row_data.rs b/crates/prepper-event/src/row_data.rs index 062f933..e5a242a 100644 --- a/crates/prepper-event/src/row_data.rs +++ b/crates/prepper-event/src/row_data.rs @@ -13,7 +13,7 @@ const TAG_POSTGRES: Tag = Tag::new(u16::from_be_bytes([b'p', b'g']) as _); const TAG_JSON: Tag = Tag::new(262); -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct RowData { pub columns: Vec, pub cells: Vec, diff --git a/crates/prepper-event/src/tamanu_id.rs b/crates/prepper-event/src/tamanu_id.rs index e162a02..b0e937b 100644 --- a/crates/prepper-event/src/tamanu_id.rs +++ b/crates/prepper-event/src/tamanu_id.rs @@ -8,10 +8,11 @@ use minicbor::{ encode::{self, Write}, Decode, Decoder, Encode, Encoder, }; +use serde::Serialize; const TAG_TAMANUID: Tag = Tag::new(u16::from_be_bytes([b'i', b'd']) as _); -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize)] pub enum TamanuId { Uuid(uuid::Uuid), Free(String), diff --git a/crates/prepper-event/src/timestamp.rs b/crates/prepper-event/src/timestamp.rs index 82a2312..b167a27 100644 --- a/crates/prepper-event/src/timestamp.rs +++ b/crates/prepper-event/src/timestamp.rs @@ -6,10 +6,11 @@ use minicbor::{ encode::{self, Write}, Decode, Decoder, Encode, Encoder, }; +use serde::Serialize; const TAG_ETIME: Tag = Tag::new(1001); -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)] pub struct Timestamp(pub jiff::Timestamp); impl From for Timestamp { diff --git a/crates/prepper/src/cli.rs b/crates/prepper/src/cli.rs index 2b101e6..0fbf505 100644 --- a/crates/prepper/src/cli.rs +++ b/crates/prepper/src/cli.rs @@ -11,7 +11,10 @@ mod read; #[derive(Debug, Clone, Subcommand)] pub enum Command { + /// Collect replication and store it in files Daemon(daemon::Args), + + /// Read the prepper files to JSON Read(read::Args), } diff --git a/crates/prepper/src/cli/read.rs b/crates/prepper/src/cli/read.rs index 60a41e2..ff020d9 100644 --- a/crates/prepper/src/cli/read.rs +++ b/crates/prepper/src/cli/read.rs @@ -1,7 +1,11 @@ -use std::path::PathBuf; +use std::{collections::BTreeSet, path::PathBuf}; use clap::{Parser, ValueHint}; -use miette::Result; +use miette::{IntoDiagnostic, Result}; +use minicbor_io::AsyncReader; +use prepper_event::Event; +use tokio::fs::{read_dir, File}; +use tokio_util::compat::TokioAsyncReadCompatExt; use uuid::Uuid; use crate::time_input::TimeInput; @@ -19,25 +23,45 @@ pub struct Args { dir: PathBuf, /// TODO Filter to a device ID - #[arg(long, value_name = "UUID")] + #[arg(long, hide = true, value_name = "UUID")] device_id: Option, /// TODO Filter to one or more table(s) - #[arg(long, value_name = "TABLE or SCHEMA.TABLE")] + #[arg(long, hide = true, value_name = "TABLE or SCHEMA.TABLE")] tables: Vec, /// TODO Start from this timestamp - #[arg(long, short = 'S', value_name = "TIME or DURATION")] + #[arg(long, hide = true, short = 'S', value_name = "TIME or DURATION")] since: Option, /// TODO Stop at this timestamp - #[arg(long, short = 'U', value_name = "TIME or DURATION")] + #[arg(long, hide = true, short = 'U', value_name = "TIME or DURATION")] until: Option, } pub async fn main(args: Args) -> Result<()> { - dbg!(args.since.clone().map(jiff::Timestamp::from)); - dbg!(args.until.clone().map(jiff::Timestamp::from)); - dbg!(args); + let mut files = read_dir(&args.dir).await.into_diagnostic()?; + let mut event_files = BTreeSet::new(); + while let Some(entry) = files.next_entry().await.into_diagnostic()? { + if !entry.file_type().await.into_diagnostic()?.is_file() { + continue; + } + + let path = entry.path(); + if !path.extension().is_some_and(|ext| ext == "prp") { + continue; + } + + event_files.insert(path); + } + + for path in event_files { + let file = File::open(path).await.into_diagnostic()?; + let mut reader = AsyncReader::new(file.compat()); + while let Some(event) = reader.read::().await.into_diagnostic()? { + println!("{}", serde_json::to_string(&event).into_diagnostic()?); + } + } + Ok(()) }