Skip to content

Commit

Permalink
feat: add indexes to urm for user lookups (#16789)
Browse files Browse the repository at this point in the history
  • Loading branch information
lyondhill authored Feb 11, 2020
1 parent b6d1cc6 commit 9561d0a
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 51 deletions.
83 changes: 48 additions & 35 deletions kv/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,71 +13,84 @@ type kvIndexer struct {
ctx context.Context
cancel context.CancelFunc
indexChan chan indexBatch
finished chan struct{}
oncer sync.Once
wg sync.WaitGroup
working chan struct{}
}

type indexBatch struct {
bucketName []byte
keys [][]byte
idxs map[string][]byte
}

func NewIndexer(log *zap.Logger, kv Store) *kvIndexer {
ctx, cancel := context.WithCancel(context.Background())
i := &kvIndexer{
return &kvIndexer{
log: log,
kv: kv,
ctx: ctx,
cancel: cancel,
indexChan: make(chan indexBatch, 10),
finished: make(chan struct{}),
working: make(chan struct{}, 1),
}

go i.workIndexes()
return i
}

func (i *kvIndexer) AddToIndex(bucketName []byte, keys [][]byte) {
func (i *kvIndexer) AddToIndex(bucketName []byte, idxs map[string][]byte) {
// check for close
select {
case <-i.ctx.Done():
return
case i.indexChan <- indexBatch{bucketName, keys}:
case i.indexChan <- indexBatch{bucketName, idxs}:
}

// add to the waitgroup and start the work process
select {
case i.working <- struct{}{}:
// i was able to insert i should start a worker
i.wg.Add(1)
go i.workIndexes()
default:
// we have reached our worker limit and we cannot start any more.
return
}
}

func (i *kvIndexer) workIndexes() {
defer close(i.finished)
for batch := range i.indexChan {
// open update tx
err := i.kv.Update(i.ctx, func(tx Tx) error {
// create a bucket for this batch
bucket, err := tx.Bucket(batch.bucketName)
if err != nil {
return err
}
// insert all the keys
for _, key := range batch.keys {
err := bucket.Put(key, nil)
// let the system know we have finished
defer i.wg.Done()
// releasee the worker hold so the system can start more later
defer func() { <-i.working }()

for {
select {
case batch := <-i.indexChan:
// open update tx
err := i.kv.Update(i.ctx, func(tx Tx) error {
// create a bucket for this batch
bucket, err := tx.Bucket(batch.bucketName)
if err != nil {
return err
}
}
return nil
})
// insert all the keys
for k, v := range batch.idxs {
err := bucket.Put([]byte(k), v)
if err != nil {
return err
}
}
return nil
})

if err != nil {
//only option is to log
i.log.Error("failed to update index bucket", zap.Error(err))
if err != nil {
//only option is to log
i.log.Error("failed to update index bucket", zap.Error(err))
}
default:
// we have finished working
return
}
}
}

func (i *kvIndexer) Stop() {
i.cancel()
i.oncer.Do(func() {
close(i.indexChan)
})

<-i.finished
func (i *kvIndexer) Wait() {
i.wg.Wait()
}
16 changes: 8 additions & 8 deletions kv/indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ func TestIndexer(t *testing.T) {
store := inmem.NewKVStore()

indexer := kv.NewIndexer(zaptest.NewLogger(t), store)
indexes := [][]byte{
[]byte("1"),
[]byte("2"),
[]byte("3"),
[]byte("4"),
indexes := map[string][]byte{
"1": []byte("1"),
"2": []byte("2"),
"3": []byte("3"),
"4": []byte("4"),
}
indexer.AddToIndex([]byte("bucket"), indexes)
indexer.Stop()
indexer.Wait()

count := 0
err := store.View(context.Background(), func(tx kv.Tx) error {
Expand All @@ -33,8 +33,8 @@ func TestIndexer(t *testing.T) {
t.Fatal(err)
}
for k, _ := cur.Next(); k != nil; k, _ = cur.Next() {
if string(k) != string(indexes[count]) {
t.Fatalf("failed to find correct index, found: %s, expected: %s", k, indexes[count])
if string(k) != string(indexes[string(k)]) {
t.Fatalf("failed to find correct index, found: %s, expected: %s", k, indexes[string(k)])
}
count++
}
Expand Down
7 changes: 1 addition & 6 deletions kv/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ var (
)

type indexer interface {
AddToIndex([]byte, [][]byte)
Stop()
AddToIndex([]byte, map[string][]byte)
}

// OpPrefix is the prefix for kv errors.
Expand Down Expand Up @@ -183,10 +182,6 @@ func (s *Service) Initialize(ctx context.Context) error {

}

func (s *Service) Stop() {
s.indexer.Stop()
}

// WithResourceLogger sets the resource audit logger for the service.
func (s *Service) WithResourceLogger(audit resource.Logger) {
s.audit = audit
Expand Down
Loading

0 comments on commit 9561d0a

Please sign in to comment.