Skip to content

Commit

Permalink
List and Get retry logic
Browse files Browse the repository at this point in the history
Some cloud provider k8s APIs have random timeouts on listing and getting.
This change introduces a retry on lists and gets and reduces the timeout
from 30s to 15s to mitigate this.
  • Loading branch information
Craig Jellick committed Nov 20, 2018
1 parent 5deb22f commit 03d294e
Showing 1 changed file with 51 additions and 21 deletions.
72 changes: 51 additions & 21 deletions store/proxy/proxy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func (s *Store) k8sClient(apiContext *types.APIContext) (rest.Interface, error)
}

func (s *Store) ByID(apiContext *types.APIContext, schema *types.Schema, id string) (map[string]interface{}, error) {
_, result, err := s.byID(apiContext, schema, id, true)
return result, err
}

func (s *Store) byID(apiContext *types.APIContext, schema *types.Schema, id string, retry bool) (string, map[string]interface{}, error) {
splitted := strings.Split(strings.TrimSpace(id), ":")
validID := false
namespaced := schema.Scope == types.NamespaceScope
Expand All @@ -150,25 +155,36 @@ func (s *Store) ByID(apiContext *types.APIContext, schema *types.Schema, id stri
validID = len(splitted) == 1 && len(strings.TrimSpace(splitted[0])) > 0
}
if !validID {
return nil, httperror.NewAPIError(httperror.NotFound, "failed to find resource by id")
return "", nil, httperror.NewAPIError(httperror.NotFound, "failed to find resource by id")
}

_, result, err := s.byID(apiContext, schema, id)
return result, err
}

func (s *Store) byID(apiContext *types.APIContext, schema *types.Schema, id string) (string, map[string]interface{}, error) {
namespace, id := splitID(id)

k8sClient, err := s.k8sClient(apiContext)
if err != nil {
return "", nil, err
}

req := s.common(namespace, k8sClient.Get()).
Name(id)
req := s.common(namespace, k8sClient.Get()).Name(id)
if !retry {
return s.singleResult(apiContext, schema, req)
}

return s.singleResult(apiContext, schema, req)
var version string
var data map[string]interface{}
for i := 0; i < 3; i++ {
req = s.common(namespace, k8sClient.Get()).Name(id)
version, data, err = s.singleResult(apiContext, schema, req)
if err != nil {
if i < 2 && strings.Contains(err.Error(), "Client.Timeout exceeded") {
logrus.Warnf("Retrying GET. Error: %v", err)
continue
}
return version, data, err
}
return version, data, err
}
return version, data, err
}

func (s *Store) Context() types.StorageContext {
Expand All @@ -178,17 +194,7 @@ func (s *Store) Context() types.StorageContext {
func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) ([]map[string]interface{}, error) {
namespace := getNamespace(apiContext, opt)

k8sClient, err := s.k8sClient(apiContext)
if err != nil {
return nil, err
}

req := s.common(namespace, k8sClient.Get())

resultList := &unstructured.UnstructuredList{}
start := time.Now()
err = req.Do().Into(resultList)
logrus.Debug("LIST: ", time.Now().Sub(start), s.resourcePlural)
resultList, err := s.retryList(namespace, apiContext)
if err != nil {
return nil, err
}
Expand All @@ -202,6 +208,30 @@ func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *ty
return apiContext.AccessControl.FilterList(apiContext, schema, result, s.authContext), nil
}

func (s *Store) retryList(namespace string, apiContext *types.APIContext) (*unstructured.UnstructuredList, error) {
resultList := &unstructured.UnstructuredList{}
k8sClient, err := s.k8sClient(apiContext)
if err != nil {
return nil, err
}

for i := 0; i < 3; i++ {
req := s.common(namespace, k8sClient.Get())
start := time.Now()
err = req.Do().Into(resultList)
logrus.Debugf("LIST: %v, %v", time.Now().Sub(start), s.resourcePlural)
if err != nil {
if i < 2 && strings.Contains(err.Error(), "Client.Timeout exceeded") {
logrus.Infof("Error on LIST %v: %v. Attempt: %v. Retrying", s.resourcePlural, err, i+1)
continue
}
return resultList, err
}
return resultList, err
}
return resultList, err
}

func (s *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) (chan map[string]interface{}, error) {
c, err := s.shareWatch(apiContext, schema, opt)
if err != nil {
Expand Down Expand Up @@ -410,7 +440,7 @@ func (s *Store) Delete(apiContext *types.APIContext, schema *types.Schema, id st
return nil, err
}

obj, err := s.ByID(apiContext, schema, id)
_, obj, err := s.byID(apiContext, schema, id, false)
if err != nil {
return nil, nil
}
Expand Down

0 comments on commit 03d294e

Please sign in to comment.