1- use std:: {
2- collections:: { HashMap , HashSet } ,
3- path:: Path ,
4- } ;
1+ use std:: { collections:: HashMap , path:: Path } ;
52
63use crate :: state:: { AddressEntry , AddressStorageConfig , UtxoDelta } ;
74use acropolis_common:: { Address , AddressTotals , TxIdentifier , UTxOIdentifier } ;
@@ -16,6 +13,14 @@ const ADDRESS_UTXOS_EPOCH_COUNTER: &[u8] = b"utxos_epoch_last";
1613const ADDRESS_TXS_EPOCH_COUNTER : & [ u8 ] = b"txs_epoch_last" ;
1714const ADDRESS_TOTALS_EPOCH_COUNTER : & [ u8 ] = b"totals_epoch_last" ;
1815
16+ #[ derive( Default ) ]
17+ struct MergedDeltas {
18+ created_utxos : Vec < UTxOIdentifier > ,
19+ spent_utxos : Vec < UTxOIdentifier > ,
20+ txs : Vec < TxIdentifier > ,
21+ totals : AddressTotals ,
22+ }
23+
1924pub struct ImmutableAddressStore {
2025 utxos : Partition ,
2126 txs : Partition ,
@@ -26,7 +31,7 @@ pub struct ImmutableAddressStore {
2631
2732impl ImmutableAddressStore {
2833 pub fn new ( path : impl AsRef < Path > ) -> Result < Self > {
29- let cfg = fjall:: Config :: new ( path) . max_write_buffer_size ( 512 * 1024 * 1024 ) ;
34+ let cfg = fjall:: Config :: new ( path) . max_write_buffer_size ( 512 * 1024 * 1024 ) . temporary ( true ) ;
3035 let keyspace = Keyspace :: open ( cfg) ?;
3136
3237 let utxos = keyspace. open_partition ( "address_utxos" , PartitionCreateOptions :: default ( ) ) ?;
@@ -43,8 +48,8 @@ impl ImmutableAddressStore {
4348 } )
4449 }
4550
46- /// Persists volatile UTxOs, transactions, and totals into their respective Fjall partitions for an entire epoch.
47- /// Skips any partitions that have already stored the given epoch.
51+ /// Persists volatile UTxOs, transactions, and totals into their respective Fjall partitions
52+ /// for an entire epoch. Skips any partitions that have already stored the given epoch.
4853 /// All writes are batched and committed atomically, preventing on-disk corruption in case of failure.
4954 pub async fn persist_epoch ( & self , epoch : u64 , config : & AddressStorageConfig ) -> Result < ( ) > {
5055 let persist_utxos = config. store_info
@@ -55,7 +60,7 @@ impl ImmutableAddressStore {
5560 && !self . epoch_exists ( self . totals . clone ( ) , ADDRESS_TOTALS_EPOCH_COUNTER , epoch) . await ?;
5661
5762 if !( persist_utxos || persist_txs || persist_totals) {
58- debug ! ( "no persistence needed for epoch {epoch} (already persisted or disabled)" , ) ;
63+ debug ! ( "no persistence needed for epoch {epoch} (already persisted or disabled)" ) ;
5964 return Ok ( ( ) ) ;
6065 }
6166
@@ -67,70 +72,50 @@ impl ImmutableAddressStore {
6772 let mut batch = self . keyspace . batch ( ) ;
6873 let mut change_count = 0 ;
6974
70- for block_map in drained_blocks. into_iter ( ) {
71- if block_map. is_empty ( ) {
72- continue ;
73- }
75+ for ( address, deltas) in Self :: merge_block_deltas ( drained_blocks) {
76+ change_count += 1 ;
77+ let addr_key = address. to_bytes_key ( ) ?;
7478
75- for ( addr, entry) in block_map {
76- change_count += 1 ;
77- let addr_key = addr. to_bytes_key ( ) ?;
78-
79- if persist_utxos {
80- let mut live: HashSet < UTxOIdentifier > = self
81- . utxos
82- . get ( & addr_key) ?
83- . map ( |bytes| decode ( & bytes) )
84- . transpose ( ) ?
85- . unwrap_or_default ( ) ;
86-
87- if let Some ( deltas) = & entry. utxos {
88- for delta in deltas {
89- match delta {
90- UtxoDelta :: Created ( u) => {
91- live. insert ( * u) ;
92- }
93- UtxoDelta :: Spent ( u) => {
94- live. remove ( u) ;
95- }
96- }
97- }
98- }
79+ if persist_utxos && ( !deltas. created_utxos . is_empty ( ) || !deltas. spent_utxos . is_empty ( ) )
80+ {
81+ let mut live: Vec < UTxOIdentifier > = self
82+ . utxos
83+ . get ( & addr_key) ?
84+ . map ( |bytes| decode ( & bytes) )
85+ . transpose ( ) ?
86+ . unwrap_or_default ( ) ;
9987
100- batch. insert ( & self . utxos , & addr_key, to_vec ( & live) ?) ;
88+ live. extend ( & deltas. created_utxos ) ;
89+
90+ for u in & deltas. spent_utxos {
91+ live. retain ( |x| x != u) ;
10192 }
10293
103- if persist_txs {
104- let mut live: Vec < TxIdentifier > = self
105- . txs
106- . get ( & addr_key) ?
107- . map ( |bytes| decode ( & bytes) )
108- . transpose ( ) ?
109- . unwrap_or_default ( ) ;
94+ batch. insert ( & self . utxos , & addr_key, to_vec ( & live) ?) ;
95+ }
11096
111- if let Some ( txs_deltas) = & entry. transactions {
112- live. extend ( txs_deltas. iter ( ) . cloned ( ) ) ;
113- }
97+ if persist_txs && !deltas. txs . is_empty ( ) {
98+ let mut live: Vec < TxIdentifier > = self
99+ . txs
100+ . get ( & addr_key) ?
101+ . map ( |bytes| decode ( & bytes) )
102+ . transpose ( ) ?
103+ . unwrap_or_default ( ) ;
114104
115- batch. insert ( & self . txs , & addr_key, to_vec ( & live) ?) ;
116- }
105+ live. extend ( deltas. txs . iter ( ) . cloned ( ) ) ;
106+ batch. insert ( & self . txs , & addr_key, to_vec ( & live) ?) ;
107+ }
117108
118- if persist_totals {
119- let mut live: AddressTotals = self
120- . totals
121- . get ( & addr_key) ?
122- . map ( |bytes| decode ( & bytes) )
123- . transpose ( ) ?
124- . unwrap_or_default ( ) ;
125-
126- if let Some ( deltas) = & entry. totals {
127- for delta in deltas {
128- live. apply_delta ( delta) ;
129- }
130- }
109+ if persist_totals && deltas. totals . tx_count != 0 {
110+ let mut live: AddressTotals = self
111+ . totals
112+ . get ( & addr_key) ?
113+ . map ( |bytes| decode ( & bytes) )
114+ . transpose ( ) ?
115+ . unwrap_or_default ( ) ;
131116
132- batch . insert ( & self . totals , & addr_key , to_vec ( & live ) ? ) ;
133- }
117+ live += deltas . totals ;
118+ batch . insert ( & self . totals , & addr_key , to_vec ( & live ) ? ) ;
134119 }
135120 }
136121
@@ -173,7 +158,7 @@ impl ImmutableAddressStore {
173158 pub async fn get_utxos ( & self , address : & Address ) -> Result < Option < Vec < UTxOIdentifier > > > {
174159 let key = address. to_bytes_key ( ) ?;
175160
176- let mut live: HashSet < UTxOIdentifier > =
161+ let mut live: Vec < UTxOIdentifier > =
177162 self . utxos . get ( & key) ?. map ( |bytes| decode ( & bytes) ) . transpose ( ) ?. unwrap_or_default ( ) ;
178163
179164 let pending = self . pending . lock ( ) . await ;
@@ -182,12 +167,8 @@ impl ImmutableAddressStore {
182167 if let Some ( deltas) = & entry. utxos {
183168 for delta in deltas {
184169 match delta {
185- UtxoDelta :: Created ( u) => {
186- live. insert ( * u) ;
187- }
188- UtxoDelta :: Spent ( u) => {
189- live. remove ( u) ;
190- }
170+ UtxoDelta :: Created ( u) => live. push ( * u) ,
171+ UtxoDelta :: Spent ( u) => live. retain ( |x| x != u) ,
191172 }
192173 }
193174 }
@@ -197,8 +178,7 @@ impl ImmutableAddressStore {
197178 if live. is_empty ( ) {
198179 Ok ( None )
199180 } else {
200- let vec: Vec < _ > = live. into_iter ( ) . collect ( ) ;
201- Ok ( Some ( vec) )
181+ Ok ( Some ( live) )
202182 }
203183 }
204184
@@ -311,4 +291,46 @@ impl ImmutableAddressStore {
311291
312292 Ok ( exists)
313293 }
294+
295+ fn merge_block_deltas (
296+ drained_blocks : Vec < HashMap < Address , AddressEntry > > ,
297+ ) -> HashMap < Address , MergedDeltas > {
298+ let mut merged = HashMap :: new ( ) ;
299+
300+ for block_map in drained_blocks {
301+ for ( addr, entry) in block_map {
302+ let target = merged. entry ( addr. clone ( ) ) . or_insert_with ( MergedDeltas :: default) ;
303+
304+ // Remove UTxOs that are spent in the same epoch
305+ if let Some ( deltas) = & entry. utxos {
306+ for delta in deltas {
307+ match delta {
308+ UtxoDelta :: Created ( u) => target. created_utxos . push ( * u) ,
309+ UtxoDelta :: Spent ( u) => {
310+ if target. created_utxos . contains ( u) {
311+ target. created_utxos . retain ( |x| x != u) ;
312+ } else {
313+ target. spent_utxos . push ( * u) ;
314+ }
315+ }
316+ }
317+ }
318+ }
319+
320+ // Merge Tx vectors
321+ if let Some ( txs) = & entry. transactions {
322+ target. txs . extend ( txs. iter ( ) . cloned ( ) ) ;
323+ }
324+
325+ // Sum totals
326+ if let Some ( totals) = & entry. totals {
327+ for delta in totals {
328+ target. totals . apply_delta ( delta) ;
329+ }
330+ }
331+ }
332+ }
333+
334+ merged
335+ }
314336}
0 commit comments