@@ -82,10 +82,6 @@ pub struct WriteBatch<K: StoreKey + Send, const FAMILIES: usize> {
8282 /// The list of new SST files that have been created.
8383 /// Tuple of (sequence number, file).
8484 new_sst_files : Mutex < Vec < ( u32 , File ) > > ,
85- /// Collectors that are currently unused, but have memory preallocated.
86- idle_collectors : Mutex < Vec < Collector < K > > > ,
87- /// Collectors that are currently unused, but have memory preallocated.
88- idle_thread_local_collectors : Mutex < Vec < Collector < K , THREAD_LOCAL_SIZE_SHIFT > > > ,
8985}
9086
9187impl < K : StoreKey + Send + Sync , const FAMILIES : usize > WriteBatch < K , FAMILIES > {
@@ -102,18 +98,9 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
10298 . map ( |_| Mutex :: new ( GlobalCollectorState :: Unsharded ( Collector :: new ( ) ) ) ) ,
10399 meta_collectors : [ ( ) ; FAMILIES ] . map ( |_| Mutex :: new ( Vec :: new ( ) ) ) ,
104100 new_sst_files : Mutex :: new ( Vec :: new ( ) ) ,
105- idle_collectors : Mutex :: new ( Vec :: new ( ) ) ,
106- idle_thread_local_collectors : Mutex :: new ( Vec :: new ( ) ) ,
107101 }
108102 }
109103
110- /// Resets the write batch to a new sequence number. This is called when the WriteBatch is
111- /// reused.
112- pub ( crate ) fn reset ( & mut self , current : u32 ) {
113- self . current_sequence_number
114- . store ( current, Ordering :: SeqCst ) ;
115- }
116-
117104 /// Returns the thread local state for the current thread.
118105 #[ allow( clippy:: mut_from_ref) ]
119106 fn thread_local_state ( & self ) -> & mut ThreadLocalState < K , FAMILIES > {
@@ -134,12 +121,8 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
134121 family : u32 ,
135122 ) -> Result < & ' l mut Collector < K , THREAD_LOCAL_SIZE_SHIFT > > {
136123 debug_assert ! ( usize_from_u32( family) < FAMILIES ) ;
137- let collector = state. collectors [ usize_from_u32 ( family) ] . get_or_insert_with ( || {
138- self . idle_thread_local_collectors
139- . lock ( )
140- . pop ( )
141- . unwrap_or_else ( || Collector :: new ( ) )
142- } ) ;
124+ let collector =
125+ state. collectors [ usize_from_u32 ( family) ] . get_or_insert_with ( || Collector :: new ( ) ) ;
143126 if collector. is_full ( ) {
144127 self . flush_thread_local_collector ( family, collector) ?;
145128 }
@@ -172,7 +155,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
172155 for collector in shards. iter_mut ( ) {
173156 if collector. is_full ( ) {
174157 full_collectors
175- . push ( replace ( & mut * collector, self . get_new_collector ( ) ) ) ;
158+ . push ( replace ( & mut * collector, Collector :: new ( ) ) ) ;
176159 }
177160 }
178161 * global_collector_state = GlobalCollectorState :: Sharded ( shards) ;
@@ -183,8 +166,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
183166 let collector = & mut shards[ shard] ;
184167 collector. add_entry ( entry) ;
185168 if collector. is_full ( ) {
186- full_collectors
187- . push ( replace ( & mut * collector, self . get_new_collector ( ) ) ) ;
169+ full_collectors. push ( replace ( & mut * collector, Collector :: new ( ) ) ) ;
188170 }
189171 }
190172 }
@@ -193,28 +175,12 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
193175 for mut global_collector in full_collectors {
194176 // When the global collector is full, we create a new SST file.
195177 let sst = self . create_sst_file ( family, global_collector. sorted ( ) ) ?;
196- global_collector. clear ( ) ;
197178 self . new_sst_files . lock ( ) . push ( sst) ;
198- self . dispose_collector ( global_collector) ;
179+ drop ( global_collector) ;
199180 }
200181 Ok ( ( ) )
201182 }
202183
203- fn get_new_collector ( & self ) -> Collector < K > {
204- self . idle_collectors
205- . lock ( )
206- . pop ( )
207- . unwrap_or_else ( || Collector :: new ( ) )
208- }
209-
210- fn dispose_collector ( & self , collector : Collector < K > ) {
211- self . idle_collectors . lock ( ) . push ( collector) ;
212- }
213-
214- fn dispose_thread_local_collector ( & self , collector : Collector < K , THREAD_LOCAL_SIZE_SHIFT > ) {
215- self . idle_thread_local_collectors . lock ( ) . push ( collector) ;
216- }
217-
218184 /// Puts a key-value pair into the write batch.
219185 pub fn put ( & self , family : u32 , key : K , value : ValueBuffer < ' _ > ) -> Result < ( ) > {
220186 let state = self . thread_local_state ( ) ;
@@ -261,7 +227,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
261227 collectors. into_par_iter ( ) . try_for_each ( |mut collector| {
262228 let _span = span. clone ( ) . entered ( ) ;
263229 self . flush_thread_local_collector ( family, & mut collector) ?;
264- self . dispose_thread_local_collector ( collector) ;
230+ drop ( collector) ;
265231 anyhow:: Ok ( ( ) )
266232 } ) ?;
267233
@@ -278,7 +244,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
278244 GlobalCollectorState :: Sharded ( _) => {
279245 let GlobalCollectorState :: Sharded ( shards) = replace (
280246 & mut * collector_state,
281- GlobalCollectorState :: Unsharded ( self . get_new_collector ( ) ) ,
247+ GlobalCollectorState :: Unsharded ( Collector :: new ( ) ) ,
282248 ) else {
283249 unreachable ! ( ) ;
284250 } ;
@@ -288,7 +254,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
288254 let sst = self . create_sst_file ( family, collector. sorted ( ) ) ?;
289255 collector. clear ( ) ;
290256 self . new_sst_files . lock ( ) . push ( sst) ;
291- self . dispose_collector ( collector) ;
257+ drop ( collector) ;
292258 }
293259 anyhow:: Ok ( ( ) )
294260 } ) ?;
@@ -332,7 +298,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
332298 {
333299 * shared_error. lock ( ) = Err ( err) ;
334300 }
335- this . dispose_thread_local_collector ( collector) ;
301+ drop ( collector) ;
336302 } ) ;
337303 }
338304 }
@@ -344,8 +310,8 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
344310 let mut new_sst_files = take ( self . new_sst_files . get_mut ( ) ) ;
345311 let shared_new_sst_files = Mutex :: new ( & mut new_sst_files) ;
346312
347- let new_collectors = [ ( ) ; FAMILIES ]
348- . map ( |_| Mutex :: new ( GlobalCollectorState :: Unsharded ( self . get_new_collector ( ) ) ) ) ;
313+ let new_collectors =
314+ [ ( ) ; FAMILIES ] . map ( |_| Mutex :: new ( GlobalCollectorState :: Unsharded ( Collector :: new ( ) ) ) ) ;
349315 let collectors = replace ( & mut self . collectors , new_collectors) ;
350316 let span = Span :: current ( ) ;
351317 collectors
@@ -370,7 +336,7 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
370336 if !collector. is_empty ( ) {
371337 let sst = self . create_sst_file ( family, collector. sorted ( ) ) ?;
372338 collector. clear ( ) ;
373- self . dispose_collector ( collector) ;
339+ drop ( collector) ;
374340 shared_new_sst_files. lock ( ) . push ( sst) ;
375341 }
376342 anyhow:: Ok ( ( ) )
0 commit comments