From 9a2413597c691be72c55cb09d87d4e083d691dbd Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Fri, 26 Sep 2025 14:24:59 -0700 Subject: [PATCH 1/6] Add a URI-like binary resolver, unit and integration tests --- Cargo.lock | 17 ++ common/Cargo.toml | 5 + common/src/lib.rs | 1 + common/src/resolver.rs | 383 +++++++++++++++++++++++++++++++++++ common/tests/loc_over_bus.rs | 209 +++++++++++++++++++ common/tests/test.toml | 20 ++ 6 files changed, 635 insertions(+) create mode 100644 common/src/resolver.rs create mode 100644 common/tests/loc_over_bus.rs create mode 100644 common/tests/test.toml diff --git a/Cargo.lock b/Cargo.lock index 69b7ffbf..c8e28236 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,10 +13,13 @@ dependencies = [ "bitmask-enum", "blake2 0.10.6", "bs58", + "bytes", "caryatid_module_clock", "caryatid_module_rest_server", + "caryatid_process", "caryatid_sdk", "chrono", + "config", "dashmap", "fraction", "futures", @@ -24,6 +27,7 @@ dependencies = [ "hex", "imbl", "lf-queue", + "memmap2", "minicbor 0.26.5", "num-rational", "num-traits", @@ -31,6 +35,7 @@ dependencies = [ "serde", "serde_json", "serde_with 3.14.1", + "tempfile", "tokio", "tracing", ] @@ -1206,6 +1211,9 @@ name = "bytes" version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" +dependencies = [ + "serde", +] [[package]] name = "byteview" @@ -3230,6 +3238,15 @@ version = "2.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" +[[package]] +name = "memmap2" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "843a98750cd611cc2965a8213b53b43e715f13c37a9e096c6408e69990961db7" +dependencies = [ + "libc", +] + [[package]] name = "mime" version = "0.3.17" diff --git a/common/Cargo.toml b/common/Cargo.toml index 11227d17..06d99b3f 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -19,16 +19,19 @@ bech32 = "0.11" bigdecimal = "0.4.8" bitmask-enum = "2.2" blake2 = "0.10" +bytes = { version = "1", features = ["serde"] } bs58 = "0.5" chrono = { workspace = true } gcd = "2.3" fraction = "0.15" hex = { workspace = true } lf-queue = "0.1.0" +memmap2 = "0.9" num-rational = { version = "0.4.2", features = ["serde"] } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true, features = ["base64"] } +tempfile = "3" tokio = { workspace = true } tracing = { workspace = true } futures = "0.3.31" @@ -37,6 +40,8 @@ num-traits = "0.2" imbl = { workspace = true } dashmap = { workspace = true } rayon = "1.11.0" +caryatid_process.workspace = true +config.workspace = true [lib] crate-type = ["rlib"] diff --git a/common/src/lib.rs b/common/src/lib.rs index cc564b42..17c554df 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -17,6 +17,7 @@ pub mod serialization; pub mod stake_addresses; pub mod state_history; pub mod types; +pub mod resolver; // Flattened re-exports pub use self::address::*; diff --git a/common/src/resolver.rs b/common/src/resolver.rs new file mode 100644 index 00000000..be04ff9b --- /dev/null +++ b/common/src/resolver.rs @@ -0,0 +1,383 @@ +//! # Resolver Module +//! +//! This module provides a uniform mechanism for resolving byte regions from either inline data or memory-mapped files. +//! It includes a registry for managing memory-mapped objects, a locator type for referencing regions, and a resolver for extracting slices. +//! +//! ## Features +//! - Registry for memory-mapped files with eviction support +//! - Uniform locator for inline or registry-backed data +//! - Thread-safe, concurrent access +//! - Out-of-bounds and overflow safety +//! +//! ## Example +//! ```rust +//! use acropolis_common::resolver::{Registry, Resolver, Loc, StoreId, ObjectId, Region}; +//! // ... see tests for usage ... +//! ``` + +use anyhow::{bail, Context, Result}; +use serde::{Serialize, Deserialize}; +use bytes::Bytes; +use dashmap::DashMap; +use memmap2::Mmap; +use std::{fs::File, ops::Range, sync::Arc}; + +/*** +Thoughts on eviction policy for the registry: + - Rollback safety: keep at least k + safe_zone worth of recently-touched + objects pinned in the registry; evict older ones. + - LRU/TTL: track last_access_slot per object (update on resolve); + run a periodic GC that calls evict for cold objects. + - Snapshot barrier: after you create a new epoch snapshot, + evict all objects strictly older than that snapshot (beyond rollback window). +Disk vs memory: + munmap frees address space and resident pages for the mapping; the file + stays on disk. If you also want to reclaim disk space, unlink the file + after registration and keep the FD/mapping alive (or use memfd); the kernel + discards it when all references are gone. +***/ + +/// Unique identifier for a storage backend (e.g., file, database). +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct StoreId(pub u16); + +/// Unique identifier for an object within a store. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ObjectId(pub u128); + +/// A region within an object, specified by offset and length. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct Region { + /// Byte offset from the start of the object. + pub offset: u64, + /// Length of the region in bytes. + pub len: u32, +} + +/// Uniform locator for all cases (inline or registry-backed) that doesn't use strings. +/// +/// If `inline` is `Some`, the region is resolved from the provided bytes; otherwise, it is resolved from the registry. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct Loc { + /// Store identifier. + pub store: StoreId, + /// Object identifier within the store. + pub object: ObjectId, + /// Region to resolve. + pub region: Region, + /// Optional inline bytes for direct resolution. + pub inline: Option, +} + +enum Backing { + Mmap(Arc), + // add Memfd(Arc), Blob(Arc<[u8]>), etc., as needed +} + +/// Thread-safe registry for memory-mapped objects. +#[derive(Default)] +pub struct Registry { + map: DashMap<(StoreId, ObjectId), Backing>, +} + +impl Registry { + /// Register a file for memory-mapped access in the registry. + pub fn register_file(&self, store: StoreId, object: ObjectId, file: &File) -> Result<()> { + let mmap = unsafe { Mmap::map(file) }.context("mmap failed")?; + self.map.insert((store, object), Backing::Mmap(Arc::new(mmap))); + Ok(()) + } + + /// Remove an object from the registry. If other holders (Resolved) still exist, + /// memory is freed later when those holders are dropped. + pub fn evict(&self, store: StoreId, object: ObjectId) -> bool { + self.map.remove(&(store, object)).is_some() + } + + /// Get a backing object from the registry, if present. + fn get(&self, store: StoreId, object: ObjectId) -> Option { + self.map.get(&(store, object)).map(|e| match &*e { + Backing::Mmap(m) => Backing::Mmap(Arc::clone(m)), + }) + } +} + +/// A resolved region, holding a reference to the underlying memory. +pub struct Resolved { + backing: ResolvedBacking, // owns an Arc (or Bytes) to keep memory alive + range: Range, +} + +enum ResolvedBacking { + Inline(Bytes), + Mmap(Arc), +} + +impl Resolved { + /// Returns the resolved region as a byte slice. + pub fn as_slice(&self) -> &[u8] { + match &self.backing { + ResolvedBacking::Inline(b) => &b[self.range.clone()], + ResolvedBacking::Mmap(m) => &m[self.range.clone()], + } + } +} + +/// Resolves regions from either inline data or registered memory-mapped files. +pub struct Resolver<'r> { + registry: &'r Registry, +} + +impl<'r> Resolver<'r> { + /// Create a new resolver with a reference to a registry. + pub fn new(registry: &'r Registry) -> Self { + Self { registry } + } + + /// Resolve a region described by `loc` into a `Resolved` view. + /// + /// Returns an error if the region is out of bounds or not found. + pub fn resolve(&self, loc: &Loc) -> Result { + let start = loc.region.offset as usize; + let end = start.checked_add(loc.region.len as usize).context("range overflow")?; + + if let Some(bytes) = &loc.inline { + if end > bytes.len() { + bail!("inline payload shorter than region"); + } + return Ok(Resolved { + backing: ResolvedBacking::Inline(bytes.clone()), + range: start..end, + }); + } + + let backing = self.registry.get(loc.store, loc.object).context("object not found")?; + + match backing { + Backing::Mmap(mm) => { + if end > mm.len() { + bail!("region out of bounds"); + } + Ok(Resolved { + backing: ResolvedBacking::Mmap(mm), + range: start..end, + }) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::Result; + use bytes::Bytes; + use std::fs::{File, OpenOptions}; + use std::io::Write; + use std::time::{SystemTime, UNIX_EPOCH}; + // use std::{sync::Arc, thread}; + + fn unique_path() -> std::path::PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(); + std::env::temp_dir().join(format!("acropolis_resolver_{nanos}.bin")) + } + + fn create_file_with(bytes: &[u8]) -> Result { + let path = unique_path(); + let mut f = OpenOptions::new() + .read(true) + .write(true) + .create(true) + .truncate(true) + .open(&path)?; + f.write_all(bytes)?; + f.sync_all()?; // ensure size/content visible to mmap + // Reopen read-only (optional, but mirrors production “reader” role) + drop(f); + let f = OpenOptions::new().read(true).open(&path)?; + Ok(f) + } + + #[test] + fn resolves_inline_payload() -> Result<()> { + let reg = Registry::default(); + let resolver = Resolver::new(®); + + let payload = Bytes::from_static(b"\x82\x01\x02\x03\x04\x05"); // small CBOR-ish bytes + let loc = Loc { + store: StoreId(0), + object: ObjectId(0), + region: Region { offset: 2, len: 3 }, // expect [0x02,0x03,0x04] + inline: Some(payload.clone()), + }; + + let r = resolver.resolve(&loc)?; + assert_eq!(r.as_slice(), &payload[2..5]); + + // Out of bounds on inline should error + let bad = Loc { + region: Region { offset: 0, len: 99 }, + ..loc.clone() + }; + assert!(resolver.resolve(&bad).is_err()); + Ok(()) + } + + #[test] + fn register_and_resolve_file_slice() -> Result<()> { + // File content: 0..=255 twice + let mut bytes = Vec::with_capacity(512); + bytes.extend(0u8..=255u8); + bytes.extend(0u8..=255u8); + + let file = create_file_with(&bytes)?; + let reg = Registry::default(); + let store = StoreId(1); + let obj = ObjectId(0xDEAD_BEEF_A11C_E55Au128); + + reg.register_file(store, obj, &file)?; + + let resolver = Resolver::new(®); + let loc = Loc { + store, + object: obj, + region: Region { offset: 100, len: 20 }, + inline: None, + }; + + let r = resolver.resolve(&loc)?; + assert_eq!(r.as_slice(), &bytes[100..120]); + Ok(()) + } + + #[test] + fn evict_makes_future_resolves_fail_but_existing_views_survive() -> Result<()> { + // Create a small file + let bytes = (0u8..=63u8).collect::>(); + let file = create_file_with(&bytes)?; + + let reg = Registry::default(); + let store = StoreId(2); + let obj = ObjectId(42); + + reg.register_file(store, obj, &file)?; + let resolver = Resolver::new(®); + + // Take a view + let loc = Loc { + store, + object: obj, + region: Region { offset: 8, len: 8 }, + inline: None, + }; + let view = resolver.resolve(&loc)?; // holds Arc to mmap internally + + // Evict from registry + assert!(reg.evict(store, obj)); + // Second evict is idempotent (nothing to remove) + assert!(!reg.evict(store, obj)); + + // Existing view is still readable (Arc kept it alive) + assert_eq!(view.as_slice(), &bytes[8..16]); + + // New resolves must fail after eviction + assert!(resolver.resolve(&loc).is_err()); + Ok(()) + } + + #[test] + fn resolve_out_of_bounds_fails() -> Result<()> { + let file_bytes = vec![1u8, 2, 3, 4, 5]; + let file = create_file_with(&file_bytes)?; + let reg = Registry::default(); + let store = StoreId(3); + let obj = ObjectId(7); + reg.register_file(store, obj, &file)?; + + let resolver = Resolver::new(®); + // Ask past end of file + let loc = Loc { + store, + object: obj, + region: Region { offset: 4, len: 4 }, // end=8 > len=5 + inline: None, + }; + assert!(resolver.resolve(&loc).is_err()); + Ok(()) + } + + #[test] + fn range_overflow_fails_early() -> Result<()> { + let reg = Registry::default(); + let resolver = Resolver::new(®); + + let loc = Loc { + store: StoreId(9), + object: ObjectId(9), + region: Region { + offset: u64::MAX - 5, + len: 16, // offset + len overflows usize/checked_add + }, + inline: Some(Bytes::from_static(&[0u8; 32])), + }; + assert!(resolver.resolve(&loc).is_err()); + Ok(()) + } + + #[test] +fn concurrent_resolves_share_backing() -> Result<()> { + use std::sync::Arc; + + // 1) Create file content. + let mut bytes = Vec::with_capacity(1024); + for i in 0..1024u32 { + bytes.push((i % 251) as u8); + } + + // 2) Pre-compute the expected slice and share it via Arc. + let expected: Arc> = Arc::new(bytes[128..384].to_vec()); + + // 3) Register the file once. + let file = create_file_with(&bytes)?; + let reg = Arc::new(Registry::default()); + let store = StoreId(11); + let obj = ObjectId(0xABCD); + reg.register_file(store, obj, &file)?; + + let loc = Loc { + store, + object: obj, + region: Region { offset: 128, len: 256 }, + inline: None, + }; + + + // 4) Resolve in parallel without ever capturing `bytes`. + let mut handles = Vec::new(); + for _ in 0..8 { + let reg_cloned = Arc::clone(®); + let loc_cloned = loc.clone(); + let expected_cloned = Arc::clone(&expected); + + handles.push(std::thread::spawn(move || { + let resolver = Resolver::new(®_cloned); + let r = resolver.resolve(&loc_cloned).expect("resolve"); + assert_eq!(r.as_slice(), &expected_cloned[..]); + })); + } + + // 4) Concurrently, sanity check in the main thread. + let resolver_main = Resolver::new(®); + let r_main = resolver_main.resolve(&loc)?; + assert_eq!(r_main.as_slice(), &expected[..]); + + for h in handles { + h.join().expect("thread join ok"); + } + Ok(()) +} + +} diff --git a/common/tests/loc_over_bus.rs b/common/tests/loc_over_bus.rs new file mode 100644 index 00000000..7c96ff22 --- /dev/null +++ b/common/tests/loc_over_bus.rs @@ -0,0 +1,209 @@ +//! Integration test: send a Loc over the Caryatid bus and resolve it. +//! Everything in this process is used for testing, don't accidentally include in production builds +//! TODO: this could be broken into three parts: subscriber module, publisher module, and the test itself. +#![cfg(test)] +use std::{ + sync::{Arc, Mutex, OnceLock}, + time::Duration, +}; + +use anyhow::{Result}; +use caryatid_sdk::{module, Context, Module}; +use serde::{Deserialize, Serialize}; +use tempfile::NamedTempFile; +use tokio::{sync::watch, time::timeout}; +use tracing::{info}; + + +use config::{Config, Environment, File}; +use caryatid_process::Process; +use acropolis_common::resolver::{Loc, ObjectId, Region, Registry, Resolver, StoreId}; + +// --------- shared test completion signaling --------- +static TEST_COMPLETION_TX: Mutex>> = Mutex::new(None); +pub fn signal_test_completion() { + if let Ok(tx) = TEST_COMPLETION_TX.lock() { + if let Some(sender) = tx.as_ref() { + let _ = sender.send(true); + } + } +} + +// --------- shared registry (test process local) ---------- +static REGISTRY: OnceLock> = OnceLock::new(); +fn registry() -> Arc { + REGISTRY.get().cloned().expect("registry not set") +} + +// ---------- typed bus message carrying our Loc ---------- +#[derive(Clone, Debug, Serialize, Default, Deserialize, PartialEq)] +enum BusMsg { + #[default] + None, // Just so we have a simple default + + Loc(Loc), + Ack(String), // response back to publisher +} + +/// Typed subscriber module +#[module( + message_type(BusMsg), + name = "subscriber", + description = "Typed subscriber module" +)] +struct Subscriber; + +impl Subscriber { + // Implement the single initialisation function, with application + async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + let subscribe_topic = config.get_string("topic").unwrap_or("sample".to_string()); + let ack_topic = format!("{}.ack", subscribe_topic); + let mut sub = context.subscribe(&subscribe_topic).await?; + info!("Creating subscriber on '{}'", subscribe_topic); + // Let this run async + let ctx = context.clone(); + ctx.run(async move { + while let Ok((_, msg)) = sub.read().await { + match &*msg { + BusMsg::Loc(loc) => { + let res = Resolver::new(®istry()).resolve(loc); + match res { + Ok(view) => { + // touch the bytes so we know mapping worked + let slice = view.as_slice(); + // trivial check: non-empty + if !slice.is_empty() { + context.publish(&ack_topic, Arc::new(BusMsg::Ack("ok".to_string()))).await.expect("Failed to publish ACK"); + } else { + context.publish(&ack_topic, Arc::new(BusMsg::Ack("empty".to_string()))).await.expect("Failed to publish ACK"); + } + break; // test done + } + Err(_) => { + context.publish(&ack_topic, Arc::new(BusMsg::Ack("resolve_err".to_string()))).await.expect("Failed to publish ACK"); + break; + } + } + } + _ => {} + } + } + }); + Ok(()) + } +} + +/// Typed publisher module +#[module( + message_type(BusMsg), + name = "publisher", + description = "Typed publisher module" +)] +pub struct Publisher; + +impl Publisher { + // super::signal_test_completion(); + // Implement the single initialisation function, with application + async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + let message_bus = context.message_bus.clone(); + + // Get configuration + let topic = config.get_string("topic").unwrap_or("sample".to_string()); + + // Subscribe for the ACK *before* publishing to avoid races. + let mut ack_sub = context.subscribe(&format!("{}.ack", topic)).await?; + + info!("Creating publisher on '{}'", topic); + + // Send test messages to the message bus on 'sample_topic' + // Let this run async + context.run(async move { + // Custom struct + let message = BusMsg::Loc(Loc { + store: StoreId(1), + object: ObjectId(0xFEED_CAFE_BEEF), + region: Region { offset: 100, len: 40 }, + inline: None, + }); + info!("Sending {:?}", message); + message_bus + .publish(&topic, Arc::new(message)) + .await + .expect("Failed to publish message"); + // Wait for ACK from Publisher and then signal completion of test. + while let Ok((_, message)) = ack_sub.read().await { + if let BusMsg::Ack(ref s) = *message { + if s == "ok" { + // we're done! + signal_test_completion(); + break; + } else { + assert!(false, "Unexpected ACK message: {}", s); + } + } + } + }); + Ok(()) + } +} + +// -------------- the test -------------- +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn loc_round_trip_over_caryatid() -> Result<()> { + // 0) Prepare backing bytes on disk and register them. + let tmp = NamedTempFile::new()?; + let bytes: Vec = (0u8..=255).collect(); // 256 bytes + std::fs::write(tmp.path(), &bytes)?; + // reopen read-only for mmap stability + let file = std::fs::File::open(tmp.path())?; + + let reg = Arc::new(Registry::default()); + reg.register_file(StoreId(1), ObjectId(0xFEED_CAFE_BEEF), &file)?; + REGISTRY.set(reg.clone()).ok(); + + // Read the config + let config = Arc::new( + Config::builder() + .add_source(File::with_name("test")) + .add_source(Environment::with_prefix("CARYATID")) + .build() + .unwrap(), + ); + + let (completion_tx, mut completion_rx) = watch::channel(false); + + { + let mut tx = TEST_COMPLETION_TX.lock().unwrap(); + *tx = Some(completion_tx); + } + + // Create the process + let mut process = Process::::create(config).await; + + // Register modules + Subscriber::register(&mut process); + Publisher::register(&mut process); + + // Run the process (this will run until we signal completion) + // We wrap this in a timeout to avoid hanging the test indefinitely + + match timeout(Duration::from_secs(5), async { + tokio::select! { + // run everythng + result = process.run() => { + result + } + _ = completion_rx.changed() => { + Ok(()) + } + } + }) + .await + { + Ok(result) => result?, + Err(_) => { + assert!(false, "Test timed out after 5 seconds"); + } + } + Ok(()) +} diff --git a/common/tests/test.toml b/common/tests/test.toml new file mode 100644 index 00000000..e3d32a4c --- /dev/null +++ b/common/tests/test.toml @@ -0,0 +1,20 @@ +# Top-level configuration for Caryatid process using typed messages + +[module.subscriber] +topic = "acropolis.loc.test" + +[module.publisher] +topic = "acropolis.loc.test" + +[message-bus.external] +class = "rabbit-mq" +url = "amqp://127.0.0.1:5672/%2f" +exchange = "caryatid" + +[message-bus.internal] +class = "in-memory" +workers = 10 + +[[message-router.route]] # Everything goes to both +pattern = "#" +bus = [ "internal", "external" ] From b78ef737c283e1aa41b60fbce2cb7f9b89fcb7ac Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Thu, 2 Oct 2025 06:56:11 -0700 Subject: [PATCH 2/6] Apply some formatting and spelling corrections --- common/src/resolver.rs | 117 +++++++++++++++++------------------ common/tests/loc_over_bus.rs | 50 ++++++++++----- 2 files changed, 92 insertions(+), 75 deletions(-) diff --git a/common/src/resolver.rs b/common/src/resolver.rs index be04ff9b..731868e9 100644 --- a/common/src/resolver.rs +++ b/common/src/resolver.rs @@ -16,10 +16,10 @@ //! ``` use anyhow::{bail, Context, Result}; -use serde::{Serialize, Deserialize}; use bytes::Bytes; use dashmap::DashMap; use memmap2::Mmap; +use serde::{Deserialize, Serialize}; use std::{fs::File, ops::Range, sync::Arc}; /*** @@ -178,24 +178,17 @@ mod tests { // use std::{sync::Arc, thread}; fn unique_path() -> std::path::PathBuf { - let nanos = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_nanos(); + let nanos = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos(); std::env::temp_dir().join(format!("acropolis_resolver_{nanos}.bin")) } fn create_file_with(bytes: &[u8]) -> Result { let path = unique_path(); - let mut f = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true) - .open(&path)?; + let mut f = + OpenOptions::new().read(true).write(true).create(true).truncate(true).open(&path)?; f.write_all(bytes)?; f.sync_all()?; // ensure size/content visible to mmap - // Reopen read-only (optional, but mirrors production “reader” role) + // Reopen read-only (optional, but mirrors production “reader” role) drop(f); let f = OpenOptions::new().read(true).open(&path)?; Ok(f) @@ -244,7 +237,10 @@ mod tests { let loc = Loc { store, object: obj, - region: Region { offset: 100, len: 20 }, + region: Region { + offset: 100, + len: 20, + }, inline: None, }; @@ -328,56 +324,57 @@ mod tests { } #[test] -fn concurrent_resolves_share_backing() -> Result<()> { - use std::sync::Arc; + fn concurrent_resolves_share_backing() -> Result<()> { + use std::sync::Arc; - // 1) Create file content. - let mut bytes = Vec::with_capacity(1024); - for i in 0..1024u32 { - bytes.push((i % 251) as u8); - } + // 1) Create file content. + let mut bytes = Vec::with_capacity(1024); + for i in 0..1024u32 { + bytes.push((i % 251) as u8); + } - // 2) Pre-compute the expected slice and share it via Arc. - let expected: Arc> = Arc::new(bytes[128..384].to_vec()); - - // 3) Register the file once. - let file = create_file_with(&bytes)?; - let reg = Arc::new(Registry::default()); - let store = StoreId(11); - let obj = ObjectId(0xABCD); - reg.register_file(store, obj, &file)?; - - let loc = Loc { - store, - object: obj, - region: Region { offset: 128, len: 256 }, - inline: None, - }; - - - // 4) Resolve in parallel without ever capturing `bytes`. - let mut handles = Vec::new(); - for _ in 0..8 { - let reg_cloned = Arc::clone(®); - let loc_cloned = loc.clone(); - let expected_cloned = Arc::clone(&expected); - - handles.push(std::thread::spawn(move || { - let resolver = Resolver::new(®_cloned); - let r = resolver.resolve(&loc_cloned).expect("resolve"); - assert_eq!(r.as_slice(), &expected_cloned[..]); - })); - } + // 2) Pre-compute the expected slice and share it via Arc. + let expected: Arc> = Arc::new(bytes[128..384].to_vec()); - // 4) Concurrently, sanity check in the main thread. - let resolver_main = Resolver::new(®); - let r_main = resolver_main.resolve(&loc)?; - assert_eq!(r_main.as_slice(), &expected[..]); + // 3) Register the file once. + let file = create_file_with(&bytes)?; + let reg = Arc::new(Registry::default()); + let store = StoreId(11); + let obj = ObjectId(0xABCD); + reg.register_file(store, obj, &file)?; - for h in handles { - h.join().expect("thread join ok"); - } - Ok(()) -} + let loc = Loc { + store, + object: obj, + region: Region { + offset: 128, + len: 256, + }, + inline: None, + }; + // 4) Resolve in parallel without ever capturing `bytes`. + let mut handles = Vec::new(); + for _ in 0..8 { + let reg_cloned = Arc::clone(®); + let loc_cloned = loc.clone(); + let expected_cloned = Arc::clone(&expected); + + handles.push(std::thread::spawn(move || { + let resolver = Resolver::new(®_cloned); + let r = resolver.resolve(&loc_cloned).expect("resolve"); + assert_eq!(r.as_slice(), &expected_cloned[..]); + })); + } + + // 4) Concurrently, sanity check in the main thread. + let resolver_main = Resolver::new(®); + let r_main = resolver_main.resolve(&loc)?; + assert_eq!(r_main.as_slice(), &expected[..]); + + for h in handles { + h.join().expect("thread join ok"); + } + Ok(()) + } } diff --git a/common/tests/loc_over_bus.rs b/common/tests/loc_over_bus.rs index 7c96ff22..749ccc17 100644 --- a/common/tests/loc_over_bus.rs +++ b/common/tests/loc_over_bus.rs @@ -7,17 +7,16 @@ use std::{ time::Duration, }; -use anyhow::{Result}; +use anyhow::Result; use caryatid_sdk::{module, Context, Module}; use serde::{Deserialize, Serialize}; use tempfile::NamedTempFile; use tokio::{sync::watch, time::timeout}; -use tracing::{info}; +use tracing::info; - -use config::{Config, Environment, File}; -use caryatid_process::Process; use acropolis_common::resolver::{Loc, ObjectId, Region, Registry, Resolver, StoreId}; +use caryatid_process::Process; +use config::{Config, Environment, File}; // --------- shared test completion signaling --------- static TEST_COMPLETION_TX: Mutex>> = Mutex::new(None); @@ -40,7 +39,7 @@ fn registry() -> Arc { enum BusMsg { #[default] None, // Just so we have a simple default - + Loc(Loc), Ack(String), // response back to publisher } @@ -73,14 +72,32 @@ impl Subscriber { let slice = view.as_slice(); // trivial check: non-empty if !slice.is_empty() { - context.publish(&ack_topic, Arc::new(BusMsg::Ack("ok".to_string()))).await.expect("Failed to publish ACK"); + context + .publish( + &ack_topic, + Arc::new(BusMsg::Ack("ok".to_string())), + ) + .await + .expect("Failed to publish ACK"); } else { - context.publish(&ack_topic, Arc::new(BusMsg::Ack("empty".to_string()))).await.expect("Failed to publish ACK"); + context + .publish( + &ack_topic, + Arc::new(BusMsg::Ack("empty".to_string())), + ) + .await + .expect("Failed to publish ACK"); } break; // test done } Err(_) => { - context.publish(&ack_topic, Arc::new(BusMsg::Ack("resolve_err".to_string()))).await.expect("Failed to publish ACK"); + context + .publish( + &ack_topic, + Arc::new(BusMsg::Ack("resolve_err".to_string())), + ) + .await + .expect("Failed to publish ACK"); break; } } @@ -120,9 +137,12 @@ impl Publisher { context.run(async move { // Custom struct let message = BusMsg::Loc(Loc { - store: StoreId(1), + store: StoreId(1), object: ObjectId(0xFEED_CAFE_BEEF), - region: Region { offset: 100, len: 40 }, + region: Region { + offset: 100, + len: 40, + }, inline: None, }); info!("Sending {:?}", message); @@ -138,7 +158,7 @@ impl Publisher { signal_test_completion(); break; } else { - assert!(false, "Unexpected ACK message: {}", s); + panic!("Unexpected ACK message: {}", s); } } } @@ -179,7 +199,7 @@ async fn loc_round_trip_over_caryatid() -> Result<()> { // Create the process let mut process = Process::::create(config).await; - + // Register modules Subscriber::register(&mut process); Publisher::register(&mut process); @@ -189,7 +209,7 @@ async fn loc_round_trip_over_caryatid() -> Result<()> { match timeout(Duration::from_secs(5), async { tokio::select! { - // run everythng + // run everything result = process.run() => { result } @@ -202,7 +222,7 @@ async fn loc_round_trip_over_caryatid() -> Result<()> { { Ok(result) => result?, Err(_) => { - assert!(false, "Test timed out after 5 seconds"); + panic!("Test timed out after 5 seconds"); } } Ok(()) From d5bad582a2076b93ba3a7722a2e2e8a0e4770e56 Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Thu, 2 Oct 2025 07:08:13 -0700 Subject: [PATCH 3/6] Fix alphabetical ordering of my new module --- common/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/lib.rs b/common/src/lib.rs index 17c554df..42b7767f 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -13,11 +13,11 @@ pub mod protocol_params; pub mod queries; pub mod rational_number; pub mod rest_helper; +pub mod resolver; pub mod serialization; pub mod stake_addresses; pub mod state_history; pub mod types; -pub mod resolver; // Flattened re-exports pub use self::address::*; From 75eb37ccf8404ea946dd050f092684d90beb0465 Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Thu, 2 Oct 2025 07:10:10 -0700 Subject: [PATCH 4/6] Document config file path logic in integration test --- common/tests/loc_over_bus.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/common/tests/loc_over_bus.rs b/common/tests/loc_over_bus.rs index 749ccc17..7d73ba14 100644 --- a/common/tests/loc_over_bus.rs +++ b/common/tests/loc_over_bus.rs @@ -182,9 +182,11 @@ async fn loc_round_trip_over_caryatid() -> Result<()> { REGISTRY.set(reg.clone()).ok(); // Read the config + // Use an absolute path for the config file so the test works regardless of the working directory. + // This ensures config-rs always finds the file, even when integration tests are run from the workspace root or other locations. let config = Arc::new( Config::builder() - .add_source(File::with_name("test")) + .add_source(File::with_name(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/test"))) .add_source(Environment::with_prefix("CARYATID")) .build() .unwrap(), From 9e7b8fb1062411f560988b039d9a09bfa435c7c7 Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Thu, 2 Oct 2025 07:17:29 -0700 Subject: [PATCH 5/6] Fix ordering, I need a command for this --- common/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/lib.rs b/common/src/lib.rs index 42b7767f..5b34005a 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -12,8 +12,8 @@ pub mod params; pub mod protocol_params; pub mod queries; pub mod rational_number; -pub mod rest_helper; pub mod resolver; +pub mod rest_helper; pub mod serialization; pub mod stake_addresses; pub mod state_history; From 2f137f91f1c56bef66b06a1d55bf4ba0b5c3a045 Mon Sep 17 00:00:00 2001 From: Chris Tilt Date: Thu, 2 Oct 2025 07:20:13 -0700 Subject: [PATCH 6/6] Format again. I need to automate this --- common/tests/loc_over_bus.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/common/tests/loc_over_bus.rs b/common/tests/loc_over_bus.rs index 7d73ba14..559fe789 100644 --- a/common/tests/loc_over_bus.rs +++ b/common/tests/loc_over_bus.rs @@ -186,7 +186,10 @@ async fn loc_round_trip_over_caryatid() -> Result<()> { // This ensures config-rs always finds the file, even when integration tests are run from the workspace root or other locations. let config = Arc::new( Config::builder() - .add_source(File::with_name(concat!(env!("CARGO_MANIFEST_DIR"), "/tests/test"))) + .add_source(File::with_name(concat!( + env!("CARGO_MANIFEST_DIR"), + "/tests/test" + ))) .add_source(Environment::with_prefix("CARYATID")) .build() .unwrap(),