Skip to content
Merged
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ alloy-serde = "1.0.41"

# op-alloy
op-alloy-network = { version = "0.20.0", default-features = false }
op-alloy-consensus = { version = "0.20.0", features = ["k256"] }
op-alloy-consensus = { version = "0.20.0", features = ["k256", "serde"] }
op-alloy-rpc-types = { version = "0.20.0", default-features = true}
op-alloy-flz = { version = "0.13.1" }

Expand Down
2 changes: 1 addition & 1 deletion crates/audit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ name = "tips-audit"
path = "src/bin/main.rs"

[dependencies]
tips-core = { workspace = true }
tips-core = { workspace = true, features = ["test-utils"] }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
Expand Down
27 changes: 13 additions & 14 deletions crates/audit/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use aws_sdk_s3::primitives::ByteStream;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::fmt::Debug;
use tips_core::Bundle;
use tips_core::AcceptedBundle;
use tracing::info;

#[derive(Debug)]
Expand Down Expand Up @@ -39,7 +39,7 @@ pub enum BundleHistoryEvent {
Received {
key: String,
timestamp: i64,
bundle: Bundle,
bundle: Box<AcceptedBundle>,
},
Cancelled {
key: String,
Expand Down Expand Up @@ -365,13 +365,9 @@ mod tests {
use crate::reader::Event;
use crate::types::{BundleEvent, DropReason};
use alloy_primitives::TxHash;
use tips_core::Bundle;
use tips_core::test_utils::create_bundle_from_txn_data;
use uuid::Uuid;

fn create_test_bundle() -> Bundle {
Bundle::default()
}

fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event {
Event {
key: key.to_string(),
Expand All @@ -383,11 +379,11 @@ mod tests {
#[test]
fn test_update_bundle_history_transform_adds_new_event() {
let bundle_history = BundleHistory { history: vec![] };
let bundle = create_test_bundle();
let bundle = create_bundle_from_txn_data();
let bundle_id = Uuid::new_v4();
let bundle_event = BundleEvent::Received {
bundle_id,
bundle: bundle.clone(),
bundle: Box::new(bundle.clone()),
};
let event = create_test_event("test-key", 1234567890, bundle_event);

Expand Down Expand Up @@ -416,15 +412,18 @@ mod tests {
let existing_event = BundleHistoryEvent::Received {
key: "duplicate-key".to_string(),
timestamp: 1111111111,
bundle: create_test_bundle(),
bundle: Box::new(create_bundle_from_txn_data()),
};
let bundle_history = BundleHistory {
history: vec![existing_event],
};

let bundle = create_test_bundle();
let bundle = create_bundle_from_txn_data();
let bundle_id = Uuid::new_v4();
let bundle_event = BundleEvent::Received { bundle_id, bundle };
let bundle_event = BundleEvent::Received {
bundle_id,
bundle: Box::new(bundle),
};
let event = create_test_event("duplicate-key", 1234567890, bundle_event);

let result = update_bundle_history_transform(bundle_history, &event);
Expand All @@ -437,10 +436,10 @@ mod tests {
let bundle_history = BundleHistory { history: vec![] };
let bundle_id = Uuid::new_v4();

let bundle = create_test_bundle();
let bundle = create_bundle_from_txn_data();
let bundle_event = BundleEvent::Received {
bundle_id,
bundle: bundle.clone(),
bundle: Box::new(bundle),
};
let event = create_test_event("test-key", 1234567890, bundle_event);
let result = update_bundle_history_transform(bundle_history.clone(), &event);
Expand Down
27 changes: 10 additions & 17 deletions crates/audit/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use alloy_consensus::transaction::{SignerRecoverable, Transaction as ConsensusTransaction};
use alloy_primitives::{Address, TxHash, U256};
use alloy_provider::network::eip2718::Decodable2718;
use bytes::Bytes;
use op_alloy_consensus::OpTxEnvelope;
use serde::{Deserialize, Serialize};
use tips_core::Bundle;
use tips_core::AcceptedBundle;
use uuid::Uuid;

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
Expand Down Expand Up @@ -33,7 +31,7 @@ pub struct Transaction {
pub enum BundleEvent {
Received {
bundle_id: BundleId,
bundle: Bundle,
bundle: Box<AcceptedBundle>,
},
Cancelled {
bundle_id: BundleId,
Expand Down Expand Up @@ -72,19 +70,14 @@ impl BundleEvent {
bundle
.txs
.iter()
.filter_map(|tx_bytes| {
match OpTxEnvelope::decode_2718_exact(tx_bytes.iter().as_slice()) {
Ok(envelope) => {
match envelope.recover_signer() {
Ok(sender) => Some(TransactionId {
sender,
nonce: U256::from(envelope.nonce()),
hash: *envelope.hash(),
}),
Err(_) => None, // Skip invalid transactions
}
}
Err(_) => None, // Skip malformed transactions
.filter_map(|envelope| {
match envelope.recover_signer() {
Ok(sender) => Some(TransactionId {
sender,
nonce: U256::from(envelope.nonce()),
hash: *envelope.hash(),
}),
Err(_) => None, // Skip invalid transactions
}
})
.collect()
Expand Down
6 changes: 3 additions & 3 deletions crates/audit/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tips_audit::{
storage::{BundleEventS3Reader, S3EventReaderWriter},
types::{BundleEvent, DropReason},
};
use tips_core::Bundle;
use tips_core::test_utils::create_bundle_from_txn_data;
use uuid::Uuid;
mod common;
use common::TestHarness;
Expand All @@ -20,10 +20,10 @@ async fn test_kafka_publisher_s3_archiver_integration()
S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone());

let test_bundle_id = Uuid::new_v4();
let test_events = vec![
let test_events = [
BundleEvent::Received {
bundle_id: test_bundle_id,
bundle: Bundle::default(),
bundle: Box::new(create_bundle_from_txn_data()),
},
BundleEvent::Dropped {
bundle_id: test_bundle_id,
Expand Down
42 changes: 14 additions & 28 deletions crates/audit/tests/s3_test.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,16 @@
use alloy_primitives::{Bytes, TxHash, b256, bytes};
use alloy_primitives::TxHash;
use std::sync::Arc;
use tips_audit::{
reader::Event,
storage::{BundleEventS3Reader, EventWriter, S3EventReaderWriter},
types::BundleEvent,
};
use tips_core::Bundle;
use tokio::task::JoinSet;
use uuid::Uuid;

mod common;
use common::TestHarness;

// https://basescan.org/tx/0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e
const TXN_DATA: Bytes = bytes!(
"0x02f88f8221058304b6b3018315fb3883124f80948ff2f0a8d017c79454aa28509a19ab9753c2dd1480a476d58e1a0182426068c9ea5b00000000000000000002f84f00000000083e4fda54950000c080a086fbc7bbee41f441fb0f32f7aa274d2188c460fe6ac95095fa6331fa08ec4ce7a01aee3bcc3c28f7ba4e0c24da9ae85e9e0166c73cabb42c25ff7b5ecd424f3105"
);
const TXN_HASH: TxHash =
b256!("0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e");

fn create_test_bundle() -> Bundle {
Bundle {
txs: vec![TXN_DATA.clone()],
..Default::default()
}
}
use tips_core::test_utils::{TXN_HASH, create_bundle_from_txn_data};

fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event {
Event {
Expand All @@ -40,13 +26,13 @@ async fn test_event_write_and_read() -> Result<(), Box<dyn std::error::Error + S
let writer = S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone());

let bundle_id = Uuid::new_v4();
let bundle = create_test_bundle();
let bundle = create_bundle_from_txn_data();
let event = create_test_event(
"test-key-1",
1234567890,
BundleEvent::Received {
bundle_id,
bundle: bundle.clone(),
bundle: Box::new(bundle.clone()),
},
);

Expand All @@ -67,13 +53,13 @@ async fn test_event_write_and_read() -> Result<(), Box<dyn std::error::Error + S
}

let bundle_id_two = Uuid::new_v4();
let bundle = create_test_bundle();
let bundle = create_bundle_from_txn_data();
let event = create_test_event(
"test-key-2",
1234567890,
BundleEvent::Received {
bundle_id: bundle_id_two,
bundle: bundle.clone(),
bundle: Box::new(bundle.clone()),
},
);

Expand All @@ -96,15 +82,15 @@ async fn test_events_appended() -> Result<(), Box<dyn std::error::Error + Send +
let writer = S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone());

let bundle_id = Uuid::new_v4();
let bundle = create_test_bundle();
let bundle = create_bundle_from_txn_data();

let events = vec![
let events = [
create_test_event(
"test-key-1",
1234567890,
BundleEvent::Received {
bundle_id,
bundle: bundle.clone(),
bundle: Box::new(bundle.clone()),
},
),
create_test_event(
Expand Down Expand Up @@ -147,13 +133,13 @@ async fn test_event_deduplication() -> Result<(), Box<dyn std::error::Error + Se
let writer = S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone());

let bundle_id = Uuid::new_v4();
let bundle = create_test_bundle();
let bundle = create_bundle_from_txn_data();
let event = create_test_event(
"duplicate-key",
1234567890,
BundleEvent::Received {
bundle_id,
bundle: bundle.clone(),
bundle: Box::new(bundle.clone()),
},
);

Expand Down Expand Up @@ -197,14 +183,14 @@ async fn test_concurrent_writes_for_bundle() -> Result<(), Box<dyn std::error::E
));

let bundle_id = Uuid::new_v4();
let bundle = create_test_bundle();
let bundle = create_bundle_from_txn_data();

let event = create_test_event(
"hello-dan",
1234567889i64,
BundleEvent::Received {
bundle_id,
bundle: bundle.clone(),
bundle: Box::new(bundle.clone()),
},
);

Expand All @@ -225,7 +211,7 @@ async fn test_concurrent_writes_for_bundle() -> Result<(), Box<dyn std::error::E
1234567890 + i as i64,
BundleEvent::Received {
bundle_id,
bundle: bundle.clone(),
bundle: Box::new(bundle.clone()),
},
);

Expand Down
4 changes: 2 additions & 2 deletions crates/bundle-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use tracing::error;

pub use pool::{Action, BundleStore, InMemoryBundlePool, ProcessedBundle};
pub use source::KafkaBundleSource;
pub use tips_core::{Bundle, BundleHash, BundleWithMetadata, CancelBundle};
pub use tips_core::{AcceptedBundle, Bundle, CancelBundle};

pub fn connect_sources_to_pool<S, P>(
sources: Vec<S>,
bundle_rx: mpsc::UnboundedReceiver<BundleWithMetadata>,
bundle_rx: mpsc::UnboundedReceiver<AcceptedBundle>,
pool: Arc<Mutex<P>>,
) where
S: BundleSource + Send + 'static,
Expand Down
12 changes: 6 additions & 6 deletions crates/bundle-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use alloy_primitives::map::HashMap;
use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use tips_audit::{BundleEvent, DropReason};
use tips_core::BundleWithMetadata;
use tips_core::AcceptedBundle;
use tokio::sync::mpsc;
use tracing::warn;
use uuid::Uuid;
Expand Down Expand Up @@ -31,8 +31,8 @@ impl ProcessedBundle {
}

pub trait BundleStore {
fn add_bundle(&mut self, bundle: BundleWithMetadata);
fn get_bundles(&self) -> Vec<BundleWithMetadata>;
fn add_bundle(&mut self, bundle: AcceptedBundle);
fn get_bundles(&self) -> Vec<AcceptedBundle>;
fn built_flashblock(
&mut self,
block_number: u64,
Expand All @@ -44,7 +44,7 @@ pub trait BundleStore {

struct BundleData {
flashblocks_in_block: HashMap<u64, Vec<ProcessedBundle>>,
bundles: HashMap<Uuid, BundleWithMetadata>,
bundles: HashMap<Uuid, AcceptedBundle>,
}

#[derive(Clone)]
Expand Down Expand Up @@ -77,12 +77,12 @@ impl InMemoryBundlePool {
}

impl BundleStore for InMemoryBundlePool {
fn add_bundle(&mut self, bundle: BundleWithMetadata) {
fn add_bundle(&mut self, bundle: AcceptedBundle) {
let mut inner = self.inner.lock().unwrap();
inner.bundles.insert(*bundle.uuid(), bundle);
}

fn get_bundles(&self) -> Vec<BundleWithMetadata> {
fn get_bundles(&self) -> Vec<AcceptedBundle> {
let inner = self.inner.lock().unwrap();
inner.bundles.values().cloned().collect()
}
Expand Down
Loading