diff --git a/kv/index.go b/kv/index.go new file mode 100644 index 00000000000..59b73fb79ce --- /dev/null +++ b/kv/index.go @@ -0,0 +1,570 @@ +package kv + +import ( + "bytes" + "context" + "errors" + "fmt" +) + +const ( + defaultPopulateBatchSize = 100 +) + +// Index is used to define and manage an index for a source bucket. +// +// When using the index you must provide it with an IndexMapping. +// The IndexMapping provides the index with the contract it needs to populate +// the entire index and traverse a populated index correctly. +// The IndexMapping provides a way to retrieve the key on which to index with +// when provided with the value from the source. +// It also provides the way to access the source bucket. +// +// The following is an illustration of its use: +// +// byUserID := func(v []byte) ([]byte, error) { +// auth := &influxdb.Authorization{} +// +// if err := json.Unmarshal(v, auth); err != nil { +// return err +// } +// +// return auth.UserID.Encode() +// } +// +// // configure a write only index +// indexByUser := NewIndex(NewSource([]byte(`authorizationsbyuserv1/), byUserID)) +// +// indexByUser.Insert(tx, someUserID, someAuthID) +// +// indexByUser.Delete(tx, someUserID, someAuthID) +// +// indexByUser.Walk(tx, someUserID, func(k, v []byte) error { +// auth := &influxdb.Authorization{} +// if err := json.Unmarshal(v, auth); err != nil { +// return err +// } +// +// // do something with auth +// +// return nil +// }) +// +// // populate entire index from source +// indexedCount, err := indexByUser.Populate(ctx, store) +// +// // verify the current index against the source and return the differences +// // found in each +// diff, err := indexByUser.Verify(ctx, tx) +type Index struct { + IndexMapping + + // populateBatchSize configures the size of the batch used for insertion + populateBatchSize int + // canRead configures whether or not Walk accesses the index at all + // or skips the index altogether and returns nothing. + // This is used when you want to integrate only the write path before + // releasing the read path. + canRead bool +} + +// IndexOption is a function which configures an index +type IndexOption func(*Index) + +// WithIndexReadPathEnabled enables the read paths of the index (Walk) +// This should be enabled once the index has been fully populated and +// the Insert and Delete paths are correctly integrated. +func WithIndexReadPathEnabled(i *Index) { + i.canRead = true +} + +// WithIndexPopulateBatchSize configures the size of each batch +// used when fully populating an index. (number of puts per tx) +func WithIndexPopulateBatchSize(n int) IndexOption { + return func(i *Index) { + i.populateBatchSize = n + } +} + +// IndexMapping is a type which configures and Index to map items +// from a source bucket to an index bucket via a mapping known as +// IndexSourceOn. This function is called on the values in the source +// to derive the foreign key on which to index each item. +type IndexMapping interface { + SourceBucket() []byte + IndexBucket() []byte + IndexSourceOn(value []byte) (foreignKey []byte, err error) +} + +// IndexSourceOnFunc is a function which can be used to derive the foreign key +// of a value in a source bucket. +type IndexSourceOnFunc func([]byte) ([]byte, error) + +type indexMapping struct { + source []byte + index []byte + fn IndexSourceOnFunc +} + +func (i indexMapping) SourceBucket() []byte { return i.source } + +func (i indexMapping) IndexBucket() []byte { return i.index } + +func (i indexMapping) IndexSourceOn(v []byte) ([]byte, error) { + return i.fn(v) +} + +// NewIndexMapping creates an implementation of IndexMapping for the provided source bucket +// to a destination index bucket. +func NewIndexMapping(sourceBucket, indexBucket []byte, fn IndexSourceOnFunc) IndexMapping { + return indexMapping{ + source: sourceBucket, + index: indexBucket, + fn: fn, + } +} + +// NewIndex configures and returns a new *Index for a given index mapping. +// By default the read path (Walk) is disabled. This is because the index needs to +// be fully populated before depending upon the read path. +// The read path can be enabled using WithIndexReadPathEnabled option. +func NewIndex(mapping IndexMapping, opts ...IndexOption) *Index { + index := &Index{ + IndexMapping: mapping, + populateBatchSize: defaultPopulateBatchSize, + } + + for _, opt := range opts { + opt(index) + } + + return index +} + +// Initialize creates the bucket if it does not already exist. +func (i *Index) Initialize(ctx context.Context, store Store) error { + return store.Update(ctx, func(tx Tx) error { + // create bucket if not exist + _, err := tx.Bucket(i.IndexBucket()) + return err + }) +} + +func (i *Index) indexBucket(tx Tx) (Bucket, error) { + return tx.Bucket(i.IndexBucket()) +} + +func (i *Index) sourceBucket(tx Tx) (Bucket, error) { + return tx.Bucket(i.SourceBucket()) +} + +func indexKey(foreignKey, primaryKey []byte) (newKey []byte) { + newKey = make([]byte, len(primaryKey)+len(foreignKey)+1) + copy(newKey, foreignKey) + newKey[len(foreignKey)] = '/' + copy(newKey[len(foreignKey)+1:], primaryKey) + + return +} + +func indexKeyParts(indexKey []byte) (fk, pk []byte, err error) { + // this function is called with items missing in index + parts := bytes.SplitN(indexKey, []byte("/"), 2) + if len(parts) < 2 { + return nil, nil, errors.New("malformed index key") + } + + // parts are fk/pk + fk, pk = parts[0], parts[1] + + return +} + +// Insert creates a single index entry for the provided primary key on the foreign key. +func (i *Index) Insert(tx Tx, foreignKey, primaryKey []byte) error { + bkt, err := i.indexBucket(tx) + if err != nil { + return err + } + + return bkt.Put(indexKey(foreignKey, primaryKey), primaryKey) +} + +// Delete removes the foreignKey and primaryKey mapping from the underlying index. +func (i *Index) Delete(tx Tx, foreignKey, primaryKey []byte) error { + bkt, err := i.indexBucket(tx) + if err != nil { + return err + } + + return bkt.Delete(indexKey(foreignKey, primaryKey)) +} + +// Walk walks the source bucket using keys found in the index using the provided foreign key +// given the index has been fully populated. +func (i *Index) Walk(tx Tx, foreignKey []byte, visitFn VisitFunc) error { + // skip walking if configured to do so as the index + // is currently being used purely to write the index + if !i.canRead { + return nil + } + + sourceBucket, err := i.sourceBucket(tx) + if err != nil { + return err + } + + indexBucket, err := i.indexBucket(tx) + if err != nil { + return err + } + + cursor, err := indexBucket.ForwardCursor(foreignKey, + WithCursorPrefix(foreignKey)) + if err != nil { + return err + } + + return indexWalk(cursor, sourceBucket, visitFn) +} + +// PopulateConfig configures a call to Populate +type PopulateConfig struct { + RemoveDanglingForeignKeys bool +} + +// PopulateOption is a functional option for the Populate call +type PopulateOption func(*PopulateConfig) + +// WithPopulateRemoveDanglingForeignKeys removes index entries which point to +// missing items in the source bucket. +func WithPopulateRemoveDanglingForeignKeys(c *PopulateConfig) { + c.RemoveDanglingForeignKeys = true +} + +// Populate does a full population of the index using the IndexSourceOn IndexMapping function. +// Once completed it marks the index as ready for use. +// It return a nil error on success and the count of inserted items. +func (i *Index) Populate(ctx context.Context, store Store, opts ...PopulateOption) (n int, err error) { + var config PopulateConfig + + for _, opt := range opts { + opt(&config) + } + + // verify the index to derive missing index + // we can skip missing source lookup as we're + // only interested in populating the missing index + diff, err := i.verify(ctx, store, config.RemoveDanglingForeignKeys) + if err != nil { + return 0, fmt.Errorf("looking up missing indexes: %w", err) + } + + flush := func(batch kvSlice) error { + if len(batch) == 0 { + return nil + } + + if err := store.Update(ctx, func(tx Tx) error { + indexBucket, err := i.indexBucket(tx) + if err != nil { + return err + } + + for _, pair := range batch { + // insert missing item into index + if err := indexBucket.Put(pair[0], pair[1]); err != nil { + return err + } + + n++ + } + + return nil + }); err != nil { + return fmt.Errorf("updating index: %w", err) + } + + return nil + } + + var batch kvSlice + + for fk, fkm := range diff.MissingFromIndex { + for pk := range fkm { + batch = append(batch, [2][]byte{indexKey([]byte(fk), []byte(pk)), []byte(pk)}) + + if len(batch) >= i.populateBatchSize { + if err := flush(batch); err != nil { + return n, err + } + + batch = batch[:0] + } + } + } + + if err := flush(batch); err != nil { + return n, err + } + + if config.RemoveDanglingForeignKeys { + return n, i.remove(ctx, store, diff.MissingFromSource) + } + + return n, nil +} + +// DeleteAll removes the entire index in batches +func (i *Index) DeleteAll(ctx context.Context, store Store) error { + diff, err := i.verify(ctx, store, true) + if err != nil { + return err + } + + for k, v := range diff.MissingFromSource { + if fkm, ok := diff.PresentInIndex[k]; ok { + for pk := range v { + fkm[pk] = struct{}{} + } + continue + } + + diff.PresentInIndex[k] = v + } + + return i.remove(ctx, store, diff.PresentInIndex) +} + +func (i *Index) remove(ctx context.Context, store Store, mappings map[string]map[string]struct{}) error { + var ( + batch [][]byte + flush = func(batch [][]byte) error { + if len(batch) == 0 { + return nil + } + + if err := store.Update(ctx, func(tx Tx) error { + indexBucket, err := i.indexBucket(tx) + if err != nil { + return err + } + + for _, indexKey := range batch { + // delete dangling foreign key + if err := indexBucket.Delete(indexKey); err != nil { + return err + } + } + + return nil + }); err != nil { + return fmt.Errorf("removing dangling foreign keys: %w", err) + } + + return nil + } + ) + + for fk, fkm := range mappings { + for pk := range fkm { + batch = append(batch, indexKey([]byte(fk), []byte(pk))) + + if len(batch) >= i.populateBatchSize { + if err := flush(batch); err != nil { + return err + } + + batch = batch[:0] + } + } + } + + return flush(batch) +} + +// IndexDiff contains a set of items present in the source not in index, +// along with a set of things in the index which are not in the source. +type IndexDiff struct { + // PresentInIndex is a map of foreign key to primary keys + // present in the index. + PresentInIndex map[string]map[string]struct{} + // MissingFromIndex is a map of foreign key to associated primary keys + // missing from the index given the source bucket. + // These items could be due to the fact an index populate migration has + // not yet occured, the index populate code is incorrect or the write path + // for your resource type does not yet insert into the index as well (Create actions). + MissingFromIndex map[string]map[string]struct{} + // MissingFromSource is a map of foreign key to associated primary keys + // missing from the source but accounted for in the index. + // This happens when index items are not properly removed from the index + // when an item is removed from the source (Delete actions). + MissingFromSource map[string]map[string]struct{} +} + +func (i *IndexDiff) addMissingSource(fk, pk []byte) { + if i.MissingFromSource == nil { + i.MissingFromSource = map[string]map[string]struct{}{} + } + + if _, ok := i.MissingFromSource[string(fk)]; !ok { + i.MissingFromSource[string(fk)] = map[string]struct{}{} + } + + i.MissingFromSource[string(fk)][string(pk)] = struct{}{} +} + +func (i *IndexDiff) addMissingIndex(fk, pk []byte) { + if i.MissingFromIndex == nil { + i.MissingFromIndex = map[string]map[string]struct{}{} + } + + if _, ok := i.MissingFromIndex[string(fk)]; !ok { + i.MissingFromIndex[string(fk)] = map[string]struct{}{} + } + + i.MissingFromIndex[string(fk)][string(pk)] = struct{}{} +} + +// Corrupt returns a list of foreign keys which have corrupted indexes (partial) +// These are foreign keys which map to a subset of the primary keys which they should +// be associated with. +func (i *IndexDiff) Corrupt() (corrupt []string) { + for fk := range i.MissingFromIndex { + if _, ok := i.PresentInIndex[fk]; ok { + corrupt = append(corrupt, fk) + } + } + return +} + +// Verify returns the difference between a source and its index +// The difference contains items in the source that are not in the index +// and vice-versa. +func (i *Index) Verify(ctx context.Context, store Store) (diff IndexDiff, err error) { + return i.verify(ctx, store, true) +} + +func (i *Index) verify(ctx context.Context, store Store, includeMissingSource bool) (diff IndexDiff, err error) { + diff.PresentInIndex, err = i.readEntireIndex(ctx, store) + if err != nil { + return diff, err + } + + sourceKVs, err := consumeBucket(ctx, store, i.sourceBucket) + if err != nil { + return diff, err + } + + // pks is a map of primary keys in source + pks := map[string]struct{}{} + + // look for items missing from index + for _, kv := range sourceKVs { + pk, v := kv[0], kv[1] + + if includeMissingSource { + // this is only useful for missing source + pks[string(pk)] = struct{}{} + } + + fk, err := i.IndexSourceOn(v) + if err != nil { + return diff, err + } + + fkm, ok := diff.PresentInIndex[string(fk)] + if ok { + _, ok = fkm[string(pk)] + } + + if !ok { + diff.addMissingIndex(fk, pk) + } + } + + if includeMissingSource { + // look for items missing from source + for fk, fkm := range diff.PresentInIndex { + for pk := range fkm { + if _, ok := pks[pk]; !ok { + diff.addMissingSource([]byte(fk), []byte(pk)) + } + } + } + } + + return +} + +// indexWalk consumes the indexKey and primaryKey pairs in the index bucket and looks up their +// associated primaryKey's value in the provided source bucket. +// When an item is located in the source, the provided visit function is called with primary key and associated value. +func indexWalk(indexCursor ForwardCursor, sourceBucket Bucket, visit VisitFunc) (err error) { + defer func() { + if cerr := indexCursor.Close(); cerr != nil && err == nil { + err = cerr + } + }() + + for ik, pk := indexCursor.Next(); ik != nil; ik, pk = indexCursor.Next() { + v, err := sourceBucket.Get(pk) + if err != nil { + return err + } + + if err := visit(pk, v); err != nil { + return fmt.Errorf("for index entry %q: %w", string(ik), err) + } + } + + return indexCursor.Err() +} + +// readEntireIndex returns the entire current state of the index +func (i *Index) readEntireIndex(ctx context.Context, store Store) (map[string]map[string]struct{}, error) { + kvs, err := consumeBucket(ctx, store, i.indexBucket) + if err != nil { + return nil, err + } + + index := map[string]map[string]struct{}{} + for _, kv := range kvs { + fk, pk, err := indexKeyParts(kv[0]) + if err != nil { + return nil, err + } + + if fkm, ok := index[string(fk)]; ok { + fkm[string(pk)] = struct{}{} + continue + } + + index[string(fk)] = map[string]struct{}{string(pk): struct{}{}} + } + + return index, nil +} + +type kvSlice [][2][]byte + +// consumeBucket consumes the entire k/v space for the provided bucket function +// applied to the provided store +func consumeBucket(ctx context.Context, store Store, fn func(tx Tx) (Bucket, error)) (kvs kvSlice, err error) { + return kvs, store.View(ctx, func(tx Tx) error { + bkt, err := fn(tx) + if err != nil { + return err + } + + cursor, err := bkt.ForwardCursor(nil) + if err != nil { + return err + } + + return WalkCursor(ctx, cursor, func(k, v []byte) error { + kvs = append(kvs, [2][]byte{k, v}) + return nil + }) + }) +} diff --git a/kv/index_test.go b/kv/index_test.go new file mode 100644 index 00000000000..2bbf11cc51b --- /dev/null +++ b/kv/index_test.go @@ -0,0 +1,22 @@ +package kv_test + +import ( + "testing" + + "github.com/influxdata/influxdb/inmem" + influxdbtesting "github.com/influxdata/influxdb/testing" +) + +func Test_Inmem_Index(t *testing.T) { + influxdbtesting.TestIndex(t, inmem.NewKVStore()) +} + +func Test_Bolt_Index(t *testing.T) { + s, closeBolt, err := NewTestBoltStore(t) + if err != nil { + t.Fatalf("failed to create new kv store: %v", err) + } + defer closeBolt() + + influxdbtesting.TestIndex(t, s) +} diff --git a/kv/indexer.go b/kv/indexer.go deleted file mode 100644 index c986e3ab8a8..00000000000 --- a/kv/indexer.go +++ /dev/null @@ -1,83 +0,0 @@ -package kv - -import ( - "context" - "sync" - - "go.uber.org/zap" -) - -type kvIndexer struct { - log *zap.Logger - kv Store - ctx context.Context - cancel context.CancelFunc - indexChan chan indexBatch - finished chan struct{} - oncer sync.Once -} - -type indexBatch struct { - bucketName []byte - keys [][]byte -} - -func NewIndexer(log *zap.Logger, kv Store) *kvIndexer { - ctx, cancel := context.WithCancel(context.Background()) - i := &kvIndexer{ - log: log, - kv: kv, - ctx: ctx, - cancel: cancel, - indexChan: make(chan indexBatch, 10), - finished: make(chan struct{}), - } - - go i.workIndexes() - return i -} - -func (i *kvIndexer) AddToIndex(bucketName []byte, keys [][]byte) { - // check for close - select { - case <-i.ctx.Done(): - return - case i.indexChan <- indexBatch{bucketName, keys}: - } -} - -func (i *kvIndexer) workIndexes() { - defer close(i.finished) - for batch := range i.indexChan { - // open update tx - err := i.kv.Update(i.ctx, func(tx Tx) error { - // create a bucket for this batch - bucket, err := tx.Bucket(batch.bucketName) - if err != nil { - return err - } - // insert all the keys - for _, key := range batch.keys { - err := bucket.Put(key, nil) - if err != nil { - return err - } - } - return nil - }) - - if err != nil { - //only option is to log - i.log.Error("failed to update index bucket", zap.Error(err)) - } - } -} - -func (i *kvIndexer) Stop() { - i.cancel() - i.oncer.Do(func() { - close(i.indexChan) - }) - - <-i.finished -} diff --git a/kv/indexer_test.go b/kv/indexer_test.go deleted file mode 100644 index 2cf689dd4dd..00000000000 --- a/kv/indexer_test.go +++ /dev/null @@ -1,49 +0,0 @@ -package kv_test - -import ( - "context" - "testing" - - "github.com/influxdata/influxdb/inmem" - "github.com/influxdata/influxdb/kv" - "go.uber.org/zap/zaptest" -) - -func TestIndexer(t *testing.T) { - store := inmem.NewKVStore() - - indexer := kv.NewIndexer(zaptest.NewLogger(t), store) - indexes := [][]byte{ - []byte("1"), - []byte("2"), - []byte("3"), - []byte("4"), - } - indexer.AddToIndex([]byte("bucket"), indexes) - indexer.Stop() - - count := 0 - err := store.View(context.Background(), func(tx kv.Tx) error { - bucket, err := tx.Bucket([]byte("bucket")) - if err != nil { - t.Fatal(err) - } - cur, err := bucket.ForwardCursor(nil) - if err != nil { - t.Fatal(err) - } - for k, _ := cur.Next(); k != nil; k, _ = cur.Next() { - if string(k) != string(indexes[count]) { - t.Fatalf("failed to find correct index, found: %s, expected: %s", k, indexes[count]) - } - count++ - } - return nil - }) - if err != nil { - t.Fatal(err) - } - if count != 4 { - t.Fatal("failed to retrieve indexes") - } -} diff --git a/kv/service.go b/kv/service.go index 5c21bd9b3b6..233ac63cdf1 100644 --- a/kv/service.go +++ b/kv/service.go @@ -18,11 +18,6 @@ var ( _ influxdb.UserService = (*Service)(nil) ) -type indexer interface { - AddToIndex([]byte, [][]byte) - Stop() -} - // OpPrefix is the prefix for kv errors. const OpPrefix = "kv/" @@ -45,8 +40,6 @@ type Service struct { influxdb.TimeGenerator Hash Crypt - indexer indexer - checkStore *IndexStore endpointStore *IndexStore variableStore *IndexStore @@ -67,7 +60,6 @@ func NewService(log *zap.Logger, kv Store, configs ...ServiceConfig) *Service { checkStore: newCheckStore(), endpointStore: newEndpointStore(), variableStore: newVariableStore(), - indexer: NewIndexer(log, kv), } if len(configs) > 0 { @@ -183,10 +175,6 @@ func (s *Service) Initialize(ctx context.Context) error { } -func (s *Service) Stop() { - s.indexer.Stop() -} - // WithResourceLogger sets the resource audit logger for the service. func (s *Service) WithResourceLogger(audit resource.Logger) { s.audit = audit diff --git a/kv/store.go b/kv/store.go index bfd7ccec757..61efbf67549 100644 --- a/kv/store.go +++ b/kv/store.go @@ -199,3 +199,30 @@ func WithCursorSkipFirstItem() CursorOption { c.SkipFirst = true } } + +// VisitFunc is called for each k, v byte slice pair from the underlying source bucket +// which are found in the index bucket for a provided foreign key. +type VisitFunc func(k, v []byte) error + +// WalkCursor consumers the forward cursor call visit for each k/v pair found +func WalkCursor(ctx context.Context, cursor ForwardCursor, visit VisitFunc) (err error) { + defer func() { + if cerr := cursor.Close(); cerr != nil && err == nil { + err = cerr + } + }() + + for k, v := cursor.Next(); k != nil; k, v = cursor.Next() { + if err := visit(k, v); err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + } + + return cursor.Err() +} diff --git a/testing/index.go b/testing/index.go new file mode 100644 index 00000000000..12830549375 --- /dev/null +++ b/testing/index.go @@ -0,0 +1,387 @@ +package testing + +import ( + "context" + "encoding/json" + "fmt" + "reflect" + "sort" + "testing" + + "github.com/influxdata/influxdb/kv" +) + +const ( + someResourceBucket = "aresource" +) + +var ( + mapping = kv.NewIndexMapping([]byte(someResourceBucket), []byte("aresourcebyowneridv1"), func(body []byte) ([]byte, error) { + var resource someResource + if err := json.Unmarshal(body, &resource); err != nil { + return nil, err + } + + return []byte(resource.OwnerID), nil + }) +) + +type someResource struct { + ID string + OwnerID string +} + +type someResourceStore struct { + store kv.Store + ownerIDIndex *kv.Index +} + +func newSomeResourceStore(ctx context.Context, store kv.Store) (*someResourceStore, error) { + ownerIDIndex := kv.NewIndex(mapping) + if err := ownerIDIndex.Initialize(ctx, store); err != nil { + return nil, err + } + + return &someResourceStore{ + store: store, + ownerIDIndex: ownerIDIndex, + }, nil +} + +func (s *someResourceStore) FindByOwner(ctx context.Context, ownerID string) (resources []someResource, err error) { + err = s.store.View(ctx, func(tx kv.Tx) error { + return s.ownerIDIndex.Walk(tx, []byte(ownerID), func(k, v []byte) error { + var resource someResource + if err := json.Unmarshal(v, &resource); err != nil { + return err + } + + resources = append(resources, resource) + return nil + }) + }) + return +} + +func (s *someResourceStore) Create(ctx context.Context, resource someResource, index bool) error { + return s.store.Update(ctx, func(tx kv.Tx) error { + bkt, err := tx.Bucket(mapping.SourceBucket()) + if err != nil { + return err + } + + if index { + if err := s.ownerIDIndex.Insert(tx, []byte(resource.OwnerID), []byte(resource.ID)); err != nil { + return err + } + } + + data, err := json.Marshal(resource) + if err != nil { + return err + } + + return bkt.Put([]byte(resource.ID), data) + }) +} + +func newResource(id, owner string) someResource { + return someResource{ID: id, OwnerID: owner} +} + +func newNResources(n int) (resources []someResource) { + for i := 0; i < n; i++ { + var ( + id = fmt.Sprintf("resource %d", i) + owner = fmt.Sprintf("owner %d", i%5) + ) + resources = append(resources, newResource(id, owner)) + } + return +} + +func TestIndex(t *testing.T, store kv.Store) { + t.Run("Test_PopulateAndVerify", func(t *testing.T) { + testPopulateAndVerify(t, store) + }) + + t.Run("Test_Walk", func(t *testing.T) { + testWalk(t, store) + }) +} + +func testPopulateAndVerify(t *testing.T, store kv.Store) { + var ( + ctx = context.TODO() + resources = newNResources(20) + resourceStore, err = newSomeResourceStore(ctx, store) + ) + + // insert 20 resources, but only index the first half + for i, resource := range resources { + if err := resourceStore.Create(ctx, resource, i < len(resources)/2); err != nil { + t.Fatal(err) + } + } + + // check that the index is populated with only 10 items + var count int + store.View(ctx, func(tx kv.Tx) error { + kvs, err := allKVs(tx, mapping.IndexBucket()) + if err != nil { + return err + } + + count = len(kvs) + + return nil + }) + + if count > 10 { + t.Errorf("expected index to be empty, found %d items", count) + } + + // ensure verify identifies the 10 missing items from the index + diff, err := resourceStore.ownerIDIndex.Verify(ctx, store) + if err != nil { + t.Fatal(err) + } + + expected := kv.IndexDiff{ + PresentInIndex: map[string]map[string]struct{}{ + "owner 0": map[string]struct{}{"resource 0": struct{}{}, "resource 5": struct{}{}}, + "owner 1": map[string]struct{}{"resource 1": struct{}{}, "resource 6": struct{}{}}, + "owner 2": map[string]struct{}{"resource 2": struct{}{}, "resource 7": struct{}{}}, + "owner 3": map[string]struct{}{"resource 3": struct{}{}, "resource 8": struct{}{}}, + "owner 4": map[string]struct{}{"resource 4": struct{}{}, "resource 9": struct{}{}}, + }, + MissingFromIndex: map[string]map[string]struct{}{ + "owner 0": map[string]struct{}{"resource 10": struct{}{}, "resource 15": struct{}{}}, + "owner 1": map[string]struct{}{"resource 11": struct{}{}, "resource 16": struct{}{}}, + "owner 2": map[string]struct{}{"resource 12": struct{}{}, "resource 17": struct{}{}}, + "owner 3": map[string]struct{}{"resource 13": struct{}{}, "resource 18": struct{}{}}, + "owner 4": map[string]struct{}{"resource 14": struct{}{}, "resource 19": struct{}{}}, + }, + } + + if !reflect.DeepEqual(expected, diff) { + t.Errorf("expected %#v, found %#v", expected, diff) + } + + corrupt := diff.Corrupt() + sort.Strings(corrupt) + + if expected := []string{ + "owner 0", + "owner 1", + "owner 2", + "owner 3", + "owner 4", + }; !reflect.DeepEqual(expected, corrupt) { + t.Errorf("expected %#v, found %#v\n", expected, corrupt) + } + + // populate the missing indexes + count, err = resourceStore.ownerIDIndex.Populate(ctx, store) + if err != nil { + t.Errorf("unexpected err %v", err) + } + + // ensure only 10 items were reported as being indexed + if count != 10 { + t.Errorf("expected to index 20 items, instead indexed %d items", count) + } + + // check the contents of the index + var allKvs [][2][]byte + store.View(ctx, func(tx kv.Tx) (err error) { + allKvs, err = allKVs(tx, mapping.IndexBucket()) + return + }) + + if expected := [][2][]byte{ + [2][]byte{[]byte("owner 0/resource 0"), []byte("resource 0")}, + [2][]byte{[]byte("owner 0/resource 10"), []byte("resource 10")}, + [2][]byte{[]byte("owner 0/resource 15"), []byte("resource 15")}, + [2][]byte{[]byte("owner 0/resource 5"), []byte("resource 5")}, + [2][]byte{[]byte("owner 1/resource 1"), []byte("resource 1")}, + [2][]byte{[]byte("owner 1/resource 11"), []byte("resource 11")}, + [2][]byte{[]byte("owner 1/resource 16"), []byte("resource 16")}, + [2][]byte{[]byte("owner 1/resource 6"), []byte("resource 6")}, + [2][]byte{[]byte("owner 2/resource 12"), []byte("resource 12")}, + [2][]byte{[]byte("owner 2/resource 17"), []byte("resource 17")}, + [2][]byte{[]byte("owner 2/resource 2"), []byte("resource 2")}, + [2][]byte{[]byte("owner 2/resource 7"), []byte("resource 7")}, + [2][]byte{[]byte("owner 3/resource 13"), []byte("resource 13")}, + [2][]byte{[]byte("owner 3/resource 18"), []byte("resource 18")}, + [2][]byte{[]byte("owner 3/resource 3"), []byte("resource 3")}, + [2][]byte{[]byte("owner 3/resource 8"), []byte("resource 8")}, + [2][]byte{[]byte("owner 4/resource 14"), []byte("resource 14")}, + [2][]byte{[]byte("owner 4/resource 19"), []byte("resource 19")}, + [2][]byte{[]byte("owner 4/resource 4"), []byte("resource 4")}, + [2][]byte{[]byte("owner 4/resource 9"), []byte("resource 9")}, + }; !reflect.DeepEqual(allKvs, expected) { + t.Errorf("expected %#v, found %#v", expected, allKvs) + } + + // remove the last 10 items from the source, but leave them in the index + store.Update(ctx, func(tx kv.Tx) error { + bkt, err := tx.Bucket(mapping.SourceBucket()) + if err != nil { + t.Fatal(err) + } + + for _, resource := range resources[10:] { + bkt.Delete([]byte(resource.ID)) + } + + return nil + }) + + // ensure verify identifies the last 10 items as missing from the source + diff, err = resourceStore.ownerIDIndex.Verify(ctx, store) + if err != nil { + t.Fatal(err) + } + + expected = kv.IndexDiff{ + PresentInIndex: map[string]map[string]struct{}{ + "owner 0": map[string]struct{}{"resource 0": struct{}{}, "resource 5": struct{}{}, "resource 10": struct{}{}, "resource 15": struct{}{}}, + "owner 1": map[string]struct{}{"resource 1": struct{}{}, "resource 6": struct{}{}, "resource 11": struct{}{}, "resource 16": struct{}{}}, + "owner 2": map[string]struct{}{"resource 2": struct{}{}, "resource 7": struct{}{}, "resource 12": struct{}{}, "resource 17": struct{}{}}, + "owner 3": map[string]struct{}{"resource 3": struct{}{}, "resource 8": struct{}{}, "resource 13": struct{}{}, "resource 18": struct{}{}}, + "owner 4": map[string]struct{}{"resource 4": struct{}{}, "resource 9": struct{}{}, "resource 14": struct{}{}, "resource 19": struct{}{}}, + }, + MissingFromSource: map[string]map[string]struct{}{ + "owner 0": map[string]struct{}{"resource 10": struct{}{}, "resource 15": struct{}{}}, + "owner 1": map[string]struct{}{"resource 11": struct{}{}, "resource 16": struct{}{}}, + "owner 2": map[string]struct{}{"resource 12": struct{}{}, "resource 17": struct{}{}}, + "owner 3": map[string]struct{}{"resource 13": struct{}{}, "resource 18": struct{}{}}, + "owner 4": map[string]struct{}{"resource 14": struct{}{}, "resource 19": struct{}{}}, + }, + } + if !reflect.DeepEqual(expected, diff) { + t.Errorf("expected %#v, found %#v", expected, diff) + } +} + +func testWalk(t *testing.T, store kv.Store) { + var ( + ctx = context.TODO() + resources = newNResources(20) + // configure resource store with read disabled + resourceStore, err = newSomeResourceStore(ctx, store) + + cases = []struct { + owner string + resources []someResource + }{ + { + owner: "owner 0", + resources: []someResource{ + newResource("resource 0", "owner 0"), + newResource("resource 10", "owner 0"), + newResource("resource 15", "owner 0"), + newResource("resource 5", "owner 0"), + }, + }, + { + owner: "owner 1", + resources: []someResource{ + newResource("resource 1", "owner 1"), + newResource("resource 11", "owner 1"), + newResource("resource 16", "owner 1"), + newResource("resource 6", "owner 1"), + }, + }, + { + owner: "owner 2", + resources: []someResource{ + newResource("resource 12", "owner 2"), + newResource("resource 17", "owner 2"), + newResource("resource 2", "owner 2"), + newResource("resource 7", "owner 2"), + }, + }, + { + owner: "owner 3", + resources: []someResource{ + newResource("resource 13", "owner 3"), + newResource("resource 18", "owner 3"), + newResource("resource 3", "owner 3"), + newResource("resource 8", "owner 3"), + }, + }, + { + owner: "owner 4", + resources: []someResource{ + newResource("resource 14", "owner 4"), + newResource("resource 19", "owner 4"), + newResource("resource 4", "owner 4"), + newResource("resource 9", "owner 4"), + }, + }, + } + ) + + if err != nil { + t.Fatal(err) + } + + // insert all 20 resources with indexing enabled + for _, resource := range resources { + if err := resourceStore.Create(ctx, resource, true); err != nil { + t.Fatal(err) + } + } + + for _, testCase := range cases { + found, err := resourceStore.FindByOwner(ctx, testCase.owner) + if err != nil { + t.Fatal(err) + } + + // expect resources to be empty while read path disabled disabled + if len(found) > 0 { + t.Fatalf("expected %#v to be empty", found) + } + } + + // configure index read path enabled + kv.WithIndexReadPathEnabled(resourceStore.ownerIDIndex) + + for _, testCase := range cases { + found, err := resourceStore.FindByOwner(ctx, testCase.owner) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(found, testCase.resources) { + t.Errorf("expected %#v, found %#v", testCase.resources, found) + } + } +} + +func allKVs(tx kv.Tx, bucket []byte) (kvs [][2][]byte, err error) { + idx, err := tx.Bucket(mapping.IndexBucket()) + if err != nil { + return + } + + cursor, err := idx.ForwardCursor(nil) + if err != nil { + return + } + + defer func() { + if cerr := cursor.Close(); cerr != nil && err == nil { + err = cerr + } + }() + + for k, v := cursor.Next(); k != nil; k, v = cursor.Next() { + kvs = append(kvs, [2][]byte{k, v}) + } + + return kvs, cursor.Err() +}