Skip to content

Commit

Permalink
Merge pull request #121040 from Jefftree/automated-cherry-pick-of-#12…
Browse files Browse the repository at this point in the history
…0814-upstream-release-1.28

Automated cherry pick of #120814: Fix 120758 - prevent cache Load on uninitialized spec

Kubernetes-commit: e3b5e621f07f0fee298f641ebded61b8f393fe27
  • Loading branch information
k8s-publishing-bot committed Oct 12, 2023
2 parents 1283b9c + b46d105 commit ba91f08
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 33 deletions.
31 changes: 13 additions & 18 deletions pkg/controllers/openapi/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ type openAPISpecInfo struct {

// The downloader is used only for non-local apiservices to
// re-update the spec every so often.
downloader cached.Data[*spec.Swagger]
// Calling Get() is not thread safe and should only be called by a single
// thread via the openapi controller.
downloader CacheableDownloader
}

type specAggregator struct {
Expand All @@ -93,8 +95,7 @@ func buildAndRegisterSpecAggregatorForLocalServices(downloader *Downloader, aggr
for i, handler := range delegationHandlers {
name := fmt.Sprintf(localDelegateChainNamePattern, i+1)

spec := NewCacheableDownloader(downloader, handler)
spec = decorateError(name, spec)
spec := NewCacheableDownloader(name, downloader, handler)
s.addLocalSpec(name, spec)
}

Expand Down Expand Up @@ -218,16 +219,21 @@ func (s *specAggregator) AddUpdateAPIService(apiService *v1.APIService, handler
s.mutex.Lock()
defer s.mutex.Unlock()

_, exists := s.specsByAPIServiceName[apiService.Name]
existingSpec, exists := s.specsByAPIServiceName[apiService.Name]
if !exists {
s.specsByAPIServiceName[apiService.Name] = &openAPISpecInfo{
specInfo := &openAPISpecInfo{
apiService: *apiService,
downloader: decorateError(apiService.Name, NewCacheableDownloader(s.downloader, handler)),
downloader: NewCacheableDownloader(apiService.Name, s.downloader, handler),
}
specInfo.spec.Replace(cached.Result[*spec.Swagger]{Err: fmt.Errorf("spec for apiservice %s is not yet available", apiService.Name)})
s.specsByAPIServiceName[apiService.Name] = specInfo
s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
} else {
existingSpec.apiService = *apiService
existingSpec.downloader.UpdateHandler(handler)
}

return s.updateServiceLocked(apiService.Name)
return nil
}

// RemoveAPIService removes an api service from OpenAPI aggregation. If it does not exist, no error is returned.
Expand All @@ -243,14 +249,3 @@ func (s *specAggregator) RemoveAPIService(apiServiceName string) {
// Re-create the mergeSpec for the new list of apiservices
s.openAPIVersionedService.UpdateSpecLazy(s.buildMergeSpecLocked())
}

// decorateError creates a new cache that wraps a downloader
// cache the name of the apiservice to help with debugging.
func decorateError(name string, cache cached.Data[*spec.Swagger]) cached.Data[*spec.Swagger] {
return cached.NewTransformer(func(result cached.Result[*spec.Swagger]) cached.Result[*spec.Swagger] {
if result.Err != nil {
return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to download %v: %v", name, result.Err))
}
return result
}, cache)
}
172 changes: 166 additions & 6 deletions pkg/controllers/openapi/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"bytes"

v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
"k8s.io/kube-openapi/pkg/common"
"k8s.io/kube-openapi/pkg/validation/spec"
Expand Down Expand Up @@ -90,6 +91,9 @@ func TestAddUpdateAPIService(t *testing.T) {
if err := s.AddUpdateAPIService(apiService, handler); err != nil {
t.Error(err)
}
if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil {
t.Error(err)
}

swagger, err := fetchOpenAPI(mux)
if err != nil {
Expand All @@ -109,7 +113,9 @@ func TestAddUpdateAPIService(t *testing.T) {
},
},
}
s.UpdateAPIServiceSpec(apiService.Name)
if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil {
t.Error(err)
}

swagger, err = fetchOpenAPI(mux)
if err != nil {
Expand Down Expand Up @@ -158,6 +164,9 @@ func TestAddRemoveAPIService(t *testing.T) {
if err := s.AddUpdateAPIService(apiService, handler); err != nil {
t.Error(err)
}
if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil {
t.Error(err)
}

swagger, err := fetchOpenAPI(mux)
if err != nil {
Expand All @@ -178,6 +187,78 @@ func TestAddRemoveAPIService(t *testing.T) {
expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
}

func TestUpdateAPIService(t *testing.T) {
mux := http.NewServeMux()
var delegationHandlers []http.Handler
delegate1 := &openAPIHandler{openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/foo/v1": {},
},
},
},
}}
delegationHandlers = append(delegationHandlers, delegate1)

s := buildAndRegisterSpecAggregator(delegationHandlers, mux)

apiService := &v1.APIService{
Spec: v1.APIServiceSpec{
Service: &v1.ServiceReference{Name: "dummy"},
},
}
apiService.Name = "apiservice"

handler := &openAPIHandler{openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/apiservicegroup/v1": {},
},
},
},
}}

handler2 := &openAPIHandler{openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{},
},
},
}}

if err := s.AddUpdateAPIService(apiService, handler); err != nil {
t.Error(err)
}
if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil {
t.Error(err)
}

swagger, err := fetchOpenAPI(mux)
if err != nil {
t.Error(err)
}
expectPath(t, swagger, "/apis/apiservicegroup/v1")
expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")

t.Logf("Updating APIService %s", apiService.Name)
if err := s.AddUpdateAPIService(apiService, handler2); err != nil {
t.Error(err)
}
if err := s.UpdateAPIServiceSpec(apiService.Name); err != nil {
t.Error(err)
}

swagger, err = fetchOpenAPI(mux)
if err != nil {
t.Error(err)
}
// Ensure that the if the APIService is added and then handler is modified, the new data is reflected in the aggregated OpenAPI.
expectNoPath(t, swagger, "/apis/apiservicegroup/v1")
expectPath(t, swagger, "/apis/apiregistration.k8s.io/v1")
}

func TestFailingAPIServiceSkippedAggregation(t *testing.T) {
mux := http.NewServeMux()
var delegationHandlers []http.Handler
Expand Down Expand Up @@ -233,8 +314,19 @@ func TestFailingAPIServiceSkippedAggregation(t *testing.T) {
},
}

s.AddUpdateAPIService(apiServiceFailed, handlerFailed)
s.AddUpdateAPIService(apiServiceSuccess, handlerSuccess)
if err := s.AddUpdateAPIService(apiServiceSuccess, handlerSuccess); err != nil {
t.Error(err)
}
if err := s.AddUpdateAPIService(apiServiceFailed, handlerFailed); err != nil {
t.Error(err)
}
if err := s.UpdateAPIServiceSpec(apiServiceSuccess.Name); err != nil {
t.Error(err)
}
err := s.UpdateAPIServiceSpec(apiServiceFailed.Name)
if err == nil {
t.Errorf("Expected updating failing apiService %s to return error", apiServiceFailed.Name)
}

swagger, err := fetchOpenAPI(mux)
if err != nil {
Expand Down Expand Up @@ -281,7 +373,12 @@ func TestAPIServiceFailSuccessTransition(t *testing.T) {
},
}

s.AddUpdateAPIService(apiService, handler)
if err := s.AddUpdateAPIService(apiService, handler); err != nil {
t.Error(err)
}
if err := s.UpdateAPIServiceSpec(apiService.Name); err == nil {
t.Errorf("Expected error for when updating spec for failing apiservice")
}

swagger, err := fetchOpenAPI(mux)
if err != nil {
Expand All @@ -304,12 +401,75 @@ func TestAPIServiceFailSuccessTransition(t *testing.T) {
expectPath(t, swagger, "/apis/apiservicegroup/v1")
}

func TestFailingAPIServiceDoesNotBlockAdd(t *testing.T) {
mux := http.NewServeMux()
var delegationHandlers []http.Handler
delegate1 := &openAPIHandler{openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/foo/v1": {},
},
},
},
}}
delegationHandlers = append(delegationHandlers, delegate1)

s := buildAndRegisterSpecAggregator(delegationHandlers, mux)

apiServiceFailed := &v1.APIService{
Spec: v1.APIServiceSpec{
Service: &v1.ServiceReference{Name: "dummy"},
},
}
apiServiceFailed.Name = "apiserviceFailed"

// Create a handler that has a long response time and ensure that
// adding the APIService does not block.
handlerFailed := &openAPIHandler{
delaySeconds: 5,
returnErr: true,
openapi: &spec.Swagger{
SwaggerProps: spec.SwaggerProps{
Paths: &spec.Paths{
Paths: map[string]spec.PathItem{
"/apis/failed/v1": {},
},
},
},
},
}

updateDone := make(chan bool)
go func() {
if err := s.AddUpdateAPIService(apiServiceFailed, handlerFailed); err != nil {
t.Error(err)
}
close(updateDone)
}()

select {
case <-updateDone:
case <-time.After(2 * time.Second):
t.Errorf("AddUpdateAPIService affected by APIService response time")
}

swagger, err := fetchOpenAPI(mux)
if err != nil {
t.Error(err)
}
expectPath(t, swagger, "/apis/foo/v1")
expectNoPath(t, swagger, "/apis/failed/v1")
}

type openAPIHandler struct {
openapi *spec.Swagger
returnErr bool
delaySeconds int
openapi *spec.Swagger
returnErr bool
}

func (o *openAPIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
time.Sleep(time.Duration(o.delaySeconds) * time.Second)
if o.returnErr {
w.WriteHeader(500)
return
Expand Down
38 changes: 30 additions & 8 deletions pkg/controllers/openapi/aggregator/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,54 @@ import (
"fmt"
"net/http"
"strings"
"sync/atomic"

"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/kube-openapi/pkg/cached"
"k8s.io/kube-openapi/pkg/validation/spec"
)

type CacheableDownloader interface {
UpdateHandler(http.Handler)
Get() cached.Result[*spec.Swagger]
}

// cacheableDownloader is a downloader that will always return the data
// and the etag.
type cacheableDownloader struct {
name string
downloader *Downloader
handler http.Handler
etag string
spec *spec.Swagger
// handler is the http Handler for the apiservice that can be replaced
handler atomic.Pointer[http.Handler]
etag string
spec *spec.Swagger
}

// Creates a downloader that also returns the etag, making it useful to use as a cached dependency.
func NewCacheableDownloader(downloader *Downloader, handler http.Handler) cached.Data[*spec.Swagger] {
return &cacheableDownloader{
// NewCacheableDownloader creates a downloader that also returns the etag, making it useful to use as a cached dependency.
func NewCacheableDownloader(apiServiceName string, downloader *Downloader, handler http.Handler) CacheableDownloader {
c := &cacheableDownloader{
name: apiServiceName,
downloader: downloader,
handler: handler,
}
c.handler.Store(&handler)
return c
}
func (d *cacheableDownloader) UpdateHandler(handler http.Handler) {
d.handler.Store(&handler)
}

func (d *cacheableDownloader) Get() cached.Result[*spec.Swagger] {
swagger, etag, status, err := d.downloader.Download(d.handler, d.etag)
r := d.get()
if r.Err != nil {
return cached.NewResultErr[*spec.Swagger](fmt.Errorf("failed to download %v: %v", d.name, r.Err))
}
return r
}

func (d *cacheableDownloader) get() cached.Result[*spec.Swagger] {
h := *d.handler.Load()
swagger, etag, status, err := d.downloader.Download(h, d.etag)
if err != nil {
return cached.NewResultErr[*spec.Swagger](err)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/openapi/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,10 @@ func (c *AggregationController) AddAPIService(handler http.Handler, apiService *

// UpdateAPIService updates API Service's info and handler.
func (c *AggregationController) UpdateAPIService(handler http.Handler, apiService *v1.APIService) {
if err := c.openAPIAggregationManager.AddUpdateAPIService(apiService, handler); err != nil {
if apiService.Spec.Service == nil {
return
}
if err := c.openAPIAggregationManager.UpdateAPIServiceSpec(apiService.Name); err != nil {
utilruntime.HandleError(fmt.Errorf("Error updating APIService %q with err: %v", apiService.Name, err))
}
key := apiService.Name
Expand Down

0 comments on commit ba91f08

Please sign in to comment.