@@ -12,7 +12,7 @@ use byteorder::{WriteBytesExt, BE};
1212use lzzzz:: lz4:: { self , ACC_LEVEL_DEFAULT } ;
1313use parking_lot:: Mutex ;
1414use rayon:: {
15- iter:: { IndexedParallelIterator , IntoParallelIterator , ParallelIterator } ,
15+ iter:: { Either , IndexedParallelIterator , IntoParallelIterator , ParallelIterator } ,
1616 scope,
1717} ;
1818use smallvec:: SmallVec ;
@@ -39,13 +39,25 @@ struct ThreadLocalState<K: StoreKey + Send, const FAMILIES: usize> {
3939 new_blob_files : Vec < ( u32 , File ) > ,
4040}
4141
42+ const COLLECTOR_SHARDS : usize = 4 ;
43+ const COLLECTOR_SHARD_SHIFT : usize =
44+ u64:: BITS as usize - COLLECTOR_SHARDS . trailing_zeros ( ) as usize ;
45+
4246/// The result of a `WriteBatch::finish` operation.
4347pub ( crate ) struct FinishResult {
4448 pub ( crate ) sequence_number : u32 ,
4549 pub ( crate ) new_sst_files : Vec < ( u32 , File ) > ,
4650 pub ( crate ) new_blob_files : Vec < ( u32 , File ) > ,
4751}
4852
53+ enum GlobalCollectorState < K : StoreKey + Send > {
54+ /// Initial state. Single collector. Once the collector is full, we switch to sharded mode.
55+ Unsharded ( Collector < K > ) ,
56+ /// Sharded mode.
57+ /// We use multiple collectors, and select one based on the first bits of the key hash.
58+ Sharded ( [ Collector < K > ; COLLECTOR_SHARDS ] ) ,
59+ }
60+
4961/// A write batch.
5062pub struct WriteBatch < K : StoreKey + Send , const FAMILIES : usize > {
5163 /// The database path
@@ -55,7 +67,7 @@ pub struct WriteBatch<K: StoreKey + Send, const FAMILIES: usize> {
5567 /// The thread local state.
5668 thread_locals : ThreadLocal < UnsafeCell < ThreadLocalState < K , FAMILIES > > > ,
5769 /// Collectors in use. The thread local collectors flush into these when they are full.
58- collectors : [ Mutex < Collector < K > > ; FAMILIES ] ,
70+ collectors : [ Mutex < GlobalCollectorState < K > > ; FAMILIES ] ,
5971 /// The list of new SST files that have been created.
6072 new_sst_files : Mutex < Vec < ( u32 , File ) > > ,
6173 /// Collectors are are current unused, but have memory preallocated.
@@ -74,7 +86,8 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
7486 path,
7587 current_sequence_number : AtomicU32 :: new ( current) ,
7688 thread_locals : ThreadLocal :: new ( ) ,
77- collectors : [ ( ) ; FAMILIES ] . map ( |_| Mutex :: new ( Collector :: new ( ) ) ) ,
89+ collectors : [ ( ) ; FAMILIES ]
90+ . map ( |_| Mutex :: new ( GlobalCollectorState :: Unsharded ( Collector :: new ( ) ) ) ) ,
7891 new_sst_files : Mutex :: new ( Vec :: new ( ) ) ,
7992 idle_collectors : Mutex :: new ( Vec :: new ( ) ) ,
8093 idle_thread_local_collectors : Mutex :: new ( Vec :: new ( ) ) ,
@@ -127,17 +140,39 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
127140 ) -> Result < ( ) > {
128141 let mut full_collectors = SmallVec :: < [ _ ; 2 ] > :: new ( ) ;
129142 {
130- let mut global_collector = self . collectors [ usize_from_u32 ( family) ] . lock ( ) ;
143+ let mut global_collector_state = self . collectors [ usize_from_u32 ( family) ] . lock ( ) ;
131144 for entry in collector. drain ( ) {
132- global_collector. add_entry ( entry) ;
133- if global_collector. is_full ( ) {
134- full_collectors. push ( replace (
135- & mut * global_collector,
136- self . idle_collectors
137- . lock ( )
138- . pop ( )
139- . unwrap_or_else ( || Collector :: new ( ) ) ,
140- ) ) ;
145+ match & mut * global_collector_state {
146+ GlobalCollectorState :: Unsharded ( collector) => {
147+ collector. add_entry ( entry) ;
148+ if collector. is_full ( ) {
149+ // When full, split the entries into shards.
150+ let mut shards: [ Collector < K > ; 4 ] =
151+ [ ( ) ; COLLECTOR_SHARDS ] . map ( |_| Collector :: new ( ) ) ;
152+ for entry in collector. drain ( ) {
153+ let shard = ( entry. key . hash >> COLLECTOR_SHARD_SHIFT ) as usize ;
154+ shards[ shard] . add_entry ( entry) ;
155+ }
156+ // There is a rare edge case where all entries are in the same shard,
157+ // and the collector is full after the split.
158+ for collector in shards. iter_mut ( ) {
159+ if collector. is_full ( ) {
160+ full_collectors
161+ . push ( replace ( & mut * collector, self . get_new_collector ( ) ) ) ;
162+ }
163+ }
164+ * global_collector_state = GlobalCollectorState :: Sharded ( shards) ;
165+ }
166+ }
167+ GlobalCollectorState :: Sharded ( shards) => {
168+ let shard = ( entry. key . hash >> COLLECTOR_SHARD_SHIFT ) as usize ;
169+ let collector = & mut shards[ shard] ;
170+ collector. add_entry ( entry) ;
171+ if collector. is_full ( ) {
172+ full_collectors
173+ . push ( replace ( & mut * collector, self . get_new_collector ( ) ) ) ;
174+ }
175+ }
141176 }
142177 }
143178 }
@@ -151,6 +186,13 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
151186 Ok ( ( ) )
152187 }
153188
189+ fn get_new_collector ( & self ) -> Collector < K > {
190+ self . idle_collectors
191+ . lock ( )
192+ . pop ( )
193+ . unwrap_or_else ( || Collector :: new ( ) )
194+ }
195+
154196 /// Puts a key-value pair into the write batch.
155197 pub fn put ( & self , family : u32 , key : K , value : ValueBuffer < ' _ > ) -> Result < ( ) > {
156198 let state = self . thread_local_state ( ) ;
@@ -213,23 +255,27 @@ impl<K: StoreKey + Send + Sync, const FAMILIES: usize> WriteBatch<K, FAMILIES> {
213255 // Now we reduce the global collectors in parallel
214256 let shared_new_sst_files = Mutex :: new ( & mut new_sst_files) ;
215257
216- let collectors = replace (
217- & mut self . collectors ,
218- [ ( ) ; FAMILIES ] . map ( |_| {
219- Mutex :: new (
220- self . idle_collectors
221- . lock ( )
222- . pop ( )
223- . unwrap_or_else ( || Collector :: new ( ) ) ,
224- )
225- } ) ,
226- ) ;
258+ let new_collectors = [ ( ) ; FAMILIES ]
259+ . map ( |_| Mutex :: new ( GlobalCollectorState :: Unsharded ( self . get_new_collector ( ) ) ) ) ;
260+ let collectors = replace ( & mut self . collectors , new_collectors) ;
227261 collectors
228262 . into_par_iter ( )
229263 . enumerate ( )
230- . try_for_each ( |( family, collector) | {
264+ . flat_map ( |( family, state) | {
265+ let collector = state. into_inner ( ) ;
266+ match collector {
267+ GlobalCollectorState :: Unsharded ( collector) => {
268+ Either :: Left ( [ ( family, collector) ] . into_par_iter ( ) )
269+ }
270+ GlobalCollectorState :: Sharded ( shards) => Either :: Right (
271+ shards
272+ . into_par_iter ( )
273+ . map ( move |collector| ( family, collector) ) ,
274+ ) ,
275+ }
276+ } )
277+ . try_for_each ( |( family, mut collector) | {
231278 let family = family as u32 ;
232- let mut collector = collector. into_inner ( ) ;
233279 if !collector. is_empty ( ) {
234280 let sst = self . create_sst_file ( family, collector. sorted ( ) ) ?;
235281 collector. clear ( ) ;
0 commit comments