Skip to content

Commit

Permalink
add tests for the indexed store
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jan 5, 2017
1 parent aabe021 commit 240792c
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 22 deletions.
4 changes: 2 additions & 2 deletions services/replay/dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func newRecordingKV(store storage.Interface) (*recordingKV, error) {
})
c.Indexes = append(c.Indexes, storage.Index{
Name: recordingDateIndex,
ValueF: func(o storage.BinaryObject) (string, error) {
ValueFunc: func(o storage.BinaryObject) (string, error) {
r, ok := o.(*Recording)
if !ok {
return "", storage.ImpossibleTypeErr(r, o)
Expand Down Expand Up @@ -246,7 +246,7 @@ func newReplayKV(store storage.Interface) (*replayKV, error) {
})
c.Indexes = append(c.Indexes, storage.Index{
Name: replayDateIndex,
ValueF: func(o storage.BinaryObject) (string, error) {
ValueFunc: func(o storage.BinaryObject) (string, error) {
r, ok := o.(*Replay)
if !ok {
return "", storage.ImpossibleTypeErr(r, o)
Expand Down
1 change: 0 additions & 1 deletion services/slack/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ func (s *Service) Handler(c HandlerConfig, l *log.Logger) alert.Handler {
}

func (h *handler) Handle(event alert.Event) {
h.logger.Printf("D! Slack event %v channel %s", event.State.ID, h.c.Channel)
if err := h.s.Alert(
h.c.Channel,
event.State.Message,
Expand Down
1 change: 0 additions & 1 deletion services/smtp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (s *Service) runMailer() {
}

func (s *Service) SendMail(to []string, subject, body string) error {
s.logger.Println("D! SendMail", to, subject)
m, err := s.prepareMessge(to, subject, body)
if err != nil {
return err
Expand Down
39 changes: 22 additions & 17 deletions services/storage/indexed.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,23 @@ type BinaryObject interface {
}

type NewObjectF func() BinaryObject
type ValueF func(BinaryObject) (string, error)
type ValueFunc func(BinaryObject) (string, error)

type Index struct {
Name string
ValueF ValueF
Unique bool
Name string
ValueFunc ValueFunc
Unique bool
}

func (idx Index) ValueOf(o BinaryObject) (string, error) {
value, err := idx.ValueFunc(o)
if err != nil {
return "", err
}
if !idx.Unique {
value = value + "/" + o.ObjectID()
}
return value, nil
}

// Indexed provides basic CRUD operations and maintains indexes.
Expand Down Expand Up @@ -64,7 +75,7 @@ func DefaultIndexedStoreConfig(prefix string, newObject NewObjectF) IndexedStore
Indexes: []Index{{
Name: DefaultIDIndex,
Unique: true,
ValueF: func(o BinaryObject) (string, error) {
ValueFunc: func(o BinaryObject) (string, error) {
return o.ObjectID(), nil
},
}},
Expand Down Expand Up @@ -98,7 +109,7 @@ func (c IndexedStoreConfig) Validate() error {
if !validPath(idx.Name) {
return fmt.Errorf("invalid index name %q", idx.Name)
}
if idx.ValueF == nil {
if idx.ValueFunc == nil {
return fmt.Errorf("index %q does not have a ValueF function", idx.Name)
}
}
Expand Down Expand Up @@ -132,7 +143,7 @@ func (s *IndexedStore) dataKey(id string) string {
//
// As such to list all handlers in ID sorted order use the /<indexesPrefix>/id/ directory.
func (s *IndexedStore) indexKey(index, value string) string {
return path.Join(s.indexesPrefix, index, value) + "/"
return path.Join(s.indexesPrefix, index, value)
}

func (s *IndexedStore) get(tx ReadOnlyTx, id string) (BinaryObject, error) {
Expand Down Expand Up @@ -201,26 +212,20 @@ func (s *IndexedStore) put(o BinaryObject, allowReplace, requireReplace bool) er
// Put all indexes
for _, idx := range s.indexes {
// Get new index key
newValue, err := idx.ValueF(o)
newValue, err := idx.ValueOf(o)
if err != nil {
return err
}
if !idx.Unique {
newValue = newValue + "/" + o.ObjectID()
}
newIndexKey := s.indexKey(idx.Name, newValue)

// Get old index key, if we are replacing
var oldValue string
if replacing {
var err error
oldValue, err = idx.ValueF(old)
oldValue, err = idx.ValueOf(old)
if err != nil {
return err
}
if !idx.Unique {
oldValue = oldValue + "/" + o.ObjectID()
}
}
oldIndexKey := s.indexKey(idx.Name, oldValue)

Expand Down Expand Up @@ -262,7 +267,7 @@ func (s *IndexedStore) Delete(id string) error {

// Delete all indexes
for _, idx := range s.indexes {
value, err := idx.ValueF(o)
value, err := idx.ValueOf(o)
if err != nil {
return err
}
Expand Down Expand Up @@ -291,7 +296,7 @@ func (s *IndexedStore) ReverseList(index, pattern string, offset, limit int) ([]
func (s *IndexedStore) list(index, pattern string, offset, limit int, reverse bool) (objects []BinaryObject, err error) {
err = s.store.View(func(tx ReadOnlyTx) error {
// List all object ids sorted by index
ids, err := tx.List(s.indexKey(index, ""))
ids, err := tx.List(s.indexKey(index, "") + "/")
if err != nil {
return err
}
Expand Down
220 changes: 220 additions & 0 deletions services/storage/indexed_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package storage_test

import (
"encoding/json"
"reflect"
"testing"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/influxdata/kapacitor/services/storage"
)

type object struct {
ID string
Value string
Date time.Time
}

func (o object) ObjectID() string {
return o.ID
}

func (o object) MarshalBinary() ([]byte, error) {
return json.Marshal(o)
}

func (o *object) UnmarshalBinary(data []byte) error {
return json.Unmarshal(data, o)
}

func TestIndexedStore_CRUD(t *testing.T) {
for name, sc := range stores {
t.Run(name, func(t *testing.T) {
db, err := sc()
if err != nil {
t.Fatal(err)
}
defer db.Close()

s := db.Store("crud")
c := storage.DefaultIndexedStoreConfig("crud", func() storage.BinaryObject {
return new(object)
})
c.Indexes = append(c.Indexes, storage.Index{
Name: "date",
ValueFunc: func(o storage.BinaryObject) (string, error) {
obj, ok := o.(*object)
if !ok {
return "", storage.ImpossibleTypeErr(obj, o)
}
return obj.Date.UTC().Format(time.RFC3339), nil
},
})
is, err := storage.NewIndexedStore(s, c)
if err != nil {
t.Fatal(err)
}

// Create new object
o1 := &object{
ID: "1",
Value: "obj1",
Date: time.Date(2017, 1, 1, 0, 0, 0, 0, time.UTC),
}
if err := is.Create(o1); err != nil {
t.Fatal(err)
}
if err := is.Create(o1); err != storage.ErrObjectExists {
t.Fatal("expected ErrObjectExists creating object1 got", err)
}
// Check o1
got1, err := is.Get("1")
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(got1, o1) {
t.Errorf("unexpected object 1 retrieved:\ngot\n%s\nexp\n%s\n", spew.Sdump(got1), spew.Sdump(o1))
}
// Check ID list
expIDList := []storage.BinaryObject{o1}
gotIDList, err := is.List("id", "", 0, 100)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(gotIDList, expIDList) {
t.Errorf("unexpected object list by ID:\ngot\n%s\nexp\n%s\n", spew.Sdump(gotIDList), spew.Sdump(expIDList))
}
// Check Date list
expDateList := []storage.BinaryObject{o1}
gotDateList, err := is.List("date", "", 0, 100)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(gotDateList, expDateList) {
t.Errorf("unexpected object list by Date:\ngot\n%s\nexp\n%s\n", spew.Sdump(gotDateList), spew.Sdump(expDateList))
}

// Create second object, using put
o2 := &object{
ID: "2",
Value: "obj2",
Date: time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC),
}
if err := is.Put(o2); err != nil {
t.Fatal(err)
}
if err := is.Create(o2); err != storage.ErrObjectExists {
t.Fatal("expected ErrObjectExists creating object2 got", err)
}
// Check o2
got2, err := is.Get("2")
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(got2, o2) {
t.Errorf("unexpected object 2 retrieved:\ngot\n%s\nexp\n%s\n", spew.Sdump(got2), spew.Sdump(o2))
}
// Check ID list
expIDList = []storage.BinaryObject{o1, o2}
gotIDList, err = is.List("id", "", 0, 100)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(gotIDList, expIDList) {
t.Errorf("unexpected object list by ID:\ngot\n%s\nexp\n%s\n", spew.Sdump(gotIDList), spew.Sdump(expIDList))
}
// Check Date list
expDateList = []storage.BinaryObject{o2, o1}
gotDateList, err = is.List("date", "", 0, 100)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(gotDateList, expDateList) {
t.Errorf("unexpected object list by Date:\ngot\n%s\nexp\n%s\n", spew.Sdump(gotDateList), spew.Sdump(expDateList))
}

// Modify objects
o1.Value = "modified obj1"
is.Replace(o1)
o2.Date = time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)
is.Put(o2)

// Check o1
got1, err = is.Get("1")
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(got1, o1) {
t.Errorf("unexpected object 1 retrieved after modification:\ngot\n%s\nexp\n%s\n", spew.Sdump(got1), spew.Sdump(o1))
}

// Check o2
got2, err = is.Get("2")
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(got2, o2) {
t.Errorf("unexpected object 2 retrieved after modification:\ngot\n%s\nexp\n%s\n", spew.Sdump(got2), spew.Sdump(o2))
}

// Check ID list
expIDList = []storage.BinaryObject{o1, o2}
gotIDList, err = is.List("id", "", 0, 100)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(gotIDList, expIDList) {
t.Errorf("unexpected object list by ID after modification:\ngot\n%s\nexp\n%s\n", spew.Sdump(gotIDList), spew.Sdump(expIDList))
}
// Check Date list
expDateList = []storage.BinaryObject{o1, o2}
gotDateList, err = is.List("date", "", 0, 100)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(gotDateList, expDateList) {
t.Errorf("unexpected object list by Date after modification:\ngot\n%s\nexp\n%s\n", spew.Sdump(gotDateList), spew.Sdump(expDateList))
}

// Delete object 2
if err := is.Delete("2"); err != nil {
t.Fatal(err)
}

// Check o2
if _, err := is.Get("2"); err != storage.ErrNoObjectExists {
t.Error("expected ErrNoObjectExists for delete object 2, got:", err)
}

// Check ID list
expIDList = []storage.BinaryObject{o1}
gotIDList, err = is.List("id", "", 0, 100)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(gotIDList, expIDList) {
t.Errorf("unexpected object list by ID after modification:\ngot\n%s\nexp\n%s\n", spew.Sdump(gotIDList), spew.Sdump(expIDList))
}
// Check Date list
expDateList = []storage.BinaryObject{o1}
gotDateList, err = is.List("date", "", 0, 100)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(gotDateList, expDateList) {
t.Errorf("unexpected object list by Date after modification:\ngot\n%s\nexp\n%s\n", spew.Sdump(gotDateList), spew.Sdump(expDateList))
}

// Try to replace non existent object
o3 := &object{
ID: "3",
Value: "obj3",
Date: time.Date(2016, 1, 1, 0, 0, 0, 0, time.UTC),
}
if err := is.Replace(o3); err != storage.ErrNoObjectExists {
t.Error("expected error replacing non existent object, got:", err)
}
})
}
}
9 changes: 9 additions & 0 deletions services/storage/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storage

import (
"fmt"
"sort"
"strings"
"sync"
)
Expand Down Expand Up @@ -63,6 +64,12 @@ func (s *MemStore) Exists(key string) (bool, error) {
return ok, nil
}

type keySortedKVs []*KeyValue

func (s keySortedKVs) Len() int { return len(s) }
func (s keySortedKVs) Less(i int, j int) bool { return s[i].Key < s[j].Key }
func (s keySortedKVs) Swap(i int, j int) { s[i], s[j] = s[j], s[i] }

func (s *MemStore) List(prefix string) ([]*KeyValue, error) {
s.mu.Lock()
kvs := make([]*KeyValue, 0, len(s.store))
Expand All @@ -72,6 +79,7 @@ func (s *MemStore) List(prefix string) ([]*KeyValue, error) {
}
}
s.mu.Unlock()
sort.Sort(keySortedKVs(kvs))
return kvs, nil
}

Expand Down Expand Up @@ -130,6 +138,7 @@ func (t *memTx) List(prefix string) ([]*KeyValue, error) {
kvs = append(kvs, &KeyValue{Key: k, Value: v})
}
}
sort.Sort(keySortedKVs(kvs))
return kvs, nil
}

Expand Down
Loading

0 comments on commit 240792c

Please sign in to comment.