@@ -12,7 +12,7 @@ use byteorder::{WriteBytesExt, BE};
1212use lzzzz:: lz4:: { self , ACC_LEVEL_DEFAULT } ;
1313use parking_lot:: Mutex ;
1414use rayon:: {
15- iter:: { IndexedParallelIterator , IntoParallelIterator , ParallelIterator } ,
15+ iter:: { Either , IndexedParallelIterator , IntoParallelIterator , ParallelIterator } ,
1616 scope,
1717} ;
1818use smallvec:: SmallVec ;
@@ -40,6 +40,10 @@ struct ThreadLocalState<K: StoreKey + Send, const FAMILIES: usize> {
4040 new_blob_files : Vec < ( u32 , File ) > ,
4141}
4242
43+ const COLLECTOR_SHARDS : usize = 4 ;
44+ const COLLECTOR_SHARD_SHIFT : usize =
45+ u64:: BITS as usize - COLLECTOR_SHARDS . trailing_zeros ( ) as usize ;
46+
4347/// The result of a `WriteBatch::finish` operation.
4448pub ( crate ) struct FinishResult {
4549 pub ( crate ) sequence_number : u32 ,
@@ -49,6 +53,14 @@ pub(crate) struct FinishResult {
4953 pub ( crate ) new_blob_files : Vec < ( u32 , File ) > ,
5054}
5155
56+ enum GlobalCollectorState < K : StoreKey + Send > {
57+ /// Initial state. Single collector. Once the collector is full, we switch to sharded mode.
58+ Unsharded ( Collector < K > ) ,
59+ /// Sharded mode.
60+ /// We use multiple collectors, and select one based on the first bits of the key hash.
61+ Sharded ( [ Collector < K > ; COLLECTOR_SHARDS ] ) ,
62+ }
63+
5264/// A write batch.
5365pub struct WriteBatch < K : StoreKey + Send , const FAMILIES : usize > {
5466 /// The database path
@@ -58,7 +70,7 @@ pub struct WriteBatch<K: StoreKey + Send, const FAMILIES: usize> {
5870 /// The thread local state.
5971 thread_locals : ThreadLocal < UnsafeCell < ThreadLocalState < K , FAMILIES > > > ,
6072 /// Collectors in use. The thread local collectors flush into these when they are full.
61- collectors : [ Mutex < Collector < K > > ; FAMILIES ] ,
73+ collectors : [ Mutex < GlobalCollectorState < K > > ; FAMILIES ] ,
6274 /// The list of new SST files that have been created.
6375 /// Tuple of (sequence number, file).
6476 new_sst_files : Mutex < Vec < ( u32 , File ) > > ,
@@ -78,7 +90,8 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
7890 path,
7991 current_sequence_number : AtomicU32 :: new ( current) ,
8092 thread_locals : ThreadLocal :: new ( ) ,
81- collectors : [ ( ) ; FAMILIES ] . map ( |_| Mutex :: new ( Collector :: new ( ) ) ) ,
93+ collectors : [ ( ) ; FAMILIES ]
94+ . map ( |_| Mutex :: new ( GlobalCollectorState :: Unsharded ( Collector :: new ( ) ) ) ) ,
8295 new_sst_files : Mutex :: new ( Vec :: new ( ) ) ,
8396 idle_collectors : Mutex :: new ( Vec :: new ( ) ) ,
8497 idle_thread_local_collectors : Mutex :: new ( Vec :: new ( ) ) ,
@@ -131,17 +144,39 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
131144 ) -> Result < ( ) > {
132145 let mut full_collectors = SmallVec :: < [ _ ; 2 ] > :: new ( ) ;
133146 {
134- let mut global_collector = self . collectors [ usize_from_u32 ( family) ] . lock ( ) ;
147+ let mut global_collector_state = self . collectors [ usize_from_u32 ( family) ] . lock ( ) ;
135148 for entry in collector. drain ( ) {
136- global_collector. add_entry ( entry) ;
137- if global_collector. is_full ( ) {
138- full_collectors. push ( replace (
139- & mut * global_collector,
140- self . idle_collectors
141- . lock ( )
142- . pop ( )
143- . unwrap_or_else ( || Collector :: new ( ) ) ,
144- ) ) ;
149+ match & mut * global_collector_state {
150+ GlobalCollectorState :: Unsharded ( collector) => {
151+ collector. add_entry ( entry) ;
152+ if collector. is_full ( ) {
153+ // When full, split the entries into shards.
154+ let mut shards: [ Collector < K > ; 4 ] =
155+ [ ( ) ; COLLECTOR_SHARDS ] . map ( |_| Collector :: new ( ) ) ;
156+ for entry in collector. drain ( ) {
157+ let shard = ( entry. key . hash >> COLLECTOR_SHARD_SHIFT ) as usize ;
158+ shards[ shard] . add_entry ( entry) ;
159+ }
160+ // There is a rare edge case where all entries are in the same shard,
161+ // and the collector is full after the split.
162+ for collector in shards. iter_mut ( ) {
163+ if collector. is_full ( ) {
164+ full_collectors
165+ . push ( replace ( & mut * collector, self . get_new_collector ( ) ) ) ;
166+ }
167+ }
168+ * global_collector_state = GlobalCollectorState :: Sharded ( shards) ;
169+ }
170+ }
171+ GlobalCollectorState :: Sharded ( shards) => {
172+ let shard = ( entry. key . hash >> COLLECTOR_SHARD_SHIFT ) as usize ;
173+ let collector = & mut shards[ shard] ;
174+ collector. add_entry ( entry) ;
175+ if collector. is_full ( ) {
176+ full_collectors
177+ . push ( replace ( & mut * collector, self . get_new_collector ( ) ) ) ;
178+ }
179+ }
145180 }
146181 }
147182 }
@@ -155,6 +190,13 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
155190 Ok ( ( ) )
156191 }
157192
193+ fn get_new_collector ( & self ) -> Collector < K > {
194+ self . idle_collectors
195+ . lock ( )
196+ . pop ( )
197+ . unwrap_or_else ( || Collector :: new ( ) )
198+ }
199+
158200 /// Puts a key-value pair into the write batch.
159201 pub fn put ( & self , family : u32 , key : K , value : ValueBuffer < ' _ > ) -> Result < ( ) > {
160202 let state = self . thread_local_state ( ) ;
@@ -217,23 +259,27 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
217259 let mut new_sst_files = take ( self . new_sst_files . get_mut ( ) ) ;
218260 let shared_new_sst_files = Mutex :: new ( & mut new_sst_files) ;
219261
220- let collectors = replace (
221- & mut self . collectors ,
222- [ ( ) ; FAMILIES ] . map ( |_| {
223- Mutex :: new (
224- self . idle_collectors
225- . lock ( )
226- . pop ( )
227- . unwrap_or_else ( || Collector :: new ( ) ) ,
228- )
229- } ) ,
230- ) ;
262+ let new_collectors = [ ( ) ; FAMILIES ]
263+ . map ( |_| Mutex :: new ( GlobalCollectorState :: Unsharded ( self . get_new_collector ( ) ) ) ) ;
264+ let collectors = replace ( & mut self . collectors , new_collectors) ;
231265 collectors
232266 . into_par_iter ( )
233267 . enumerate ( )
234- . try_for_each ( |( family, collector) | {
268+ . flat_map ( |( family, state) | {
269+ let collector = state. into_inner ( ) ;
270+ match collector {
271+ GlobalCollectorState :: Unsharded ( collector) => {
272+ Either :: Left ( [ ( family, collector) ] . into_par_iter ( ) )
273+ }
274+ GlobalCollectorState :: Sharded ( shards) => Either :: Right (
275+ shards
276+ . into_par_iter ( )
277+ . map ( move |collector| ( family, collector) ) ,
278+ ) ,
279+ }
280+ } )
281+ . try_for_each ( |( family, mut collector) | {
235282 let family = family as u32 ;
236- let mut collector = collector. into_inner ( ) ;
237283 if !collector. is_empty ( ) {
238284 let sst = self . create_sst_file ( family, collector. sorted ( ) ) ?;
239285 collector. clear ( ) ;
0 commit comments