Skip to content

Commit

Permalink
added typer to store
Browse files Browse the repository at this point in the history
  • Loading branch information
paynejacob committed Oct 1, 2021
1 parent afd06f5 commit e2cdad8
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 71 deletions.
48 changes: 0 additions & 48 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions store/crd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (f *Factory) AssignStores(ctx context.Context, storageContext types.Storage

schema.Store = proxy.NewProxyStore(ctx, f.ClientGetter,
storageContext,
nil,
[]string{"apis"},
crd.Spec.Group,
// Even if CRD is created as v1beta1, it's served as v1 with a single element in Versions
Expand Down
104 changes: 82 additions & 22 deletions store/proxy/proxy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package proxy
import (
"context"
ejson "encoding/json"
"github.com/rancher/norman/types/mapper"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -85,6 +86,11 @@ func (s *simpleClientGetter) APIExtClient(apiContext *types.APIContext, context
return s.apiExtClient, nil
}

type StoreTyper interface {
runtime.ObjectConvertor
runtime.ObjectCreater
}

type Store struct {
sync.Mutex

Expand All @@ -98,10 +104,17 @@ type Store struct {
authContext map[string]string
close context.Context
broadcasters map[rest.Interface]*broadcast.Broadcaster
typer StoreTyper
}

func NewProxyStore(ctx context.Context, clientGetter ClientGetter, storageContext types.StorageContext,
func NewProxyStore(ctx context.Context, clientGetter ClientGetter, storageContext types.StorageContext, typer StoreTyper,
prefix []string, group, version, kind, resourcePlural string) types.Store {

// Default to an empty scheme, all types will default to generic
if typer == nil {
typer = runtime.NewScheme()
}

return &errorStore{
Store: &Store{
clientGetter: clientGetter,
Expand All @@ -117,6 +130,7 @@ func NewProxyStore(ctx context.Context, clientGetter ClientGetter, storageContex
},
close: ctx,
broadcasters: map[rest.Interface]*broadcast.Broadcaster{},
typer: typer,
},
}
}
Expand Down Expand Up @@ -201,16 +215,19 @@ func (s *Store) Context() types.StorageContext {
}

func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) ([]map[string]interface{}, error) {
var resultList unstructured.UnstructuredList
result := make([]map[string]interface{}, 0)

// if there are no namespaces field in options, a single request is made
if opt == nil || opt.Namespaces == nil {
ns := getNamespace(apiContext, opt)
list, err := s.retryList(ns, apiContext)
resultList := s.getListStruct()

err := s.retryList(ns, apiContext, resultList)
if err != nil {
return nil, err
}
resultList = *list

result = append(result, s.collectionFromInternal(resultList, apiContext, schema)...)
} else {
var (
errGroup errgroup.Group
Expand All @@ -221,13 +238,15 @@ func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *ty
for _, ns := range allNS {
nsCopy := ns
errGroup.Go(func() error {
list, err := s.retryList(nsCopy, apiContext)
resultList := s.getListStruct()

err := s.retryList(nsCopy, apiContext, resultList)
if err != nil {
return err
}

mux.Lock()
resultList.Items = append(resultList.Items, list.Items...)
result = append(result, s.collectionFromInternal(resultList, apiContext, schema)...)
mux.Unlock()

return nil
Expand All @@ -238,38 +257,30 @@ func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *ty
}
}

var result []map[string]interface{}

for _, obj := range resultList.Items {
result = append(result, s.fromInternal(apiContext, schema, obj.Object))
}

return apiContext.AccessControl.FilterList(apiContext, schema, result, s.authContext), nil
}

func (s *Store) retryList(namespace string, apiContext *types.APIContext) (*unstructured.UnstructuredList, error) {
var resultList *unstructured.UnstructuredList
func (s *Store) retryList(namespace string, apiContext *types.APIContext, resultList runtime.Object) error {
k8sClient, err := s.k8sClient(apiContext)
if err != nil {
return nil, err
return err
}

for i := 0; i < 3; i++ {
req := s.common(namespace, k8sClient.Get())
start := time.Now()
resultList = &unstructured.UnstructuredList{}
err = req.Do(apiContext.Request.Context()).Into(resultList)
logrus.Tracef("LIST: %v, %v", time.Now().Sub(start), s.resourcePlural)
if err != nil {
if i < 2 && strings.Contains(err.Error(), "Client.Timeout exceeded") {
logrus.Infof("Error on LIST %v: %v. Attempt: %v. Retrying", s.resourcePlural, err, i+1)
continue
}
return resultList, err
return err
}
return resultList, err
return err
}
return resultList, err
return err
}

func (s *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) (chan map[string]interface{}, error) {
Expand Down Expand Up @@ -559,12 +570,61 @@ func (s *Store) common(namespace string, req *rest.Request) *rest.Request {
}

func (s *Store) fromInternal(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}) map[string]interface{} {
var mappers types.Mappers

// add the schema mappers
mappers = append(mappers, schema.Mapper)

// if this is an export drop the status field
if apiContext.Option("export") == "true" {
delete(data, "status")
mappers = append(mappers, mapper.Drop{Field: "status"})
}
if schema.Mapper != nil {
schema.Mapper.FromInternal(data)

// if the drop option is set drop any fields given
dropFields := apiContext.Option("drop")
if dropFields != "" {
for _, field := range strings.Split(dropFields, ",") {
mappers = append(mappers, mapper.Drop{Field: field})
}
}

mappers.FromInternal(data)

return data
}

// getListStruct returns a runtime object for storing results from list requests. If the Store's scheme does not return
// a type for the resource associated with the store, a generic type will be used.
func (s *Store) getListStruct() runtime.Object {
// try to find the list type for this store
obj, err := s.typer.New(schema.GroupVersionKind{
Group: s.group,
Version: s.version,
Kind: s.kind + "List",
})

// if we cannot get the specific type default to a generic parser
if err != nil {
return new(unstructured.UnstructuredList)
}

return obj
}

// collectionFromInternal maps a collection runtime object to an array of maps.
func (s *Store) collectionFromInternal(list runtime.Object, apiContext *types.APIContext, schema *types.Schema) []map[string]interface{} {
var ul unstructured.UnstructuredList

// Since we are converting to the generic version of a collection any collection type will be convertable. If a non
//collection type is passed to this method the line below will return an error and the method will return an empty
//result set. There is no way to handle this error returned because it is only returned if this method is used
//improperly. We swallow the error here to prevent having to check it in the caller.
_ = s.typer.Convert(list, &ul, nil)

results := make([]map[string]interface{}, len(ul.Items))
for i := range ul.Items {
results[i] = s.fromInternal(apiContext, schema, ul.Items[i].Object)
}

return results
}
Loading

0 comments on commit e2cdad8

Please sign in to comment.