Skip to content

Commit

Permalink
fix(kv): remove URM and Authorizations by UserID Index (#16852)
Browse files Browse the repository at this point in the history
* Revert "fix(kv): Don't stop when key not found from index."

This reverts commit bd9167d.

* Revert "fix(kv): push down org ID to skip in delete URM (#16841)"

This reverts commit a5f508d.

* Revert "fix(kv): delete authorization from correct index bucket (#16835)"

This reverts commit 7349216.

* Revert "feat(kv): Index Authorizations by User ID (#16818)"

This reverts commit df36fe9.

* Revert "feat: add indexes to urm for user lookups (#16789)"

This reverts commit 9561d0a.
  • Loading branch information
GeorgeMac authored Feb 13, 2020
1 parent a174716 commit f239a2e
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 877 deletions.
13 changes: 4 additions & 9 deletions http/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"io/ioutil"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/influxdata/httprouter"
Expand Down Expand Up @@ -858,12 +857,12 @@ func TestService_handleDeleteAuthorization(t *testing.T) {

func initAuthorizationService(f platformtesting.AuthorizationFields, t *testing.T) (platform.AuthorizationService, string, func()) {
t.Helper()
if strings.Contains(t.Name(), "find_authorization_by_token") {
if t.Name() == "TestAuthorizationService_FindAuthorizations/find_authorization_by_token" {
/*
TODO(goller): need a secure way to communicate get
authorization by token string via headers or something
*/
t.Skipf("%s skipped because user tokens cannot be queried", t.Name())
t.Skip("TestAuthorizationService_FindAuthorizations/find_authorization_by_token skipped because user tokens cannot be queried")
}

if t.Name() == "TestAuthorizationService_CreateAuthorization/providing_a_non_existing_user_is_invalid" {
Expand Down Expand Up @@ -958,15 +957,11 @@ func TestAuthorizationService_FindAuthorizationByToken(t *testing.T) {
}

func TestAuthorizationService_FindAuthorizations(t *testing.T) {
// with pre-populated index
platformtesting.FindAuthorizations(initAuthorizationService, t, true)
// without pre-populated index
platformtesting.FindAuthorizations(initAuthorizationService, t, false)
platformtesting.FindAuthorizations(initAuthorizationService, t)
}

func TestAuthorizationService_DeleteAuthorization(t *testing.T) {
// without pre-populated index
platformtesting.DeleteAuthorization(initAuthorizationService, t, true)
platformtesting.DeleteAuthorization(initAuthorizationService, t)
}

func TestAuthorizationService_UpdateAuthorization(t *testing.T) {
Expand Down
242 changes: 26 additions & 216 deletions kv/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kv
import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/buger/jsonparser"
Expand All @@ -12,27 +11,19 @@ import (
)

var (
authBucket = []byte("authorizationsv1")
authIndex = []byte("authorizationindexv1")
authByUserIndex = []byte("authorizationbyuserindexv1")

_ influxdb.AuthorizationService = (*Service)(nil)

// ErrMissingUserFromFilter is returned when a lookup by user is performed
// but neither a user ID or user resource is provided
ErrMissingUserFromFilter = errors.New("no user parameter in filter")
authBucket = []byte("authorizationsv1")
authIndex = []byte("authorizationindexv1")
)

var _ influxdb.AuthorizationService = (*Service)(nil)

func (s *Service) initializeAuths(ctx context.Context, tx Tx) error {
if _, err := tx.Bucket(authBucket); err != nil {
return err
}
if _, err := authIndexBucket(tx); err != nil {
return err
}
if _, err := authByUserIndexBucket(tx); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -138,80 +129,6 @@ func (s *Service) findAuthorizationByToken(ctx context.Context, tx Tx, n string)
return s.findAuthorizationByID(ctx, tx, id)
}

func (s *Service) findAuthorizationsByUser(ctx context.Context, tx Tx, filter influxdb.AuthorizationFilter) (auths []*influxdb.Authorization, userID *influxdb.ID, err error) {
userID = filter.UserID
if userID == nil && filter.User != nil {
user, err := s.findUserByName(ctx, tx, *filter.User)
if err != nil {
return nil, nil, err
}

userID = &user.ID
}

if userID == nil {
err = ErrMissingUserFromFilter
return
}

var (
prefix = authByUserIndexPrefix(*userID)
filterFn = filterAuthorizationsFn(filter)
wrapInternal = func(err error) *influxdb.Error {
return &influxdb.Error{
Code: influxdb.EInternal,
Err: err,
}
}
)

bkt, err := tx.Bucket(authBucket)
if err != nil {
return nil, nil, err
}

idx, err := authByUserIndexBucket(tx)
if err != nil {
return nil, nil, err
}

// index scan
cursor, err := idx.ForwardCursor(prefix, WithCursorPrefix(prefix))
if err != nil {
return nil, nil, wrapInternal(err)
}

for k, v := cursor.Next(); k != nil && v != nil; k, v = cursor.Next() {
v, err := bkt.Get(v)
if err != nil {
return nil, nil, err
}

// preallocate Permissions to reduce multiple slice re-allocations
a := &influxdb.Authorization{
Permissions: make([]influxdb.Permission, 64),
}

if err := decodeAuthorization(v, a); err != nil {
return nil, nil, err
}

if filterFn(a) {
auths = append(auths, a)
}
}

if err := cursor.Err(); err != nil {
return nil, nil, wrapInternal(err)
}

if err := cursor.Close(); err != nil {
return nil, nil, wrapInternal(err)
}

return
}

func authorizationsPredicateFn(f influxdb.AuthorizationFilter) CursorPredicateFunc {
// if any errors occur reading the JSON data, the predicate will always return true
// to ensure the value is included and handled higher up.
Expand Down Expand Up @@ -326,42 +243,14 @@ func (s *Service) FindAuthorizations(ctx context.Context, filter influxdb.Author
return []*influxdb.Authorization{a}, 1, nil
}

var (
auths []*influxdb.Authorization
err error
findOptions []findOption
)

if filter.UserID != nil || filter.User != nil {
var userID *influxdb.ID

// attempt index lookup
if err := s.kv.View(ctx, func(tx Tx) error {
auths, userID, err = s.findAuthorizationsByUser(ctx, tx, filter)
as := []*influxdb.Authorization{}
err := s.kv.View(ctx, func(tx Tx) error {
auths, err := s.findAuthorizations(ctx, tx, filter)
if err != nil {
return err
}); err != nil {
return nil, 0, &influxdb.Error{
Err: err,
}
}

if len(auths) > 0 {
return auths, len(auths), nil
}

// when found using full keyspace scan then publish authorization
// to indexer
findOptions = append(findOptions, withVisitFunc(func(a *influxdb.Authorization) {
id, _ := a.ID.Encode()
s.indexer.AddToIndex(authByUserIndex, map[string][]byte{
authByUserIndexKey(*userID, a.ID): id,
})
}))
}

err = s.kv.View(ctx, func(tx Tx) error {
auths, err = s.findAuthorizations(ctx, tx, filter, findOptions...)
return err
as = auths
return nil
})

if err != nil {
Expand All @@ -370,34 +259,10 @@ func (s *Service) FindAuthorizations(ctx context.Context, filter influxdb.Author
}
}

return auths, len(auths), nil
}

type findConfig struct {
visit func(*influxdb.Authorization)
}

func newFindConfig(opts ...findOption) findConfig {
config := findConfig{
visit: func(*influxdb.Authorization) {},
}

for _, opt := range opts {
opt(&config)
}

return config
return as, len(as), nil
}

type findOption func(*findConfig)

func withVisitFunc(fn func(*influxdb.Authorization)) findOption {
return func(c *findConfig) {
c.visit = fn
}
}

func (s *Service) findAuthorizations(ctx context.Context, tx Tx, f influxdb.AuthorizationFilter, opts ...findOption) ([]*influxdb.Authorization, error) {
func (s *Service) findAuthorizations(ctx context.Context, tx Tx, f influxdb.AuthorizationFilter) ([]*influxdb.Authorization, error) {
// If the users name was provided, look up user by ID first
if f.User != nil {
u, err := s.findUserByName(ctx, tx, *f.User)
Expand All @@ -415,23 +280,20 @@ func (s *Service) findAuthorizations(ctx context.Context, tx Tx, f influxdb.Auth
f.OrgID = &o.ID
}

var (
conf = newFindConfig(opts...)
as []*influxdb.Authorization
pred = authorizationsPredicateFn(f)
filterFn = filterAuthorizationsFn(f)
err = s.forEachAuthorization(ctx, tx, pred, func(a *influxdb.Authorization) bool {
if filterFn(a) {
// visit using find config visit func
conf.visit(a)
// append to resulting slice
as = append(as, a)
}
return true
})
)
var as []*influxdb.Authorization
pred := authorizationsPredicateFn(f)
filterFn := filterAuthorizationsFn(f)
err := s.forEachAuthorization(ctx, tx, pred, func(a *influxdb.Authorization) bool {
if filterFn(a) {
as = append(as, a)
}
return true
})
if err != nil {
return nil, err
}

return as, err
return as, nil
}

// CreateAuthorization creates a influxdb authorization and sets b.ID, and b.UserID if not provided.
Expand Down Expand Up @@ -483,12 +345,6 @@ func (s *Service) createAuthorization(ctx context.Context, tx Tx, a *influxdb.Au
return nil
}

type authSkipIndexOnPutContextKey struct{}

func authDoSkipIndexOnPut(ctx context.Context) bool {
return ctx.Value(authSkipIndexOnPutContextKey{}) != nil
}

// PutAuthorization will put a authorization without setting an ID.
func (s *Service) PutAuthorization(ctx context.Context, a *influxdb.Authorization) error {
return s.kv.Update(ctx, func(tx Tx) error {
Expand Down Expand Up @@ -540,23 +396,6 @@ func (s *Service) putAuthorization(ctx context.Context, tx Tx, a *influxdb.Autho
}
}

// this should only be configurable via test package
// it is used to test behavior with empty caches
if !authDoSkipIndexOnPut(ctx) {
idx, err := authByUserIndexBucket(tx)
if err != nil {
return err
}

fk := authByUserIndexKey(a.UserID, a.ID)
if err := idx.Put([]byte(fk), encodedID); err != nil {
return &influxdb.Error{
Code: influxdb.EInternal,
Err: err,
}
}
}

b, err := tx.Bucket(authBucket)
if err != nil {
return err
Expand All @@ -575,16 +414,6 @@ func authIndexKey(n string) []byte {
return []byte(n)
}

func authByUserIndexKey(userID, authID influxdb.ID) string {
id, _ := authID.Encode()
return string(append(authByUserIndexPrefix(userID), id...))
}

func authByUserIndexPrefix(userID influxdb.ID) []byte {
id, _ := userID.Encode()
return append(id, '/')
}

func decodeAuthorization(b []byte, a *influxdb.Authorization) error {
if err := json.Unmarshal(b, a); err != nil {
return err
Expand Down Expand Up @@ -652,16 +481,6 @@ func (s *Service) deleteAuthorization(ctx context.Context, tx Tx, id influxdb.ID
Err: err,
}
}

uidx, err := authByUserIndexBucket(tx)
if err != nil {
return &influxdb.Error{Err: err}
}

if err := uidx.Delete([]byte(authByUserIndexKey(a.UserID, id))); err != nil {
return err
}

encodedID, err := id.Encode()
if err != nil {
return &influxdb.Error{
Expand Down Expand Up @@ -717,16 +536,7 @@ func (s *Service) updateAuthorization(ctx context.Context, tx Tx, id influxdb.ID
}

func authIndexBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(authIndex)
if err != nil {
return nil, UnexpectedAuthIndexError(err)
}

return b, nil
}

func authByUserIndexBucket(tx Tx) (Bucket, error) {
b, err := tx.Bucket(authByUserIndex)
b, err := tx.Bucket([]byte(authIndex))
if err != nil {
return nil, UnexpectedAuthIndexError(err)
}
Expand Down
Loading

0 comments on commit f239a2e

Please sign in to comment.