diff --git a/etcd/etcdv3.go b/etcd/etcdv3.go index 3dcd11e..2ee52cf 100644 --- a/etcd/etcdv3.go +++ b/etcd/etcdv3.go @@ -175,6 +175,12 @@ func (e *Backend) Get(key string, ops ...store.GetOption) ([]store.Entry, error) return []store.Entry{}, store.ErrKeyNotFound } + if opts.Filter == nil { + opts.Filter = func([]byte, []byte) bool { + return true + } + } + if opts.Handler == nil { opts.Handler = func([]byte, []byte) error { return nil @@ -184,6 +190,10 @@ func (e *Backend) Get(key string, ops ...store.GetOption) ([]store.Entry, error) result := []store.Entry{} for _, value := range resp.Kvs { + if ok := opts.Filter([]byte(e.RelKey(string(value.Key))), value.Value); !ok { + continue + } + if err := opts.Handler([]byte(e.RelKey(string(value.Key))), value.Value); err != nil { return result, err } diff --git a/etcd/etcdv3_test.go b/etcd/etcdv3_test.go index b9139ae..b27bd32 100644 --- a/etcd/etcdv3_test.go +++ b/etcd/etcdv3_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "testing" "time" @@ -105,6 +106,15 @@ func TestGet(t *testing.T) { assert.Equal(t, expEntries, e) }) + t.Run("get with filter", func(t *testing.T) { + filter := func(k []byte, v []byte) bool { + return strings.HasSuffix(string(k), "6") + } + e, err := b.Get("key", store.WithPrefix(), store.WithFilter(filter)) + require.NoError(t, err) + assert.Equal(t, expEntries[6], e[0]) + }) + t.Run("get with handler", func(t *testing.T) { entry := store.Entry{} handler := func(k []byte, v []byte) error { @@ -517,7 +527,7 @@ func setupTestStore(t *testing.T, log bool, opts []Opt) (store.BackendKeyer, *cl backend, err := New(append([]Opt{WithClient(cli)}, opts...)...) if err != nil { - t.Errorf("failed to create backend: %w", err) + t.Errorf("failed to create backend: %v", err) } return backend, cli, func() { cluster.Terminate(t) } diff --git a/hash/hash.go b/hash/hash.go index 3429a2a..0585176 100644 --- a/hash/hash.go +++ b/hash/hash.go @@ -148,6 +148,7 @@ func (h *Backend) keys(prefix string) []string { // Get returns a list of store entries // WithPrefix, WithHandler are supported // WithContext will be ignored +//nolint:gocyclo // review func (h *Backend) Get(key string, ops ...store.GetOption) ([]store.Entry, error) { opts := &store.GetOptions{} @@ -155,6 +156,12 @@ func (h *Backend) Get(key string, ops ...store.GetOption) ([]store.Entry, error) op.SetGetOption(opts) } + if opts.Filter == nil { + opts.Filter = func([]byte, []byte) bool { + return true + } + } + if opts.Handler == nil { opts.Handler = func([]byte, []byte) error { return nil @@ -176,6 +183,10 @@ func (h *Backend) Get(key string, ops ...store.GetOption) ([]store.Entry, error) h.RUnlock() if ok && h.exists(v) { + if ok := opts.Filter([]byte(v.Data.Key), v.Data.Value); !ok { + continue + } + if opts.Unmarshal != nil && !opts.Unmarshal.IsSlice() { return nil, json.Unmarshal(v.Data.Value, &opts.Unmarshal.Input) } diff --git a/hash/hash_test.go b/hash/hash_test.go index 171b7a2..0fad1be 100644 --- a/hash/hash_test.go +++ b/hash/hash_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "testing" "time" @@ -105,6 +106,15 @@ func TestGet(t *testing.T) { assert.Equal(t, expEntries, e) }) + t.Run("get with filter", func(t *testing.T) { + filter := func(k []byte, v []byte) bool { + return strings.HasSuffix(string(k), "6") + } + e, err := b.Get("key", store.WithPrefix(), store.WithFilter(filter)) + require.NoError(t, err) + assert.Equal(t, expEntries[6], e[0]) + }) + t.Run("get with handler", func(t *testing.T) { entry := store.Entry{} handler := func(k []byte, v []byte) error { diff --git a/store.go b/store.go index c53e316..6a6f76c 100644 --- a/store.go +++ b/store.go @@ -16,6 +16,10 @@ var ( ErrResponseChannelClosed = errors.New("keepalive response channel has been closed") ) +// FilterFunc is a function that is called on (each) returned +// key-value pair during a Get request. +type FilterFunc func([]byte, []byte) bool + // HandlerFunc is a function that is called on (each) returned // key-value pair during a Get request. type HandlerFunc func([]byte, []byte) error @@ -96,6 +100,7 @@ func Put(b Backend, key string, v interface{}, opts ...PutOption) (bool, error) // GetOptions represent all possible options for Get requests. type GetOptions struct { Prefix bool + Filter FilterFunc Handler HandlerFunc Context context.Context Unmarshal *unmarshal @@ -142,6 +147,14 @@ func WithContext(ctx context.Context) interface { return &contextOption{Context: ctx} } +// WithFilter is an option to use an FilterFunc on each +// key-value pair during a Get request. +func WithFilter(f FilterFunc) interface { + GetOption +} { + return &filterOption{Filter: f} +} + // WithHandler is an option to use an HandlerFunc on each // key-value pair during a Get request. func WithHandler(h HandlerFunc) interface { @@ -257,6 +270,15 @@ func (c *contextOption) SetDelOption(opts *DelOptions) { opts.Context = c.Context } +// filter +type filterOption struct { + Filter FilterFunc +} + +func (h *filterOption) SetGetOption(opts *GetOptions) { + opts.Filter = h.Filter +} + // handler type handlerOption struct { Handler HandlerFunc