Skip to content
This repository has been archived by the owner on Jan 16, 2025. It is now read-only.

Commit

Permalink
feat: read tool
Browse files Browse the repository at this point in the history
  • Loading branch information
passcod committed Jan 16, 2025
1 parent b72c4c3 commit f3cea5f
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 17 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ Combine those like `?pub=pubname&slot=slotname` (URL syntax).

To start from scratch delete the output folder (defaults to `history`).

For convenience, there's a built-in tool that will read the files and dump the
metadata (i.e. excluding the full row data) to JSON.

```
prepper read
```

## File format

_Length-prefixed CBOR objects in rotated files._
Expand Down
2 changes: 1 addition & 1 deletion crates/prepper-event/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"
async-trait = "0.1.81"
chrono = "0.4.39"
clap = { version = "4.5.21", features = ["derive", "cargo"] }
jiff = "0.1.14"
jiff = { version = "0.1.14", features = ["serde"] }
machine-uid = "0.5.3"
miette = { workspace = true, features = ["fancy"] }
minicbor = { version = "0.25.1", features = ["derive", "std"] }
Expand Down
11 changes: 7 additions & 4 deletions crates/prepper-event/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use minicbor::{data::Int, Decode, Encode};
use pg_replicate::table::TableId;
use row_data::RowData;
use serde::Serialize;
pub use tamanu_id::TamanuId;
pub use timestamp::Timestamp;
use uuid::Uuid;
Expand All @@ -12,7 +13,7 @@ pub mod uuid;

pub const VERSION: u8 = 1;

#[derive(Clone, Debug, Encode, Decode)]
#[derive(Clone, Debug, Encode, Decode, Serialize)]
pub struct Event {
#[cbor(n(0))]
pub version: u8,
Expand All @@ -27,7 +28,7 @@ pub struct Event {
pub snapshot: Snapshot,
}

#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode, Serialize)]
#[cbor(map)]
pub struct Table {
#[cbor(n(1))]
Expand All @@ -40,7 +41,7 @@ pub struct Table {
pub name: String,
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Encode, Decode)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Encode, Decode, Serialize)]
#[cbor(map)]
pub struct Device {
#[cbor(
Expand All @@ -54,7 +55,7 @@ pub struct Device {
pub ts: Timestamp,
}

#[derive(Clone, Debug, Encode, Decode)]
#[derive(Clone, Debug, Encode, Decode, Serialize)]
#[cbor(map)]
pub struct Snapshot {
#[cbor(n(1))]
Expand All @@ -70,6 +71,7 @@ pub struct Snapshot {
pub deleted_at: Option<Timestamp>,

#[cbor(n(5))]
#[serde(skip)]
pub sync_tick: Int,

#[cbor(n(6))]
Expand All @@ -78,5 +80,6 @@ pub struct Snapshot {
// this is at the bottom so it's serialised last, such that the
// metadata fields can be read and then the data skipped efficiently
#[cbor(n(23))]
#[serde(skip)]
pub data: RowData,
}
2 changes: 1 addition & 1 deletion crates/prepper-event/src/row_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const TAG_POSTGRES: Tag = Tag::new(u16::from_be_bytes([b'p', b'g']) as _);

const TAG_JSON: Tag = Tag::new(262);

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Default)]
pub struct RowData {
pub columns: Vec<String>,
pub cells: Vec<Cell>,
Expand Down
3 changes: 2 additions & 1 deletion crates/prepper-event/src/tamanu_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use minicbor::{
encode::{self, Write},
Decode, Decoder, Encode, Encoder,
};
use serde::Serialize;

const TAG_TAMANUID: Tag = Tag::new(u16::from_be_bytes([b'i', b'd']) as _);

#[derive(Clone, Debug, PartialEq)]
#[derive(Clone, Debug, PartialEq, Serialize)]
pub enum TamanuId {
Uuid(uuid::Uuid),
Free(String),
Expand Down
3 changes: 2 additions & 1 deletion crates/prepper-event/src/timestamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ use minicbor::{
encode::{self, Write},
Decode, Decoder, Encode, Encoder,
};
use serde::Serialize;

const TAG_ETIME: Tag = Tag::new(1001);

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
pub struct Timestamp(pub jiff::Timestamp);

impl From<jiff::Timestamp> for Timestamp {
Expand Down
3 changes: 3 additions & 0 deletions crates/prepper/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ mod read;

#[derive(Debug, Clone, Subcommand)]
pub enum Command {
/// Collect replication and store it in files
Daemon(daemon::Args),

/// Read the prepper files to JSON
Read(read::Args),
}

Expand Down
42 changes: 33 additions & 9 deletions crates/prepper/src/cli/read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::path::PathBuf;
use std::{collections::BTreeSet, path::PathBuf};

use clap::{Parser, ValueHint};
use miette::Result;
use miette::{IntoDiagnostic, Result};
use minicbor_io::AsyncReader;
use prepper_event::Event;
use tokio::fs::{read_dir, File};
use tokio_util::compat::TokioAsyncReadCompatExt;
use uuid::Uuid;

use crate::time_input::TimeInput;
Expand All @@ -19,25 +23,45 @@ pub struct Args {
dir: PathBuf,

/// TODO Filter to a device ID
#[arg(long, value_name = "UUID")]
#[arg(long, hide = true, value_name = "UUID")]
device_id: Option<Uuid>,

/// TODO Filter to one or more table(s)
#[arg(long, value_name = "TABLE or SCHEMA.TABLE")]
#[arg(long, hide = true, value_name = "TABLE or SCHEMA.TABLE")]
tables: Vec<String>,

/// TODO Start from this timestamp
#[arg(long, short = 'S', value_name = "TIME or DURATION")]
#[arg(long, hide = true, short = 'S', value_name = "TIME or DURATION")]
since: Option<TimeInput>,

/// TODO Stop at this timestamp
#[arg(long, short = 'U', value_name = "TIME or DURATION")]
#[arg(long, hide = true, short = 'U', value_name = "TIME or DURATION")]
until: Option<TimeInput>,
}

pub async fn main(args: Args) -> Result<()> {
dbg!(args.since.clone().map(jiff::Timestamp::from));
dbg!(args.until.clone().map(jiff::Timestamp::from));
dbg!(args);
let mut files = read_dir(&args.dir).await.into_diagnostic()?;
let mut event_files = BTreeSet::new();
while let Some(entry) = files.next_entry().await.into_diagnostic()? {
if !entry.file_type().await.into_diagnostic()?.is_file() {
continue;
}

let path = entry.path();
if !path.extension().is_some_and(|ext| ext == "prp") {
continue;
}

event_files.insert(path);
}

for path in event_files {
let file = File::open(path).await.into_diagnostic()?;
let mut reader = AsyncReader::new(file.compat());
while let Some(event) = reader.read::<Event>().await.into_diagnostic()? {
println!("{}", serde_json::to_string(&event).into_diagnostic()?);
}
}

Ok(())
}

0 comments on commit f3cea5f

Please sign in to comment.