Skip to content

Commit

Permalink
feat: filter added
Browse files Browse the repository at this point in the history
  • Loading branch information
marcsauter committed Nov 9, 2021
1 parent ce5f984 commit c3dfed2
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 1 deletion.
10 changes: 10 additions & 0 deletions etcd/etcdv3.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ func (e *Backend) Get(key string, ops ...store.GetOption) ([]store.Entry, error)
return []store.Entry{}, store.ErrKeyNotFound
}

if opts.Filter == nil {
opts.Filter = func([]byte, []byte) bool {
return true
}
}

if opts.Handler == nil {
opts.Handler = func([]byte, []byte) error {
return nil
Expand All @@ -184,6 +190,10 @@ func (e *Backend) Get(key string, ops ...store.GetOption) ([]store.Entry, error)
result := []store.Entry{}

for _, value := range resp.Kvs {
if ok := opts.Filter([]byte(e.RelKey(string(value.Key))), value.Value); !ok {
continue
}

if err := opts.Handler([]byte(e.RelKey(string(value.Key))), value.Value); err != nil {
return result, err
}
Expand Down
12 changes: 11 additions & 1 deletion etcd/etcdv3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -105,6 +106,15 @@ func TestGet(t *testing.T) {
assert.Equal(t, expEntries, e)
})

t.Run("get with filter", func(t *testing.T) {
filter := func(k []byte, v []byte) bool {
return strings.HasSuffix(string(k), "6")
}
e, err := b.Get("key", store.WithPrefix(), store.WithFilter(filter))
require.NoError(t, err)
assert.Equal(t, expEntries[6], e[0])
})

t.Run("get with handler", func(t *testing.T) {
entry := store.Entry{}
handler := func(k []byte, v []byte) error {
Expand Down Expand Up @@ -517,7 +527,7 @@ func setupTestStore(t *testing.T, log bool, opts []Opt) (store.BackendKeyer, *cl

backend, err := New(append([]Opt{WithClient(cli)}, opts...)...)
if err != nil {
t.Errorf("failed to create backend: %w", err)
t.Errorf("failed to create backend: %v", err)
}

return backend, cli, func() { cluster.Terminate(t) }
Expand Down
11 changes: 11 additions & 0 deletions hash/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,20 @@ func (h *Backend) keys(prefix string) []string {
// Get returns a list of store entries
// WithPrefix, WithHandler are supported
// WithContext will be ignored
//nolint:gocyclo // review
func (h *Backend) Get(key string, ops ...store.GetOption) ([]store.Entry, error) {
opts := &store.GetOptions{}

for _, op := range ops {
op.SetGetOption(opts)
}

if opts.Filter == nil {
opts.Filter = func([]byte, []byte) bool {
return true
}
}

if opts.Handler == nil {
opts.Handler = func([]byte, []byte) error {
return nil
Expand All @@ -176,6 +183,10 @@ func (h *Backend) Get(key string, ops ...store.GetOption) ([]store.Entry, error)
h.RUnlock()

if ok && h.exists(v) {
if ok := opts.Filter([]byte(v.Data.Key), v.Data.Value); !ok {
continue
}

if opts.Unmarshal != nil && !opts.Unmarshal.IsSlice() {
return nil, json.Unmarshal(v.Data.Value, &opts.Unmarshal.Input)
}
Expand Down
10 changes: 10 additions & 0 deletions hash/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -105,6 +106,15 @@ func TestGet(t *testing.T) {
assert.Equal(t, expEntries, e)
})

t.Run("get with filter", func(t *testing.T) {
filter := func(k []byte, v []byte) bool {
return strings.HasSuffix(string(k), "6")
}
e, err := b.Get("key", store.WithPrefix(), store.WithFilter(filter))
require.NoError(t, err)
assert.Equal(t, expEntries[6], e[0])
})

t.Run("get with handler", func(t *testing.T) {
entry := store.Entry{}
handler := func(k []byte, v []byte) error {
Expand Down
22 changes: 22 additions & 0 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ var (
ErrResponseChannelClosed = errors.New("keepalive response channel has been closed")
)

// FilterFunc is a function that is called on (each) returned
// key-value pair during a Get request.
type FilterFunc func([]byte, []byte) bool

// HandlerFunc is a function that is called on (each) returned
// key-value pair during a Get request.
type HandlerFunc func([]byte, []byte) error
Expand Down Expand Up @@ -96,6 +100,7 @@ func Put(b Backend, key string, v interface{}, opts ...PutOption) (bool, error)
// GetOptions represent all possible options for Get requests.
type GetOptions struct {
Prefix bool
Filter FilterFunc
Handler HandlerFunc
Context context.Context
Unmarshal *unmarshal
Expand Down Expand Up @@ -142,6 +147,14 @@ func WithContext(ctx context.Context) interface {
return &contextOption{Context: ctx}
}

// WithFilter is an option to use an FilterFunc on each
// key-value pair during a Get request.
func WithFilter(f FilterFunc) interface {
GetOption
} {
return &filterOption{Filter: f}
}

// WithHandler is an option to use an HandlerFunc on each
// key-value pair during a Get request.
func WithHandler(h HandlerFunc) interface {
Expand Down Expand Up @@ -257,6 +270,15 @@ func (c *contextOption) SetDelOption(opts *DelOptions) {
opts.Context = c.Context
}

// filter
type filterOption struct {
Filter FilterFunc
}

func (h *filterOption) SetGetOption(opts *GetOptions) {
opts.Filter = h.Filter
}

// handler
type handlerOption struct {
Handler HandlerFunc
Expand Down

0 comments on commit c3dfed2

Please sign in to comment.