Skip to content

Commit

Permalink
feat(kv): add Close to kv.ForwardCursor interface
Browse files Browse the repository at this point in the history
  • Loading branch information
GeorgeMac committed Dec 17, 2019
1 parent bbec85c commit 53f307d
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 5 deletions.
26 changes: 26 additions & 0 deletions bolt/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"go.uber.org/zap"
)

// check that *KVStore implement kv.Store interface.
var _ (kv.Store) = (*KVStore)(nil)

// KVStore is a kv.Store backed by boltdb.
type KVStore struct {
path string
Expand Down Expand Up @@ -224,10 +227,21 @@ type Cursor struct {
key, value []byte

config kv.CursorConfig
closed bool
}

// Close sets the closed to closed
func (c *Cursor) Close() error {
c.closed = true

return nil
}

// Seek seeks for the first key that matches the prefix provided.
func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.Seek(prefix)
if len(k) == 0 && len(v) == 0 {
return nil, nil
Expand All @@ -237,6 +251,9 @@ func (c *Cursor) Seek(prefix []byte) ([]byte, []byte) {

// First retrieves the first key value pair in the bucket.
func (c *Cursor) First() ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.First()
if len(k) == 0 && len(v) == 0 {
return nil, nil
Expand All @@ -246,6 +263,9 @@ func (c *Cursor) First() ([]byte, []byte) {

// Last retrieves the last key value pair in the bucket.
func (c *Cursor) Last() ([]byte, []byte) {
if c.closed {
return nil, nil
}
k, v := c.cursor.Last()
if len(k) == 0 && len(v) == 0 {
return nil, nil
Expand All @@ -255,6 +275,9 @@ func (c *Cursor) Last() ([]byte, []byte) {

// Next retrieves the next key in the bucket.
func (c *Cursor) Next() (k []byte, v []byte) {
if c.closed {
return nil, nil
}
// get and unset previously seeked values if they exist
k, v, c.key, c.value = c.key, c.value, nil, nil
if len(k) > 0 && len(v) > 0 {
Expand All @@ -275,6 +298,9 @@ func (c *Cursor) Next() (k []byte, v []byte) {

// Prev retrieves the previous key in the bucket.
func (c *Cursor) Prev() (k []byte, v []byte) {
if c.closed {
return nil, nil
}
// get and unset previously seeked values if they exist
k, v, c.key, c.value = c.key, c.value, nil, nil
if len(k) > 0 && len(v) > 0 {
Expand Down
48 changes: 45 additions & 3 deletions inmem/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,11 @@ type pair struct {

// ForwardCursor returns a directional cursor which starts at the provided seeked key
func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.ForwardCursor, error) {
pairs := make(chan []pair)
var (
pairs = make(chan []pair)
stop = make(chan struct{})
)

go func() {
defer close(pairs)

Expand All @@ -268,6 +272,13 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
var batch []pair

iterate(func(i btree.Item) bool {
select {
case <-stop:
// if signalled to stop then exit iteration
return false
default:
}

j, ok := i.(*item)
if !ok {
batch = append(batch, pair{err: fmt.Errorf("error item is type %T not *item", i)})
Expand Down Expand Up @@ -296,7 +307,7 @@ func (b *Bucket) ForwardCursor(seek []byte, opts ...kv.CursorOption) (kv.Forward
}
}()

return &ForwardCursor{pairs: pairs}, nil
return &ForwardCursor{pairs: pairs, stop: stop}, nil
}

// ForwardCursor is a kv.ForwardCursor which iterates over an in-memory btree
Expand All @@ -306,6 +317,8 @@ type ForwardCursor struct {
cur []pair
n int

stop chan struct{}
closed bool
// error found during iteration
err error
}
Expand All @@ -315,9 +328,38 @@ func (c *ForwardCursor) Err() error {
return c.err
}

// Close releases the producing goroutines for the forward cursor.
// It blocks until the producing goroutine exits.
func (c *ForwardCursor) Close() error {
if c.closed {
return nil
}

close(c.stop)

c.closed = true

// drain any existing pairs and set any encountered
// errors on cursor
for {
pairs, ok := <-c.pairs
if !ok {
break
}

if c.err == nil {
for _, pair := range pairs {
c.err = pair.err
}
}
}

return nil
}

// Next returns the next key/value pair in the cursor
func (c *ForwardCursor) Next() ([]byte, []byte) {
if c.err != nil {
if c.err != nil || c.closed {
return nil, nil
}

Expand Down
2 changes: 0 additions & 2 deletions kv/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ type staticCursor struct {
}

// Pair is a struct for key value pairs.
// It also includes an error which should only be non-nil
// when the Key and Value are nil.
type Pair struct {
Key []byte
Value []byte
Expand Down
2 changes: 2 additions & 0 deletions kv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type ForwardCursor interface {
// Err returns non-nil if an error occurred during cursor iteration.
// This should always be checked after Next returns a nil key/value.
Err() error
// Close is reponsible for freeing any resources created by the cursor.
Close() error
}

// CursorDirection is an integer used to define the direction
Expand Down
9 changes: 9 additions & 0 deletions testing/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ func KVForwardCursor(
fields KVStoreFields
args args
exp []string
expErr error
}{
{
name: "no hints",
Expand Down Expand Up @@ -899,6 +900,14 @@ func KVForwardCursor(
t.Errorf("unexpected cursor values: -got/+exp\n%v", cmp.Diff(got, exp))
}

if err := cur.Err(); !cmp.Equal(err, tt.expErr) {
t.Errorf("expected error to be %v, got %v", tt.expErr, err)
}

if err := cur.Close(); err != nil {
t.Errorf("expected cursor to close with nil error, found %v", err)
}

return nil
})

Expand Down

0 comments on commit 53f307d

Please sign in to comment.