forked from cockroachdb/pebble
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flushable.go
244 lines (218 loc) · 8.21 KB
/
flushable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
// Copyright 2020 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.
package pebble
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
)
// flushable defines the interface for immutable memtables.
type flushable interface {
newIter(o *IterOptions) internalIterator
newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator
newRangeDelIter(o *IterOptions) keyspan.FragmentIterator
newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator
containsRangeKeys() bool
// inuseBytes returns the number of inuse bytes by the flushable.
inuseBytes() uint64
// totalBytes returns the total number of bytes allocated by the flushable.
totalBytes() uint64
// readyForFlush returns true when the flushable is ready for flushing. See
// memTable.readyForFlush for one implementation which needs to check whether
// there are any outstanding write references.
readyForFlush() bool
}
// flushableEntry wraps a flushable and adds additional metadata and
// functionality that is common to all flushables.
type flushableEntry struct {
flushable
// Channel which is closed when the flushable has been flushed.
flushed chan struct{}
// flushForced indicates whether a flush was forced on this memtable (either
// manual, or due to ingestion). Protected by DB.mu.
flushForced bool
// delayedFlushForcedAt indicates whether a timer has been set to force a
// flush on this memtable at some point in the future. Protected by DB.mu.
// Holds the timestamp of when the flush will be issued.
delayedFlushForcedAt time.Time
// logNum corresponds to the WAL that contains the records present in the
// receiver.
logNum FileNum
// logSize is the size in bytes of the associated WAL. Protected by DB.mu.
logSize uint64
// The current logSeqNum at the time the memtable was created. This is
// guaranteed to be less than or equal to any seqnum stored in the memtable.
logSeqNum uint64
// readerRefs tracks the read references on the flushable. The two sources of
// reader references are DB.mu.mem.queue and readState.memtables. The memory
// reserved by the flushable in the cache is released when the reader refs
// drop to zero. If the flushable is referencing sstables, then the file
// refount is also decreased once the reader refs drops to 0. If the
// flushable is a memTable, when the reader refs drops to zero, the writer
// refs will already be zero because the memtable will have been flushed and
// that only occurs once the writer refs drops to zero.
readerRefs int32
// Closure to invoke to release memory accounting.
releaseMemAccounting func()
// unrefFiles, if not nil, should be invoked to decrease the ref count of
// files which are backing the flushable.
unrefFiles func() []*fileBacking
// deleteFnLocked should be called if the caller is holding DB.mu.
deleteFnLocked func(obsolete []*fileBacking)
// deleteFn should be called if the caller is not holding DB.mu.
deleteFn func(obsolete []*fileBacking)
}
func (e *flushableEntry) readerRef() {
switch v := atomic.AddInt32(&e.readerRefs, 1); {
case v <= 1:
panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
}
}
// db.mu must not be held when this is called.
func (e *flushableEntry) readerUnref(deleteFiles bool) {
e.readerUnrefHelper(deleteFiles, e.deleteFn)
}
// db.mu must be held when this is called.
func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) {
e.readerUnrefHelper(deleteFiles, e.deleteFnLocked)
}
func (e *flushableEntry) readerUnrefHelper(
deleteFiles bool, deleteFn func(obsolete []*fileBacking),
) {
switch v := atomic.AddInt32(&e.readerRefs, -1); {
case v < 0:
panic(fmt.Sprintf("pebble: inconsistent reference count: %d", v))
case v == 0:
if e.releaseMemAccounting == nil {
panic("pebble: memtable reservation already released")
}
e.releaseMemAccounting()
e.releaseMemAccounting = nil
if e.unrefFiles != nil {
obsolete := e.unrefFiles()
e.unrefFiles = nil
if deleteFiles {
deleteFn(obsolete)
}
}
}
}
type flushableList []*flushableEntry
// ingestedFlushable is the implementation of the flushable interface for the
// ingesting sstables which are added to the flushable list.
type ingestedFlushable struct {
files []physicalMeta
cmp Compare
split Split
newIters tableNewIters
newRangeKeyIters keyspan.TableNewSpanIter
// Since the level slice is immutable, we construct and set it once. It
// should be safe to read from slice in future reads.
slice manifest.LevelSlice
// hasRangeKeys is set on ingestedFlushable construction.
hasRangeKeys bool
}
func newIngestedFlushable(
files []*fileMetadata,
cmp Compare,
split Split,
newIters tableNewIters,
newRangeKeyIters keyspan.TableNewSpanIter,
) *ingestedFlushable {
var physicalFiles []physicalMeta
var hasRangeKeys bool
for _, f := range files {
if f.HasRangeKeys {
hasRangeKeys = true
}
physicalFiles = append(physicalFiles, f.PhysicalMeta())
}
ret := &ingestedFlushable{
files: physicalFiles,
cmp: cmp,
split: split,
newIters: newIters,
newRangeKeyIters: newRangeKeyIters,
// slice is immutable and can be set once and used many times.
slice: manifest.NewLevelSliceKeySorted(cmp, files),
hasRangeKeys: hasRangeKeys,
}
return ret
}
// TODO(sumeer): ingestedFlushable iters also need to plumb context for
// tracing.
func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
var opts IterOptions
if o != nil {
opts = *o
}
// TODO(bananabrick): The manifest.Level in newLevelIter is only used for
// logging. Update the manifest.Level encoding to account for levels which
// aren't truly levels in the lsm. Right now, the encoding only supports
// L0 sublevels, and the rest of the levels in the lsm.
return newLevelIter(
opts, s.cmp, s.split, s.newIters, s.slice.Iter(), manifest.Level(0), nil,
)
}
func (s *ingestedFlushable) newFlushIter(o *IterOptions, bytesFlushed *uint64) internalIterator {
// newFlushIter is only used for writing memtables to disk as sstables.
// Since ingested sstables are already present on disk, they don't need to
// make use of a flush iter.
panic("pebble: not implemented")
}
func (s *ingestedFlushable) constructRangeDelIter(
file *manifest.FileMetadata, _ *keyspan.SpanIterOptions,
) (keyspan.FragmentIterator, error) {
// Note that the keyspan level iter expects a non-nil iterator to be
// returned even if there is an error. So, we return the emptyKeyspanIter.
iter, rangeDelIter, err := s.newIters(context.Background(), file, nil, internalIterOpts{})
if err != nil {
return emptyKeyspanIter, err
}
iter.Close()
if rangeDelIter == nil {
return emptyKeyspanIter, nil
}
return rangeDelIter, nil
}
// TODO(bananabrick): Using a level iter instead of a keyspan level iter to
// surface range deletes is more efficient.
func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator {
return keyspan.NewLevelIter(
keyspan.SpanIterOptions{}, s.cmp,
s.constructRangeDelIter, s.slice.Iter(), manifest.Level(0),
manifest.KeyTypePoint,
)
}
func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator {
if !s.containsRangeKeys() {
return nil
}
return keyspan.NewLevelIter(
keyspan.SpanIterOptions{}, s.cmp, s.newRangeKeyIters,
s.slice.Iter(), manifest.Level(0), manifest.KeyTypeRange,
)
}
func (s *ingestedFlushable) containsRangeKeys() bool {
return s.hasRangeKeys
}
func (s *ingestedFlushable) inuseBytes() uint64 {
// inuseBytes is only used when memtables are flushed to disk as sstables.
panic("pebble: not implemented")
}
func (s *ingestedFlushable) totalBytes() uint64 {
// We don't allocate additional bytes for the ingestedFlushable.
return 0
}
func (s *ingestedFlushable) readyForFlush() bool {
// ingestedFlushable should always be ready to flush. However, note that
// memtables before the ingested sstables in the memtable queue must be
// flushed before an ingestedFlushable can be flushed. This is because the
// ingested sstables need an updated view of the Version to
// determine where to place the files in the lsm.
return true
}