Skip to content

Commit 48b8cb8

Browse files
authored
feat(kv): define forward cursor interface (#16212)
* feat(kv): define forward cursor interface * feat(kv): implement ForwardCursor on bolt and inmem buckets * feat(kv): update tests to capture forward cursor * fix(kv): typo in docs * feat(kv): add Err method to ForwardCursor interface * feat(inmem): batch pair channel sends in forward cursor * fix(kv): remove Err field from kv.Pair * feat(kv): add Close to kv.ForwardCursor interface
1 parent cc0943c commit 48b8cb8

File tree

6 files changed

+542
-8
lines changed

6 files changed

+542
-8
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
4. [16262](https://github.com/influxdata/influxdb/pull/16262): add support for check resource dry run functionality
99
5. [16275](https://github.com/influxdata/influxdb/pull/16275): add support for check resource apply functionality
1010
6. [16283](https://github.com/influxdata/influxdb/pull/16283): add support for check resource export functionality
11+
1. [16212](https://github.com/influxdata/influxdb/pull/16212): Add new kv.ForwardCursor interface
1112

1213
### Bug Fixes
1314

bolt/kv.go

+78-4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ import (
1313
"go.uber.org/zap"
1414
)
1515

16+
// check that *KVStore implement kv.Store interface.
17+
var _ (kv.Store) = (*KVStore)(nil)
18+
1619
// KVStore is a kv.Store backed by boltdb.
1720
type KVStore struct {
1821
path string
@@ -191,6 +194,22 @@ func (b *Bucket) Delete(key []byte) error {
191194
return err
192195
}
193196

197+
// ForwardCursor retrieves a cursor for iterating through the entries
198+
// in the key value store in a given direction (ascending / descending).
199+
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
200+
var (
201+
cursor = b.bucket.Cursor()
202+
key, value = cursor.Seek(seek)
203+
)
204+
205+
return &Cursor{
206+
cursor: cursor,
207+
key: key,
208+
value: value,
209+
config: kv.NewCursorConfig(opts...),
210+
}, nil
211+
}
212+
194213
// Cursor retrieves a cursor for iterating through the entries
195214
// in the key value store.
196215
func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
@@ -203,10 +222,26 @@ func (b *Bucket) Cursor(opts ...kv.CursorHint) (kv.Cursor, error) {
203222
// in the key value store.
204223
type Cursor struct {
205224
cursor *bolt.Cursor
225+
226+
// previously seeked key/value
227+
key, value []byte
228+
229+
config kv.CursorConfig
230+
closed bool
231+
}
232+
233+
// Close sets the closed to closed
234+
func (c *Cursor) Close() error {
235+
c.closed = true
236+
237+
return nil
206238
}
207239

208240
// Seek seeks for the first key that matches the prefix provided.
209241
func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) {
242+
if c.closed {
243+
return nil, nil
244+
}
210245
k, v := c.cursor.Seek(prefix)
211246
if len(k) == 0 && len(v) == 0 {
212247
return nil, nil
@@ -216,6 +251,9 @@ func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) {
216251

217252
// First retrieves the first key value pair in the bucket.
218253
func (c *Cursor) First() ([]byte, []byte) {
254+
if c.closed {
255+
return nil, nil
256+
}
219257
k, v := c.cursor.First()
220258
if len(k) == 0 && len(v) == 0 {
221259
return nil, nil
@@ -225,6 +263,9 @@ func (c *Cursor) First() ([]byte, []byte) {
225263

226264
// Last retrieves the last key value pair in the bucket.
227265
func (c *Cursor) Last() ([]byte, []byte) {
266+
if c.closed {
267+
return nil, nil
268+
}
228269
k, v := c.cursor.Last()
229270
if len(k) == 0 && len(v) == 0 {
230271
return nil, nil
@@ -233,19 +274,52 @@ func (c *Cursor) Last() ([]byte, []byte) {
233274
}
234275

235276
// Next retrieves the next key in the bucket.
236-
func (c *Cursor) Next() ([]byte, []byte) {
237-
k, v := c.cursor.Next()
277+
func (c *Cursor) Next() (k []byte, v []byte) {
278+
if c.closed {
279+
return nil, nil
280+
}
281+
// get and unset previously seeked values if they exist
282+
k, v, c.key, c.value = c.key, c.value, nil, nil
283+
if len(k) > 0 && len(v) > 0 {
284+
return
285+
}
286+
287+
next := c.cursor.Next
288+
if c.config.Direction == kv.CursorDescending {
289+
next = c.cursor.Prev
290+
}
291+
292+
k, v = next()
238293
if len(k) == 0 && len(v) == 0 {
239294
return nil, nil
240295
}
241296
return k, v
242297
}
243298

244299
// Prev retrieves the previous key in the bucket.
245-
func (c *Cursor) Prev() ([]byte, []byte) {
246-
k, v := c.cursor.Prev()
300+
func (c *Cursor) Prev() (k []byte, v []byte) {
301+
if c.closed {
302+
return nil, nil
303+
}
304+
// get and unset previously seeked values if they exist
305+
k, v, c.key, c.value = c.key, c.value, nil, nil
306+
if len(k) > 0 && len(v) > 0 {
307+
return
308+
}
309+
310+
prev := c.cursor.Prev
311+
if c.config.Direction == kv.CursorDescending {
312+
prev = c.cursor.Next
313+
}
314+
315+
k, v = prev()
247316
if len(k) == 0 && len(v) == 0 {
248317
return nil, nil
249318
}
250319
return k, v
251320
}
321+
322+
// Err always returns nil as nothing can go wrong™ during iteration
323+
func (c *Cursor) Err() error {
324+
return nil
325+
}

inmem/kv.go

+146
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,13 @@ import (
1010
"github.com/influxdata/influxdb/kv"
1111
)
1212

13+
// ensure *KVStore implement kv.Store interface
14+
var _ kv.Store = (*KVStore)(nil)
15+
16+
// cursorBatchSize is the size of a batch sent by a forward cursors
17+
// tree iterator
18+
const cursorBatchSize = 1000
19+
1320
// KVStore is an in memory btree backed kv.Store.
1421
type KVStore struct {
1522
mu sync.RWMutex
@@ -225,3 +232,142 @@ func (b *Bucket) getAll(o *kv.CursorHints) ([]kv.Pair, error) {
225232

226233
return pairs, nil
227234
}
235+
236+
type pair struct {
237+
kv.Pair
238+
err error
239+
}
240+
241+
// ForwardCursor returns a directional cursor which starts at the provided seeked key
242+
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
243+
var (
244+
pairs = make(chan []pair)
245+
stop = make(chan struct{})
246+
send = func(batch []pair) bool {
247+
if len(batch) == 0 {
248+
return true
249+
}
250+
251+
select {
252+
case pairs <- batch:
253+
return true
254+
case <-stop:
255+
return false
256+
}
257+
}
258+
)
259+
260+
go func() {
261+
defer close(pairs)
262+
263+
var (
264+
batch []pair
265+
config = kv.NewCursorConfig(opts...)
266+
fn = config.Hints.PredicateFn
267+
iterate = func(it btree.ItemIterator) {
268+
b.btree.AscendGreaterOrEqual(&item{key: seek}, it)
269+
}
270+
)
271+
272+
if config.Direction == kv.CursorDescending {
273+
iterate = func(it btree.ItemIterator) {
274+
b.btree.DescendLessOrEqual(&item{key: seek}, it)
275+
}
276+
}
277+
278+
iterate(func(i btree.Item) bool {
279+
select {
280+
case <-stop:
281+
// if signalled to stop then exit iteration
282+
return false
283+
default:
284+
}
285+
286+
j, ok := i.(*item)
287+
if !ok {
288+
batch = append(batch, pair{err: fmt.Errorf("error item is type %T not *item", i)})
289+
290+
return false
291+
}
292+
293+
if fn == nil || fn(j.key, j.value) {
294+
batch = append(batch, pair{Pair: kv.Pair{Key: j.key, Value: j.value}})
295+
}
296+
297+
if len(batch) < cursorBatchSize {
298+
return true
299+
}
300+
301+
if send(batch) {
302+
// batch flushed successfully so we can
303+
// begin a new batch
304+
batch = nil
305+
306+
return true
307+
}
308+
309+
// we've been signalled to stop
310+
return false
311+
})
312+
313+
// send if any left in batch
314+
send(batch)
315+
}()
316+
317+
return &ForwardCursor{pairs: pairs, stop: stop}, nil
318+
}
319+
320+
// ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree
321+
type ForwardCursor struct {
322+
pairs <-chan []pair
323+
324+
cur []pair
325+
n int
326+
327+
stop chan struct{}
328+
closed bool
329+
// error found during iteration
330+
err error
331+
}
332+
333+
// Err returns a non-nil error when an error occurred during cursor iteration.
334+
func (c *ForwardCursor) Err() error {
335+
return c.err
336+
}
337+
338+
// Close releases the producing goroutines for the forward cursor.
339+
// It blocks until the producing goroutine exits.
340+
func (c *ForwardCursor) Close() error {
341+
if c.closed {
342+
return nil
343+
}
344+
345+
close(c.stop)
346+
347+
c.closed = true
348+
349+
return nil
350+
}
351+
352+
// Next returns the next key/value pair in the cursor
353+
func (c *ForwardCursor) Next() ([]byte, []byte) {
354+
if c.err != nil || c.closed {
355+
return nil, nil
356+
}
357+
358+
if c.n >= len(c.cur) {
359+
var ok bool
360+
c.cur, ok = <-c.pairs
361+
if !ok {
362+
return nil, nil
363+
}
364+
365+
c.n = 0
366+
}
367+
368+
pair := c.cur[c.n]
369+
c.err = pair.err
370+
c.n++
371+
372+
return pair.Key, pair.Value
373+
}

kv/store.go

+61
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ type Bucket interface {
9494
Put(key, value []byte) error
9595
// Delete should error if the transaction it was called in is not writable.
9696
Delete(key []byte) error
97+
// ForwardCursor returns a forward cursor from the seek position provided.
98+
// Other options can be supplied to provide direction and hints.
99+
ForwardCursor(seek []byte, opts ...CursorOption) (ForwardCursor, error)
97100
}
98101

99102
// Cursor is an abstraction for iterating/ranging through data. A concrete implementation
@@ -110,3 +113,61 @@ type Cursor interface {
110113
// Prev moves the cursor to the prev key in the bucket.
111114
Prev() (k []byte, v []byte)
112115
}
116+
117+
// ForwardCursor is an abstraction for interacting/ranging through data in one direction.
118+
type ForwardCursor interface {
119+
// Next moves the cursor to the next key in the bucket.
120+
Next() (k, v []byte)
121+
// Err returns non-nil if an error occurred during cursor iteration.
122+
// This should always be checked after Next returns a nil key/value.
123+
Err() error
124+
// Close is reponsible for freeing any resources created by the cursor.
125+
Close() error
126+
}
127+
128+
// CursorDirection is an integer used to define the direction
129+
// a request cursor operates in.
130+
type CursorDirection int
131+
132+
const (
133+
// CursorAscending directs a cursor to range in ascending order
134+
CursorAscending CursorDirection = iota
135+
// CursorAscending directs a cursor to range in descending order
136+
CursorDescending
137+
)
138+
139+
// CursorConfig is a type used to configure a new forward cursor.
140+
// It includes a direction and a set of hints
141+
type CursorConfig struct {
142+
Direction CursorDirection
143+
Hints CursorHints
144+
}
145+
146+
// NewCursorConfig constructs and configures a CursorConfig used to configure
147+
// a forward cursor.
148+
func NewCursorConfig(opts ...CursorOption) CursorConfig {
149+
conf := CursorConfig{}
150+
for _, opt := range opts {
151+
opt(&conf)
152+
}
153+
return conf
154+
}
155+
156+
// CursorOption is a functional option for configuring a forward cursor
157+
type CursorOption func(*CursorConfig)
158+
159+
// WithCursorDirection sets the cursor direction on a provided cursor config
160+
func WithCursorDirection(direction CursorDirection) CursorOption {
161+
return func(c *CursorConfig) {
162+
c.Direction = direction
163+
}
164+
}
165+
166+
// WithCursorHints configs the provided hints on the cursor config
167+
func WithCursorHints(hints ...CursorHint) CursorOption {
168+
return func(c *CursorConfig) {
169+
for _, hint := range hints {
170+
hint(&c.Hints)
171+
}
172+
}
173+
}

0 commit comments

Comments
 (0)