-
Notifications
You must be signed in to change notification settings - Fork 464
/
commit.go
519 lines (466 loc) · 19.8 KB
/
commit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
// Copyright 2018 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"runtime"
"sync"
"sync/atomic"
"github.com/cockroachdb/crlib/crtime"
"github.com/cockroachdb/pebble/batchrepr"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/record"
)
// commitQueue is a lock-free fixed-size single-producer, multi-consumer
// queue. The single producer can enqueue (push) to the head, and consumers can
// dequeue (pop) from the tail.
//
// It has the added feature that it nils out unused slots to avoid unnecessary
// retention of objects.
type commitQueue struct {
// headTail packs together a 32-bit head index and a 32-bit tail index. Both
// are indexes into slots modulo len(slots)-1.
//
// tail = index of oldest data in queue
// head = index of next slot to fill
//
// Slots in the range [tail, head) are owned by consumers. A consumer
// continues to own a slot outside this range until it nils the slot, at
// which point ownership passes to the producer.
//
// The head index is stored in the most-significant bits so that we can
// atomically add to it and the overflow is harmless.
headTail atomic.Uint64
// slots is a ring buffer of values stored in this queue. The size must be a
// power of 2. A slot is in use until *both* the tail index has moved beyond
// it and the slot value has been set to nil. The slot value is set to nil
// atomically by the consumer and read atomically by the producer.
slots [record.SyncConcurrency]atomic.Pointer[Batch]
}
const dequeueBits = 32
func (q *commitQueue) unpack(ptrs uint64) (head, tail uint32) {
const mask = 1<<dequeueBits - 1
head = uint32((ptrs >> dequeueBits) & mask)
tail = uint32(ptrs & mask)
return
}
func (q *commitQueue) pack(head, tail uint32) uint64 {
const mask = 1<<dequeueBits - 1
return (uint64(head) << dequeueBits) |
uint64(tail&mask)
}
func (q *commitQueue) enqueue(b *Batch) {
ptrs := q.headTail.Load()
head, tail := q.unpack(ptrs)
if (tail+uint32(len(q.slots)))&(1<<dequeueBits-1) == head {
// Queue is full. This should never be reached because commitPipeline.commitQueueSem
// limits the number of concurrent operations.
panic("pebble: not reached")
}
slot := &q.slots[head&uint32(len(q.slots)-1)]
// Check if the head slot has been released by dequeueApplied.
for slot.Load() != nil {
// Another goroutine is still cleaning up the tail, so the queue is
// actually still full. We spin because this should resolve itself
// momentarily.
runtime.Gosched()
}
// The head slot is free, so we own it.
slot.Store(b)
// Increment head. This passes ownership of slot to dequeueApplied and acts as a
// store barrier for writing the slot.
q.headTail.Add(1 << dequeueBits)
}
// dequeueApplied removes the earliest enqueued Batch, if it is applied.
//
// Returns nil if the commit queue is empty or the earliest Batch is not yet
// applied.
func (q *commitQueue) dequeueApplied() *Batch {
for {
ptrs := q.headTail.Load()
head, tail := q.unpack(ptrs)
if tail == head {
// Queue is empty.
return nil
}
slot := &q.slots[tail&uint32(len(q.slots)-1)]
b := slot.Load()
if b == nil || !b.applied.Load() {
// The batch is not ready to be dequeued, or another goroutine has
// already dequeued it.
return nil
}
// Confirm head and tail (for our speculative check above) and increment
// tail. If this succeeds, then we own the slot at tail.
ptrs2 := q.pack(head, tail+1)
if q.headTail.CompareAndSwap(ptrs, ptrs2) {
// We now own slot.
//
// Tell enqueue that we're done with this slot. Zeroing the slot is also
// important so we don't leave behind references that could keep this object
// live longer than necessary.
slot.Store(nil)
// At this point enqueue owns the slot.
return b
}
}
}
// commitEnv contains the environment that a commitPipeline interacts
// with. This allows fine-grained testing of commitPipeline behavior without
// construction of an entire DB.
type commitEnv struct {
// The next sequence number to give to a batch. Protected by
// commitPipeline.mu.
logSeqNum *base.AtomicSeqNum
// The visible sequence number at which reads should be performed. Ratcheted
// upwards atomically as batches are applied to the memtable.
visibleSeqNum *base.AtomicSeqNum
// Apply the batch to the specified memtable. Called concurrently.
apply func(b *Batch, mem *memTable) error
// Write the batch to the WAL. If wg != nil, the data will be persisted
// asynchronously and done will be called on wg upon completion. If wg != nil
// and err != nil, a failure to persist the WAL will populate *err. Returns
// the memtable the batch should be applied to. Serial execution enforced by
// commitPipeline.mu.
write func(b *Batch, wg *sync.WaitGroup, err *error) (*memTable, error)
}
// A commitPipeline manages the stages of committing a set of mutations
// (contained in a single Batch) atomically to the DB. The steps are
// conceptually:
//
// 1. Write the batch to the WAL and optionally sync the WAL
// 2. Apply the mutations in the batch to the memtable
//
// These two simple steps are made complicated by the desire for high
// performance. In the absence of concurrency, performance is limited by how
// fast a batch can be written (and synced) to the WAL and then added to the
// memtable, both of which are outside the purview of the commit
// pipeline. Performance under concurrency is the primary concern of the commit
// pipeline, though it also needs to maintain two invariants:
//
// 1. Batches need to be written to the WAL in sequence number order.
// 2. Batches need to be made visible for reads in sequence number order. This
// invariant arises from the use of a single sequence number which
// indicates which mutations are visible.
//
// Taking these invariants into account, let's revisit the work the commit
// pipeline needs to perform. Writing the batch to the WAL is necessarily
// serialized as there is a single WAL object. The order of the entries in the
// WAL defines the sequence number order. Note that writing to the WAL is
// extremely fast, usually just a memory copy. Applying the mutations in a
// batch to the memtable can occur concurrently as the underlying skiplist
// supports concurrent insertions. Publishing the visible sequence number is
// another serialization point, but one with a twist: the visible sequence
// number cannot be bumped until the mutations for earlier batches have
// finished applying to the memtable (the visible sequence number only ratchets
// up). Lastly, if requested, the commit waits for the WAL to sync. Note that
// waiting for the WAL sync after ratcheting the visible sequence number allows
// another goroutine to read committed data before the WAL has synced. This is
// similar behavior to RocksDB's manual WAL flush functionality. Application
// code needs to protect against this if necessary.
//
// The full outline of the commit pipeline operation is as follows:
//
// with commitPipeline mutex locked:
// assign batch sequence number
// write batch to WAL
// (optionally) add batch to WAL sync list
// apply batch to memtable (concurrently)
// wait for earlier batches to apply
// ratchet read sequence number
// (optionally) wait for the WAL to sync
//
// As soon as a batch has been written to the WAL, the commitPipeline mutex is
// released allowing another batch to write to the WAL. Each commit operation
// individually applies its batch to the memtable providing concurrency. The
// WAL sync happens concurrently with applying to the memtable (see
// commitPipeline.syncLoop).
//
// The "waits for earlier batches to apply" work is more complicated than might
// be expected. The obvious approach would be to keep a queue of pending
// batches and for each batch to wait for the previous batch to finish
// committing. This approach was tried initially and turned out to be too
// slow. The problem is that it causes excessive goroutine activity as each
// committing goroutine needs to wake up in order for the next goroutine to be
// unblocked. The approach taken in the current code is conceptually similar,
// though it avoids waking a goroutine to perform work that another goroutine
// can perform. A commitQueue (a single-producer, multiple-consumer queue)
// holds the ordered list of committing batches. Addition to the queue is done
// while holding commitPipeline.mutex ensuring the same ordering of batches in
// the queue as the ordering in the WAL. When a batch finishes applying to the
// memtable, it atomically updates its Batch.applied field. Ratcheting of the
// visible sequence number is done by commitPipeline.publish which loops
// dequeueing "applied" batches and ratcheting the visible sequence number. If
// we hit an unapplied batch at the head of the queue we can block as we know
// that committing of that unapplied batch will eventually find our (applied)
// batch in the queue. See commitPipeline.publish for additional commentary.
type commitPipeline struct {
// WARNING: The following struct `commitQueue` contains fields which will
// be accessed atomically.
//
// Go allocations are guaranteed to be 64-bit aligned which we take advantage
// of by placing the 64-bit fields which we access atomically at the beginning
// of the commitPipeline struct.
// For more information, see https://golang.org/pkg/sync/atomic/#pkg-note-BUG.
// Queue of pending batches to commit.
pending commitQueue
env commitEnv
// The commit path has two queues:
// - commitPipeline.pending contains batches whose seqnums have not yet been
// published. It is a lock-free single producer multi consumer queue.
// - LogWriter.flusher.syncQ contains state for batches that have asked for
// a sync. It is a lock-free single producer single consumer queue.
// These lock-free queues have a fixed capacity. And since they are
// lock-free, we cannot do blocking waits when pushing onto these queues, in
// case they are full. Additionally, adding to these queues happens while
// holding commitPipeline.mu, and we don't want to block while holding that
// mutex since it is also needed by other code.
//
// Popping from these queues is independent and for a particular batch can
// occur in either order, though it is more common that popping from the
// commitPipeline.pending will happen first.
//
// Due to these constraints, we reserve a unit of space in each queue before
// acquiring commitPipeline.mu, which also ensures that the push operation
// is guaranteed to have space in the queue. The commitQueueSem and
// logSyncQSem are used for this reservation.
commitQueueSem chan struct{}
logSyncQSem chan struct{}
ingestSem chan struct{}
// The mutex to use for synchronizing access to logSeqNum and serializing
// calls to commitEnv.write().
mu sync.Mutex
}
func newCommitPipeline(env commitEnv) *commitPipeline {
p := &commitPipeline{
env: env,
// The capacity of both commitQueue.slots and syncQueue.slots is set to
// record.SyncConcurrency, which also determines the value of these
// semaphores. We used to have a single semaphore, which required that the
// capacity of these queues be the same. Now that we have two semaphores,
// the capacity of these queues could be changed to be different. Say half
// of the batches asked to be synced, but syncing took 5x the latency of
// adding to the memtable and publishing. Then syncQueue.slots could be
// sized as 0.5*5 of the commitQueue.slots. We can explore this if we find
// that LogWriterMetrics.SyncQueueLen has high utilization under some
// workloads.
//
// NB: the commit concurrency is one less than SyncConcurrency because we
// have to allow one "slot" for a concurrent WAL rotation which will close
// and sync the WAL.
commitQueueSem: make(chan struct{}, record.SyncConcurrency-1),
logSyncQSem: make(chan struct{}, record.SyncConcurrency-1),
ingestSem: make(chan struct{}, 1),
}
return p
}
// directWrite is used to directly write to the WAL. commitPipeline.mu must be
// held while this is called. DB.mu must not be held. directWrite will only
// return once the WAL sync is complete. Note that DirectWrite is a special case
// function which is currently only used when ingesting sstables as a flushable.
// Reason carefully about the correctness argument when calling this function
// from any context.
func (p *commitPipeline) directWrite(b *Batch) error {
var syncWG sync.WaitGroup
var syncErr error
syncWG.Add(1)
p.logSyncQSem <- struct{}{}
_, err := p.env.write(b, &syncWG, &syncErr)
syncWG.Wait()
err = firstError(err, syncErr)
return err
}
// Commit the specified batch, writing it to the WAL, optionally syncing the
// WAL, and applying the batch to the memtable. Upon successful return the
// batch's mutations will be visible for reading.
// REQUIRES: noSyncWait => syncWAL
func (p *commitPipeline) Commit(b *Batch, syncWAL bool, noSyncWait bool) error {
if b.Empty() {
return nil
}
commitStartTime := crtime.NowMono()
// Acquire semaphores.
p.commitQueueSem <- struct{}{}
if syncWAL {
p.logSyncQSem <- struct{}{}
}
b.commitStats.SemaphoreWaitDuration = commitStartTime.Elapsed()
// Prepare the batch for committing: enqueuing the batch in the pending
// queue, determining the batch sequence number and writing the data to the
// WAL.
//
// NB: We set Batch.commitErr on error so that the batch won't be a candidate
// for reuse. See Batch.release().
mem, err := p.prepare(b, syncWAL, noSyncWait)
if err != nil {
b.db = nil // prevent batch reuse on error
// NB: we are not doing <-p.commitQueueSem since the batch is still
// sitting in the pending queue. We should consider fixing this by also
// removing the batch from the pending queue.
return err
}
// Apply the batch to the memtable.
if err := p.env.apply(b, mem); err != nil {
b.db = nil // prevent batch reuse on error
// NB: we are not doing <-p.commitQueueSem since the batch is still
// sitting in the pending queue. We should consider fixing this by also
// removing the batch from the pending queue.
return err
}
// Publish the batch sequence number.
p.publish(b)
<-p.commitQueueSem
if !noSyncWait {
// Already waited for commit, so look at the error.
if b.commitErr != nil {
b.db = nil // prevent batch reuse on error
err = b.commitErr
}
}
// Else noSyncWait. The LogWriter can be concurrently writing to
// b.commitErr. We will read b.commitErr in Batch.SyncWait after the
// LogWriter is done writing.
b.commitStats.TotalDuration = commitStartTime.Elapsed()
return err
}
// AllocateSeqNum allocates count sequence numbers, invokes the prepare
// callback, then the apply callback, and then publishes the sequence
// numbers. AllocateSeqNum does not write to the WAL or add entries to the
// memtable. AllocateSeqNum can be used to sequence an operation such as
// sstable ingestion within the commit pipeline. The prepare callback is
// invoked with commitPipeline.mu held, but note that DB.mu is not held and
// must be locked if necessary.
func (p *commitPipeline) AllocateSeqNum(
count int, prepare func(seqNum base.SeqNum), apply func(seqNum base.SeqNum),
) {
// This method is similar to Commit and prepare. Be careful about trying to
// share additional code with those methods because Commit and prepare are
// performance critical code paths.
b := newBatch(nil)
defer b.Close()
// Give the batch a count of 1 so that the log and visible sequence number
// are incremented correctly.
b.data = make([]byte, batchrepr.HeaderLen)
b.setCount(uint32(count))
b.commit.Add(1)
p.commitQueueSem <- struct{}{}
p.mu.Lock()
// Enqueue the batch in the pending queue. Note that while the pending queue
// is lock-free, we want the order of batches to be the same as the sequence
// number order.
p.pending.enqueue(b)
// Assign the batch a sequence number. Note that we use atomic operations
// here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
// mutual exclusion for other goroutines writing to logSeqNum.
logSeqNum := p.env.logSeqNum.Add(base.SeqNum(count)) - base.SeqNum(count)
seqNum := logSeqNum
if seqNum == 0 {
// We can't use the value 0 for the global seqnum during ingestion, because
// 0 indicates no global seqnum. So allocate one more seqnum.
p.env.logSeqNum.Add(1)
seqNum++
}
b.setSeqNum(seqNum)
// Wait for any outstanding writes to the memtable to complete. This is
// necessary for ingestion so that the check for memtable overlap can see any
// writes that were sequenced before the ingestion. The spin loop is
// unfortunate, but obviates the need for additional synchronization.
for {
visibleSeqNum := p.env.visibleSeqNum.Load()
if visibleSeqNum == logSeqNum {
break
}
runtime.Gosched()
}
// Invoke the prepare callback. Note the lack of error reporting. Even if the
// callback internally fails, the sequence number needs to be published in
// order to allow the commit pipeline to proceed.
prepare(b.SeqNum())
p.mu.Unlock()
// Invoke the apply callback.
apply(b.SeqNum())
// Publish the sequence number.
p.publish(b)
<-p.commitQueueSem
}
func (p *commitPipeline) prepare(b *Batch, syncWAL bool, noSyncWait bool) (*memTable, error) {
n := uint64(b.Count())
if n == invalidBatchCount {
return nil, ErrInvalidBatch
}
var syncWG *sync.WaitGroup
var syncErr *error
switch {
case !syncWAL:
// Only need to wait for the publish.
b.commit.Add(1)
// Remaining cases represent syncWAL=true.
case noSyncWait:
syncErr = &b.commitErr
syncWG = &b.fsyncWait
// Only need to wait synchronously for the publish. The user will
// (asynchronously) wait on the batch's fsyncWait.
b.commit.Add(1)
b.fsyncWait.Add(1)
case !noSyncWait:
syncErr = &b.commitErr
syncWG = &b.commit
// Must wait for both the publish and the WAL fsync.
b.commit.Add(2)
}
p.mu.Lock()
// Enqueue the batch in the pending queue. Note that while the pending queue
// is lock-free, we want the order of batches to be the same as the sequence
// number order.
p.pending.enqueue(b)
// Assign the batch a sequence number. Note that we use atomic operations
// here to handle concurrent reads of logSeqNum. commitPipeline.mu provides
// mutual exclusion for other goroutines writing to logSeqNum.
b.setSeqNum(p.env.logSeqNum.Add(base.SeqNum(n)) - base.SeqNum(n))
// Write the data to the WAL.
mem, err := p.env.write(b, syncWG, syncErr)
p.mu.Unlock()
return mem, err
}
func (p *commitPipeline) publish(b *Batch) {
// Mark the batch as applied.
b.applied.Store(true)
// Loop dequeuing applied batches from the pending queue. If our batch was
// the head of the pending queue we are guaranteed that either we'll publish
// it or someone else will dequeueApplied and publish it. If our batch is not the
// head of the queue then either we'll dequeueApplied applied batches and reach our
// batch or there is an unapplied batch blocking us. When that unapplied
// batch applies it will go through the same process and publish our batch
// for us.
for {
t := p.pending.dequeueApplied()
if t == nil {
// Wait for another goroutine to publish us. We might also be waiting for
// the WAL sync to finish.
now := crtime.NowMono()
b.commit.Wait()
b.commitStats.CommitWaitDuration += now.Elapsed()
break
}
if !t.applied.Load() {
panic("not reached")
}
// We're responsible for publishing the sequence number for batch t, but
// another concurrent goroutine might sneak in and publish the sequence
// number for a subsequent batch. That's ok as all we're guaranteeing is
// that the sequence number ratchets up.
for {
curSeqNum := p.env.visibleSeqNum.Load()
newSeqNum := t.SeqNum() + base.SeqNum(t.Count())
if newSeqNum <= curSeqNum {
// t's sequence number has already been published.
break
}
if p.env.visibleSeqNum.CompareAndSwap(curSeqNum, newSeqNum) {
// We successfully published t's sequence number.
break
}
}
t.commit.Done()
}
}