Skip to content

Commit 2ff5595

Browse files
authored
refactor: add bundle helpers (#54)
* spike * spike from * remove transactions * fix test * bundle params * fmt * txs helper * err type * spike new types * better err handling * reduce diff * more diff reduce * comments and diff * reduce clones * naming * fmt * fmt + parsed + change fn to new * accept bundle is err free * fix * no redudant closure
1 parent 44e6bef commit 2ff5595

File tree

16 files changed

+310
-185
lines changed

16 files changed

+310
-185
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ alloy-serde = "1.0.41"
3131

3232
# op-alloy
3333
op-alloy-network = { version = "0.20.0", default-features = false }
34-
op-alloy-consensus = { version = "0.20.0", features = ["k256"] }
34+
op-alloy-consensus = { version = "0.20.0", features = ["k256", "serde"] }
3535
op-alloy-rpc-types = { version = "0.20.0", default-features = true}
3636
op-alloy-flz = { version = "0.13.1" }
3737

crates/audit/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ name = "tips-audit"
1212
path = "src/bin/main.rs"
1313

1414
[dependencies]
15-
tips-core = { workspace = true }
15+
tips-core = { workspace = true, features = ["test-utils"] }
1616
tokio = { workspace = true }
1717
tracing = { workspace = true }
1818
tracing-subscriber = { workspace = true }

crates/audit/src/storage.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use aws_sdk_s3::primitives::ByteStream;
1010
use serde::{Deserialize, Serialize};
1111
use std::fmt;
1212
use std::fmt::Debug;
13-
use tips_core::Bundle;
13+
use tips_core::AcceptedBundle;
1414
use tracing::info;
1515

1616
#[derive(Debug)]
@@ -39,7 +39,7 @@ pub enum BundleHistoryEvent {
3939
Received {
4040
key: String,
4141
timestamp: i64,
42-
bundle: Bundle,
42+
bundle: Box<AcceptedBundle>,
4343
},
4444
Cancelled {
4545
key: String,
@@ -365,13 +365,9 @@ mod tests {
365365
use crate::reader::Event;
366366
use crate::types::{BundleEvent, DropReason};
367367
use alloy_primitives::TxHash;
368-
use tips_core::Bundle;
368+
use tips_core::test_utils::create_bundle_from_txn_data;
369369
use uuid::Uuid;
370370

371-
fn create_test_bundle() -> Bundle {
372-
Bundle::default()
373-
}
374-
375371
fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event {
376372
Event {
377373
key: key.to_string(),
@@ -383,11 +379,11 @@ mod tests {
383379
#[test]
384380
fn test_update_bundle_history_transform_adds_new_event() {
385381
let bundle_history = BundleHistory { history: vec![] };
386-
let bundle = create_test_bundle();
382+
let bundle = create_bundle_from_txn_data();
387383
let bundle_id = Uuid::new_v4();
388384
let bundle_event = BundleEvent::Received {
389385
bundle_id,
390-
bundle: bundle.clone(),
386+
bundle: Box::new(bundle.clone()),
391387
};
392388
let event = create_test_event("test-key", 1234567890, bundle_event);
393389

@@ -416,15 +412,18 @@ mod tests {
416412
let existing_event = BundleHistoryEvent::Received {
417413
key: "duplicate-key".to_string(),
418414
timestamp: 1111111111,
419-
bundle: create_test_bundle(),
415+
bundle: Box::new(create_bundle_from_txn_data()),
420416
};
421417
let bundle_history = BundleHistory {
422418
history: vec![existing_event],
423419
};
424420

425-
let bundle = create_test_bundle();
421+
let bundle = create_bundle_from_txn_data();
426422
let bundle_id = Uuid::new_v4();
427-
let bundle_event = BundleEvent::Received { bundle_id, bundle };
423+
let bundle_event = BundleEvent::Received {
424+
bundle_id,
425+
bundle: Box::new(bundle),
426+
};
428427
let event = create_test_event("duplicate-key", 1234567890, bundle_event);
429428

430429
let result = update_bundle_history_transform(bundle_history, &event);
@@ -437,10 +436,10 @@ mod tests {
437436
let bundle_history = BundleHistory { history: vec![] };
438437
let bundle_id = Uuid::new_v4();
439438

440-
let bundle = create_test_bundle();
439+
let bundle = create_bundle_from_txn_data();
441440
let bundle_event = BundleEvent::Received {
442441
bundle_id,
443-
bundle: bundle.clone(),
442+
bundle: Box::new(bundle),
444443
};
445444
let event = create_test_event("test-key", 1234567890, bundle_event);
446445
let result = update_bundle_history_transform(bundle_history.clone(), &event);

crates/audit/src/types.rs

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
use alloy_consensus::transaction::{SignerRecoverable, Transaction as ConsensusTransaction};
22
use alloy_primitives::{Address, TxHash, U256};
3-
use alloy_provider::network::eip2718::Decodable2718;
43
use bytes::Bytes;
5-
use op_alloy_consensus::OpTxEnvelope;
64
use serde::{Deserialize, Serialize};
7-
use tips_core::Bundle;
5+
use tips_core::AcceptedBundle;
86
use uuid::Uuid;
97

108
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
@@ -33,7 +31,7 @@ pub struct Transaction {
3331
pub enum BundleEvent {
3432
Received {
3533
bundle_id: BundleId,
36-
bundle: Bundle,
34+
bundle: Box<AcceptedBundle>,
3735
},
3836
Cancelled {
3937
bundle_id: BundleId,
@@ -72,19 +70,14 @@ impl BundleEvent {
7270
bundle
7371
.txs
7472
.iter()
75-
.filter_map(|tx_bytes| {
76-
match OpTxEnvelope::decode_2718_exact(tx_bytes.iter().as_slice()) {
77-
Ok(envelope) => {
78-
match envelope.recover_signer() {
79-
Ok(sender) => Some(TransactionId {
80-
sender,
81-
nonce: U256::from(envelope.nonce()),
82-
hash: *envelope.hash(),
83-
}),
84-
Err(_) => None, // Skip invalid transactions
85-
}
86-
}
87-
Err(_) => None, // Skip malformed transactions
73+
.filter_map(|envelope| {
74+
match envelope.recover_signer() {
75+
Ok(sender) => Some(TransactionId {
76+
sender,
77+
nonce: U256::from(envelope.nonce()),
78+
hash: *envelope.hash(),
79+
}),
80+
Err(_) => None, // Skip invalid transactions
8881
}
8982
})
9083
.collect()

crates/audit/tests/integration_tests.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use tips_audit::{
55
storage::{BundleEventS3Reader, S3EventReaderWriter},
66
types::{BundleEvent, DropReason},
77
};
8-
use tips_core::Bundle;
8+
use tips_core::test_utils::create_bundle_from_txn_data;
99
use uuid::Uuid;
1010
mod common;
1111
use common::TestHarness;
@@ -20,10 +20,10 @@ async fn test_kafka_publisher_s3_archiver_integration()
2020
S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone());
2121

2222
let test_bundle_id = Uuid::new_v4();
23-
let test_events = vec![
23+
let test_events = [
2424
BundleEvent::Received {
2525
bundle_id: test_bundle_id,
26-
bundle: Bundle::default(),
26+
bundle: Box::new(create_bundle_from_txn_data()),
2727
},
2828
BundleEvent::Dropped {
2929
bundle_id: test_bundle_id,

crates/audit/tests/s3_test.rs

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,16 @@
1-
use alloy_primitives::{Bytes, TxHash, b256, bytes};
1+
use alloy_primitives::TxHash;
22
use std::sync::Arc;
33
use tips_audit::{
44
reader::Event,
55
storage::{BundleEventS3Reader, EventWriter, S3EventReaderWriter},
66
types::BundleEvent,
77
};
8-
use tips_core::Bundle;
98
use tokio::task::JoinSet;
109
use uuid::Uuid;
1110

1211
mod common;
1312
use common::TestHarness;
14-
15-
// https://basescan.org/tx/0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e
16-
const TXN_DATA: Bytes = bytes!(
17-
"0x02f88f8221058304b6b3018315fb3883124f80948ff2f0a8d017c79454aa28509a19ab9753c2dd1480a476d58e1a0182426068c9ea5b00000000000000000002f84f00000000083e4fda54950000c080a086fbc7bbee41f441fb0f32f7aa274d2188c460fe6ac95095fa6331fa08ec4ce7a01aee3bcc3c28f7ba4e0c24da9ae85e9e0166c73cabb42c25ff7b5ecd424f3105"
18-
);
19-
const TXN_HASH: TxHash =
20-
b256!("0x4f7ddfc911f5cf85dd15a413f4cbb2a0abe4f1ff275ed13581958c0bcf043c5e");
21-
22-
fn create_test_bundle() -> Bundle {
23-
Bundle {
24-
txs: vec![TXN_DATA.clone()],
25-
..Default::default()
26-
}
27-
}
13+
use tips_core::test_utils::{TXN_HASH, create_bundle_from_txn_data};
2814

2915
fn create_test_event(key: &str, timestamp: i64, bundle_event: BundleEvent) -> Event {
3016
Event {
@@ -40,13 +26,13 @@ async fn test_event_write_and_read() -> Result<(), Box<dyn std::error::Error + S
4026
let writer = S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone());
4127

4228
let bundle_id = Uuid::new_v4();
43-
let bundle = create_test_bundle();
29+
let bundle = create_bundle_from_txn_data();
4430
let event = create_test_event(
4531
"test-key-1",
4632
1234567890,
4733
BundleEvent::Received {
4834
bundle_id,
49-
bundle: bundle.clone(),
35+
bundle: Box::new(bundle.clone()),
5036
},
5137
);
5238

@@ -67,13 +53,13 @@ async fn test_event_write_and_read() -> Result<(), Box<dyn std::error::Error + S
6753
}
6854

6955
let bundle_id_two = Uuid::new_v4();
70-
let bundle = create_test_bundle();
56+
let bundle = create_bundle_from_txn_data();
7157
let event = create_test_event(
7258
"test-key-2",
7359
1234567890,
7460
BundleEvent::Received {
7561
bundle_id: bundle_id_two,
76-
bundle: bundle.clone(),
62+
bundle: Box::new(bundle.clone()),
7763
},
7864
);
7965

@@ -96,15 +82,15 @@ async fn test_events_appended() -> Result<(), Box<dyn std::error::Error + Send +
9682
let writer = S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone());
9783

9884
let bundle_id = Uuid::new_v4();
99-
let bundle = create_test_bundle();
85+
let bundle = create_bundle_from_txn_data();
10086

101-
let events = vec![
87+
let events = [
10288
create_test_event(
10389
"test-key-1",
10490
1234567890,
10591
BundleEvent::Received {
10692
bundle_id,
107-
bundle: bundle.clone(),
93+
bundle: Box::new(bundle.clone()),
10894
},
10995
),
11096
create_test_event(
@@ -147,13 +133,13 @@ async fn test_event_deduplication() -> Result<(), Box<dyn std::error::Error + Se
147133
let writer = S3EventReaderWriter::new(harness.s3_client.clone(), harness.bucket_name.clone());
148134

149135
let bundle_id = Uuid::new_v4();
150-
let bundle = create_test_bundle();
136+
let bundle = create_bundle_from_txn_data();
151137
let event = create_test_event(
152138
"duplicate-key",
153139
1234567890,
154140
BundleEvent::Received {
155141
bundle_id,
156-
bundle: bundle.clone(),
142+
bundle: Box::new(bundle.clone()),
157143
},
158144
);
159145

@@ -197,14 +183,14 @@ async fn test_concurrent_writes_for_bundle() -> Result<(), Box<dyn std::error::E
197183
));
198184

199185
let bundle_id = Uuid::new_v4();
200-
let bundle = create_test_bundle();
186+
let bundle = create_bundle_from_txn_data();
201187

202188
let event = create_test_event(
203189
"hello-dan",
204190
1234567889i64,
205191
BundleEvent::Received {
206192
bundle_id,
207-
bundle: bundle.clone(),
193+
bundle: Box::new(bundle.clone()),
208194
},
209195
);
210196

@@ -225,7 +211,7 @@ async fn test_concurrent_writes_for_bundle() -> Result<(), Box<dyn std::error::E
225211
1234567890 + i as i64,
226212
BundleEvent::Received {
227213
bundle_id,
228-
bundle: bundle.clone(),
214+
bundle: Box::new(bundle.clone()),
229215
},
230216
);
231217

crates/bundle-pool/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ use tracing::error;
88

99
pub use pool::{Action, BundleStore, InMemoryBundlePool, ProcessedBundle};
1010
pub use source::KafkaBundleSource;
11-
pub use tips_core::{Bundle, BundleHash, BundleWithMetadata, CancelBundle};
11+
pub use tips_core::{AcceptedBundle, Bundle, CancelBundle};
1212

1313
pub fn connect_sources_to_pool<S, P>(
1414
sources: Vec<S>,
15-
bundle_rx: mpsc::UnboundedReceiver<BundleWithMetadata>,
15+
bundle_rx: mpsc::UnboundedReceiver<AcceptedBundle>,
1616
pool: Arc<Mutex<P>>,
1717
) where
1818
S: BundleSource + Send + 'static,

crates/bundle-pool/src/pool.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use alloy_primitives::map::HashMap;
33
use std::fmt::Debug;
44
use std::sync::{Arc, Mutex};
55
use tips_audit::{BundleEvent, DropReason};
6-
use tips_core::BundleWithMetadata;
6+
use tips_core::AcceptedBundle;
77
use tokio::sync::mpsc;
88
use tracing::warn;
99
use uuid::Uuid;
@@ -31,8 +31,8 @@ impl ProcessedBundle {
3131
}
3232

3333
pub trait BundleStore {
34-
fn add_bundle(&mut self, bundle: BundleWithMetadata);
35-
fn get_bundles(&self) -> Vec<BundleWithMetadata>;
34+
fn add_bundle(&mut self, bundle: AcceptedBundle);
35+
fn get_bundles(&self) -> Vec<AcceptedBundle>;
3636
fn built_flashblock(
3737
&mut self,
3838
block_number: u64,
@@ -44,7 +44,7 @@ pub trait BundleStore {
4444

4545
struct BundleData {
4646
flashblocks_in_block: HashMap<u64, Vec<ProcessedBundle>>,
47-
bundles: HashMap<Uuid, BundleWithMetadata>,
47+
bundles: HashMap<Uuid, AcceptedBundle>,
4848
}
4949

5050
#[derive(Clone)]
@@ -77,12 +77,12 @@ impl InMemoryBundlePool {
7777
}
7878

7979
impl BundleStore for InMemoryBundlePool {
80-
fn add_bundle(&mut self, bundle: BundleWithMetadata) {
80+
fn add_bundle(&mut self, bundle: AcceptedBundle) {
8181
let mut inner = self.inner.lock().unwrap();
8282
inner.bundles.insert(*bundle.uuid(), bundle);
8383
}
8484

85-
fn get_bundles(&self) -> Vec<BundleWithMetadata> {
85+
fn get_bundles(&self) -> Vec<AcceptedBundle> {
8686
let inner = self.inner.lock().unwrap();
8787
inner.bundles.values().cloned().collect()
8888
}

0 commit comments

Comments
 (0)